summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/11974.misc1
-rw-r--r--synapse/handlers/sync.py68
-rw-r--r--synapse/storage/databases/main/devices.py10
-rw-r--r--synapse/storage/databases/main/roommember.py62
4 files changed, 126 insertions, 15 deletions
diff --git a/changelog.d/11974.misc b/changelog.d/11974.misc
new file mode 100644
index 0000000000..1debad2361
--- /dev/null
+++ b/changelog.d/11974.misc
@@ -0,0 +1 @@
+Optimise calculating device_list changes in `/sync`.
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index aa9a76f8a9..e6050cbce6 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1289,23 +1289,54 @@ class SyncHandler:
             # room with by looking at all users that have left a room plus users
             # that were in a room we've left.
 
-            users_who_share_room = await self.store.get_users_who_share_room_with_user(
-                user_id
-            )
-
-            # Always tell the user about their own devices. We check as the user
-            # ID is almost certainly already included (unless they're not in any
-            # rooms) and taking a copy of the set is relatively expensive.
-            if user_id not in users_who_share_room:
-                users_who_share_room = set(users_who_share_room)
-                users_who_share_room.add(user_id)
+            users_that_have_changed = set()
 
-            tracked_users = users_who_share_room
+            joined_rooms = sync_result_builder.joined_room_ids
 
-            # Step 1a, check for changes in devices of users we share a room with
-            users_that_have_changed = await self.store.get_users_whose_devices_changed(
-                since_token.device_list_key, tracked_users
+            # Step 1a, check for changes in devices of users we share a room
+            # with
+            #
+            # We do this in two different ways depending on what we have cached.
+            # If we already have a list of all the user that have changed since
+            # the last sync then it's likely more efficient to compare the rooms
+            # they're in with the rooms the syncing user is in.
+            #
+            # If we don't have that info cached then we get all the users that
+            # share a room with our user and check if those users have changed.
+            changed_users = self.store.get_cached_device_list_changes(
+                since_token.device_list_key
             )
+            if changed_users is not None:
+                result = await self.store.get_rooms_for_users_with_stream_ordering(
+                    changed_users
+                )
+
+                for changed_user_id, entries in result.items():
+                    # Check if the changed user shares any rooms with the user,
+                    # or if the changed user is the syncing user (as we always
+                    # want to include device list updates of their own devices).
+                    if user_id == changed_user_id or any(
+                        e.room_id in joined_rooms for e in entries
+                    ):
+                        users_that_have_changed.add(changed_user_id)
+            else:
+                users_who_share_room = (
+                    await self.store.get_users_who_share_room_with_user(user_id)
+                )
+
+                # Always tell the user about their own devices. We check as the user
+                # ID is almost certainly already included (unless they're not in any
+                # rooms) and taking a copy of the set is relatively expensive.
+                if user_id not in users_who_share_room:
+                    users_who_share_room = set(users_who_share_room)
+                    users_who_share_room.add(user_id)
+
+                tracked_users = users_who_share_room
+                users_that_have_changed = (
+                    await self.store.get_users_whose_devices_changed(
+                        since_token.device_list_key, tracked_users
+                    )
+                )
 
             # Step 1b, check for newly joined rooms
             for room_id in newly_joined_rooms:
@@ -1329,7 +1360,14 @@ class SyncHandler:
                 newly_left_users.update(left_users)
 
             # Remove any users that we still share a room with.
-            newly_left_users -= users_who_share_room
+            left_users_rooms = (
+                await self.store.get_rooms_for_users_with_stream_ordering(
+                    newly_left_users
+                )
+            )
+            for user_id, entries in left_users_rooms.items():
+                if any(e.room_id in joined_rooms for e in entries):
+                    newly_left_users.discard(user_id)
 
             return DeviceLists(changed=users_that_have_changed, left=newly_left_users)
         else:
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 8d845fe951..3b3a089b76 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -670,6 +670,16 @@ class DeviceWorkerStore(SQLBaseStore):
             device["device_id"]: db_to_json(device["content"]) for device in devices
         }
 
+    def get_cached_device_list_changes(
+        self,
+        from_key: int,
+    ) -> Optional[Set[str]]:
+        """Get set of users whose devices have changed since `from_key`, or None
+        if that information is not in our cache.
+        """
+
+        return self._device_list_stream_cache.get_all_entities_changed(from_key)
+
     async def get_users_whose_devices_changed(
         self, from_key: int, user_ids: Iterable[str]
     ) -> Set[str]:
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 4489732fda..e48ec5f495 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -504,6 +504,68 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             for room_id, instance, stream_id in txn
         )
 
+    @cachedList(
+        cached_method_name="get_rooms_for_user_with_stream_ordering",
+        list_name="user_ids",
+    )
+    async def get_rooms_for_users_with_stream_ordering(
+        self, user_ids: Collection[str]
+    ) -> Dict[str, FrozenSet[GetRoomsForUserWithStreamOrdering]]:
+        """A batched version of `get_rooms_for_user_with_stream_ordering`.
+
+        Returns:
+            Map from user_id to set of rooms that is currently in.
+        """
+        return await self.db_pool.runInteraction(
+            "get_rooms_for_users_with_stream_ordering",
+            self._get_rooms_for_users_with_stream_ordering_txn,
+            user_ids,
+        )
+
+    def _get_rooms_for_users_with_stream_ordering_txn(
+        self, txn, user_ids: Collection[str]
+    ) -> Dict[str, FrozenSet[GetRoomsForUserWithStreamOrdering]]:
+
+        clause, args = make_in_list_sql_clause(
+            self.database_engine,
+            "c.state_key",
+            user_ids,
+        )
+
+        if self._current_state_events_membership_up_to_date:
+            sql = f"""
+                SELECT c.state_key, room_id, e.instance_name, e.stream_ordering
+                FROM current_state_events AS c
+                INNER JOIN events AS e USING (room_id, event_id)
+                WHERE
+                    c.type = 'm.room.member'
+                    AND c.membership = ?
+                    AND {clause}
+            """
+        else:
+            sql = f"""
+                SELECT c.state_key, room_id, e.instance_name, e.stream_ordering
+                FROM current_state_events AS c
+                INNER JOIN room_memberships AS m USING (room_id, event_id)
+                INNER JOIN events AS e USING (room_id, event_id)
+                WHERE
+                    c.type = 'm.room.member'
+                    AND m.membership = ?
+                    AND {clause}
+            """
+
+        txn.execute(sql, [Membership.JOIN] + args)
+
+        result = {user_id: set() for user_id in user_ids}
+        for user_id, room_id, instance, stream_id in txn:
+            result[user_id].add(
+                GetRoomsForUserWithStreamOrdering(
+                    room_id, PersistedEventPosition(instance, stream_id)
+                )
+            )
+
+        return {user_id: frozenset(v) for user_id, v in result.items()}
+
     async def get_users_server_still_shares_room_with(
         self, user_ids: Collection[str]
     ) -> Set[str]: