diff options
author | Erik Johnston <erik@matrix.org> | 2024-05-15 15:11:50 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2024-05-15 15:11:50 +0100 |
commit | 2b8e932749bb29227aadc6bf6b6eb1bb30b02740 (patch) | |
tree | 77d6a6489fa4dd7b6750ee5d762923a638faa3fc | |
parent | Merge branch 'master' into develop (diff) | |
download | synapse-2b8e932749bb29227aadc6bf6b6eb1bb30b02740.tar.xz |
Cap the top stream ID when fetching changed devices
-rw-r--r-- | synapse/handlers/device.py | 22 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 1 | ||||
-rw-r--r-- | synapse/storage/databases/main/devices.py | 5 |
3 files changed, 22 insertions, 6 deletions
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 67953a3ed9..55842e7c7b 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -159,20 +159,32 @@ class DeviceWorkerHandler: @cancellable async def get_device_changes_in_shared_rooms( - self, user_id: str, room_ids: StrCollection, from_token: StreamToken + self, + user_id: str, + room_ids: StrCollection, + from_token: StreamToken, + now_token: Optional[StreamToken] = None, ) -> Set[str]: """Get the set of users whose devices have changed who share a room with the given user. """ + now_device_lists_key = self.store.get_device_stream_token() + if now_token: + now_device_lists_key = now_token.device_list_key + changed_users = await self.store.get_device_list_changes_in_rooms( - room_ids, from_token.device_list_key + room_ids, + from_token.device_list_key, + now_device_lists_key, ) if changed_users is not None: # We also check if the given user has changed their device. If # they're in no rooms then the above query won't include them. changed = await self.store.get_users_whose_devices_changed( - from_token.device_list_key, [user_id] + from_token.device_list_key, + [user_id], + to_key=now_device_lists_key, ) changed_users.update(changed) return changed_users @@ -190,7 +202,9 @@ class DeviceWorkerHandler: tracked_users.add(user_id) changed = await self.store.get_users_whose_devices_changed( - from_token.device_list_key, tracked_users + from_token.device_list_key, + tracked_users, + to_key=now_device_lists_key, ) return changed diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 0bef58351c..c833f1ef96 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1808,6 +1808,7 @@ class SyncHandler: user_id, sync_result_builder.joined_room_ids, from_token=since_token, + now_token=sync_result_builder.now_token, ) ) diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index d98f0593bc..6bfbb18f31 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -1444,7 +1444,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): @cancellable async def get_device_list_changes_in_rooms( - self, room_ids: Collection[str], from_id: int + self, room_ids: Collection[str], from_id: int, to_id: int ) -> Optional[Set[str]]: """Return the set of users whose devices have changed in the given rooms since the given stream ID. @@ -1462,7 +1462,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): sql = """ SELECT DISTINCT user_id FROM device_lists_changes_in_room - WHERE {clause} AND stream_id > ? + WHERE {clause} AND stream_id > ? AND stream_id <= ? """ def _get_device_list_changes_in_rooms_txn( @@ -1479,6 +1479,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): self.database_engine, "room_id", chunk ) args.append(from_id) + args.append(to_id) changes |= await self.db_pool.runInteraction( "get_device_list_changes_in_rooms", |