summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorMatthew Hodgson <matthew@matrix.org>2018-03-13 22:11:58 +0000
committerMatthew Hodgson <matthew@matrix.org>2018-03-13 22:11:58 +0000
commit12350e3f9aeac73f80d1c5bb35ef9e5d0887dec0 (patch)
tree1db11e8ee3cd835de2c8d33aaf9534573d0ac357 /synapse/handlers
parentensure we always include the members for a given timeline block (diff)
parenttypoe (diff)
downloadsynapse-12350e3f9aeac73f80d1c5bb35ef9e5d0887dec0.tar.xz
merge proper fix to bug 2969
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/device.py9
-rw-r--r--synapse/handlers/devicemessage.py2
-rw-r--r--synapse/handlers/directory.py4
-rw-r--r--synapse/handlers/e2e_keys.py4
-rw-r--r--synapse/handlers/federation.py4
-rw-r--r--synapse/handlers/message.py123
-rw-r--r--synapse/handlers/presence.py11
-rw-r--r--synapse/handlers/profile.py4
-rw-r--r--synapse/handlers/receipts.py2
-rw-r--r--synapse/handlers/register.py26
-rw-r--r--synapse/handlers/room_list.py2
-rw-r--r--synapse/handlers/room_member.py103
-rw-r--r--synapse/handlers/typing.py2
13 files changed, 223 insertions, 73 deletions
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 0e83453851..40f3d24678 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -37,14 +37,15 @@ class DeviceHandler(BaseHandler):
         self.state = hs.get_state_handler()
         self._auth_handler = hs.get_auth_handler()
         self.federation_sender = hs.get_federation_sender()
-        self.federation = hs.get_replication_layer()
 
         self._edu_updater = DeviceListEduUpdater(hs, self)
 
-        self.federation.register_edu_handler(
+        federation_registry = hs.get_federation_registry()
+
+        federation_registry.register_edu_handler(
             "m.device_list_update", self._edu_updater.incoming_device_list_update,
         )
-        self.federation.register_query_handler(
+        federation_registry.register_query_handler(
             "user_devices", self.on_federation_query_user_devices,
         )
 
@@ -430,7 +431,7 @@ class DeviceListEduUpdater(object):
 
     def __init__(self, hs, device_handler):
         self.store = hs.get_datastore()
-        self.federation = hs.get_replication_layer()
+        self.federation = hs.get_federation_client()
         self.clock = hs.get_clock()
         self.device_handler = device_handler
 
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index d996aa90bb..f147a20b73 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -37,7 +37,7 @@ class DeviceMessageHandler(object):
         self.is_mine = hs.is_mine
         self.federation = hs.get_federation_sender()
 
-        hs.get_replication_layer().register_edu_handler(
+        hs.get_federation_registry().register_edu_handler(
             "m.direct_to_device", self.on_direct_to_device_edu
         )
 
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 8580ada60a..c5b6e75e03 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -36,8 +36,8 @@ class DirectoryHandler(BaseHandler):
         self.appservice_handler = hs.get_application_service_handler()
         self.event_creation_handler = hs.get_event_creation_handler()
 
-        self.federation = hs.get_replication_layer()
-        self.federation.register_query_handler(
+        self.federation = hs.get_federation_client()
+        hs.get_federation_registry().register_query_handler(
             "directory", self.on_directory_query
         )
 
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 9aa95f89e6..31b1ece13e 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -32,7 +32,7 @@ logger = logging.getLogger(__name__)
 class E2eKeysHandler(object):
     def __init__(self, hs):
         self.store = hs.get_datastore()
-        self.federation = hs.get_replication_layer()
+        self.federation = hs.get_federation_client()
         self.device_handler = hs.get_device_handler()
         self.is_mine = hs.is_mine
         self.clock = hs.get_clock()
@@ -40,7 +40,7 @@ class E2eKeysHandler(object):
         # doesn't really work as part of the generic query API, because the
         # query request requires an object POST, but we abuse the
         # "query handler" interface.
-        self.federation.register_query_handler(
+        hs.get_federation_registry().register_query_handler(
             "client_keys", self.on_federation_query_client_keys
         )
 
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 520612683e..080aca3d71 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -68,7 +68,7 @@ class FederationHandler(BaseHandler):
         self.hs = hs
 
         self.store = hs.get_datastore()
-        self.replication_layer = hs.get_replication_layer()
+        self.replication_layer = hs.get_federation_client()
         self.state_handler = hs.get_state_handler()
         self.server_name = hs.hostname
         self.keyring = hs.get_keyring()
@@ -78,8 +78,6 @@ class FederationHandler(BaseHandler):
         self.spam_checker = hs.get_spam_checker()
         self.event_creation_handler = hs.get_event_creation_handler()
 
-        self.replication_layer.set_handler(self)
-
         # When joining a room we need to queue any events for that room up
         self.room_queues = {}
         self._room_pdu_linearizer = Linearizer("fed_room_pdu")
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index dd00d8a86c..4f97c8db79 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -13,7 +13,8 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from twisted.internet import defer
+from twisted.internet import defer, reactor
+from twisted.python.failure import Failure
 
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError, Codes, SynapseError
@@ -24,9 +25,10 @@ from synapse.types import (
     UserID, RoomAlias, RoomStreamToken,
 )
 from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import preserve_fn, run_in_background
 from synapse.util.metrics import measure_func
 from synapse.util.frozenutils import unfreeze
+from synapse.util.stringutils import random_string
 from synapse.visibility import filter_events_for_client
 from synapse.replication.http.send_event import send_event_to_master
 
@@ -41,6 +43,36 @@ import ujson
 logger = logging.getLogger(__name__)
 
 
+class PurgeStatus(object):
+    """Object tracking the status of a purge request
+
+    This class contains information on the progress of a purge request, for
+    return by get_purge_status.
+
+    Attributes:
+        status (int): Tracks whether this request has completed. One of
+            STATUS_{ACTIVE,COMPLETE,FAILED}
+    """
+
+    STATUS_ACTIVE = 0
+    STATUS_COMPLETE = 1
+    STATUS_FAILED = 2
+
+    STATUS_TEXT = {
+        STATUS_ACTIVE: "active",
+        STATUS_COMPLETE: "complete",
+        STATUS_FAILED: "failed",
+    }
+
+    def __init__(self):
+        self.status = PurgeStatus.STATUS_ACTIVE
+
+    def asdict(self):
+        return {
+            "status": PurgeStatus.STATUS_TEXT[self.status]
+        }
+
+
 class MessageHandler(BaseHandler):
 
     def __init__(self, hs):
@@ -50,15 +82,88 @@ class MessageHandler(BaseHandler):
         self.clock = hs.get_clock()
 
         self.pagination_lock = ReadWriteLock()
+        self._purges_in_progress_by_room = set()
+        # map from purge id to PurgeStatus
+        self._purges_by_id = {}
 
-    @defer.inlineCallbacks
-    def purge_history(self, room_id, topological_ordering,
-                      delete_local_events=False):
-        with (yield self.pagination_lock.write(room_id)):
-            yield self.store.purge_history(
-                room_id, topological_ordering, delete_local_events,
+    def start_purge_history(self, room_id, topological_ordering,
+                            delete_local_events=False):
+        """Start off a history purge on a room.
+
+        Args:
+            room_id (str): The room to purge from
+
+            topological_ordering (int): minimum topo ordering to preserve
+            delete_local_events (bool): True to delete local events as well as
+                remote ones
+
+        Returns:
+            str: unique ID for this purge transaction.
+        """
+        if room_id in self._purges_in_progress_by_room:
+            raise SynapseError(
+                400,
+                "History purge already in progress for %s" % (room_id, ),
             )
 
+        purge_id = random_string(16)
+
+        # we log the purge_id here so that it can be tied back to the
+        # request id in the log lines.
+        logger.info("[purge] starting purge_id %s", purge_id)
+
+        self._purges_by_id[purge_id] = PurgeStatus()
+        run_in_background(
+            self._purge_history,
+            purge_id, room_id, topological_ordering, delete_local_events,
+        )
+        return purge_id
+
+    @defer.inlineCallbacks
+    def _purge_history(self, purge_id, room_id, topological_ordering,
+                       delete_local_events):
+        """Carry out a history purge on a room.
+
+        Args:
+            purge_id (str): The id for this purge
+            room_id (str): The room to purge from
+            topological_ordering (int): minimum topo ordering to preserve
+            delete_local_events (bool): True to delete local events as well as
+                remote ones
+
+        Returns:
+            Deferred
+        """
+        self._purges_in_progress_by_room.add(room_id)
+        try:
+            with (yield self.pagination_lock.write(room_id)):
+                yield self.store.purge_history(
+                    room_id, topological_ordering, delete_local_events,
+                )
+            logger.info("[purge] complete")
+            self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
+        except Exception:
+            logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
+            self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
+        finally:
+            self._purges_in_progress_by_room.discard(room_id)
+
+            # remove the purge from the list 24 hours after it completes
+            def clear_purge():
+                del self._purges_by_id[purge_id]
+            reactor.callLater(24 * 3600, clear_purge)
+
+    def get_purge_status(self, purge_id):
+        """Get the current status of an active purge
+
+        Args:
+            purge_id (str): purge_id returned by start_purge_history
+
+        Returns:
+            PurgeStatus|None
+        """
+        return self._purges_by_id.get(purge_id)
+
     @defer.inlineCallbacks
     def get_messages(self, requester, room_id=None, pagin_config=None,
                      as_client_event=True, event_filter=None):
@@ -562,7 +667,7 @@ class EventCreationHandler(object):
             event (FrozenEvent)
             context (EventContext)
             ratelimit (bool)
-            extra_users (list(str)): Any extra users to notify about event
+            extra_users (list(UserID)): Any extra users to notify about event
         """
 
         try:
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index cb158ba962..a5e501897c 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -93,29 +93,30 @@ class PresenceHandler(object):
         self.store = hs.get_datastore()
         self.wheel_timer = WheelTimer()
         self.notifier = hs.get_notifier()
-        self.replication = hs.get_replication_layer()
         self.federation = hs.get_federation_sender()
 
         self.state = hs.get_state_handler()
 
-        self.replication.register_edu_handler(
+        federation_registry = hs.get_federation_registry()
+
+        federation_registry.register_edu_handler(
             "m.presence", self.incoming_presence
         )
-        self.replication.register_edu_handler(
+        federation_registry.register_edu_handler(
             "m.presence_invite",
             lambda origin, content: self.invite_presence(
                 observed_user=UserID.from_string(content["observed_user"]),
                 observer_user=UserID.from_string(content["observer_user"]),
             )
         )
-        self.replication.register_edu_handler(
+        federation_registry.register_edu_handler(
             "m.presence_accept",
             lambda origin, content: self.accept_presence(
                 observed_user=UserID.from_string(content["observed_user"]),
                 observer_user=UserID.from_string(content["observer_user"]),
             )
         )
-        self.replication.register_edu_handler(
+        federation_registry.register_edu_handler(
             "m.presence_deny",
             lambda origin, content: self.deny_presence(
                 observed_user=UserID.from_string(content["observed_user"]),
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index c9c2879038..cb710fe796 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -31,8 +31,8 @@ class ProfileHandler(BaseHandler):
     def __init__(self, hs):
         super(ProfileHandler, self).__init__(hs)
 
-        self.federation = hs.get_replication_layer()
-        self.federation.register_query_handler(
+        self.federation = hs.get_federation_client()
+        hs.get_federation_registry().register_query_handler(
             "profile", self.on_profile_query
         )
 
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 0525765272..3f215c2b4e 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -35,7 +35,7 @@ class ReceiptsHandler(BaseHandler):
         self.store = hs.get_datastore()
         self.hs = hs
         self.federation = hs.get_federation_sender()
-        hs.get_replication_layer().register_edu_handler(
+        hs.get_federation_registry().register_edu_handler(
             "m.receipt", self._received_remote_receipt
         )
         self.clock = self.hs.get_clock()
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 9021d4d57f..ed5939880a 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -446,16 +446,34 @@ class RegistrationHandler(BaseHandler):
         return self.hs.get_auth_handler()
 
     @defer.inlineCallbacks
-    def guest_access_token_for(self, medium, address, inviter_user_id):
+    def get_or_register_3pid_guest(self, medium, address, inviter_user_id):
+        """Get a guest access token for a 3PID, creating a guest account if
+        one doesn't already exist.
+
+        Args:
+            medium (str)
+            address (str)
+            inviter_user_id (str): The user ID who is trying to invite the
+                3PID
+
+        Returns:
+            Deferred[(str, str)]: A 2-tuple of `(user_id, access_token)` of the
+            3PID guest account.
+        """
         access_token = yield self.store.get_3pid_guest_access_token(medium, address)
         if access_token:
-            defer.returnValue(access_token)
+            user_info = yield self.auth.get_user_by_access_token(
+                access_token
+            )
 
-        _, access_token = yield self.register(
+            defer.returnValue((user_info["user"].to_string(), access_token))
+
+        user_id, access_token = yield self.register(
             generate_token=True,
             make_guest=True
         )
         access_token = yield self.store.save_or_get_3pid_guest_access_token(
             medium, address, access_token, inviter_user_id
         )
-        defer.returnValue(access_token)
+
+        defer.returnValue((user_id, access_token))
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index dfa09141ed..5d81f59b44 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -409,7 +409,7 @@ class RoomListHandler(BaseHandler):
     def _get_remote_list_cached(self, server_name, limit=None, since_token=None,
                                 search_filter=None, include_all_networks=False,
                                 third_party_instance_id=None,):
-        repl_layer = self.hs.get_replication_layer()
+        repl_layer = self.hs.get_federation_client()
         if search_filter:
             # We can't cache when asking for search
             return repl_layer.get_public_rooms(
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index ed3b97730d..0127cf4166 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -55,7 +55,6 @@ class RoomMemberHandler(object):
         self.registration_handler = hs.get_handlers().registration_handler
         self.profile_handler = hs.get_profile_handler()
         self.event_creation_hander = hs.get_event_creation_handler()
-        self.replication_layer = hs.get_replication_layer()
 
         self.member_linearizer = Linearizer(name="member")
 
@@ -138,7 +137,20 @@ class RoomMemberHandler(object):
         defer.returnValue(event)
 
     @defer.inlineCallbacks
-    def remote_join(self, remote_room_hosts, room_id, user, content):
+    def _remote_join(self, remote_room_hosts, room_id, user, content):
+        """Try and join a room that this server is not in
+
+        Args:
+            remote_room_hosts (list[str]): List of servers that can be used
+                to join via.
+            room_id (str): Room that we are trying to join
+            user (UserID): User who is trying to join
+            content (dict): A dict that should be used as the content of the
+                join event.
+
+        Returns:
+            Deferred
+        """
         if len(remote_room_hosts) == 0:
             raise SynapseError(404, "No known servers")
 
@@ -155,6 +167,43 @@ class RoomMemberHandler(object):
         yield user_joined_room(self.distributor, user, room_id)
 
     @defer.inlineCallbacks
+    def _remote_reject_invite(self, remote_room_hosts, room_id, target):
+        """Attempt to reject an invite for a room this server is not in. If we
+        fail to do so we locally mark the invite as rejected.
+
+        Args:
+            remote_room_hosts (list[str]): List of servers to use to try and
+                reject invite
+            room_id (str)
+            target (UserID): The user rejecting the invite
+
+        Returns:
+            Deferred[dict]: A dictionary to be returned to the client, may
+            include event_id etc, or nothing if we locally rejected
+        """
+        fed_handler = self.federation_handler
+        try:
+            ret = yield fed_handler.do_remotely_reject_invite(
+                remote_room_hosts,
+                room_id,
+                target.to_string(),
+            )
+            defer.returnValue(ret)
+        except Exception as e:
+            # if we were unable to reject the exception, just mark
+            # it as rejected on our end and plough ahead.
+            #
+            # The 'except' clause is very broad, but we need to
+            # capture everything from DNS failures upwards
+            #
+            logger.warn("Failed to reject invite: %s", e)
+
+            yield self.store.locally_reject_invite(
+                target.to_string(), room_id
+            )
+            defer.returnValue({})
+
+    @defer.inlineCallbacks
     def update_membership(
             self,
             requester,
@@ -212,7 +261,7 @@ class RoomMemberHandler(object):
         # if this is a join with a 3pid signature, we may need to turn a 3pid
         # invite into a normal invite before we can handle the join.
         if third_party_signed is not None:
-            yield self.replication_layer.exchange_third_party_invite(
+            yield self.federation_handler.exchange_third_party_invite(
                 third_party_signed["sender"],
                 target.to_string(),
                 room_id,
@@ -292,7 +341,7 @@ class RoomMemberHandler(object):
                     raise AuthError(403, "Guest access not allowed")
 
             if not is_host_in_room:
-                inviter = yield self.get_inviter(target.to_string(), room_id)
+                inviter = yield self._get_inviter(target.to_string(), room_id)
                 if inviter and not self.hs.is_mine(inviter):
                     remote_room_hosts.append(inviter.domain)
 
@@ -306,7 +355,7 @@ class RoomMemberHandler(object):
                 if requester.is_guest:
                     content["kind"] = "guest"
 
-                ret = yield self.remote_join(
+                ret = yield self._remote_join(
                     remote_room_hosts, room_id, target, content
                 )
                 defer.returnValue(ret)
@@ -314,7 +363,7 @@ class RoomMemberHandler(object):
         elif effective_membership_state == Membership.LEAVE:
             if not is_host_in_room:
                 # perhaps we've been invited
-                inviter = yield self.get_inviter(target.to_string(), room_id)
+                inviter = yield self._get_inviter(target.to_string(), room_id)
                 if not inviter:
                     raise SynapseError(404, "Not a known room")
 
@@ -328,28 +377,10 @@ class RoomMemberHandler(object):
                 else:
                     # send the rejection to the inviter's HS.
                     remote_room_hosts = remote_room_hosts + [inviter.domain]
-                    fed_handler = self.federation_handler
-                    try:
-                        ret = yield fed_handler.do_remotely_reject_invite(
-                            remote_room_hosts,
-                            room_id,
-                            target.to_string(),
-                        )
-                        defer.returnValue(ret)
-                    except Exception as e:
-                        # if we were unable to reject the exception, just mark
-                        # it as rejected on our end and plough ahead.
-                        #
-                        # The 'except' clause is very broad, but we need to
-                        # capture everything from DNS failures upwards
-                        #
-                        logger.warn("Failed to reject invite: %s", e)
-
-                        yield self.store.locally_reject_invite(
-                            target.to_string(), room_id
-                        )
-
-                        defer.returnValue({})
+                    res = yield self._remote_reject_invite(
+                        remote_room_hosts, room_id, target,
+                    )
+                    defer.returnValue(res)
 
         res = yield self._local_membership_update(
             requester=requester,
@@ -496,7 +527,7 @@ class RoomMemberHandler(object):
         defer.returnValue((RoomID.from_string(room_id), servers))
 
     @defer.inlineCallbacks
-    def get_inviter(self, user_id, room_id):
+    def _get_inviter(self, user_id, room_id):
         invite = yield self.store.get_invite_for_user_in_room(
             user_id=user_id,
             room_id=room_id,
@@ -573,7 +604,7 @@ class RoomMemberHandler(object):
             if "mxid" in data:
                 if "signatures" not in data:
                     raise AuthError(401, "No signatures on 3pid binding")
-                yield self.verify_any_signature(data, id_server)
+                yield self._verify_any_signature(data, id_server)
                 defer.returnValue(data["mxid"])
 
         except IOError as e:
@@ -581,7 +612,7 @@ class RoomMemberHandler(object):
             defer.returnValue(None)
 
     @defer.inlineCallbacks
-    def verify_any_signature(self, data, server_hostname):
+    def _verify_any_signature(self, data, server_hostname):
         if server_hostname not in data["signatures"]:
             raise AuthError(401, "No signature from server %s" % (server_hostname,))
         for key_name, signature in data["signatures"][server_hostname].items():
@@ -735,20 +766,16 @@ class RoomMemberHandler(object):
         }
 
         if self.config.invite_3pid_guest:
-            registration_handler = self.registration_handler
-            guest_access_token = yield registration_handler.guest_access_token_for(
+            rh = self.registration_handler
+            guest_user_id, guest_access_token = yield rh.get_or_register_3pid_guest(
                 medium=medium,
                 address=address,
                 inviter_user_id=inviter_user_id,
             )
 
-            guest_user_info = yield self.auth.get_user_by_access_token(
-                guest_access_token
-            )
-
             invite_config.update({
                 "guest_access_token": guest_access_token,
-                "guest_user_id": guest_user_info["user"].to_string(),
+                "guest_user_id": guest_user_id,
             })
 
         data = yield self.simple_http_client.post_urlencoded_get_json(
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 82dedbbc99..77c0cf146f 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -56,7 +56,7 @@ class TypingHandler(object):
 
         self.federation = hs.get_federation_sender()
 
-        hs.get_replication_layer().register_edu_handler("m.typing", self._recv_edu)
+        hs.get_federation_registry().register_edu_handler("m.typing", self._recv_edu)
 
         hs.get_distributor().observe("user_left_room", self.user_left_room)