From a2f6d31a63012531935566e380dfb5edd81dcbd0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 26 Jun 2019 11:56:52 +0100 Subject: Refactor get_user_ids_changed to pull less from DB When a client asks for users whose devices have changed since a token we used to pull *all* users from the database since the token, which could easily be thousands of rows for old tokens. This PR changes this to only check for changes for users the client is actually interested in. Fixes #5553 --- synapse/handlers/device.py | 12 +++++------ synapse/handlers/sync.py | 22 +++++++++----------- synapse/storage/devices.py | 51 +++++++++++++++++++++++++++++++++++++--------- 3 files changed, 57 insertions(+), 28 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index f59d0479b5..2b6c2117f9 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -101,9 +101,13 @@ class DeviceWorkerHandler(BaseHandler): room_ids = yield self.store.get_rooms_for_user(user_id) - # First we check if any devices have changed + # First we check if any devices have changed for users that we share + # rooms with. + users_who_share_room = yield self.store.get_users_who_share_room_with_user( + user_id + ) changed = yield self.store.get_user_whose_devices_changed( - from_token.device_list_key + from_token.device_list_key, users_who_share_room ) # Then work out if any users have since joined @@ -188,10 +192,6 @@ class DeviceWorkerHandler(BaseHandler): break if possibly_changed or possibly_left: - users_who_share_room = yield self.store.get_users_who_share_room_with_user( - user_id - ) - # Take the intersection of the users whose devices may have changed # and those that actually still share a room with the user possibly_joined = possibly_changed & users_who_share_room diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index c5188a1f8e..8249e75ecd 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1062,10 +1062,6 @@ class SyncHandler(object): since_token = sync_result_builder.since_token if since_token and since_token.device_list_key: - changed = yield self.store.get_user_whose_devices_changed( - since_token.device_list_key - ) - # TODO: Be more clever than this, i.e. remove users who we already # share a room with? for room_id in newly_joined_rooms: @@ -1076,21 +1072,23 @@ class SyncHandler(object): left_users = yield self.state.get_current_users_in_room(room_id) newly_left_users.update(left_users) + users_who_share_room = yield self.store.get_users_who_share_room_with_user( + user_id + ) + # TODO: Check that these users are actually new, i.e. either they # weren't in the previous sync *or* they left and rejoined. - changed.update(newly_joined_or_invited_users) - - if not changed and not newly_left_users: - defer.returnValue(DeviceLists(changed=[], left=newly_left_users)) + changed = users_who_share_room & set(newly_joined_or_invited_users) - users_who_share_room = yield self.store.get_users_who_share_room_with_user( - user_id + changed_users = yield self.store.get_user_whose_devices_changed( + since_token.device_list_key, users_who_share_room ) + changed.update(changed_users) + defer.returnValue( DeviceLists( - changed=users_who_share_room & changed, - left=set(newly_left_users) - users_who_share_room, + changed=changed, left=set(newly_left_users) - users_who_share_room ) ) else: diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 3413a46675..3af0171f75 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -391,22 +391,53 @@ class DeviceWorkerStore(SQLBaseStore): return now_stream_id, [] - @defer.inlineCallbacks - def get_user_whose_devices_changed(self, from_key): - """Get set of users whose devices have changed since `from_key`. + def get_user_whose_devices_changed(self, from_key, user_ids): + """Get set of users whose devices have changed since `from_key` that + are in the given list of user_ids. + + Args: + user_ids (Iterable[str]) + from_key: The device lists stream token + + Returns: + Deferred[set[str]]: The set of user_ids whose devices have changed + since `from_key` """ from_key = int(from_key) - changed = self._device_list_stream_cache.get_all_entities_changed(from_key) - if changed is not None: - defer.returnValue(set(changed)) + + # Get set of users who *may* have changed. Users not in the returned + # list have definitely not changed. + to_check = list( + self._device_list_stream_cache.get_entities_changed(user_ids, from_key) + ) + + if not to_check: + return defer.succeed(set()) + + # We now check the database for all users in `to_check`, in batches. + batch_size = 100 + chunks = [ + to_check[i : i + batch_size] for i in range(0, len(to_check), batch_size) + ] sql = """ - SELECT DISTINCT user_id FROM device_lists_stream WHERE stream_id > ? + SELECT DISTINCT user_id FROM device_lists_stream + WHERE stream_id > ? + AND user_id IN (%s) """ - rows = yield self._execute( - "get_user_whose_devices_changed", None, sql, from_key + + def _get_user_whose_devices_changed_txn(txn): + changes = set() + + for chunk in chunks: + txn.execute(sql % (",".join("?" for _ in chunk),), [from_key] + chunk) + changes.update(user_id for user_id, in txn) + + return changes + + return self.runInteraction( + "get_user_whose_devices_changed", _get_user_whose_devices_changed_txn ) - defer.returnValue(set(row[0] for row in rows)) def get_all_device_list_changes_for_remotes(self, from_key, to_key): """Return a list of `(stream_id, user_id, destination)` which is the -- cgit 1.4.1 From 508c3ce3d71b4711f6e50e7f1e74d71cb46f61d9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 26 Jun 2019 12:03:49 +0100 Subject: Newsfile --- changelog.d/5559.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/5559.misc diff --git a/changelog.d/5559.misc b/changelog.d/5559.misc new file mode 100644 index 0000000000..b77b383459 --- /dev/null +++ b/changelog.d/5559.misc @@ -0,0 +1 @@ +Optimise devices changed query to not pull unnecessary rows from the database, reducing database load. -- cgit 1.4.1 From 806a06daf2b30691c2c69e32d1ff2e104436bbc4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 26 Jun 2019 19:09:10 +0100 Subject: Rename get_users_whose_devices_changed --- synapse/handlers/device.py | 2 +- synapse/handlers/sync.py | 2 +- synapse/storage/devices.py | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 2b6c2117f9..99e8413092 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -106,7 +106,7 @@ class DeviceWorkerHandler(BaseHandler): users_who_share_room = yield self.store.get_users_who_share_room_with_user( user_id ) - changed = yield self.store.get_user_whose_devices_changed( + changed = yield self.store.get_users_whose_devices_changed( from_token.device_list_key, users_who_share_room ) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 8249e75ecd..f70ebfdee7 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1080,7 +1080,7 @@ class SyncHandler(object): # weren't in the previous sync *or* they left and rejoined. changed = users_who_share_room & set(newly_joined_or_invited_users) - changed_users = yield self.store.get_user_whose_devices_changed( + changed_users = yield self.store.get_users_whose_devices_changed( since_token.device_list_key, users_who_share_room ) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 3af0171f75..97f6cd2754 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -391,7 +391,7 @@ class DeviceWorkerStore(SQLBaseStore): return now_stream_id, [] - def get_user_whose_devices_changed(self, from_key, user_ids): + def get_users_whose_devices_changed(self, from_key, user_ids): """Get set of users whose devices have changed since `from_key` that are in the given list of user_ids. @@ -426,7 +426,7 @@ class DeviceWorkerStore(SQLBaseStore): AND user_id IN (%s) """ - def _get_user_whose_devices_changed_txn(txn): + def _get_users_whose_devices_changed_txn(txn): changes = set() for chunk in chunks: @@ -436,7 +436,7 @@ class DeviceWorkerStore(SQLBaseStore): return changes return self.runInteraction( - "get_user_whose_devices_changed", _get_user_whose_devices_changed_txn + "get_users_whose_devices_changed", _get_users_whose_devices_changed_txn ) def get_all_device_list_changes_for_remotes(self, from_key, to_key): -- cgit 1.4.1 From f335e77d5330d13cbaf61b7b903980bae60761d7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 26 Jun 2019 19:10:38 +0100 Subject: Use batch_iter and correct docstring --- synapse/storage/devices.py | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 97f6cd2754..44324bf400 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -24,6 +24,7 @@ from synapse.api.errors import StoreError from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage._base import Cache, SQLBaseStore, db_to_json from synapse.storage.background_updates import BackgroundUpdateStore +from synapse.util import batch_iter from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList logger = logging.getLogger(__name__) @@ -396,8 +397,8 @@ class DeviceWorkerStore(SQLBaseStore): are in the given list of user_ids. Args: + from_key (str): The device lists stream token user_ids (Iterable[str]) - from_key: The device lists stream token Returns: Deferred[set[str]]: The set of user_ids whose devices have changed @@ -414,23 +415,19 @@ class DeviceWorkerStore(SQLBaseStore): if not to_check: return defer.succeed(set()) - # We now check the database for all users in `to_check`, in batches. - batch_size = 100 - chunks = [ - to_check[i : i + batch_size] for i in range(0, len(to_check), batch_size) - ] - - sql = """ - SELECT DISTINCT user_id FROM device_lists_stream - WHERE stream_id > ? - AND user_id IN (%s) - """ - def _get_users_whose_devices_changed_txn(txn): changes = set() - for chunk in chunks: - txn.execute(sql % (",".join("?" for _ in chunk),), [from_key] + chunk) + sql = """ + SELECT DISTINCT user_id FROM device_lists_stream + WHERE stream_id > ? + AND user_id IN (%s) + """ + + for chunk in batch_iter(to_check, 100): + txn.execute( + sql % (",".join("?" for _ in chunk),), [from_key] + list(chunk) + ) changes.update(user_id for user_id, in txn) return changes -- cgit 1.4.1 From 8624db3194789cdc98e4d2a8e0da324609497fb6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 26 Jun 2019 19:30:35 +0100 Subject: Refactor and comment sync device list code --- synapse/handlers/sync.py | 70 ++++++++++++++++++++++++++++++++++++------------ 1 file changed, 53 insertions(+), 17 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index f70ebfdee7..4f737d0a12 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1058,38 +1058,74 @@ class SyncHandler(object): newly_left_rooms, newly_left_users, ): + """Generate the DeviceLists section of sync + + Args: + sync_result_builder (SyncResultBuilder) + newly_joined_rooms (set[str]): Set of rooms user has joined since + previous sync + newly_joined_or_invited_users (set[str]): Set of users that have + joined or been invited to a room since previous sync. + newly_left_rooms (set[str]): Set of rooms user has left since + previous sync + newly_left_users (set[str]): Set of users that have left a room + we're in since previous sync + + Returns: + Deferred[DeviceLists] + """ + user_id = sync_result_builder.sync_config.user.to_string() since_token = sync_result_builder.since_token - if since_token and since_token.device_list_key: - # TODO: Be more clever than this, i.e. remove users who we already - # share a room with? - for room_id in newly_joined_rooms: - joined_users = yield self.state.get_current_users_in_room(room_id) - newly_joined_or_invited_users.update(joined_users) + # We're going to mutate these fields, so lets copy them rather than + # assume they won't get used later. + newly_joined_or_invited_users = set(newly_joined_or_invited_users) + newly_left_users = set(newly_left_users) - for room_id in newly_left_rooms: - left_users = yield self.state.get_current_users_in_room(room_id) - newly_left_users.update(left_users) + if since_token and since_token.device_list_key: + # We want to figure out what user IDs the client should refetch + # device keys for, and which users we aren't going to track changes + # for anymore. + # + # For the first step we check: + # 1. if any users we share a room with have updated their devices, + # and + # 2. we also check if we've joined any new rooms, or if a user has + # joined a room we're in. + # + # For the second step we just find any users we no longer share a + # room with by looking at all users that have left a room plus users + # that were in a room we've left. users_who_share_room = yield self.store.get_users_who_share_room_with_user( user_id ) + # Step 1, check for changes in devices of users we share a room with + users_that_have_changed = yield self.store.get_users_whose_devices_changed( + since_token.device_list_key, users_who_share_room + ) + + # Step 2, check for newly joined rooms + for room_id in newly_joined_rooms: + joined_users = yield self.state.get_current_users_in_room(room_id) + newly_joined_or_invited_users.update(joined_users) + # TODO: Check that these users are actually new, i.e. either they # weren't in the previous sync *or* they left and rejoined. - changed = users_who_share_room & set(newly_joined_or_invited_users) + users_that_have_changed.update(newly_joined_or_invited_users) - changed_users = yield self.store.get_users_whose_devices_changed( - since_token.device_list_key, users_who_share_room - ) + # Now find users that we no longer track + for room_id in newly_left_rooms: + left_users = yield self.state.get_current_users_in_room(room_id) + newly_left_users.update(left_users) - changed.update(changed_users) + # Remove any users that we still share a room with. + newly_left_users -= users_who_share_room defer.returnValue( - DeviceLists( - changed=changed, left=set(newly_left_users) - users_who_share_room - ) + DeviceLists(changed=users_that_have_changed, left=newly_left_users) ) else: defer.returnValue(DeviceLists(changed=[], left=[])) -- cgit 1.4.1 From 82028d723b1533832c22b2acced1ff5d1a0fb51a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 26 Jun 2019 19:33:11 +0100 Subject: Move changelog --- changelog.d/5559.feature | 1 + changelog.d/5559.misc | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) create mode 100644 changelog.d/5559.feature delete mode 100644 changelog.d/5559.misc diff --git a/changelog.d/5559.feature b/changelog.d/5559.feature new file mode 100644 index 0000000000..b77b383459 --- /dev/null +++ b/changelog.d/5559.feature @@ -0,0 +1 @@ +Optimise devices changed query to not pull unnecessary rows from the database, reducing database load. diff --git a/changelog.d/5559.misc b/changelog.d/5559.misc deleted file mode 100644 index b77b383459..0000000000 --- a/changelog.d/5559.misc +++ /dev/null @@ -1 +0,0 @@ -Optimise devices changed query to not pull unnecessary rows from the database, reducing database load. -- cgit 1.4.1 From 729f5a4fb6654e6c9beb68a3edbb8dbbae076e3f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 27 Jun 2019 16:06:23 +0100 Subject: Review comments --- synapse/handlers/sync.py | 8 ++++---- synapse/storage/devices.py | 4 +--- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 4f737d0a12..a3f550554f 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1089,9 +1089,9 @@ class SyncHandler(object): # for anymore. # # For the first step we check: - # 1. if any users we share a room with have updated their devices, + # a. if any users we share a room with have updated their devices, # and - # 2. we also check if we've joined any new rooms, or if a user has + # b. we also check if we've joined any new rooms, or if a user has # joined a room we're in. # # For the second step we just find any users we no longer share a @@ -1102,12 +1102,12 @@ class SyncHandler(object): user_id ) - # Step 1, check for changes in devices of users we share a room with + # Step 1a, check for changes in devices of users we share a room with users_that_have_changed = yield self.store.get_users_whose_devices_changed( since_token.device_list_key, users_who_share_room ) - # Step 2, check for newly joined rooms + # Step 1b, check for newly joined rooms for room_id in newly_joined_rooms: joined_users = yield self.state.get_current_users_in_room(room_id) newly_joined_or_invited_users.update(joined_users) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 44324bf400..d2b113a4e7 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -425,9 +425,7 @@ class DeviceWorkerStore(SQLBaseStore): """ for chunk in batch_iter(to_check, 100): - txn.execute( - sql % (",".join("?" for _ in chunk),), [from_key] + list(chunk) - ) + txn.execute(sql % (",".join("?" for _ in chunk),), (from_key,) + chunk) changes.update(user_id for user_id, in txn) return changes -- cgit 1.4.1