summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/auth.py32
-rw-r--r--synapse/handlers/device.py34
-rw-r--r--synapse/handlers/federation.py202
-rw-r--r--synapse/handlers/identity.py37
4 files changed, 239 insertions, 66 deletions
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index e7a1bb7246..fffba34383 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -1,6 +1,5 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014 - 2016 OpenMarket Ltd
-# Copyright 2017 Vector Creations Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -48,7 +47,6 @@ class AuthHandler(BaseHandler):
             LoginType.PASSWORD: self._check_password_auth,
             LoginType.RECAPTCHA: self._check_recaptcha,
             LoginType.EMAIL_IDENTITY: self._check_email_identity,
-            LoginType.MSISDN: self._check_msisdn,
             LoginType.DUMMY: self._check_dummy_auth,
         }
         self.bcrypt_rounds = hs.config.bcrypt_rounds
@@ -309,47 +307,31 @@ class AuthHandler(BaseHandler):
                 defer.returnValue(True)
         raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
 
-    def _check_email_identity(self, authdict, _):
-        return self._check_threepid('email', authdict)
-
-    def _check_msisdn(self, authdict, _):
-        return self._check_threepid('msisdn', authdict)
-
     @defer.inlineCallbacks
-    def _check_dummy_auth(self, authdict, _):
-        yield run_on_reactor()
-        defer.returnValue(True)
-
-    @defer.inlineCallbacks
-    def _check_threepid(self, medium, authdict):
+    def _check_email_identity(self, authdict, _):
         yield run_on_reactor()
 
         if 'threepid_creds' not in authdict:
             raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM)
 
         threepid_creds = authdict['threepid_creds']
-
         identity_handler = self.hs.get_handlers().identity_handler
 
-        logger.info("Getting validated threepid. threepidcreds: %r", (threepid_creds,))
+        logger.info("Getting validated threepid. threepidcreds: %r" % (threepid_creds,))
         threepid = yield identity_handler.threepid_from_creds(threepid_creds)
 
         if not threepid:
             raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
 
-        if threepid['medium'] != medium:
-            raise LoginError(
-                401,
-                "Expecting threepid of type '%s', got '%s'" % (
-                    medium, threepid['medium'],
-                ),
-                errcode=Codes.UNAUTHORIZED
-            )
-
         threepid['threepid_creds'] = authdict['threepid_creds']
 
         defer.returnValue(threepid)
 
+    @defer.inlineCallbacks
+    def _check_dummy_auth(self, authdict, _):
+        yield run_on_reactor()
+        defer.returnValue(True)
+
     def _get_params_recaptcha(self):
         return {"public_key": self.hs.config.recaptcha_public_key}
 
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 9374c085db..1b007d4945 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -170,6 +170,40 @@ class DeviceHandler(BaseHandler):
         yield self.notify_device_update(user_id, [device_id])
 
     @defer.inlineCallbacks
+    def delete_devices(self, user_id, device_ids):
+        """ Delete several devices
+
+        Args:
+            user_id (str):
+            device_ids (str): The list of device IDs to delete
+
+        Returns:
+            defer.Deferred:
+        """
+
+        try:
+            yield self.store.delete_devices(user_id, device_ids)
+        except errors.StoreError, e:
+            if e.code == 404:
+                # no match
+                pass
+            else:
+                raise
+
+        # Delete access tokens and e2e keys for each device. Not optimised as it is not
+        # considered as part of a critical path.
+        for device_id in device_ids:
+            yield self.store.user_delete_access_tokens(
+                user_id, device_id=device_id,
+                delete_refresh_tokens=True,
+            )
+            yield self.store.delete_e2e_keys_by_device(
+                user_id=user_id, device_id=device_id
+            )
+
+        yield self.notify_device_update(user_id, device_ids)
+
+    @defer.inlineCallbacks
     def update_device(self, user_id, device_id, content):
         """ Update the given device
 
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index ed0fa51e7f..d0c2b4d6ed 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -31,7 +31,7 @@ from synapse.util.logcontext import (
 )
 from synapse.util.metrics import measure_func
 from synapse.util.logutils import log_function
-from synapse.util.async import run_on_reactor
+from synapse.util.async import run_on_reactor, Linearizer
 from synapse.util.frozenutils import unfreeze
 from synapse.crypto.event_signing import (
     compute_event_signature, add_hashes_and_signatures,
@@ -79,12 +79,204 @@ class FederationHandler(BaseHandler):
 
         # When joining a room we need to queue any events for that room up
         self.room_queues = {}
+        self._room_pdu_linearizer = Linearizer("fed_room_pdu")
+
+    @defer.inlineCallbacks
+    @log_function
+    def on_receive_pdu(self, origin, pdu, get_missing=True):
+        """ Process a PDU received via a federation /send/ transaction, or
+        via backfill of missing prev_events
+
+        Args:
+            origin (str): server which initiated the /send/ transaction. Will
+                be used to fetch missing events or state.
+            pdu (FrozenEvent): received PDU
+            get_missing (bool): True if we should fetch missing prev_events
+
+        Returns (Deferred): completes with None
+        """
+
+        # We reprocess pdus when we have seen them only as outliers
+        existing = yield self.get_persisted_pdu(
+            origin, pdu.event_id, do_auth=False
+        )
+
+        # FIXME: Currently we fetch an event again when we already have it
+        # if it has been marked as an outlier.
+
+        already_seen = (
+            existing and (
+                not existing.internal_metadata.is_outlier()
+                or pdu.internal_metadata.is_outlier()
+            )
+        )
+        if already_seen:
+            logger.debug("Already seen pdu %s", pdu.event_id)
+            return
+
+        state = None
+
+        auth_chain = []
+
+        have_seen = yield self.store.have_events(
+            [ev for ev, _ in pdu.prev_events]
+        )
+
+        fetch_state = False
+
+        # Get missing pdus if necessary.
+        if not pdu.internal_metadata.is_outlier():
+            # We only backfill backwards to the min depth.
+            min_depth = yield self.get_min_depth_for_context(
+                pdu.room_id
+            )
+
+            logger.debug(
+                "_handle_new_pdu min_depth for %s: %d",
+                pdu.room_id, min_depth
+            )
+
+            prevs = {e_id for e_id, _ in pdu.prev_events}
+            seen = set(have_seen.keys())
+
+            if min_depth and pdu.depth < min_depth:
+                # This is so that we don't notify the user about this
+                # message, to work around the fact that some events will
+                # reference really really old events we really don't want to
+                # send to the clients.
+                pdu.internal_metadata.outlier = True
+            elif min_depth and pdu.depth > min_depth:
+                if get_missing and prevs - seen:
+                    # If we're missing stuff, ensure we only fetch stuff one
+                    # at a time.
+                    logger.info(
+                        "Acquiring lock for room %r to fetch %d missing events: %r...",
+                        pdu.room_id, len(prevs - seen), list(prevs - seen)[:5],
+                    )
+                    with (yield self._room_pdu_linearizer.queue(pdu.room_id)):
+                        logger.info(
+                            "Acquired lock for room %r to fetch %d missing events",
+                            pdu.room_id, len(prevs - seen),
+                        )
+
+                        yield self._get_missing_events_for_pdu(
+                            origin, pdu, prevs, min_depth
+                        )
+
+            prevs = {e_id for e_id, _ in pdu.prev_events}
+            seen = set(have_seen.keys())
+            if prevs - seen:
+                logger.info(
+                    "Still missing %d events for room %r: %r...",
+                    len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
+                )
+                fetch_state = True
+
+        if fetch_state:
+            # We need to get the state at this event, since we haven't
+            # processed all the prev events.
+            logger.debug(
+                "_handle_new_pdu getting state for %s",
+                pdu.room_id
+            )
+            try:
+                state, auth_chain = yield self.replication_layer.get_state_for_room(
+                    origin, pdu.room_id, pdu.event_id,
+                )
+            except:
+                logger.exception("Failed to get state for event: %s", pdu.event_id)
+
+        yield self._process_received_pdu(
+            origin,
+            pdu,
+            state=state,
+            auth_chain=auth_chain,
+        )
+
+    @defer.inlineCallbacks
+    def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth):
+        """
+        Args:
+            origin (str): Origin of the pdu. Will be called to get the missing events
+            pdu: received pdu
+            prevs (str[]): List of event ids which we are missing
+            min_depth (int): Minimum depth of events to return.
+
+        Returns:
+            Deferred<dict(str, str?)>: updated have_seen dictionary
+        """
+        # We recalculate seen, since it may have changed.
+        have_seen = yield self.store.have_events(prevs)
+        seen = set(have_seen.keys())
+
+        if not prevs - seen:
+            # nothing left to do
+            defer.returnValue(have_seen)
+
+        latest = yield self.store.get_latest_event_ids_in_room(
+            pdu.room_id
+        )
+
+        # We add the prev events that we have seen to the latest
+        # list to ensure the remote server doesn't give them to us
+        latest = set(latest)
+        latest |= seen
+
+        logger.info(
+            "Missing %d events for room %r: %r...",
+            len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
+        )
+
+        # XXX: we set timeout to 10s to help workaround
+        # https://github.com/matrix-org/synapse/issues/1733.
+        # The reason is to avoid holding the linearizer lock
+        # whilst processing inbound /send transactions, causing
+        # FDs to stack up and block other inbound transactions
+        # which empirically can currently take up to 30 minutes.
+        #
+        # N.B. this explicitly disables retry attempts.
+        #
+        # N.B. this also increases our chances of falling back to
+        # fetching fresh state for the room if the missing event
+        # can't be found, which slightly reduces our security.
+        # it may also increase our DAG extremity count for the room,
+        # causing additional state resolution?  See #1760.
+        # However, fetching state doesn't hold the linearizer lock
+        # apparently.
+        #
+        # see https://github.com/matrix-org/synapse/pull/1744
+
+        missing_events = yield self.replication_layer.get_missing_events(
+            origin,
+            pdu.room_id,
+            earliest_events_ids=list(latest),
+            latest_events=[pdu],
+            limit=10,
+            min_depth=min_depth,
+            timeout=10000,
+        )
+
+        # We want to sort these by depth so we process them and
+        # tell clients about them in order.
+        missing_events.sort(key=lambda x: x.depth)
+
+        for e in missing_events:
+            yield self.on_receive_pdu(
+                origin,
+                e,
+                get_missing=False
+            )
+
+        have_seen = yield self.store.have_events(
+            [ev for ev, _ in pdu.prev_events]
+        )
+        defer.returnValue(have_seen)
 
     @log_function
     @defer.inlineCallbacks
-    def on_receive_pdu(self, origin, pdu, state=None, auth_chain=None):
-        """ Called by the ReplicationLayer when we have a new pdu. We need to
-        do auth checks and put it through the StateHandler.
+    def _process_received_pdu(self, origin, pdu, state=None, auth_chain=None):
+        """ Called when we have a new pdu. We need to do auth checks and put it
+        through the StateHandler.
 
         auth_chain and state are None if we already have the necessary state
         and prev_events in the db
@@ -738,7 +930,7 @@ class FederationHandler(BaseHandler):
                     continue
 
                 try:
-                    self.on_receive_pdu(origin, p)
+                    self._process_received_pdu(origin, p)
                 except:
                     logger.exception("Couldn't handle pdu")
 
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 6a53c5eb47..559e5d5a71 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -1,6 +1,5 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
-# Copyright 2017 Vector Creations Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -151,7 +150,7 @@ class IdentityHandler(BaseHandler):
         params.update(kwargs)
 
         try:
-            data = yield self.http_client.post_json_get_json(
+            data = yield self.http_client.post_urlencoded_get_json(
                 "https://%s%s" % (
                     id_server,
                     "/_matrix/identity/api/v1/validate/email/requestToken"
@@ -162,37 +161,3 @@ class IdentityHandler(BaseHandler):
         except CodeMessageException as e:
             logger.info("Proxied requestToken failed: %r", e)
             raise e
-
-    @defer.inlineCallbacks
-    def requestMsisdnToken(
-            self, id_server, country, phone_number,
-            client_secret, send_attempt, **kwargs
-    ):
-        yield run_on_reactor()
-
-        if not self._should_trust_id_server(id_server):
-            raise SynapseError(
-                400, "Untrusted ID server '%s'" % id_server,
-                Codes.SERVER_NOT_TRUSTED
-            )
-
-        params = {
-            'country': country,
-            'phone_number': phone_number,
-            'client_secret': client_secret,
-            'send_attempt': send_attempt,
-        }
-        params.update(kwargs)
-
-        try:
-            data = yield self.http_client.post_json_get_json(
-                "https://%s%s" % (
-                    id_server,
-                    "/_matrix/identity/api/v1/validate/msisdn/requestToken"
-                ),
-                params
-            )
-            defer.returnValue(data)
-        except CodeMessageException as e:
-            logger.info("Proxied requestToken failed: %r", e)
-            raise e