From a2f6d31a63012531935566e380dfb5edd81dcbd0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 26 Jun 2019 11:56:52 +0100 Subject: Refactor get_user_ids_changed to pull less from DB When a client asks for users whose devices have changed since a token we used to pull *all* users from the database since the token, which could easily be thousands of rows for old tokens. This PR changes this to only check for changes for users the client is actually interested in. Fixes #5553 --- synapse/storage/devices.py | 51 +++++++++++++++++++++++++++++++++++++--------- 1 file changed, 41 insertions(+), 10 deletions(-) (limited to 'synapse/storage/devices.py') diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 3413a46675..3af0171f75 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -391,22 +391,53 @@ class DeviceWorkerStore(SQLBaseStore): return now_stream_id, [] - @defer.inlineCallbacks - def get_user_whose_devices_changed(self, from_key): - """Get set of users whose devices have changed since `from_key`. + def get_user_whose_devices_changed(self, from_key, user_ids): + """Get set of users whose devices have changed since `from_key` that + are in the given list of user_ids. + + Args: + user_ids (Iterable[str]) + from_key: The device lists stream token + + Returns: + Deferred[set[str]]: The set of user_ids whose devices have changed + since `from_key` """ from_key = int(from_key) - changed = self._device_list_stream_cache.get_all_entities_changed(from_key) - if changed is not None: - defer.returnValue(set(changed)) + + # Get set of users who *may* have changed. Users not in the returned + # list have definitely not changed. + to_check = list( + self._device_list_stream_cache.get_entities_changed(user_ids, from_key) + ) + + if not to_check: + return defer.succeed(set()) + + # We now check the database for all users in `to_check`, in batches. + batch_size = 100 + chunks = [ + to_check[i : i + batch_size] for i in range(0, len(to_check), batch_size) + ] sql = """ - SELECT DISTINCT user_id FROM device_lists_stream WHERE stream_id > ? + SELECT DISTINCT user_id FROM device_lists_stream + WHERE stream_id > ? + AND user_id IN (%s) """ - rows = yield self._execute( - "get_user_whose_devices_changed", None, sql, from_key + + def _get_user_whose_devices_changed_txn(txn): + changes = set() + + for chunk in chunks: + txn.execute(sql % (",".join("?" for _ in chunk),), [from_key] + chunk) + changes.update(user_id for user_id, in txn) + + return changes + + return self.runInteraction( + "get_user_whose_devices_changed", _get_user_whose_devices_changed_txn ) - defer.returnValue(set(row[0] for row in rows)) def get_all_device_list_changes_for_remotes(self, from_key, to_key): """Return a list of `(stream_id, user_id, destination)` which is the -- cgit 1.5.1 From 806a06daf2b30691c2c69e32d1ff2e104436bbc4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 26 Jun 2019 19:09:10 +0100 Subject: Rename get_users_whose_devices_changed --- synapse/handlers/device.py | 2 +- synapse/handlers/sync.py | 2 +- synapse/storage/devices.py | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) (limited to 'synapse/storage/devices.py') diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 2b6c2117f9..99e8413092 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -106,7 +106,7 @@ class DeviceWorkerHandler(BaseHandler): users_who_share_room = yield self.store.get_users_who_share_room_with_user( user_id ) - changed = yield self.store.get_user_whose_devices_changed( + changed = yield self.store.get_users_whose_devices_changed( from_token.device_list_key, users_who_share_room ) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 8249e75ecd..f70ebfdee7 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1080,7 +1080,7 @@ class SyncHandler(object): # weren't in the previous sync *or* they left and rejoined. changed = users_who_share_room & set(newly_joined_or_invited_users) - changed_users = yield self.store.get_user_whose_devices_changed( + changed_users = yield self.store.get_users_whose_devices_changed( since_token.device_list_key, users_who_share_room ) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 3af0171f75..97f6cd2754 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -391,7 +391,7 @@ class DeviceWorkerStore(SQLBaseStore): return now_stream_id, [] - def get_user_whose_devices_changed(self, from_key, user_ids): + def get_users_whose_devices_changed(self, from_key, user_ids): """Get set of users whose devices have changed since `from_key` that are in the given list of user_ids. @@ -426,7 +426,7 @@ class DeviceWorkerStore(SQLBaseStore): AND user_id IN (%s) """ - def _get_user_whose_devices_changed_txn(txn): + def _get_users_whose_devices_changed_txn(txn): changes = set() for chunk in chunks: @@ -436,7 +436,7 @@ class DeviceWorkerStore(SQLBaseStore): return changes return self.runInteraction( - "get_user_whose_devices_changed", _get_user_whose_devices_changed_txn + "get_users_whose_devices_changed", _get_users_whose_devices_changed_txn ) def get_all_device_list_changes_for_remotes(self, from_key, to_key): -- cgit 1.5.1 From f335e77d5330d13cbaf61b7b903980bae60761d7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 26 Jun 2019 19:10:38 +0100 Subject: Use batch_iter and correct docstring --- synapse/storage/devices.py | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) (limited to 'synapse/storage/devices.py') diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 97f6cd2754..44324bf400 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -24,6 +24,7 @@ from synapse.api.errors import StoreError from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage._base import Cache, SQLBaseStore, db_to_json from synapse.storage.background_updates import BackgroundUpdateStore +from synapse.util import batch_iter from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList logger = logging.getLogger(__name__) @@ -396,8 +397,8 @@ class DeviceWorkerStore(SQLBaseStore): are in the given list of user_ids. Args: + from_key (str): The device lists stream token user_ids (Iterable[str]) - from_key: The device lists stream token Returns: Deferred[set[str]]: The set of user_ids whose devices have changed @@ -414,23 +415,19 @@ class DeviceWorkerStore(SQLBaseStore): if not to_check: return defer.succeed(set()) - # We now check the database for all users in `to_check`, in batches. - batch_size = 100 - chunks = [ - to_check[i : i + batch_size] for i in range(0, len(to_check), batch_size) - ] - - sql = """ - SELECT DISTINCT user_id FROM device_lists_stream - WHERE stream_id > ? - AND user_id IN (%s) - """ - def _get_users_whose_devices_changed_txn(txn): changes = set() - for chunk in chunks: - txn.execute(sql % (",".join("?" for _ in chunk),), [from_key] + chunk) + sql = """ + SELECT DISTINCT user_id FROM device_lists_stream + WHERE stream_id > ? + AND user_id IN (%s) + """ + + for chunk in batch_iter(to_check, 100): + txn.execute( + sql % (",".join("?" for _ in chunk),), [from_key] + list(chunk) + ) changes.update(user_id for user_id, in txn) return changes -- cgit 1.5.1 From 729f5a4fb6654e6c9beb68a3edbb8dbbae076e3f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 27 Jun 2019 16:06:23 +0100 Subject: Review comments --- synapse/handlers/sync.py | 8 ++++---- synapse/storage/devices.py | 4 +--- 2 files changed, 5 insertions(+), 7 deletions(-) (limited to 'synapse/storage/devices.py') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 4f737d0a12..a3f550554f 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1089,9 +1089,9 @@ class SyncHandler(object): # for anymore. # # For the first step we check: - # 1. if any users we share a room with have updated their devices, + # a. if any users we share a room with have updated their devices, # and - # 2. we also check if we've joined any new rooms, or if a user has + # b. we also check if we've joined any new rooms, or if a user has # joined a room we're in. # # For the second step we just find any users we no longer share a @@ -1102,12 +1102,12 @@ class SyncHandler(object): user_id ) - # Step 1, check for changes in devices of users we share a room with + # Step 1a, check for changes in devices of users we share a room with users_that_have_changed = yield self.store.get_users_whose_devices_changed( since_token.device_list_key, users_who_share_room ) - # Step 2, check for newly joined rooms + # Step 1b, check for newly joined rooms for room_id in newly_joined_rooms: joined_users = yield self.state.get_current_users_in_room(room_id) newly_joined_or_invited_users.update(joined_users) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 44324bf400..d2b113a4e7 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -425,9 +425,7 @@ class DeviceWorkerStore(SQLBaseStore): """ for chunk in batch_iter(to_check, 100): - txn.execute( - sql % (",".join("?" for _ in chunk),), [from_key] + list(chunk) - ) + txn.execute(sql % (",".join("?" for _ in chunk),), (from_key,) + chunk) changes.update(user_id for user_id, in txn) return changes -- cgit 1.5.1