summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/directory.py2
-rw-r--r--synapse/handlers/e2e_keys.py19
-rw-r--r--synapse/handlers/e2e_room_keys.py130
-rw-r--r--synapse/handlers/federation.py85
-rw-r--r--synapse/handlers/message.py4
-rw-r--r--synapse/handlers/pagination.py106
6 files changed, 251 insertions, 95 deletions
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 69051101a6..a07d2f1a17 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -119,7 +119,7 @@ class DirectoryHandler(BaseHandler):
             if not service.is_interested_in_alias(room_alias.to_string()):
                 raise SynapseError(
                     400,
-                    "This application service has not reserved" " this kind of alias.",
+                    "This application service has not reserved this kind of alias.",
                     errcode=Codes.EXCLUSIVE,
                 )
         else:
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index f09a0b73c8..28c12753c1 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -30,6 +30,7 @@ from twisted.internet import defer
 from synapse.api.errors import CodeMessageException, Codes, NotFoundError, SynapseError
 from synapse.logging.context import make_deferred_yieldable, run_in_background
 from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace
+from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
 from synapse.types import (
     UserID,
     get_domain_from_id,
@@ -53,6 +54,12 @@ class E2eKeysHandler(object):
 
         self._edu_updater = SigningKeyEduUpdater(hs, self)
 
+        self._is_master = hs.config.worker_app is None
+        if not self._is_master:
+            self._user_device_resync_client = ReplicationUserDevicesResyncRestServlet.make_client(
+                hs
+            )
+
         federation_registry = hs.get_federation_registry()
 
         # FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec
@@ -191,9 +198,15 @@ class E2eKeysHandler(object):
                 # probably be tracking their device lists. However, we haven't
                 # done an initial sync on the device list so we do it now.
                 try:
-                    user_devices = yield self.device_handler.device_list_updater.user_device_resync(
-                        user_id
-                    )
+                    if self._is_master:
+                        user_devices = yield self.device_handler.device_list_updater.user_device_resync(
+                            user_id
+                        )
+                    else:
+                        user_devices = yield self._user_device_resync_client(
+                            user_id=user_id
+                        )
+
                     user_devices = user_devices["devices"]
                     for device in user_devices:
                         results[user_id] = {device["device_id"]: device["keys"]}
diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
index 0cea445f0d..f1b4424a02 100644
--- a/synapse/handlers/e2e_room_keys.py
+++ b/synapse/handlers/e2e_room_keys.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2017, 2018 New Vector Ltd
+# Copyright 2019 Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -103,14 +104,35 @@ class E2eRoomKeysHandler(object):
                 rooms
             session_id(string): session ID to delete keys for, for None to delete keys
                 for all sessions
+        Raises:
+            NotFoundError: if the backup version does not exist
         Returns:
-            A deferred of the deletion transaction
+            A dict containing the count and etag for the backup version
         """
 
         # lock for consistency with uploading
         with (yield self._upload_linearizer.queue(user_id)):
+            # make sure the backup version exists
+            try:
+                version_info = yield self.store.get_e2e_room_keys_version_info(
+                    user_id, version
+                )
+            except StoreError as e:
+                if e.code == 404:
+                    raise NotFoundError("Unknown backup version")
+                else:
+                    raise
+
             yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)
 
+            version_etag = version_info["etag"] + 1
+            yield self.store.update_e2e_room_keys_version(
+                user_id, version, None, version_etag
+            )
+
+            count = yield self.store.count_e2e_room_keys(user_id, version)
+            return {"etag": str(version_etag), "count": count}
+
     @trace
     @defer.inlineCallbacks
     def upload_room_keys(self, user_id, version, room_keys):
@@ -138,6 +160,9 @@ class E2eRoomKeysHandler(object):
             }
         }
 
+        Returns:
+            A dict containing the count and etag for the backup version
+
         Raises:
             NotFoundError: if there are no versions defined
             RoomKeysVersionError: if the uploaded version is not the current version
@@ -171,59 +196,62 @@ class E2eRoomKeysHandler(object):
                     else:
                         raise
 
-            # go through the room_keys.
-            # XXX: this should/could be done concurrently, given we're in a lock.
+            # Fetch any existing room keys for the sessions that have been
+            # submitted.  Then compare them with the submitted keys.  If the
+            # key is new, insert it; if the key should be updated, then update
+            # it; otherwise, drop it.
+            existing_keys = yield self.store.get_e2e_room_keys_multi(
+                user_id, version, room_keys["rooms"]
+            )
+            to_insert = []  # batch the inserts together
+            changed = False  # if anything has changed, we need to update the etag
             for room_id, room in iteritems(room_keys["rooms"]):
-                for session_id, session in iteritems(room["sessions"]):
-                    yield self._upload_room_key(
-                        user_id, version, room_id, session_id, session
+                for session_id, room_key in iteritems(room["sessions"]):
+                    log_kv(
+                        {
+                            "message": "Trying to upload room key",
+                            "room_id": room_id,
+                            "session_id": session_id,
+                            "user_id": user_id,
+                        }
                     )
-
-    @defer.inlineCallbacks
-    def _upload_room_key(self, user_id, version, room_id, session_id, room_key):
-        """Upload a given room_key for a given room and session into a given
-        version of the backup.  Merges the key with any which might already exist.
-
-        Args:
-            user_id(str): the user whose backup we're setting
-            version(str): the version ID of the backup we're updating
-            room_id(str): the ID of the room whose keys we're setting
-            session_id(str): the session whose room_key we're setting
-            room_key(dict): the room_key being set
-        """
-        log_kv(
-            {
-                "message": "Trying to upload room key",
-                "room_id": room_id,
-                "session_id": session_id,
-                "user_id": user_id,
-            }
-        )
-        # get the room_key for this particular row
-        current_room_key = None
-        try:
-            current_room_key = yield self.store.get_e2e_room_key(
-                user_id, version, room_id, session_id
-            )
-        except StoreError as e:
-            if e.code == 404:
-                log_kv(
-                    {
-                        "message": "Room key not found.",
-                        "room_id": room_id,
-                        "user_id": user_id,
-                    }
+                    current_room_key = existing_keys.get(room_id, {}).get(session_id)
+                    if current_room_key:
+                        if self._should_replace_room_key(current_room_key, room_key):
+                            log_kv({"message": "Replacing room key."})
+                            # updates are done one at a time in the DB, so send
+                            # updates right away rather than batching them up,
+                            # like we do with the inserts
+                            yield self.store.update_e2e_room_key(
+                                user_id, version, room_id, session_id, room_key
+                            )
+                            changed = True
+                        else:
+                            log_kv({"message": "Not replacing room_key."})
+                    else:
+                        log_kv(
+                            {
+                                "message": "Room key not found.",
+                                "room_id": room_id,
+                                "user_id": user_id,
+                            }
+                        )
+                        log_kv({"message": "Replacing room key."})
+                        to_insert.append((room_id, session_id, room_key))
+                        changed = True
+
+            if len(to_insert):
+                yield self.store.add_e2e_room_keys(user_id, version, to_insert)
+
+            version_etag = version_info["etag"]
+            if changed:
+                version_etag = version_etag + 1
+                yield self.store.update_e2e_room_keys_version(
+                    user_id, version, None, version_etag
                 )
-            else:
-                raise
 
-        if self._should_replace_room_key(current_room_key, room_key):
-            log_kv({"message": "Replacing room key."})
-            yield self.store.set_e2e_room_key(
-                user_id, version, room_id, session_id, room_key
-            )
-        else:
-            log_kv({"message": "Not replacing room_key."})
+            count = yield self.store.count_e2e_room_keys(user_id, version)
+            return {"etag": str(version_etag), "count": count}
 
     @staticmethod
     def _should_replace_room_key(current_room_key, room_key):
@@ -314,6 +342,8 @@ class E2eRoomKeysHandler(object):
                     raise NotFoundError("Unknown backup version")
                 else:
                     raise
+
+            res["count"] = yield self.store.count_e2e_room_keys(user_id, res["version"])
             return res
 
     @trace
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 0e904f2da0..a5ae7b77d1 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -2040,8 +2040,10 @@ class FederationHandler(BaseHandler):
             auth_events (dict[(str, str)->synapse.events.EventBase]):
                 Map from (event_type, state_key) to event
 
-                What we expect the event's auth_events to be, based on the event's
-                position in the dag. I think? maybe??
+                Normally, our calculated auth_events based on the state of the room
+                at the event's position in the DAG, though occasionally (eg if the
+                event is an outlier), may be the auth events claimed by the remote
+                server.
 
                 Also NB that this function adds entries to it.
         Returns:
@@ -2091,30 +2093,35 @@ class FederationHandler(BaseHandler):
             origin (str):
             event (synapse.events.EventBase):
             context (synapse.events.snapshot.EventContext):
+
             auth_events (dict[(str, str)->synapse.events.EventBase]):
+                Map from (event_type, state_key) to event
+
+                Normally, our calculated auth_events based on the state of the room
+                at the event's position in the DAG, though occasionally (eg if the
+                event is an outlier), may be the auth events claimed by the remote
+                server.
+
+                Also NB that this function adds entries to it.
 
         Returns:
             defer.Deferred[EventContext]: updated context
         """
         event_auth_events = set(event.auth_event_ids())
 
-        if event.is_state():
-            event_key = (event.type, event.state_key)
-        else:
-            event_key = None
-
-        # if the event's auth_events refers to events which are not in our
-        # calculated auth_events, we need to fetch those events from somewhere.
-        #
-        # we start by fetching them from the store, and then try calling /event_auth/.
+        # missing_auth is the set of the event's auth_events which we don't yet have
+        # in auth_events.
         missing_auth = event_auth_events.difference(
             e.event_id for e in auth_events.values()
         )
 
+        # if we have missing events, we need to fetch those events from somewhere.
+        #
+        # we start by checking if they are in the store, and then try calling /event_auth/.
         if missing_auth:
             # TODO: can we use store.have_seen_events here instead?
             have_events = yield self.store.get_seen_events_with_rejections(missing_auth)
-            logger.debug("Got events %s from store", have_events)
+            logger.debug("Found events %s in the store", have_events)
             missing_auth.difference_update(have_events.keys())
         else:
             have_events = {}
@@ -2169,15 +2176,17 @@ class FederationHandler(BaseHandler):
                     event.auth_event_ids()
                 )
             except Exception:
-                # FIXME:
                 logger.exception("Failed to get auth chain")
 
         if event.internal_metadata.is_outlier():
+            # XXX: given that, for an outlier, we'll be working with the
+            # event's *claimed* auth events rather than those we calculated:
+            # (a) is there any point in this test, since different_auth below will
+            # obviously be empty
+            # (b) alternatively, why don't we do it earlier?
             logger.info("Skipping auth_event fetch for outlier")
             return context
 
-        # FIXME: Assumes we have and stored all the state for all the
-        # prev_events
         different_auth = event_auth_events.difference(
             e.event_id for e in auth_events.values()
         )
@@ -2191,27 +2200,22 @@ class FederationHandler(BaseHandler):
             different_auth,
         )
 
+        # now we state-resolve between our own idea of the auth events, and the remote's
+        # idea of them.
+
         room_version = yield self.store.get_room_version(event.room_id)
+        different_event_ids = [
+            d for d in different_auth if d in have_events and not have_events[d]
+        ]
 
-        different_events = yield make_deferred_yieldable(
-            defer.gatherResults(
-                [
-                    run_in_background(
-                        self.store.get_event, d, allow_none=True, allow_rejected=False
-                    )
-                    for d in different_auth
-                    if d in have_events and not have_events[d]
-                ],
-                consumeErrors=True,
-            )
-        ).addErrback(unwrapFirstError)
+        if different_event_ids:
+            # XXX: currently this checks for redactions but I'm not convinced that is
+            # necessary?
+            different_events = yield self.store.get_events_as_list(different_event_ids)
 
-        if different_events:
             local_view = dict(auth_events)
             remote_view = dict(auth_events)
-            remote_view.update(
-                {(d.type, d.state_key): d for d in different_events if d}
-            )
+            remote_view.update({(d.type, d.state_key): d for d in different_events})
 
             new_state = yield self.state_handler.resolve_events(
                 room_version,
@@ -2231,13 +2235,13 @@ class FederationHandler(BaseHandler):
             auth_events.update(new_state)
 
             context = yield self._update_context_for_auth_events(
-                event, context, auth_events, event_key
+                event, context, auth_events
             )
 
         return context
 
     @defer.inlineCallbacks
-    def _update_context_for_auth_events(self, event, context, auth_events, event_key):
+    def _update_context_for_auth_events(self, event, context, auth_events):
         """Update the state_ids in an event context after auth event resolution,
         storing the changes as a new state group.
 
@@ -2246,18 +2250,21 @@ class FederationHandler(BaseHandler):
 
             context (synapse.events.snapshot.EventContext): initial event context
 
-            auth_events (dict[(str, str)->str]): Events to update in the event
+            auth_events (dict[(str, str)->EventBase]): Events to update in the event
                 context.
 
-            event_key ((str, str)): (type, state_key) for the current event.
-                this will not be included in the current_state in the context.
-
         Returns:
             Deferred[EventContext]: new event context
         """
+        # exclude the state key of the new event from the current_state in the context.
+        if event.is_state():
+            event_key = (event.type, event.state_key)
+        else:
+            event_key = None
         state_updates = {
             k: a.event_id for k, a in iteritems(auth_events) if k != event_key
         }
+
         current_state_ids = yield context.get_current_state_ids(self.store)
         current_state_ids = dict(current_state_ids)
 
@@ -2459,7 +2466,7 @@ class FederationHandler(BaseHandler):
                 room_version, event_dict, event, context
             )
 
-            EventValidator().validate_new(event)
+            EventValidator().validate_new(event, self.config)
 
             # We need to tell the transaction queue to send this out, even
             # though the sender isn't a local user.
@@ -2574,7 +2581,7 @@ class FederationHandler(BaseHandler):
         event, context = yield self.event_creation_handler.create_new_client_event(
             builder=builder
         )
-        EventValidator().validate_new(event)
+        EventValidator().validate_new(event, self.config)
         return (event, context)
 
     @defer.inlineCallbacks
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index d682dc2b7a..155ed6e06a 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -417,7 +417,7 @@ class EventCreationHandler(object):
                     403, "You must be in the room to create an alias for it"
                 )
 
-        self.validator.validate_new(event)
+        self.validator.validate_new(event, self.config)
 
         return (event, context)
 
@@ -634,7 +634,7 @@ class EventCreationHandler(object):
         if requester:
             context.app_service = requester.app_service
 
-        self.validator.validate_new(event)
+        self.validator.validate_new(event, self.config)
 
         # If this event is an annotation then we check that that the sender
         # can't annotate the same way twice (e.g. stops users from liking an
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 260a4351ca..8514ddc600 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -15,12 +15,15 @@
 # limitations under the License.
 import logging
 
+from six import iteritems
+
 from twisted.internet import defer
 from twisted.python.failure import Failure
 
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import SynapseError
 from synapse.logging.context import run_in_background
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.state import StateFilter
 from synapse.types import RoomStreamToken
 from synapse.util.async_helpers import ReadWriteLock
@@ -80,6 +83,109 @@ class PaginationHandler(object):
         self._purges_by_id = {}
         self._event_serializer = hs.get_event_client_serializer()
 
+        self._retention_default_max_lifetime = hs.config.retention_default_max_lifetime
+
+        if hs.config.retention_enabled:
+            # Run the purge jobs described in the configuration file.
+            for job in hs.config.retention_purge_jobs:
+                self.clock.looping_call(
+                    run_as_background_process,
+                    job["interval"],
+                    "purge_history_for_rooms_in_range",
+                    self.purge_history_for_rooms_in_range,
+                    job["shortest_max_lifetime"],
+                    job["longest_max_lifetime"],
+                )
+
+    @defer.inlineCallbacks
+    def purge_history_for_rooms_in_range(self, min_ms, max_ms):
+        """Purge outdated events from rooms within the given retention range.
+
+        If a default retention policy is defined in the server's configuration and its
+        'max_lifetime' is within this range, also targets rooms which don't have a
+        retention policy.
+
+        Args:
+            min_ms (int|None): Duration in milliseconds that define the lower limit of
+                the range to handle (exclusive). If None, it means that the range has no
+                lower limit.
+            max_ms (int|None): Duration in milliseconds that define the upper limit of
+                the range to handle (inclusive). If None, it means that the range has no
+                upper limit.
+        """
+        # We want the storage layer to to include rooms with no retention policy in its
+        # return value only if a default retention policy is defined in the server's
+        # configuration and that policy's 'max_lifetime' is either lower (or equal) than
+        # max_ms or higher than min_ms (or both).
+        if self._retention_default_max_lifetime is not None:
+            include_null = True
+
+            if min_ms is not None and min_ms >= self._retention_default_max_lifetime:
+                # The default max_lifetime is lower than (or equal to) min_ms.
+                include_null = False
+
+            if max_ms is not None and max_ms < self._retention_default_max_lifetime:
+                # The default max_lifetime is higher than max_ms.
+                include_null = False
+        else:
+            include_null = False
+
+        rooms = yield self.store.get_rooms_for_retention_period_in_range(
+            min_ms, max_ms, include_null
+        )
+
+        for room_id, retention_policy in iteritems(rooms):
+            if room_id in self._purges_in_progress_by_room:
+                logger.warning(
+                    "[purge] not purging room %s as there's an ongoing purge running"
+                    " for this room",
+                    room_id,
+                )
+                continue
+
+            max_lifetime = retention_policy["max_lifetime"]
+
+            if max_lifetime is None:
+                # If max_lifetime is None, it means that include_null equals True,
+                # therefore we can safely assume that there is a default policy defined
+                # in the server's configuration.
+                max_lifetime = self._retention_default_max_lifetime
+
+            # Figure out what token we should start purging at.
+            ts = self.clock.time_msec() - max_lifetime
+
+            stream_ordering = yield self.store.find_first_stream_ordering_after_ts(ts)
+
+            r = yield self.store.get_room_event_after_stream_ordering(
+                room_id, stream_ordering,
+            )
+            if not r:
+                logger.warning(
+                    "[purge] purging events not possible: No event found "
+                    "(ts %i => stream_ordering %i)",
+                    ts,
+                    stream_ordering,
+                )
+                continue
+
+            (stream, topo, _event_id) = r
+            token = "t%d-%d" % (topo, stream)
+
+            purge_id = random_string(16)
+
+            self._purges_by_id[purge_id] = PurgeStatus()
+
+            logger.info(
+                "Starting purging events in room %s (purge_id %s)" % (room_id, purge_id)
+            )
+
+            # We want to purge everything, including local events, and to run the purge in
+            # the background so that it's not blocking any other operation apart from
+            # other purges in the same room.
+            run_as_background_process(
+                "_purge_history", self._purge_history, purge_id, room_id, token, True,
+            )
+
     def start_purge_history(self, room_id, token, delete_local_events=False):
         """Start off a history purge on a room.