diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index f59d0479b5..2b6c2117f9 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -101,9 +101,13 @@ class DeviceWorkerHandler(BaseHandler):
room_ids = yield self.store.get_rooms_for_user(user_id)
- # First we check if any devices have changed
+ # First we check if any devices have changed for users that we share
+ # rooms with.
+ users_who_share_room = yield self.store.get_users_who_share_room_with_user(
+ user_id
+ )
changed = yield self.store.get_user_whose_devices_changed(
- from_token.device_list_key
+ from_token.device_list_key, users_who_share_room
)
# Then work out if any users have since joined
@@ -188,10 +192,6 @@ class DeviceWorkerHandler(BaseHandler):
break
if possibly_changed or possibly_left:
- users_who_share_room = yield self.store.get_users_who_share_room_with_user(
- user_id
- )
-
# Take the intersection of the users whose devices may have changed
# and those that actually still share a room with the user
possibly_joined = possibly_changed & users_who_share_room
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index c5188a1f8e..8249e75ecd 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1062,10 +1062,6 @@ class SyncHandler(object):
since_token = sync_result_builder.since_token
if since_token and since_token.device_list_key:
- changed = yield self.store.get_user_whose_devices_changed(
- since_token.device_list_key
- )
-
# TODO: Be more clever than this, i.e. remove users who we already
# share a room with?
for room_id in newly_joined_rooms:
@@ -1076,21 +1072,23 @@ class SyncHandler(object):
left_users = yield self.state.get_current_users_in_room(room_id)
newly_left_users.update(left_users)
+ users_who_share_room = yield self.store.get_users_who_share_room_with_user(
+ user_id
+ )
+
# TODO: Check that these users are actually new, i.e. either they
# weren't in the previous sync *or* they left and rejoined.
- changed.update(newly_joined_or_invited_users)
-
- if not changed and not newly_left_users:
- defer.returnValue(DeviceLists(changed=[], left=newly_left_users))
+ changed = users_who_share_room & set(newly_joined_or_invited_users)
- users_who_share_room = yield self.store.get_users_who_share_room_with_user(
- user_id
+ changed_users = yield self.store.get_user_whose_devices_changed(
+ since_token.device_list_key, users_who_share_room
)
+ changed.update(changed_users)
+
defer.returnValue(
DeviceLists(
- changed=users_who_share_room & changed,
- left=set(newly_left_users) - users_who_share_room,
+ changed=changed, left=set(newly_left_users) - users_who_share_room
)
)
else:
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 3413a46675..3af0171f75 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -391,22 +391,53 @@ class DeviceWorkerStore(SQLBaseStore):
return now_stream_id, []
- @defer.inlineCallbacks
- def get_user_whose_devices_changed(self, from_key):
- """Get set of users whose devices have changed since `from_key`.
+ def get_user_whose_devices_changed(self, from_key, user_ids):
+ """Get set of users whose devices have changed since `from_key` that
+ are in the given list of user_ids.
+
+ Args:
+ user_ids (Iterable[str])
+ from_key: The device lists stream token
+
+ Returns:
+ Deferred[set[str]]: The set of user_ids whose devices have changed
+ since `from_key`
"""
from_key = int(from_key)
- changed = self._device_list_stream_cache.get_all_entities_changed(from_key)
- if changed is not None:
- defer.returnValue(set(changed))
+
+ # Get set of users who *may* have changed. Users not in the returned
+ # list have definitely not changed.
+ to_check = list(
+ self._device_list_stream_cache.get_entities_changed(user_ids, from_key)
+ )
+
+ if not to_check:
+ return defer.succeed(set())
+
+ # We now check the database for all users in `to_check`, in batches.
+ batch_size = 100
+ chunks = [
+ to_check[i : i + batch_size] for i in range(0, len(to_check), batch_size)
+ ]
sql = """
- SELECT DISTINCT user_id FROM device_lists_stream WHERE stream_id > ?
+ SELECT DISTINCT user_id FROM device_lists_stream
+ WHERE stream_id > ?
+ AND user_id IN (%s)
"""
- rows = yield self._execute(
- "get_user_whose_devices_changed", None, sql, from_key
+
+ def _get_user_whose_devices_changed_txn(txn):
+ changes = set()
+
+ for chunk in chunks:
+ txn.execute(sql % (",".join("?" for _ in chunk),), [from_key] + chunk)
+ changes.update(user_id for user_id, in txn)
+
+ return changes
+
+ return self.runInteraction(
+ "get_user_whose_devices_changed", _get_user_whose_devices_changed_txn
)
- defer.returnValue(set(row[0] for row in rows))
def get_all_device_list_changes_for_remotes(self, from_key, to_key):
"""Return a list of `(stream_id, user_id, destination)` which is the
|