diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 67953a3ed9..55842e7c7b 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -159,20 +159,32 @@ class DeviceWorkerHandler:
@cancellable
async def get_device_changes_in_shared_rooms(
- self, user_id: str, room_ids: StrCollection, from_token: StreamToken
+ self,
+ user_id: str,
+ room_ids: StrCollection,
+ from_token: StreamToken,
+ now_token: Optional[StreamToken] = None,
) -> Set[str]:
"""Get the set of users whose devices have changed who share a room with
the given user.
"""
+ now_device_lists_key = self.store.get_device_stream_token()
+ if now_token:
+ now_device_lists_key = now_token.device_list_key
+
changed_users = await self.store.get_device_list_changes_in_rooms(
- room_ids, from_token.device_list_key
+ room_ids,
+ from_token.device_list_key,
+ now_device_lists_key,
)
if changed_users is not None:
# We also check if the given user has changed their device. If
# they're in no rooms then the above query won't include them.
changed = await self.store.get_users_whose_devices_changed(
- from_token.device_list_key, [user_id]
+ from_token.device_list_key,
+ [user_id],
+ to_key=now_device_lists_key,
)
changed_users.update(changed)
return changed_users
@@ -190,7 +202,9 @@ class DeviceWorkerHandler:
tracked_users.add(user_id)
changed = await self.store.get_users_whose_devices_changed(
- from_token.device_list_key, tracked_users
+ from_token.device_list_key,
+ tracked_users,
+ to_key=now_device_lists_key,
)
return changed
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index fdaaa9006c..b7917a99d6 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1891,6 +1891,7 @@ class SyncHandler:
user_id,
sync_result_builder.joined_room_ids,
from_token=since_token,
+ now_token=sync_result_builder.now_token,
)
)
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index d98f0593bc..6bfbb18f31 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -1444,7 +1444,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
@cancellable
async def get_device_list_changes_in_rooms(
- self, room_ids: Collection[str], from_id: int
+ self, room_ids: Collection[str], from_id: int, to_id: int
) -> Optional[Set[str]]:
"""Return the set of users whose devices have changed in the given rooms
since the given stream ID.
@@ -1462,7 +1462,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
sql = """
SELECT DISTINCT user_id FROM device_lists_changes_in_room
- WHERE {clause} AND stream_id > ?
+ WHERE {clause} AND stream_id > ? AND stream_id <= ?
"""
def _get_device_list_changes_in_rooms_txn(
@@ -1479,6 +1479,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
self.database_engine, "room_id", chunk
)
args.append(from_id)
+ args.append(to_id)
changes |= await self.db_pool.runInteraction(
"get_device_list_changes_in_rooms",
|