summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/device.py14
-rw-r--r--synapse/handlers/sync.py70
-rw-r--r--synapse/storage/devices.py50
3 files changed, 97 insertions, 37 deletions
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index f59d0479b5..99e8413092 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -101,9 +101,13 @@ class DeviceWorkerHandler(BaseHandler):
 
         room_ids = yield self.store.get_rooms_for_user(user_id)
 
-        # First we check if any devices have changed
-        changed = yield self.store.get_user_whose_devices_changed(
-            from_token.device_list_key
+        # First we check if any devices have changed for users that we share
+        # rooms with.
+        users_who_share_room = yield self.store.get_users_who_share_room_with_user(
+            user_id
+        )
+        changed = yield self.store.get_users_whose_devices_changed(
+            from_token.device_list_key, users_who_share_room
         )
 
         # Then work out if any users have since joined
@@ -188,10 +192,6 @@ class DeviceWorkerHandler(BaseHandler):
                         break
 
         if possibly_changed or possibly_left:
-            users_who_share_room = yield self.store.get_users_who_share_room_with_user(
-                user_id
-            )
-
             # Take the intersection of the users whose devices may have changed
             # and those that actually still share a room with the user
             possibly_joined = possibly_changed & users_who_share_room
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index c5188a1f8e..a3f550554f 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1058,40 +1058,74 @@ class SyncHandler(object):
         newly_left_rooms,
         newly_left_users,
     ):
+        """Generate the DeviceLists section of sync
+
+        Args:
+            sync_result_builder (SyncResultBuilder)
+            newly_joined_rooms (set[str]): Set of rooms user has joined since
+                previous sync
+            newly_joined_or_invited_users (set[str]): Set of users that have
+                joined or been invited to a room since previous sync.
+            newly_left_rooms (set[str]): Set of rooms user has left since
+                previous sync
+            newly_left_users (set[str]): Set of users that have left a room
+                we're in since previous sync
+
+        Returns:
+            Deferred[DeviceLists]
+        """
+
         user_id = sync_result_builder.sync_config.user.to_string()
         since_token = sync_result_builder.since_token
 
+        # We're going to mutate these fields, so lets copy them rather than
+        # assume they won't get used later.
+        newly_joined_or_invited_users = set(newly_joined_or_invited_users)
+        newly_left_users = set(newly_left_users)
+
         if since_token and since_token.device_list_key:
-            changed = yield self.store.get_user_whose_devices_changed(
-                since_token.device_list_key
+            # We want to figure out what user IDs the client should refetch
+            # device keys for, and which users we aren't going to track changes
+            # for anymore.
+            #
+            # For the first step we check:
+            #   a. if any users we share a room with have updated their devices,
+            #      and
+            #   b. we also check if we've joined any new rooms, or if a user has
+            #      joined a room we're in.
+            #
+            # For the second step we just find any users we no longer share a
+            # room with by looking at all users that have left a room plus users
+            # that were in a room we've left.
+
+            users_who_share_room = yield self.store.get_users_who_share_room_with_user(
+                user_id
+            )
+
+            # Step 1a, check for changes in devices of users we share a room with
+            users_that_have_changed = yield self.store.get_users_whose_devices_changed(
+                since_token.device_list_key, users_who_share_room
             )
 
-            # TODO: Be more clever than this, i.e. remove users who we already
-            # share a room with?
+            # Step 1b, check for newly joined rooms
             for room_id in newly_joined_rooms:
                 joined_users = yield self.state.get_current_users_in_room(room_id)
                 newly_joined_or_invited_users.update(joined_users)
 
-            for room_id in newly_left_rooms:
-                left_users = yield self.state.get_current_users_in_room(room_id)
-                newly_left_users.update(left_users)
-
             # TODO: Check that these users are actually new, i.e. either they
             # weren't in the previous sync *or* they left and rejoined.
-            changed.update(newly_joined_or_invited_users)
+            users_that_have_changed.update(newly_joined_or_invited_users)
 
-            if not changed and not newly_left_users:
-                defer.returnValue(DeviceLists(changed=[], left=newly_left_users))
+            # Now find users that we no longer track
+            for room_id in newly_left_rooms:
+                left_users = yield self.state.get_current_users_in_room(room_id)
+                newly_left_users.update(left_users)
 
-            users_who_share_room = yield self.store.get_users_who_share_room_with_user(
-                user_id
-            )
+            # Remove any users that we still share a room with.
+            newly_left_users -= users_who_share_room
 
             defer.returnValue(
-                DeviceLists(
-                    changed=users_who_share_room & changed,
-                    left=set(newly_left_users) - users_who_share_room,
-                )
+                DeviceLists(changed=users_that_have_changed, left=newly_left_users)
             )
         else:
             defer.returnValue(DeviceLists(changed=[], left=[]))
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 3413a46675..d2b113a4e7 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -24,6 +24,7 @@ from synapse.api.errors import StoreError
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage._base import Cache, SQLBaseStore, db_to_json
 from synapse.storage.background_updates import BackgroundUpdateStore
+from synapse.util import batch_iter
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
 
 logger = logging.getLogger(__name__)
@@ -391,22 +392,47 @@ class DeviceWorkerStore(SQLBaseStore):
 
         return now_stream_id, []
 
-    @defer.inlineCallbacks
-    def get_user_whose_devices_changed(self, from_key):
-        """Get set of users whose devices have changed since `from_key`.
+    def get_users_whose_devices_changed(self, from_key, user_ids):
+        """Get set of users whose devices have changed since `from_key` that
+        are in the given list of user_ids.
+
+        Args:
+            from_key (str): The device lists stream token
+            user_ids (Iterable[str])
+
+        Returns:
+            Deferred[set[str]]: The set of user_ids whose devices have changed
+            since `from_key`
         """
         from_key = int(from_key)
-        changed = self._device_list_stream_cache.get_all_entities_changed(from_key)
-        if changed is not None:
-            defer.returnValue(set(changed))
 
-        sql = """
-            SELECT DISTINCT user_id FROM device_lists_stream WHERE stream_id > ?
-        """
-        rows = yield self._execute(
-            "get_user_whose_devices_changed", None, sql, from_key
+        # Get set of users who *may* have changed. Users not in the returned
+        # list have definitely not changed.
+        to_check = list(
+            self._device_list_stream_cache.get_entities_changed(user_ids, from_key)
+        )
+
+        if not to_check:
+            return defer.succeed(set())
+
+        def _get_users_whose_devices_changed_txn(txn):
+            changes = set()
+
+            sql = """
+                SELECT DISTINCT user_id FROM device_lists_stream
+                WHERE stream_id > ?
+                AND user_id IN (%s)
+            """
+
+            for chunk in batch_iter(to_check, 100):
+                txn.execute(sql % (",".join("?" for _ in chunk),), (from_key,) + chunk)
+                changes.update(user_id for user_id, in txn)
+
+            return changes
+
+        return self.runInteraction(
+            "get_users_whose_devices_changed", _get_users_whose_devices_changed_txn
         )
-        defer.returnValue(set(row[0] for row in rows))
 
     def get_all_device_list_changes_for_remotes(self, from_key, to_key):
         """Return a list of `(stream_id, user_id, destination)` which is the