summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/handlers/device.py12
-rw-r--r--synapse/handlers/sync.py22
-rw-r--r--synapse/storage/devices.py51
3 files changed, 57 insertions, 28 deletions
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index f59d0479b5..2b6c2117f9 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -101,9 +101,13 @@ class DeviceWorkerHandler(BaseHandler):
 
         room_ids = yield self.store.get_rooms_for_user(user_id)
 
-        # First we check if any devices have changed
+        # First we check if any devices have changed for users that we share
+        # rooms with.
+        users_who_share_room = yield self.store.get_users_who_share_room_with_user(
+            user_id
+        )
         changed = yield self.store.get_user_whose_devices_changed(
-            from_token.device_list_key
+            from_token.device_list_key, users_who_share_room
         )
 
         # Then work out if any users have since joined
@@ -188,10 +192,6 @@ class DeviceWorkerHandler(BaseHandler):
                         break
 
         if possibly_changed or possibly_left:
-            users_who_share_room = yield self.store.get_users_who_share_room_with_user(
-                user_id
-            )
-
             # Take the intersection of the users whose devices may have changed
             # and those that actually still share a room with the user
             possibly_joined = possibly_changed & users_who_share_room
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index c5188a1f8e..8249e75ecd 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1062,10 +1062,6 @@ class SyncHandler(object):
         since_token = sync_result_builder.since_token
 
         if since_token and since_token.device_list_key:
-            changed = yield self.store.get_user_whose_devices_changed(
-                since_token.device_list_key
-            )
-
             # TODO: Be more clever than this, i.e. remove users who we already
             # share a room with?
             for room_id in newly_joined_rooms:
@@ -1076,21 +1072,23 @@ class SyncHandler(object):
                 left_users = yield self.state.get_current_users_in_room(room_id)
                 newly_left_users.update(left_users)
 
+            users_who_share_room = yield self.store.get_users_who_share_room_with_user(
+                user_id
+            )
+
             # TODO: Check that these users are actually new, i.e. either they
             # weren't in the previous sync *or* they left and rejoined.
-            changed.update(newly_joined_or_invited_users)
-
-            if not changed and not newly_left_users:
-                defer.returnValue(DeviceLists(changed=[], left=newly_left_users))
+            changed = users_who_share_room & set(newly_joined_or_invited_users)
 
-            users_who_share_room = yield self.store.get_users_who_share_room_with_user(
-                user_id
+            changed_users = yield self.store.get_user_whose_devices_changed(
+                since_token.device_list_key, users_who_share_room
             )
 
+            changed.update(changed_users)
+
             defer.returnValue(
                 DeviceLists(
-                    changed=users_who_share_room & changed,
-                    left=set(newly_left_users) - users_who_share_room,
+                    changed=changed, left=set(newly_left_users) - users_who_share_room
                 )
             )
         else:
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 3413a46675..3af0171f75 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -391,22 +391,53 @@ class DeviceWorkerStore(SQLBaseStore):
 
         return now_stream_id, []
 
-    @defer.inlineCallbacks
-    def get_user_whose_devices_changed(self, from_key):
-        """Get set of users whose devices have changed since `from_key`.
+    def get_user_whose_devices_changed(self, from_key, user_ids):
+        """Get set of users whose devices have changed since `from_key` that
+        are in the given list of user_ids.
+
+        Args:
+            user_ids (Iterable[str])
+            from_key: The device lists stream token
+
+        Returns:
+            Deferred[set[str]]: The set of user_ids whose devices have changed
+            since `from_key`
         """
         from_key = int(from_key)
-        changed = self._device_list_stream_cache.get_all_entities_changed(from_key)
-        if changed is not None:
-            defer.returnValue(set(changed))
+
+        # Get set of users who *may* have changed. Users not in the returned
+        # list have definitely not changed.
+        to_check = list(
+            self._device_list_stream_cache.get_entities_changed(user_ids, from_key)
+        )
+
+        if not to_check:
+            return defer.succeed(set())
+
+        # We now check the database for all users in `to_check`, in batches.
+        batch_size = 100
+        chunks = [
+            to_check[i : i + batch_size] for i in range(0, len(to_check), batch_size)
+        ]
 
         sql = """
-            SELECT DISTINCT user_id FROM device_lists_stream WHERE stream_id > ?
+            SELECT DISTINCT user_id FROM device_lists_stream
+            WHERE stream_id > ?
+            AND user_id IN (%s)
         """
-        rows = yield self._execute(
-            "get_user_whose_devices_changed", None, sql, from_key
+
+        def _get_user_whose_devices_changed_txn(txn):
+            changes = set()
+
+            for chunk in chunks:
+                txn.execute(sql % (",".join("?" for _ in chunk),), [from_key] + chunk)
+                changes.update(user_id for user_id, in txn)
+
+            return changes
+
+        return self.runInteraction(
+            "get_user_whose_devices_changed", _get_user_whose_devices_changed_txn
         )
-        defer.returnValue(set(row[0] for row in rows))
 
     def get_all_device_list_changes_for_remotes(self, from_key, to_key):
         """Return a list of `(stream_id, user_id, destination)` which is the