summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2024-05-15 15:11:50 +0100
committerErik Johnston <erik@matrix.org>2024-05-15 15:11:50 +0100
commit2b8e932749bb29227aadc6bf6b6eb1bb30b02740 (patch)
tree77d6a6489fa4dd7b6750ee5d762923a638faa3fc
parentMerge branch 'master' into develop (diff)
downloadsynapse-2b8e932749bb29227aadc6bf6b6eb1bb30b02740.tar.xz
Cap the top stream ID when fetching changed devices
-rw-r--r--synapse/handlers/device.py22
-rw-r--r--synapse/handlers/sync.py1
-rw-r--r--synapse/storage/databases/main/devices.py5
3 files changed, 22 insertions, 6 deletions
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 67953a3ed9..55842e7c7b 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -159,20 +159,32 @@ class DeviceWorkerHandler:
 
     @cancellable
     async def get_device_changes_in_shared_rooms(
-        self, user_id: str, room_ids: StrCollection, from_token: StreamToken
+        self,
+        user_id: str,
+        room_ids: StrCollection,
+        from_token: StreamToken,
+        now_token: Optional[StreamToken] = None,
     ) -> Set[str]:
         """Get the set of users whose devices have changed who share a room with
         the given user.
         """
+        now_device_lists_key = self.store.get_device_stream_token()
+        if now_token:
+            now_device_lists_key = now_token.device_list_key
+
         changed_users = await self.store.get_device_list_changes_in_rooms(
-            room_ids, from_token.device_list_key
+            room_ids,
+            from_token.device_list_key,
+            now_device_lists_key,
         )
 
         if changed_users is not None:
             # We also check if the given user has changed their device. If
             # they're in no rooms then the above query won't include them.
             changed = await self.store.get_users_whose_devices_changed(
-                from_token.device_list_key, [user_id]
+                from_token.device_list_key,
+                [user_id],
+                to_key=now_device_lists_key,
             )
             changed_users.update(changed)
             return changed_users
@@ -190,7 +202,9 @@ class DeviceWorkerHandler:
         tracked_users.add(user_id)
 
         changed = await self.store.get_users_whose_devices_changed(
-            from_token.device_list_key, tracked_users
+            from_token.device_list_key,
+            tracked_users,
+            to_key=now_device_lists_key,
         )
 
         return changed
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 0bef58351c..c833f1ef96 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1808,6 +1808,7 @@ class SyncHandler:
                 user_id,
                 sync_result_builder.joined_room_ids,
                 from_token=since_token,
+                now_token=sync_result_builder.now_token,
             )
         )
 
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index d98f0593bc..6bfbb18f31 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -1444,7 +1444,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
 
     @cancellable
     async def get_device_list_changes_in_rooms(
-        self, room_ids: Collection[str], from_id: int
+        self, room_ids: Collection[str], from_id: int, to_id: int
     ) -> Optional[Set[str]]:
         """Return the set of users whose devices have changed in the given rooms
         since the given stream ID.
@@ -1462,7 +1462,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
 
         sql = """
             SELECT DISTINCT user_id FROM device_lists_changes_in_room
-            WHERE {clause} AND stream_id > ?
+            WHERE {clause} AND stream_id > ? AND stream_id <= ?
         """
 
         def _get_device_list_changes_in_rooms_txn(
@@ -1479,6 +1479,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
                 self.database_engine, "room_id", chunk
             )
             args.append(from_id)
+            args.append(to_id)
 
             changes |= await self.db_pool.runInteraction(
                 "get_device_list_changes_in_rooms",