summary refs log tree commit diff
diff options
context:
space:
mode:
authorPatrick Cloke <clokep@users.noreply.github.com>2023-11-09 14:40:30 -0500
committerGitHub <noreply@github.com>2023-11-09 14:40:30 -0500
commit9f514dd0fbf539d97b6d9f8f590aaf668fb3280e (patch)
tree80463e04fdef771952a8b5712b6a944c9660a5f2
parentConvert simple_select_one_txn and simple_select_one to return tuples. (#16612) (diff)
downloadsynapse-9f514dd0fbf539d97b6d9f8f590aaf668fb3280e.tar.xz
Use _invalidate_cache_and_stream_bulk in more places. (#16616)
This takes advantage of the new bulk method in more places to
invalidate caches for many keys at once (and then to stream that
over replication).
Diffstat (limited to '')
-rw-r--r--changelog.d/16613.feature2
-rw-r--r--changelog.d/16616.feature1
-rw-r--r--synapse/storage/databases/main/account_data.py24
-rw-r--r--synapse/storage/databases/main/events_bg_updates.py15
-rw-r--r--synapse/storage/databases/main/keys.py17
-rw-r--r--synapse/storage/databases/main/presence.py9
-rw-r--r--synapse/storage/databases/main/purge_events.py31
-rw-r--r--synapse/storage/databases/main/registration.py20
8 files changed, 72 insertions, 47 deletions
diff --git a/changelog.d/16613.feature b/changelog.d/16613.feature
index 254d04a90e..419c56fb83 100644
--- a/changelog.d/16613.feature
+++ b/changelog.d/16613.feature
@@ -1 +1 @@
-Improve the performance of claiming encryption keys in multi-worker deployments.
+Improve the performance of some operations in multi-worker deployments.
diff --git a/changelog.d/16616.feature b/changelog.d/16616.feature
new file mode 100644
index 0000000000..419c56fb83
--- /dev/null
+++ b/changelog.d/16616.feature
@@ -0,0 +1 @@
+Improve the performance of some operations in multi-worker deployments.
diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index d7482a1f4e..07f9b65af3 100644
--- a/synapse/storage/databases/main/account_data.py
+++ b/synapse/storage/databases/main/account_data.py
@@ -747,8 +747,16 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
         )
 
         # Invalidate the cache for any ignored users which were added or removed.
-        for ignored_user_id in previously_ignored_users ^ currently_ignored_users:
-            self._invalidate_cache_and_stream(txn, self.ignored_by, (ignored_user_id,))
+        self._invalidate_cache_and_stream_bulk(
+            txn,
+            self.ignored_by,
+            [
+                (ignored_user_id,)
+                for ignored_user_id in (
+                    previously_ignored_users ^ currently_ignored_users
+                )
+            ],
+        )
         self._invalidate_cache_and_stream(txn, self.ignored_users, (user_id,))
 
     async def remove_account_data_for_user(
@@ -824,10 +832,14 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
                 )
 
                 # Invalidate the cache for ignored users which were removed.
-                for ignored_user_id in previously_ignored_users:
-                    self._invalidate_cache_and_stream(
-                        txn, self.ignored_by, (ignored_user_id,)
-                    )
+                self._invalidate_cache_and_stream_bulk(
+                    txn,
+                    self.ignored_by,
+                    [
+                        (ignored_user_id,)
+                        for ignored_user_id in previously_ignored_users
+                    ],
+                )
 
                 # Invalidate for this user the cache tracking ignored users.
                 self._invalidate_cache_and_stream(txn, self.ignored_users, (user_id,))
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index 0061805150..9c46c5d7bd 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -1222,14 +1222,13 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
                 )
 
                 # Iterate the parent IDs and invalidate caches.
-                for parent_id in {r[1] for r in relations_to_insert}:
-                    cache_tuple = (parent_id,)
-                    self._invalidate_cache_and_stream(  # type: ignore[attr-defined]
-                        txn, self.get_relations_for_event, cache_tuple  # type: ignore[attr-defined]
-                    )
-                    self._invalidate_cache_and_stream(  # type: ignore[attr-defined]
-                        txn, self.get_thread_summary, cache_tuple  # type: ignore[attr-defined]
-                    )
+                cache_tuples = {(r[1],) for r in relations_to_insert}
+                self._invalidate_cache_and_stream_bulk(  # type: ignore[attr-defined]
+                    txn, self.get_relations_for_event, cache_tuples  # type: ignore[attr-defined]
+                )
+                self._invalidate_cache_and_stream_bulk(  # type: ignore[attr-defined]
+                    txn, self.get_thread_summary, cache_tuples  # type: ignore[attr-defined]
+                )
 
             if results:
                 latest_event_id = results[-1][0]
diff --git a/synapse/storage/databases/main/keys.py b/synapse/storage/databases/main/keys.py
index ce88772f9e..c700872fdc 100644
--- a/synapse/storage/databases/main/keys.py
+++ b/synapse/storage/databases/main/keys.py
@@ -107,13 +107,16 @@ class KeyStore(CacheInvalidationWorkerStore):
             # invalidate takes a tuple corresponding to the params of
             # _get_server_keys_json. _get_server_keys_json only takes one
             # param, which is itself the 2-tuple (server_name, key_id).
-            for key_id in verify_keys:
-                self._invalidate_cache_and_stream(
-                    txn, self._get_server_keys_json, ((server_name, key_id),)
-                )
-                self._invalidate_cache_and_stream(
-                    txn, self.get_server_key_json_for_remote, (server_name, key_id)
-                )
+            self._invalidate_cache_and_stream_bulk(
+                txn,
+                self._get_server_keys_json,
+                [((server_name, key_id),) for key_id in verify_keys],
+            )
+            self._invalidate_cache_and_stream_bulk(
+                txn,
+                self.get_server_key_json_for_remote,
+                [(server_name, key_id) for key_id in verify_keys],
+            )
 
         await self.db_pool.runInteraction(
             "store_server_keys_response", store_server_keys_response_txn
diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py
index 3b444d2d07..0198bb09d2 100644
--- a/synapse/storage/databases/main/presence.py
+++ b/synapse/storage/databases/main/presence.py
@@ -363,10 +363,11 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore)
                 # for their user ID.
                 value_values=[(presence_stream_id,) for _ in user_ids],
             )
-            for user_id in user_ids:
-                self._invalidate_cache_and_stream(
-                    txn, self._get_full_presence_stream_token_for_user, (user_id,)
-                )
+            self._invalidate_cache_and_stream_bulk(
+                txn,
+                self._get_full_presence_stream_token_for_user,
+                [(user_id,) for user_id in user_ids],
+            )
 
         return await self.db_pool.runInteraction(
             "add_users_to_send_full_presence_to", _add_users_to_send_full_presence_to
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index 1e11bf2706..c3b3e2baaf 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -295,19 +295,28 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
         # so make sure to keep this actually last.
         txn.execute("DROP TABLE events_to_purge")
 
-        for event_id, should_delete in event_rows:
-            self._invalidate_cache_and_stream(
-                txn, self._get_state_group_for_event, (event_id,)
-            )
+        self._invalidate_cache_and_stream_bulk(
+            txn,
+            self._get_state_group_for_event,
+            [(event_id,) for event_id, _ in event_rows],
+        )
 
-            # XXX: This is racy, since have_seen_events could be called between the
-            #    transaction completing and the invalidation running. On the other hand,
-            #    that's no different to calling `have_seen_events` just before the
-            #    event is deleted from the database.
+        # XXX: This is racy, since have_seen_events could be called between the
+        #    transaction completing and the invalidation running. On the other hand,
+        #    that's no different to calling `have_seen_events` just before the
+        #    event is deleted from the database.
+        self._invalidate_cache_and_stream_bulk(
+            txn,
+            self.have_seen_event,
+            [
+                (room_id, event_id)
+                for event_id, should_delete in event_rows
+                if should_delete
+            ],
+        )
+
+        for event_id, should_delete in event_rows:
             if should_delete:
-                self._invalidate_cache_and_stream(
-                    txn, self.have_seen_event, (room_id, event_id)
-                )
                 self.invalidate_get_event_cache_after_txn(txn, event_id)
 
         logger.info("[purge] done")
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index dec9858575..2c3f30e2eb 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -561,16 +561,15 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
                 updatevalues={"shadow_banned": shadow_banned},
             )
             # In order for this to apply immediately, clear the cache for this user.
-            tokens = self.db_pool.simple_select_onecol_txn(
+            tokens = self.db_pool.simple_select_list_txn(
                 txn,
                 table="access_tokens",
                 keyvalues={"user_id": user_id},
-                retcol="token",
+                retcols=("token",),
+            )
+            self._invalidate_cache_and_stream_bulk(
+                txn, self.get_user_by_access_token, tokens
             )
-            for token in tokens:
-                self._invalidate_cache_and_stream(
-                    txn, self.get_user_by_access_token, (token,)
-                )
             self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
 
         await self.db_pool.runInteraction("set_shadow_banned", set_shadow_banned_txn)
@@ -2683,10 +2682,11 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
             )
             tokens_and_devices = [(r[0], r[1], r[2]) for r in txn]
 
-            for token, _, _ in tokens_and_devices:
-                self._invalidate_cache_and_stream(
-                    txn, self.get_user_by_access_token, (token,)
-                )
+            self._invalidate_cache_and_stream_bulk(
+                txn,
+                self.get_user_by_access_token,
+                [(token,) for token, _, _ in tokens_and_devices],
+            )
 
             txn.execute("DELETE FROM access_tokens WHERE %s" % where_clause, values)