diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 6b347b1749..ee129c8689 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -79,6 +79,7 @@ class EventTypes(object):
RoomHistoryVisibility = "m.room.history_visibility"
CanonicalAlias = "m.room.canonical_alias"
+ Encryption = "m.room.encryption"
RoomAvatar = "m.room.avatar"
RoomEncryption = "m.room.encryption"
GuestAccess = "m.room.guest_access"
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index ff89259dec..e91697049c 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -328,9 +328,23 @@ class RoomKeysVersionError(SynapseError):
self.current_version = current_version
+class UnsupportedRoomVersionError(SynapseError):
+ """The client's request to create a room used a room version that the server does
+ not support."""
+ def __init__(self):
+ super(UnsupportedRoomVersionError, self).__init__(
+ code=400,
+ msg="Homeserver does not support this room version",
+ errcode=Codes.UNSUPPORTED_ROOM_VERSION,
+ )
+
+
class IncompatibleRoomVersionError(SynapseError):
- """A server is trying to join a room whose version it does not support."""
+ """A server is trying to join a room whose version it does not support.
+ Unlike UnsupportedRoomVersionError, it is specific to the case of the make_join
+ failing.
+ """
def __init__(self, room_version):
super(IncompatibleRoomVersionError, self).__init__(
code=400,
diff --git a/synapse/api/room_versions.py b/synapse/api/room_versions.py
index 485b3d0237..b2895355a8 100644
--- a/synapse/api/room_versions.py
+++ b/synapse/api/room_versions.py
@@ -77,9 +77,9 @@ class RoomVersions(object):
EventFormatVersions.V2,
StateResolutionVersions.V2,
)
- EVENTID_NOSLASH_TEST = RoomVersion(
- "eventid-noslash-test",
- RoomDisposition.UNSTABLE,
+ V4 = RoomVersion(
+ "4",
+ RoomDisposition.STABLE,
EventFormatVersions.V3,
StateResolutionVersions.V2,
)
@@ -95,6 +95,6 @@ KNOWN_ROOM_VERSIONS = {
RoomVersions.V2,
RoomVersions.V3,
RoomVersions.STATE_V2_TEST,
- RoomVersions.EVENTID_NOSLASH_TEST,
+ RoomVersions.V4,
)
} # type: dict[str, RoomVersion]
diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py
index 727fdc54d8..5c4fc8ff21 100644
--- a/synapse/config/homeserver.py
+++ b/synapse/config/homeserver.py
@@ -13,6 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
from .api import ApiConfig
from .appservice import AppServiceConfig
from .captcha import CaptchaConfig
@@ -36,20 +37,41 @@ from .saml2_config import SAML2Config
from .server import ServerConfig
from .server_notices_config import ServerNoticesConfig
from .spam_checker import SpamCheckerConfig
+from .stats import StatsConfig
from .tls import TlsConfig
from .user_directory import UserDirectoryConfig
from .voip import VoipConfig
from .workers import WorkerConfig
-class HomeServerConfig(ServerConfig, TlsConfig, DatabaseConfig, LoggingConfig,
- RatelimitConfig, ContentRepositoryConfig, CaptchaConfig,
- VoipConfig, RegistrationConfig, MetricsConfig, ApiConfig,
- AppServiceConfig, KeyConfig, SAML2Config, CasConfig,
- JWTConfig, PasswordConfig, EmailConfig,
- WorkerConfig, PasswordAuthProviderConfig, PushConfig,
- SpamCheckerConfig, GroupsConfig, UserDirectoryConfig,
- ConsentConfig,
- ServerNoticesConfig, RoomDirectoryConfig,
- ):
+class HomeServerConfig(
+ ServerConfig,
+ TlsConfig,
+ DatabaseConfig,
+ LoggingConfig,
+ RatelimitConfig,
+ ContentRepositoryConfig,
+ CaptchaConfig,
+ VoipConfig,
+ RegistrationConfig,
+ MetricsConfig,
+ ApiConfig,
+ AppServiceConfig,
+ KeyConfig,
+ SAML2Config,
+ CasConfig,
+ JWTConfig,
+ PasswordConfig,
+ EmailConfig,
+ WorkerConfig,
+ PasswordAuthProviderConfig,
+ PushConfig,
+ SpamCheckerConfig,
+ GroupsConfig,
+ UserDirectoryConfig,
+ ConsentConfig,
+ StatsConfig,
+ ServerNoticesConfig,
+ RoomDirectoryConfig,
+):
pass
diff --git a/synapse/config/registration.py b/synapse/config/registration.py
index 1309bce3ee..693288f938 100644
--- a/synapse/config/registration.py
+++ b/synapse/config/registration.py
@@ -123,6 +123,14 @@ class RegistrationConfig(Config):
# link. ``%%(app)s`` can be used as a placeholder for the ``app_name`` parameter
# from the ``email`` section.
#
+ # Once this feature is enabled, Synapse will look for registered users without an
+ # expiration date at startup and will add one to every account it found using the
+ # current settings at that time.
+ # This means that, if a validity period is set, and Synapse is restarted (it will
+ # then derive an expiration date from the current validity period), and some time
+ # after that the validity period changes and Synapse is restarted, the users'
+ # expiration dates won't be updated unless their account is manually renewed.
+ #
#account_validity:
# enabled: True
# period: 6w
diff --git a/synapse/config/stats.py b/synapse/config/stats.py
new file mode 100644
index 0000000000..80fc1b9dd0
--- /dev/null
+++ b/synapse/config/stats.py
@@ -0,0 +1,60 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import division
+
+import sys
+
+from ._base import Config
+
+
+class StatsConfig(Config):
+ """Stats Configuration
+ Configuration for the behaviour of synapse's stats engine
+ """
+
+ def read_config(self, config):
+ self.stats_enabled = True
+ self.stats_bucket_size = 86400
+ self.stats_retention = sys.maxsize
+ stats_config = config.get("stats", None)
+ if stats_config:
+ self.stats_enabled = stats_config.get("enabled", self.stats_enabled)
+ self.stats_bucket_size = (
+ self.parse_duration(stats_config.get("bucket_size", "1d")) / 1000
+ )
+ self.stats_retention = (
+ self.parse_duration(
+ stats_config.get("retention", "%ds" % (sys.maxsize,))
+ )
+ / 1000
+ )
+
+ def default_config(self, config_dir_path, server_name, **kwargs):
+ return """
+ # Local statistics collection. Used in populating the room directory.
+ #
+ # 'bucket_size' controls how large each statistics timeslice is. It can
+ # be defined in a human readable short form -- e.g. "1d", "1y".
+ #
+ # 'retention' controls how long historical statistics will be kept for.
+ # It can be defined in a human readable short form -- e.g. "1d", "1y".
+ #
+ #
+ #stats:
+ # enabled: true
+ # bucket_size: 1d
+ # retention: 1y
+ """
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index badeb903fc..1edd19cc13 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -21,6 +21,7 @@ import six
from unpaddedbase64 import encode_base64
+from synapse.api.errors import UnsupportedRoomVersionError
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, EventFormatVersions
from synapse.util.caches import intern_dict
from synapse.util.frozenutils import freeze
@@ -369,12 +370,15 @@ def room_version_to_event_format(room_version):
Returns:
int
+
+ Raises:
+ UnsupportedRoomVersionError if the room version is unknown
"""
v = KNOWN_ROOM_VERSIONS.get(room_version)
if not v:
- # We should have already checked version, so this should not happen
- raise RuntimeError("Unrecognized room version %s" % (room_version,))
+ # this can happen if support is withdrawn for a room version
+ raise UnsupportedRoomVersionError()
return v.event_format
diff --git a/synapse/events/builder.py b/synapse/events/builder.py
index fba27177c7..1fe995f212 100644
--- a/synapse/events/builder.py
+++ b/synapse/events/builder.py
@@ -18,6 +18,7 @@ import attr
from twisted.internet import defer
from synapse.api.constants import MAX_DEPTH
+from synapse.api.errors import UnsupportedRoomVersionError
from synapse.api.room_versions import (
KNOWN_EVENT_FORMAT_VERSIONS,
KNOWN_ROOM_VERSIONS,
@@ -178,9 +179,8 @@ class EventBuilderFactory(object):
"""
v = KNOWN_ROOM_VERSIONS.get(room_version)
if not v:
- raise Exception(
- "No event format defined for version %r" % (room_version,)
- )
+ # this can happen if support is withdrawn for a room version
+ raise UnsupportedRoomVersionError()
return self.for_room_version(v, key_values)
def for_room_version(self, room_version, key_values):
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index df60828dba..4c28c1dc3c 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -33,6 +33,7 @@ from synapse.api.errors import (
IncompatibleRoomVersionError,
NotFoundError,
SynapseError,
+ UnsupportedRoomVersionError,
)
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.crypto.event_signing import compute_event_signature
@@ -198,11 +199,22 @@ class FederationServer(FederationBase):
try:
room_version = yield self.store.get_room_version(room_id)
- format_ver = room_version_to_event_format(room_version)
except NotFoundError:
logger.info("Ignoring PDU for unknown room_id: %s", room_id)
continue
+ try:
+ format_ver = room_version_to_event_format(room_version)
+ except UnsupportedRoomVersionError:
+ # this can happen if support for a given room version is withdrawn,
+ # so that we still get events for said room.
+ logger.info(
+ "Ignoring PDU for room %s with unknown version %s",
+ room_id,
+ room_version,
+ )
+ continue
+
event = event_from_pdu_json(p, format_ver)
pdus_by_room.setdefault(room_id, []).append(event)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 71d4bfb690..e2f5b23e33 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1916,6 +1916,11 @@ class FederationHandler(BaseHandler):
event.room_id, latest_event_ids=extrem_ids,
)
+ logger.debug(
+ "Doing soft-fail check for %s: state %s",
+ event.event_id, current_state_ids,
+ )
+
# Now check if event pass auth against said current state
auth_types = auth_types_for_event(event)
current_state_ids = [
@@ -1932,7 +1937,7 @@ class FederationHandler(BaseHandler):
self.auth.check(room_version, event, auth_events=current_auth_events)
except AuthError as e:
logger.warn(
- "Failed current state auth resolution for %r because %s",
+ "Soft-failing %r because %s",
event, e,
)
event.internal_metadata.soft_failed = True
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 7b2c33a922..792edc7579 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -22,7 +22,7 @@ from canonicaljson import encode_canonical_json, json
from twisted.internet import defer
from twisted.internet.defer import succeed
-from synapse.api.constants import EventTypes, Membership
+from synapse.api.constants import EventTypes, Membership, RelationTypes
from synapse.api.errors import (
AuthError,
Codes,
@@ -601,6 +601,20 @@ class EventCreationHandler(object):
self.validator.validate_new(event)
+ # If this event is an annotation then we check that that the sender
+ # can't annotate the same way twice (e.g. stops users from liking an
+ # event multiple times).
+ relation = event.content.get("m.relates_to", {})
+ if relation.get("rel_type") == RelationTypes.ANNOTATION:
+ relates_to = relation["event_id"]
+ aggregation_key = relation["key"]
+
+ already_exists = yield self.store.has_user_annotated_event(
+ relates_to, event.type, aggregation_key, event.sender,
+ )
+ if already_exists:
+ raise SynapseError(400, "Can't send same reaction twice")
+
logger.debug(
"Created event %s",
event.event_id,
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
new file mode 100644
index 0000000000..0e92b405ba
--- /dev/null
+++ b/synapse/handlers/stats.py
@@ -0,0 +1,325 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from twisted.internet import defer
+
+from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.handlers.state_deltas import StateDeltasHandler
+from synapse.metrics import event_processing_positions
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.types import UserID
+from synapse.util.metrics import Measure
+
+logger = logging.getLogger(__name__)
+
+
+class StatsHandler(StateDeltasHandler):
+ """Handles keeping the *_stats tables updated with a simple time-series of
+ information about the users, rooms and media on the server, such that admins
+ have some idea of who is consuming their resources.
+
+ Heavily derived from UserDirectoryHandler
+ """
+
+ def __init__(self, hs):
+ super(StatsHandler, self).__init__(hs)
+ self.hs = hs
+ self.store = hs.get_datastore()
+ self.state = hs.get_state_handler()
+ self.server_name = hs.hostname
+ self.clock = hs.get_clock()
+ self.notifier = hs.get_notifier()
+ self.is_mine_id = hs.is_mine_id
+ self.stats_bucket_size = hs.config.stats_bucket_size
+
+ # The current position in the current_state_delta stream
+ self.pos = None
+
+ # Guard to ensure we only process deltas one at a time
+ self._is_processing = False
+
+ if hs.config.stats_enabled:
+ self.notifier.add_replication_callback(self.notify_new_event)
+
+ # We kick this off so that we don't have to wait for a change before
+ # we start populating stats
+ self.clock.call_later(0, self.notify_new_event)
+
+ def notify_new_event(self):
+ """Called when there may be more deltas to process
+ """
+ if not self.hs.config.stats_enabled:
+ return
+
+ if self._is_processing:
+ return
+
+ @defer.inlineCallbacks
+ def process():
+ try:
+ yield self._unsafe_process()
+ finally:
+ self._is_processing = False
+
+ self._is_processing = True
+ run_as_background_process("stats.notify_new_event", process)
+
+ @defer.inlineCallbacks
+ def _unsafe_process(self):
+ # If self.pos is None then means we haven't fetched it from DB
+ if self.pos is None:
+ self.pos = yield self.store.get_stats_stream_pos()
+
+ # If still None then the initial background update hasn't happened yet
+ if self.pos is None:
+ defer.returnValue(None)
+
+ # Loop round handling deltas until we're up to date
+ while True:
+ with Measure(self.clock, "stats_delta"):
+ deltas = yield self.store.get_current_state_deltas(self.pos)
+ if not deltas:
+ return
+
+ logger.info("Handling %d state deltas", len(deltas))
+ yield self._handle_deltas(deltas)
+
+ self.pos = deltas[-1]["stream_id"]
+ yield self.store.update_stats_stream_pos(self.pos)
+
+ event_processing_positions.labels("stats").set(self.pos)
+
+ @defer.inlineCallbacks
+ def _handle_deltas(self, deltas):
+ """
+ Called with the state deltas to process
+ """
+ for delta in deltas:
+ typ = delta["type"]
+ state_key = delta["state_key"]
+ room_id = delta["room_id"]
+ event_id = delta["event_id"]
+ stream_id = delta["stream_id"]
+ prev_event_id = delta["prev_event_id"]
+
+ logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
+
+ token = yield self.store.get_earliest_token_for_room_stats(room_id)
+
+ # If the earliest token to begin from is larger than our current
+ # stream ID, skip processing this delta.
+ if token is not None and token >= stream_id:
+ logger.debug(
+ "Ignoring: %s as earlier than this room's initial ingestion event",
+ event_id,
+ )
+ continue
+
+ if event_id is None and prev_event_id is None:
+ # Errr...
+ continue
+
+ event_content = {}
+
+ if event_id is not None:
+ event_content = (yield self.store.get_event(event_id)).content or {}
+
+ # quantise time to the nearest bucket
+ now = yield self.store.get_received_ts(event_id)
+ now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size
+
+ if typ == EventTypes.Member:
+ # we could use _get_key_change here but it's a bit inefficient
+ # given we're not testing for a specific result; might as well
+ # just grab the prev_membership and membership strings and
+ # compare them.
+ prev_event_content = {}
+ if prev_event_id is not None:
+ prev_event_content = (
+ yield self.store.get_event(prev_event_id)
+ ).content
+
+ membership = event_content.get("membership", Membership.LEAVE)
+ prev_membership = prev_event_content.get("membership", Membership.LEAVE)
+
+ if prev_membership == membership:
+ continue
+
+ if prev_membership == Membership.JOIN:
+ yield self.store.update_stats_delta(
+ now, "room", room_id, "joined_members", -1
+ )
+ elif prev_membership == Membership.INVITE:
+ yield self.store.update_stats_delta(
+ now, "room", room_id, "invited_members", -1
+ )
+ elif prev_membership == Membership.LEAVE:
+ yield self.store.update_stats_delta(
+ now, "room", room_id, "left_members", -1
+ )
+ elif prev_membership == Membership.BAN:
+ yield self.store.update_stats_delta(
+ now, "room", room_id, "banned_members", -1
+ )
+ else:
+ err = "%s is not a valid prev_membership" % (repr(prev_membership),)
+ logger.error(err)
+ raise ValueError(err)
+
+ if membership == Membership.JOIN:
+ yield self.store.update_stats_delta(
+ now, "room", room_id, "joined_members", +1
+ )
+ elif membership == Membership.INVITE:
+ yield self.store.update_stats_delta(
+ now, "room", room_id, "invited_members", +1
+ )
+ elif membership == Membership.LEAVE:
+ yield self.store.update_stats_delta(
+ now, "room", room_id, "left_members", +1
+ )
+ elif membership == Membership.BAN:
+ yield self.store.update_stats_delta(
+ now, "room", room_id, "banned_members", +1
+ )
+ else:
+ err = "%s is not a valid membership" % (repr(membership),)
+ logger.error(err)
+ raise ValueError(err)
+
+ user_id = state_key
+ if self.is_mine_id(user_id):
+ # update user_stats as it's one of our users
+ public = yield self._is_public_room(room_id)
+
+ if membership == Membership.LEAVE:
+ yield self.store.update_stats_delta(
+ now,
+ "user",
+ user_id,
+ "public_rooms" if public else "private_rooms",
+ -1,
+ )
+ elif membership == Membership.JOIN:
+ yield self.store.update_stats_delta(
+ now,
+ "user",
+ user_id,
+ "public_rooms" if public else "private_rooms",
+ +1,
+ )
+
+ elif typ == EventTypes.Create:
+ # Newly created room. Add it with all blank portions.
+ yield self.store.update_room_state(
+ room_id,
+ {
+ "join_rules": None,
+ "history_visibility": None,
+ "encryption": None,
+ "name": None,
+ "topic": None,
+ "avatar": None,
+ "canonical_alias": None,
+ },
+ )
+
+ elif typ == EventTypes.JoinRules:
+ yield self.store.update_room_state(
+ room_id, {"join_rules": event_content.get("join_rule")}
+ )
+
+ is_public = yield self._get_key_change(
+ prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
+ )
+ if is_public is not None:
+ yield self.update_public_room_stats(now, room_id, is_public)
+
+ elif typ == EventTypes.RoomHistoryVisibility:
+ yield self.store.update_room_state(
+ room_id,
+ {"history_visibility": event_content.get("history_visibility")},
+ )
+
+ is_public = yield self._get_key_change(
+ prev_event_id, event_id, "history_visibility", "world_readable"
+ )
+ if is_public is not None:
+ yield self.update_public_room_stats(now, room_id, is_public)
+
+ elif typ == EventTypes.Encryption:
+ yield self.store.update_room_state(
+ room_id, {"encryption": event_content.get("algorithm")}
+ )
+ elif typ == EventTypes.Name:
+ yield self.store.update_room_state(
+ room_id, {"name": event_content.get("name")}
+ )
+ elif typ == EventTypes.Topic:
+ yield self.store.update_room_state(
+ room_id, {"topic": event_content.get("topic")}
+ )
+ elif typ == EventTypes.RoomAvatar:
+ yield self.store.update_room_state(
+ room_id, {"avatar": event_content.get("url")}
+ )
+ elif typ == EventTypes.CanonicalAlias:
+ yield self.store.update_room_state(
+ room_id, {"canonical_alias": event_content.get("alias")}
+ )
+
+ @defer.inlineCallbacks
+ def update_public_room_stats(self, ts, room_id, is_public):
+ """
+ Increment/decrement a user's number of public rooms when a room they are
+ in changes to/from public visibility.
+
+ Args:
+ ts (int): Timestamp in seconds
+ room_id (str)
+ is_public (bool)
+ """
+ # For now, blindly iterate over all local users in the room so that
+ # we can handle the whole problem of copying buckets over as needed
+ user_ids = yield self.store.get_users_in_room(room_id)
+
+ for user_id in user_ids:
+ if self.hs.is_mine(UserID.from_string(user_id)):
+ yield self.store.update_stats_delta(
+ ts, "user", user_id, "public_rooms", +1 if is_public else -1
+ )
+ yield self.store.update_stats_delta(
+ ts, "user", user_id, "private_rooms", -1 if is_public else +1
+ )
+
+ @defer.inlineCallbacks
+ def _is_public_room(self, room_id):
+ join_rules = yield self.state.get_current_state(room_id, EventTypes.JoinRules)
+ history_visibility = yield self.state.get_current_state(
+ room_id, EventTypes.RoomHistoryVisibility
+ )
+
+ if (join_rules and join_rules.content.get("join_rule") == JoinRules.PUBLIC) or (
+ (
+ history_visibility
+ and history_visibility.content.get("history_visibility")
+ == "world_readable"
+ )
+ ):
+ defer.returnValue(True)
+ else:
+ defer.returnValue(False)
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index fdcfb90a7e..e3f828c4bb 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -16,7 +16,12 @@
import logging
-from pkg_resources import DistributionNotFound, VersionConflict, get_distribution
+from pkg_resources import (
+ DistributionNotFound,
+ Requirement,
+ VersionConflict,
+ get_provider,
+)
logger = logging.getLogger(__name__)
@@ -91,7 +96,13 @@ CONDITIONAL_REQUIREMENTS = {
# ACME support is required to provision TLS certificates from authorities
# that use the protocol, such as Let's Encrypt.
- "acme": ["txacme>=0.9.2"],
+ "acme": [
+ "txacme>=0.9.2",
+
+ # txacme depends on eliot. Eliot 1.8.0 is incompatible with
+ # python 3.5.2, as per https://github.com/itamarst/eliot/issues/418
+ 'eliot<1.8.0;python_version<"3.5.3"',
+ ],
"saml2": ["pysaml2>=4.5.0"],
"systemd": ["systemd-python>=231"],
@@ -125,10 +136,10 @@ class DependencyException(Exception):
@property
def dependencies(self):
for i in self.args[0]:
- yield '"' + i + '"'
+ yield "'" + i + "'"
-def check_requirements(for_feature=None, _get_distribution=get_distribution):
+def check_requirements(for_feature=None):
deps_needed = []
errors = []
@@ -139,7 +150,7 @@ def check_requirements(for_feature=None, _get_distribution=get_distribution):
for dependency in reqs:
try:
- _get_distribution(dependency)
+ _check_requirement(dependency)
except VersionConflict as e:
deps_needed.append(dependency)
errors.append(
@@ -157,7 +168,7 @@ def check_requirements(for_feature=None, _get_distribution=get_distribution):
for dependency in OPTS:
try:
- _get_distribution(dependency)
+ _check_requirement(dependency)
except VersionConflict as e:
deps_needed.append(dependency)
errors.append(
@@ -175,6 +186,23 @@ def check_requirements(for_feature=None, _get_distribution=get_distribution):
raise DependencyException(deps_needed)
+def _check_requirement(dependency_string):
+ """Parses a dependency string, and checks if the specified requirement is installed
+
+ Raises:
+ VersionConflict if the requirement is installed, but with the the wrong version
+ DistributionNotFound if nothing is found to provide the requirement
+ """
+ req = Requirement.parse(dependency_string)
+
+ # first check if the markers specify that this requirement needs installing
+ if req.marker is not None and not req.marker.evaluate():
+ # not required for this environment
+ return
+
+ get_provider(req)
+
+
if __name__ == "__main__":
import sys
diff --git a/synapse/server.py b/synapse/server.py
index 80d40b9272..9229a68a8d 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -72,6 +72,7 @@ from synapse.handlers.room_list import RoomListHandler
from synapse.handlers.room_member import RoomMemberMasterHandler
from synapse.handlers.room_member_worker import RoomMemberWorkerHandler
from synapse.handlers.set_password import SetPasswordHandler
+from synapse.handlers.stats import StatsHandler
from synapse.handlers.sync import SyncHandler
from synapse.handlers.typing import TypingHandler
from synapse.handlers.user_directory import UserDirectoryHandler
@@ -139,6 +140,7 @@ class HomeServer(object):
'acme_handler',
'auth_handler',
'device_handler',
+ 'stats_handler',
'e2e_keys_handler',
'e2e_room_keys_handler',
'event_handler',
@@ -191,6 +193,7 @@ class HomeServer(object):
REQUIRED_ON_MASTER_STARTUP = [
"user_directory_handler",
+ "stats_handler"
]
# This is overridden in derived application classes
@@ -474,6 +477,9 @@ class HomeServer(object):
def build_secrets(self):
return Secrets()
+ def build_stats_handler(self):
+ return StatsHandler(self)
+
def build_spam_checker(self):
return SpamChecker(self)
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 7522d3fd57..66675d08ae 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -55,6 +55,7 @@ from .roommember import RoomMemberStore
from .search import SearchStore
from .signatures import SignatureStore
from .state import StateStore
+from .stats import StatsStore
from .stream import StreamStore
from .tags import TagsStore
from .transactions import TransactionStore
@@ -100,6 +101,7 @@ class DataStore(
GroupServerStore,
UserErasureStore,
MonthlyActiveUsersStore,
+ StatsStore,
RelationsStore,
):
def __init__(self, db_conn, hs):
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 983ce026e1..fa6839ceca 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -1,5 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2017-2018 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -227,6 +229,8 @@ class SQLBaseStore(object):
# A set of tables that are not safe to use native upserts in.
self._unsafe_to_upsert_tables = set(UNIQUE_INDEX_BACKGROUND_UPDATES.keys())
+ self._account_validity = self.hs.config.account_validity
+
# We add the user_directory_search table to the blacklist on SQLite
# because the existing search table does not have an index, making it
# unsafe to use native upserts.
@@ -243,6 +247,14 @@ class SQLBaseStore(object):
self._check_safe_to_upsert,
)
+ if self._account_validity.enabled:
+ self._clock.call_later(
+ 0.0,
+ run_as_background_process,
+ "account_validity_set_expiration_dates",
+ self._set_expiration_date_when_missing,
+ )
+
@defer.inlineCallbacks
def _check_safe_to_upsert(self):
"""
@@ -275,6 +287,52 @@ class SQLBaseStore(object):
self._check_safe_to_upsert,
)
+ @defer.inlineCallbacks
+ def _set_expiration_date_when_missing(self):
+ """
+ Retrieves the list of registered users that don't have an expiration date, and
+ adds an expiration date for each of them.
+ """
+
+ def select_users_with_no_expiration_date_txn(txn):
+ """Retrieves the list of registered users with no expiration date from the
+ database.
+ """
+ sql = (
+ "SELECT users.name FROM users"
+ " LEFT JOIN account_validity ON (users.name = account_validity.user_id)"
+ " WHERE account_validity.user_id is NULL;"
+ )
+ txn.execute(sql, [])
+
+ res = self.cursor_to_dict(txn)
+ if res:
+ for user in res:
+ self.set_expiration_date_for_user_txn(txn, user["name"])
+
+ yield self.runInteraction(
+ "get_users_with_no_expiration_date",
+ select_users_with_no_expiration_date_txn,
+ )
+
+ def set_expiration_date_for_user_txn(self, txn, user_id):
+ """Sets an expiration date to the account with the given user ID.
+
+ Args:
+ user_id (str): User ID to set an expiration date for.
+ """
+ now_ms = self._clock.time_msec()
+ expiration_ts = now_ms + self._account_validity.period
+ self._simple_insert_txn(
+ txn,
+ "account_validity",
+ values={
+ "user_id": user_id,
+ "expiration_ts_ms": expiration_ts,
+ "email_sent": False,
+ },
+ )
+
def start_profiling(self):
self._previous_loop_ts = self._clock.time_msec()
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 881d6d0126..2ffc27ff41 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -575,10 +575,11 @@ class EventsStore(
def _get_events(txn, batch):
sql = """
- SELECT prev_event_id
+ SELECT prev_event_id, internal_metadata
FROM event_edges
INNER JOIN events USING (event_id)
LEFT JOIN rejections USING (event_id)
+ LEFT JOIN event_json USING (event_id)
WHERE
prev_event_id IN (%s)
AND NOT events.outlier
@@ -588,7 +589,11 @@ class EventsStore(
)
txn.execute(sql, batch)
- results.extend(r[0] for r in txn)
+ results.extend(
+ r[0]
+ for r in txn
+ if not json.loads(r[1]).get("soft_failed")
+ )
for chunk in batch_iter(event_ids, 100):
yield self.runInteraction("_get_events_which_are_prevs", _get_events, chunk)
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index e96f801631..0c569d0c6c 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -614,18 +614,27 @@ class EventsWorkerStore(SQLBaseStore):
return self.runInteraction("get_rejection_reasons", f)
- def _get_state_event_counts_txn(self, txn, room_id):
+ def _get_current_state_event_counts_txn(self, txn, room_id):
"""
- See get_state_event_counts.
+ See get_current_state_event_counts.
"""
sql = "SELECT COUNT(*) FROM current_state_events WHERE room_id=?"
txn.execute(sql, (room_id,))
row = txn.fetchone()
return row[0] if row else 0
- def get_state_event_counts(self, room_id):
+ def _get_total_state_event_counts_txn(self, txn, room_id):
"""
- Gets the total number of state events in a room.
+ See get_total_state_event_counts.
+ """
+ sql = "SELECT COUNT(*) FROM state_events WHERE room_id=?"
+ txn.execute(sql, (room_id,))
+ row = txn.fetchone()
+ return row[0] if row else 0
+
+ def get_current_state_event_counts(self, room_id):
+ """
+ Gets the current number of state events in a room.
Args:
room_id (str)
@@ -634,7 +643,7 @@ class EventsWorkerStore(SQLBaseStore):
Deferred[int]
"""
return self.runInteraction(
- "get_state_event_counts", self._get_state_event_counts_txn, room_id
+ "get_current_state_event_counts", self._get_current_state_event_counts_txn, room_id
)
@defer.inlineCallbacks
@@ -648,7 +657,7 @@ class EventsWorkerStore(SQLBaseStore):
Returns:
Deferred[dict[str:int]] of complexity version to complexity.
"""
- state_events = yield self.get_state_event_counts(room_id)
+ state_events = yield self.get_current_state_event_counts(room_id)
# Call this one "v1", so we can introduce new ones as we want to develop
# it.
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 03a06a83d6..4cf159ba81 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -1,5 +1,7 @@
# -*- coding: utf-8 -*-
-# Copyright 2014 - 2016 OpenMarket Ltd
+# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2017-2018 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -725,17 +727,7 @@ class RegistrationStore(
raise StoreError(400, "User ID already taken.", errcode=Codes.USER_IN_USE)
if self._account_validity.enabled:
- now_ms = self.clock.time_msec()
- expiration_ts = now_ms + self._account_validity.period
- self._simple_insert_txn(
- txn,
- "account_validity",
- values={
- "user_id": user_id,
- "expiration_ts_ms": expiration_ts,
- "email_sent": False,
- }
- )
+ self.set_expiration_date_for_user_txn(txn, user_id)
if token:
# it's possible for this to get a conflict, but only for a single user
diff --git a/synapse/storage/relations.py b/synapse/storage/relations.py
index 493abe405e..4c83800cca 100644
--- a/synapse/storage/relations.py
+++ b/synapse/storage/relations.py
@@ -280,7 +280,7 @@ class RelationsWorkerStore(SQLBaseStore):
having_clause = ""
sql = """
- SELECT type, aggregation_key, COUNT(*), MAX(stream_ordering)
+ SELECT type, aggregation_key, COUNT(DISTINCT sender), MAX(stream_ordering)
FROM event_relations
INNER JOIN events USING (event_id)
WHERE {where_clause}
@@ -350,9 +350,7 @@ class RelationsWorkerStore(SQLBaseStore):
"""
def _get_applicable_edit_txn(txn):
- txn.execute(
- sql, (event_id, RelationTypes.REPLACE,)
- )
+ txn.execute(sql, (event_id, RelationTypes.REPLACE))
row = txn.fetchone()
if row:
return row[0]
@@ -367,6 +365,50 @@ class RelationsWorkerStore(SQLBaseStore):
edit_event = yield self.get_event(edit_id, allow_none=True)
defer.returnValue(edit_event)
+ def has_user_annotated_event(self, parent_id, event_type, aggregation_key, sender):
+ """Check if a user has already annotated an event with the same key
+ (e.g. already liked an event).
+
+ Args:
+ parent_id (str): The event being annotated
+ event_type (str): The event type of the annotation
+ aggregation_key (str): The aggregation key of the annotation
+ sender (str): The sender of the annotation
+
+ Returns:
+ Deferred[bool]
+ """
+
+ sql = """
+ SELECT 1 FROM event_relations
+ INNER JOIN events USING (event_id)
+ WHERE
+ relates_to_id = ?
+ AND relation_type = ?
+ AND type = ?
+ AND sender = ?
+ AND aggregation_key = ?
+ LIMIT 1;
+ """
+
+ def _get_if_user_has_annotated_event(txn):
+ txn.execute(
+ sql,
+ (
+ parent_id,
+ RelationTypes.ANNOTATION,
+ event_type,
+ sender,
+ aggregation_key,
+ ),
+ )
+
+ return bool(txn.fetchone())
+
+ return self.runInteraction(
+ "get_if_user_has_annotated_event", _get_if_user_has_annotated_event
+ )
+
class RelationsStore(RelationsWorkerStore):
def _handle_event_relations(self, txn, event):
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 57df17bcc2..4bd1669458 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -142,6 +142,38 @@ class RoomMemberWorkerStore(EventsWorkerStore):
return self.runInteraction("get_room_summary", _get_room_summary_txn)
+ def _get_user_count_in_room_txn(self, txn, room_id, membership):
+ """
+ See get_user_count_in_room.
+ """
+ sql = (
+ "SELECT count(*) FROM room_memberships as m"
+ " INNER JOIN current_state_events as c"
+ " ON m.event_id = c.event_id "
+ " AND m.room_id = c.room_id "
+ " AND m.user_id = c.state_key"
+ " WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?"
+ )
+
+ txn.execute(sql, (room_id, membership))
+ row = txn.fetchone()
+ return row[0]
+
+ def get_user_count_in_room(self, room_id, membership):
+ """
+ Get the user count in a room with a particular membership.
+
+ Args:
+ room_id (str)
+ membership (Membership)
+
+ Returns:
+ Deferred[int]
+ """
+ return self.runInteraction(
+ "get_users_in_room", self._get_user_count_in_room_txn, room_id, membership
+ )
+
@cached()
def get_invited_rooms_for_user(self, user_id):
""" Get all the rooms the user is invited to
diff --git a/synapse/storage/schema/delta/54/stats.sql b/synapse/storage/schema/delta/54/stats.sql
new file mode 100644
index 0000000000..652e58308e
--- /dev/null
+++ b/synapse/storage/schema/delta/54/stats.sql
@@ -0,0 +1,80 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE stats_stream_pos (
+ Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row.
+ stream_id BIGINT,
+ CHECK (Lock='X')
+);
+
+INSERT INTO stats_stream_pos (stream_id) VALUES (null);
+
+CREATE TABLE user_stats (
+ user_id TEXT NOT NULL,
+ ts BIGINT NOT NULL,
+ bucket_size INT NOT NULL,
+ public_rooms INT NOT NULL,
+ private_rooms INT NOT NULL
+);
+
+CREATE UNIQUE INDEX user_stats_user_ts ON user_stats(user_id, ts);
+
+CREATE TABLE room_stats (
+ room_id TEXT NOT NULL,
+ ts BIGINT NOT NULL,
+ bucket_size INT NOT NULL,
+ current_state_events INT NOT NULL,
+ joined_members INT NOT NULL,
+ invited_members INT NOT NULL,
+ left_members INT NOT NULL,
+ banned_members INT NOT NULL,
+ state_events INT NOT NULL
+);
+
+CREATE UNIQUE INDEX room_stats_room_ts ON room_stats(room_id, ts);
+
+-- cache of current room state; useful for the publicRooms list
+CREATE TABLE room_state (
+ room_id TEXT NOT NULL,
+ join_rules TEXT,
+ history_visibility TEXT,
+ encryption TEXT,
+ name TEXT,
+ topic TEXT,
+ avatar TEXT,
+ canonical_alias TEXT
+ -- get aliases straight from the right table
+);
+
+CREATE UNIQUE INDEX room_state_room ON room_state(room_id);
+
+CREATE TABLE room_stats_earliest_token (
+ room_id TEXT NOT NULL,
+ token BIGINT NOT NULL
+);
+
+CREATE UNIQUE INDEX room_stats_earliest_token_idx ON room_stats_earliest_token(room_id);
+
+-- Set up staging tables
+INSERT INTO background_updates (update_name, progress_json) VALUES
+ ('populate_stats_createtables', '{}');
+
+-- Run through each room and update stats
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+ ('populate_stats_process_rooms', '{}', 'populate_stats_createtables');
+
+-- Clean up staging tables
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+ ('populate_stats_cleanup', '{}', 'populate_stats_process_rooms');
diff --git a/synapse/storage/state_deltas.py b/synapse/storage/state_deltas.py
index 31a0279b18..5fdb442104 100644
--- a/synapse/storage/state_deltas.py
+++ b/synapse/storage/state_deltas.py
@@ -84,10 +84,16 @@ class StateDeltasStore(SQLBaseStore):
"get_current_state_deltas", get_current_state_deltas_txn
)
- def get_max_stream_id_in_current_state_deltas(self):
- return self._simple_select_one_onecol(
+ def _get_max_stream_id_in_current_state_deltas_txn(self, txn):
+ return self._simple_select_one_onecol_txn(
+ txn,
table="current_state_delta_stream",
keyvalues={},
retcol="COALESCE(MAX(stream_id), -1)",
- desc="get_max_stream_id_in_current_state_deltas",
+ )
+
+ def get_max_stream_id_in_current_state_deltas(self):
+ return self.runInteraction(
+ "get_max_stream_id_in_current_state_deltas",
+ self._get_max_stream_id_in_current_state_deltas_txn,
)
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
new file mode 100644
index 0000000000..71b80a891d
--- /dev/null
+++ b/synapse/storage/stats.py
@@ -0,0 +1,450 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018, 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from twisted.internet import defer
+
+from synapse.api.constants import EventTypes, Membership
+from synapse.storage.state_deltas import StateDeltasStore
+from synapse.util.caches.descriptors import cached
+
+logger = logging.getLogger(__name__)
+
+# these fields track absolutes (e.g. total number of rooms on the server)
+ABSOLUTE_STATS_FIELDS = {
+ "room": (
+ "current_state_events",
+ "joined_members",
+ "invited_members",
+ "left_members",
+ "banned_members",
+ "state_events",
+ ),
+ "user": ("public_rooms", "private_rooms"),
+}
+
+TYPE_TO_ROOM = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
+
+TEMP_TABLE = "_temp_populate_stats"
+
+
+class StatsStore(StateDeltasStore):
+ def __init__(self, db_conn, hs):
+ super(StatsStore, self).__init__(db_conn, hs)
+
+ self.server_name = hs.hostname
+ self.clock = self.hs.get_clock()
+ self.stats_enabled = hs.config.stats_enabled
+ self.stats_bucket_size = hs.config.stats_bucket_size
+
+ self.register_background_update_handler(
+ "populate_stats_createtables", self._populate_stats_createtables
+ )
+ self.register_background_update_handler(
+ "populate_stats_process_rooms", self._populate_stats_process_rooms
+ )
+ self.register_background_update_handler(
+ "populate_stats_cleanup", self._populate_stats_cleanup
+ )
+
+ @defer.inlineCallbacks
+ def _populate_stats_createtables(self, progress, batch_size):
+
+ if not self.stats_enabled:
+ yield self._end_background_update("populate_stats_createtables")
+ defer.returnValue(1)
+
+ # Get all the rooms that we want to process.
+ def _make_staging_area(txn):
+ sql = (
+ "CREATE TABLE IF NOT EXISTS "
+ + TEMP_TABLE
+ + "_rooms(room_id TEXT NOT NULL, events BIGINT NOT NULL)"
+ )
+ txn.execute(sql)
+
+ sql = (
+ "CREATE TABLE IF NOT EXISTS "
+ + TEMP_TABLE
+ + "_position(position TEXT NOT NULL)"
+ )
+ txn.execute(sql)
+
+ # Get rooms we want to process from the database
+ sql = """
+ SELECT room_id, count(*) FROM current_state_events
+ GROUP BY room_id
+ """
+ txn.execute(sql)
+ rooms = [{"room_id": x[0], "events": x[1]} for x in txn.fetchall()]
+ self._simple_insert_many_txn(txn, TEMP_TABLE + "_rooms", rooms)
+ del rooms
+
+ new_pos = yield self.get_max_stream_id_in_current_state_deltas()
+ yield self.runInteraction("populate_stats_temp_build", _make_staging_area)
+ yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos})
+ self.get_earliest_token_for_room_stats.invalidate_all()
+
+ yield self._end_background_update("populate_stats_createtables")
+ defer.returnValue(1)
+
+ @defer.inlineCallbacks
+ def _populate_stats_cleanup(self, progress, batch_size):
+ """
+ Update the user directory stream position, then clean up the old tables.
+ """
+ if not self.stats_enabled:
+ yield self._end_background_update("populate_stats_cleanup")
+ defer.returnValue(1)
+
+ position = yield self._simple_select_one_onecol(
+ TEMP_TABLE + "_position", None, "position"
+ )
+ yield self.update_stats_stream_pos(position)
+
+ def _delete_staging_area(txn):
+ txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
+ txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")
+
+ yield self.runInteraction("populate_stats_cleanup", _delete_staging_area)
+
+ yield self._end_background_update("populate_stats_cleanup")
+ defer.returnValue(1)
+
+ @defer.inlineCallbacks
+ def _populate_stats_process_rooms(self, progress, batch_size):
+
+ if not self.stats_enabled:
+ yield self._end_background_update("populate_stats_process_rooms")
+ defer.returnValue(1)
+
+ # If we don't have progress filed, delete everything.
+ if not progress:
+ yield self.delete_all_stats()
+
+ def _get_next_batch(txn):
+ # Only fetch 250 rooms, so we don't fetch too many at once, even
+ # if those 250 rooms have less than batch_size state events.
+ sql = """
+ SELECT room_id, events FROM %s_rooms
+ ORDER BY events DESC
+ LIMIT 250
+ """ % (
+ TEMP_TABLE,
+ )
+ txn.execute(sql)
+ rooms_to_work_on = txn.fetchall()
+
+ if not rooms_to_work_on:
+ return None
+
+ # Get how many are left to process, so we can give status on how
+ # far we are in processing
+ txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
+ progress["remaining"] = txn.fetchone()[0]
+
+ return rooms_to_work_on
+
+ rooms_to_work_on = yield self.runInteraction(
+ "populate_stats_temp_read", _get_next_batch
+ )
+
+ # No more rooms -- complete the transaction.
+ if not rooms_to_work_on:
+ yield self._end_background_update("populate_stats_process_rooms")
+ defer.returnValue(1)
+
+ logger.info(
+ "Processing the next %d rooms of %d remaining",
+ (len(rooms_to_work_on), progress["remaining"]),
+ )
+
+ # Number of state events we've processed by going through each room
+ processed_event_count = 0
+
+ for room_id, event_count in rooms_to_work_on:
+
+ current_state_ids = yield self.get_current_state_ids(room_id)
+
+ join_rules = yield self.get_event(
+ current_state_ids.get((EventTypes.JoinRules, "")), allow_none=True
+ )
+ history_visibility = yield self.get_event(
+ current_state_ids.get((EventTypes.RoomHistoryVisibility, "")),
+ allow_none=True,
+ )
+ encryption = yield self.get_event(
+ current_state_ids.get((EventTypes.RoomEncryption, "")), allow_none=True
+ )
+ name = yield self.get_event(
+ current_state_ids.get((EventTypes.Name, "")), allow_none=True
+ )
+ topic = yield self.get_event(
+ current_state_ids.get((EventTypes.Topic, "")), allow_none=True
+ )
+ avatar = yield self.get_event(
+ current_state_ids.get((EventTypes.RoomAvatar, "")), allow_none=True
+ )
+ canonical_alias = yield self.get_event(
+ current_state_ids.get((EventTypes.CanonicalAlias, "")), allow_none=True
+ )
+
+ def _or_none(x, arg):
+ if x:
+ return x.content.get(arg)
+ return None
+
+ yield self.update_room_state(
+ room_id,
+ {
+ "join_rules": _or_none(join_rules, "join_rule"),
+ "history_visibility": _or_none(
+ history_visibility, "history_visibility"
+ ),
+ "encryption": _or_none(encryption, "algorithm"),
+ "name": _or_none(name, "name"),
+ "topic": _or_none(topic, "topic"),
+ "avatar": _or_none(avatar, "url"),
+ "canonical_alias": _or_none(canonical_alias, "alias"),
+ },
+ )
+
+ now = self.hs.get_reactor().seconds()
+
+ # quantise time to the nearest bucket
+ now = (now // self.stats_bucket_size) * self.stats_bucket_size
+
+ def _fetch_data(txn):
+
+ # Get the current token of the room
+ current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
+
+ current_state_events = len(current_state_ids)
+ joined_members = self._get_user_count_in_room_txn(
+ txn, room_id, Membership.JOIN
+ )
+ invited_members = self._get_user_count_in_room_txn(
+ txn, room_id, Membership.INVITE
+ )
+ left_members = self._get_user_count_in_room_txn(
+ txn, room_id, Membership.LEAVE
+ )
+ banned_members = self._get_user_count_in_room_txn(
+ txn, room_id, Membership.BAN
+ )
+ total_state_events = self._get_total_state_event_counts_txn(
+ txn, room_id
+ )
+
+ self._update_stats_txn(
+ txn,
+ "room",
+ room_id,
+ now,
+ {
+ "bucket_size": self.stats_bucket_size,
+ "current_state_events": current_state_events,
+ "joined_members": joined_members,
+ "invited_members": invited_members,
+ "left_members": left_members,
+ "banned_members": banned_members,
+ "state_events": total_state_events,
+ },
+ )
+ self._simple_insert_txn(
+ txn,
+ "room_stats_earliest_token",
+ {"room_id": room_id, "token": current_token},
+ )
+
+ yield self.runInteraction("update_room_stats", _fetch_data)
+
+ # We've finished a room. Delete it from the table.
+ yield self._simple_delete_one(TEMP_TABLE + "_rooms", {"room_id": room_id})
+ # Update the remaining counter.
+ progress["remaining"] -= 1
+ yield self.runInteraction(
+ "populate_stats",
+ self._background_update_progress_txn,
+ "populate_stats_process_rooms",
+ progress,
+ )
+
+ processed_event_count += event_count
+
+ if processed_event_count > batch_size:
+ # Don't process any more rooms, we've hit our batch size.
+ defer.returnValue(processed_event_count)
+
+ defer.returnValue(processed_event_count)
+
+ def delete_all_stats(self):
+ """
+ Delete all statistics records.
+ """
+
+ def _delete_all_stats_txn(txn):
+ txn.execute("DELETE FROM room_state")
+ txn.execute("DELETE FROM room_stats")
+ txn.execute("DELETE FROM room_stats_earliest_token")
+ txn.execute("DELETE FROM user_stats")
+
+ return self.runInteraction("delete_all_stats", _delete_all_stats_txn)
+
+ def get_stats_stream_pos(self):
+ return self._simple_select_one_onecol(
+ table="stats_stream_pos",
+ keyvalues={},
+ retcol="stream_id",
+ desc="stats_stream_pos",
+ )
+
+ def update_stats_stream_pos(self, stream_id):
+ return self._simple_update_one(
+ table="stats_stream_pos",
+ keyvalues={},
+ updatevalues={"stream_id": stream_id},
+ desc="update_stats_stream_pos",
+ )
+
+ def update_room_state(self, room_id, fields):
+ """
+ Args:
+ room_id (str)
+ fields (dict[str:Any])
+ """
+ return self._simple_upsert(
+ table="room_state",
+ keyvalues={"room_id": room_id},
+ values=fields,
+ desc="update_room_state",
+ )
+
+ def get_deltas_for_room(self, room_id, start, size=100):
+ """
+ Get statistics deltas for a given room.
+
+ Args:
+ room_id (str)
+ start (int): Pagination start. Number of entries, not timestamp.
+ size (int): How many entries to return.
+
+ Returns:
+ Deferred[list[dict]], where the dict has the keys of
+ ABSOLUTE_STATS_FIELDS["room"] and "ts".
+ """
+ return self._simple_select_list_paginate(
+ "room_stats",
+ {"room_id": room_id},
+ "ts",
+ start,
+ size,
+ retcols=(list(ABSOLUTE_STATS_FIELDS["room"]) + ["ts"]),
+ order_direction="DESC",
+ )
+
+ def get_all_room_state(self):
+ return self._simple_select_list(
+ "room_state", None, retcols=("name", "topic", "canonical_alias")
+ )
+
+ @cached()
+ def get_earliest_token_for_room_stats(self, room_id):
+ """
+ Fetch the "earliest token". This is used by the room stats delta
+ processor to ignore deltas that have been processed between the
+ start of the background task and any particular room's stats
+ being calculated.
+
+ Returns:
+ Deferred[int]
+ """
+ return self._simple_select_one_onecol(
+ "room_stats_earliest_token",
+ {"room_id": room_id},
+ retcol="token",
+ allow_none=True,
+ )
+
+ def update_stats(self, stats_type, stats_id, ts, fields):
+ table, id_col = TYPE_TO_ROOM[stats_type]
+ return self._simple_upsert(
+ table=table,
+ keyvalues={id_col: stats_id, "ts": ts},
+ values=fields,
+ desc="update_stats",
+ )
+
+ def _update_stats_txn(self, txn, stats_type, stats_id, ts, fields):
+ table, id_col = TYPE_TO_ROOM[stats_type]
+ return self._simple_upsert_txn(
+ txn, table=table, keyvalues={id_col: stats_id, "ts": ts}, values=fields
+ )
+
+ def update_stats_delta(self, ts, stats_type, stats_id, field, value):
+ def _update_stats_delta(txn):
+ table, id_col = TYPE_TO_ROOM[stats_type]
+
+ sql = (
+ "SELECT * FROM %s"
+ " WHERE %s=? and ts=("
+ " SELECT MAX(ts) FROM %s"
+ " WHERE %s=?"
+ ")"
+ ) % (table, id_col, table, id_col)
+ txn.execute(sql, (stats_id, stats_id))
+ rows = self.cursor_to_dict(txn)
+ if len(rows) == 0:
+ # silently skip as we don't have anything to apply a delta to yet.
+ # this tries to minimise any race between the initial sync and
+ # subsequent deltas arriving.
+ return
+
+ current_ts = ts
+ latest_ts = rows[0]["ts"]
+ if current_ts < latest_ts:
+ # This one is in the past, but we're just encountering it now.
+ # Mark it as part of the current bucket.
+ current_ts = latest_ts
+ elif ts != latest_ts:
+ # we have to copy our absolute counters over to the new entry.
+ values = {
+ key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type]
+ }
+ values[id_col] = stats_id
+ values["ts"] = ts
+ values["bucket_size"] = self.stats_bucket_size
+
+ self._simple_insert_txn(txn, table=table, values=values)
+
+ # actually update the new value
+ if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]:
+ self._simple_update_txn(
+ txn,
+ table=table,
+ keyvalues={id_col: stats_id, "ts": current_ts},
+ updatevalues={field: value},
+ )
+ else:
+ sql = ("UPDATE %s SET %s=%s+? WHERE %s=? AND ts=?") % (
+ table,
+ field,
+ field,
+ id_col,
+ )
+ txn.execute(sql, (value, stats_id, current_ts))
+
+ return self.runInteraction("update_stats_delta", _update_stats_delta)
|