summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorJorik Schellekens <joriksch@gmail.com>2019-07-29 16:34:44 +0100
committerErik Johnston <erik@matrix.org>2019-07-29 16:34:44 +0100
commit85b0bd8fe05ed78548c9b2b0da768927582f7d70 (patch)
tree907e0425888dec128674b9b86427f64a76619a68 /synapse/handlers
parentFix debian packages for sid being called buster. (#5775) (diff)
downloadsynapse-85b0bd8fe05ed78548c9b2b0da768927582f7d70.tar.xz
Update the device list cache when keys/query is called (#5693)
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/device.py150
-rw-r--r--synapse/handlers/e2e_keys.py60
2 files changed, 136 insertions, 74 deletions
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index d6ab337783..5c1cf83c9d 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -209,12 +209,12 @@ class DeviceHandler(DeviceWorkerHandler):
 
         self.federation_sender = hs.get_federation_sender()
 
-        self._edu_updater = DeviceListEduUpdater(hs, self)
+        self.device_list_updater = DeviceListUpdater(hs, self)
 
         federation_registry = hs.get_federation_registry()
 
         federation_registry.register_edu_handler(
-            "m.device_list_update", self._edu_updater.incoming_device_list_update
+            "m.device_list_update", self.device_list_updater.incoming_device_list_update
         )
         federation_registry.register_query_handler(
             "user_devices", self.on_federation_query_user_devices
@@ -426,7 +426,7 @@ def _update_device_from_client_ips(device, client_ips):
     device.update({"last_seen_ts": ip.get("last_seen"), "last_seen_ip": ip.get("ip")})
 
 
-class DeviceListEduUpdater(object):
+class DeviceListUpdater(object):
     "Handles incoming device list updates from federation and updates the DB"
 
     def __init__(self, hs, device_handler):
@@ -519,75 +519,7 @@ class DeviceListEduUpdater(object):
             logger.debug("Need to re-sync devices for %r? %r", user_id, resync)
 
             if resync:
-                # Fetch all devices for the user.
-                origin = get_domain_from_id(user_id)
-                try:
-                    result = yield self.federation.query_user_devices(origin, user_id)
-                except (
-                    NotRetryingDestination,
-                    RequestSendFailed,
-                    HttpResponseException,
-                ):
-                    # TODO: Remember that we are now out of sync and try again
-                    # later
-                    logger.warn("Failed to handle device list update for %s", user_id)
-                    # We abort on exceptions rather than accepting the update
-                    # as otherwise synapse will 'forget' that its device list
-                    # is out of date. If we bail then we will retry the resync
-                    # next time we get a device list update for this user_id.
-                    # This makes it more likely that the device lists will
-                    # eventually become consistent.
-                    return
-                except FederationDeniedError as e:
-                    logger.info(e)
-                    return
-                except Exception:
-                    # TODO: Remember that we are now out of sync and try again
-                    # later
-                    logger.exception(
-                        "Failed to handle device list update for %s", user_id
-                    )
-                    return
-
-                stream_id = result["stream_id"]
-                devices = result["devices"]
-
-                # If the remote server has more than ~1000 devices for this user
-                # we assume that something is going horribly wrong (e.g. a bot
-                # that logs in and creates a new device every time it tries to
-                # send a message).  Maintaining lots of devices per user in the
-                # cache can cause serious performance issues as if this request
-                # takes more than 60s to complete, internal replication from the
-                # inbound federation worker to the synapse master may time out
-                # causing the inbound federation to fail and causing the remote
-                # server to retry, causing a DoS.  So in this scenario we give
-                # up on storing the total list of devices and only handle the
-                # delta instead.
-                if len(devices) > 1000:
-                    logger.warn(
-                        "Ignoring device list snapshot for %s as it has >1K devs (%d)",
-                        user_id,
-                        len(devices),
-                    )
-                    devices = []
-
-                for device in devices:
-                    logger.debug(
-                        "Handling resync update %r/%r, ID: %r",
-                        user_id,
-                        device["device_id"],
-                        stream_id,
-                    )
-
-                yield self.store.update_remote_device_list_cache(
-                    user_id, devices, stream_id
-                )
-                device_ids = [device["device_id"] for device in devices]
-                yield self.device_handler.notify_device_update(user_id, device_ids)
-
-                # We clobber the seen updates since we've re-synced from a given
-                # point.
-                self._seen_updates[user_id] = set([stream_id])
+                yield self.user_device_resync(user_id)
             else:
                 # Simply update the single device, since we know that is the only
                 # change (because of the single prev_id matching the current cache)
@@ -634,3 +566,77 @@ class DeviceListEduUpdater(object):
             stream_id_in_updates.add(stream_id)
 
         return False
+
+    @defer.inlineCallbacks
+    def user_device_resync(self, user_id):
+        """Fetches all devices for a user and updates the device cache with them.
+
+        Args:
+            user_id (str): The user's id whose device_list will be updated.
+        Returns:
+            Deferred[dict]: a dict with device info as under the "devices" in the result of this
+            request:
+            https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid
+        """
+        # Fetch all devices for the user.
+        origin = get_domain_from_id(user_id)
+        try:
+            result = yield self.federation.query_user_devices(origin, user_id)
+        except (NotRetryingDestination, RequestSendFailed, HttpResponseException):
+            # TODO: Remember that we are now out of sync and try again
+            # later
+            logger.warn("Failed to handle device list update for %s", user_id)
+            # We abort on exceptions rather than accepting the update
+            # as otherwise synapse will 'forget' that its device list
+            # is out of date. If we bail then we will retry the resync
+            # next time we get a device list update for this user_id.
+            # This makes it more likely that the device lists will
+            # eventually become consistent.
+            return
+        except FederationDeniedError as e:
+            logger.info(e)
+            return
+        except Exception:
+            # TODO: Remember that we are now out of sync and try again
+            # later
+            logger.exception("Failed to handle device list update for %s", user_id)
+            return
+        stream_id = result["stream_id"]
+        devices = result["devices"]
+
+        # If the remote server has more than ~1000 devices for this user
+        # we assume that something is going horribly wrong (e.g. a bot
+        # that logs in and creates a new device every time it tries to
+        # send a message).  Maintaining lots of devices per user in the
+        # cache can cause serious performance issues as if this request
+        # takes more than 60s to complete, internal replication from the
+        # inbound federation worker to the synapse master may time out
+        # causing the inbound federation to fail and causing the remote
+        # server to retry, causing a DoS.  So in this scenario we give
+        # up on storing the total list of devices and only handle the
+        # delta instead.
+        if len(devices) > 1000:
+            logger.warn(
+                "Ignoring device list snapshot for %s as it has >1K devs (%d)",
+                user_id,
+                len(devices),
+            )
+            devices = []
+
+        for device in devices:
+            logger.debug(
+                "Handling resync update %r/%r, ID: %r",
+                user_id,
+                device["device_id"],
+                stream_id,
+            )
+
+        yield self.store.update_remote_device_list_cache(user_id, devices, stream_id)
+        device_ids = [device["device_id"] for device in devices]
+        yield self.device_handler.notify_device_update(user_id, device_ids)
+
+        # We clobber the seen updates since we've re-synced from a given
+        # point.
+        self._seen_updates[user_id] = set([stream_id])
+
+        defer.returnValue(result)
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 1300b540e3..366a0bc68b 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -65,6 +65,7 @@ class E2eKeysHandler(object):
             }
         }
         """
+
         device_keys_query = query_body.get("device_keys", {})
 
         # separate users by domain.
@@ -121,7 +122,58 @@ class E2eKeysHandler(object):
         # Now fetch any devices that we don't have in our cache
         @defer.inlineCallbacks
         def do_remote_query(destination):
+            """This is called when we are querying the device list of a user on
+            a remote homeserver and their device list is not in the device list
+            cache. If we share a room with this user and we're not querying for
+            specific user we will update the cache
+            with their device list."""
+
             destination_query = remote_queries_not_in_cache[destination]
+
+            # We first consider whether we wish to update the device list cache with
+            # the users device list. We want to track a user's devices when the
+            # authenticated user shares a room with the queried user and the query
+            # has not specified a particular device.
+            # If we update the cache for the queried user we remove them from further
+            # queries. We use the more efficient batched query_client_keys for all
+            # remaining users
+            user_ids_updated = []
+            for (user_id, device_list) in destination_query.items():
+                if user_id in user_ids_updated:
+                    continue
+
+                if device_list:
+                    continue
+
+                room_ids = yield self.store.get_rooms_for_user(user_id)
+                if not room_ids:
+                    continue
+
+                # We've decided we're sharing a room with this user and should
+                # probably be tracking their device lists. However, we haven't
+                # done an initial sync on the device list so we do it now.
+                try:
+                    user_devices = yield self.device_handler.device_list_updater.user_device_resync(
+                        user_id
+                    )
+                    user_devices = user_devices["devices"]
+                    for device in user_devices:
+                        results[user_id] = {device["device_id"]: device["keys"]}
+                    user_ids_updated.append(user_id)
+                except Exception as e:
+                    failures[destination] = failures.get(destination, []).append(
+                        _exception_to_failure(e)
+                    )
+
+            if len(destination_query) == len(user_ids_updated):
+                # We've updated all the users in the query and we do not need to
+                # make any further remote calls.
+                return
+
+            # Remove all the users from the query which we have updated
+            for user_id in user_ids_updated:
+                destination_query.pop(user_id)
+
             try:
                 remote_result = yield self.federation.query_client_keys(
                     destination, {"device_keys": destination_query}, timeout=timeout
@@ -132,7 +184,8 @@ class E2eKeysHandler(object):
                         results[user_id] = keys
 
             except Exception as e:
-                failures[destination] = _exception_to_failure(e)
+                failure = _exception_to_failure(e)
+                failures[destination] = failure
 
         yield make_deferred_yieldable(
             defer.gatherResults(
@@ -234,8 +287,10 @@ class E2eKeysHandler(object):
                 for user_id, keys in remote_result["one_time_keys"].items():
                     if user_id in device_keys:
                         json_result[user_id] = keys
+
             except Exception as e:
-                failures[destination] = _exception_to_failure(e)
+                failure = _exception_to_failure(e)
+                failures[destination] = failure
 
         yield make_deferred_yieldable(
             defer.gatherResults(
@@ -263,6 +318,7 @@ class E2eKeysHandler(object):
 
     @defer.inlineCallbacks
     def upload_keys_for_user(self, user_id, device_id, keys):
+
         time_now = self.clock.time_msec()
 
         # TODO: Validate the JSON to make sure it has the right keys.