diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index d7482a1f4e..07f9b65af3 100644
--- a/synapse/storage/databases/main/account_data.py
+++ b/synapse/storage/databases/main/account_data.py
@@ -747,8 +747,16 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
)
# Invalidate the cache for any ignored users which were added or removed.
- for ignored_user_id in previously_ignored_users ^ currently_ignored_users:
- self._invalidate_cache_and_stream(txn, self.ignored_by, (ignored_user_id,))
+ self._invalidate_cache_and_stream_bulk(
+ txn,
+ self.ignored_by,
+ [
+ (ignored_user_id,)
+ for ignored_user_id in (
+ previously_ignored_users ^ currently_ignored_users
+ )
+ ],
+ )
self._invalidate_cache_and_stream(txn, self.ignored_users, (user_id,))
async def remove_account_data_for_user(
@@ -824,10 +832,14 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
)
# Invalidate the cache for ignored users which were removed.
- for ignored_user_id in previously_ignored_users:
- self._invalidate_cache_and_stream(
- txn, self.ignored_by, (ignored_user_id,)
- )
+ self._invalidate_cache_and_stream_bulk(
+ txn,
+ self.ignored_by,
+ [
+ (ignored_user_id,)
+ for ignored_user_id in previously_ignored_users
+ ],
+ )
# Invalidate for this user the cache tracking ignored users.
self._invalidate_cache_and_stream(txn, self.ignored_users, (user_id,))
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index 0061805150..9c46c5d7bd 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -1222,14 +1222,13 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
)
# Iterate the parent IDs and invalidate caches.
- for parent_id in {r[1] for r in relations_to_insert}:
- cache_tuple = (parent_id,)
- self._invalidate_cache_and_stream( # type: ignore[attr-defined]
- txn, self.get_relations_for_event, cache_tuple # type: ignore[attr-defined]
- )
- self._invalidate_cache_and_stream( # type: ignore[attr-defined]
- txn, self.get_thread_summary, cache_tuple # type: ignore[attr-defined]
- )
+ cache_tuples = {(r[1],) for r in relations_to_insert}
+ self._invalidate_cache_and_stream_bulk( # type: ignore[attr-defined]
+ txn, self.get_relations_for_event, cache_tuples # type: ignore[attr-defined]
+ )
+ self._invalidate_cache_and_stream_bulk( # type: ignore[attr-defined]
+ txn, self.get_thread_summary, cache_tuples # type: ignore[attr-defined]
+ )
if results:
latest_event_id = results[-1][0]
diff --git a/synapse/storage/databases/main/keys.py b/synapse/storage/databases/main/keys.py
index ce88772f9e..c700872fdc 100644
--- a/synapse/storage/databases/main/keys.py
+++ b/synapse/storage/databases/main/keys.py
@@ -107,13 +107,16 @@ class KeyStore(CacheInvalidationWorkerStore):
# invalidate takes a tuple corresponding to the params of
# _get_server_keys_json. _get_server_keys_json only takes one
# param, which is itself the 2-tuple (server_name, key_id).
- for key_id in verify_keys:
- self._invalidate_cache_and_stream(
- txn, self._get_server_keys_json, ((server_name, key_id),)
- )
- self._invalidate_cache_and_stream(
- txn, self.get_server_key_json_for_remote, (server_name, key_id)
- )
+ self._invalidate_cache_and_stream_bulk(
+ txn,
+ self._get_server_keys_json,
+ [((server_name, key_id),) for key_id in verify_keys],
+ )
+ self._invalidate_cache_and_stream_bulk(
+ txn,
+ self.get_server_key_json_for_remote,
+ [(server_name, key_id) for key_id in verify_keys],
+ )
await self.db_pool.runInteraction(
"store_server_keys_response", store_server_keys_response_txn
diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py
index 3b444d2d07..0198bb09d2 100644
--- a/synapse/storage/databases/main/presence.py
+++ b/synapse/storage/databases/main/presence.py
@@ -363,10 +363,11 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore)
# for their user ID.
value_values=[(presence_stream_id,) for _ in user_ids],
)
- for user_id in user_ids:
- self._invalidate_cache_and_stream(
- txn, self._get_full_presence_stream_token_for_user, (user_id,)
- )
+ self._invalidate_cache_and_stream_bulk(
+ txn,
+ self._get_full_presence_stream_token_for_user,
+ [(user_id,) for user_id in user_ids],
+ )
return await self.db_pool.runInteraction(
"add_users_to_send_full_presence_to", _add_users_to_send_full_presence_to
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index 1e11bf2706..c3b3e2baaf 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -295,19 +295,28 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
# so make sure to keep this actually last.
txn.execute("DROP TABLE events_to_purge")
- for event_id, should_delete in event_rows:
- self._invalidate_cache_and_stream(
- txn, self._get_state_group_for_event, (event_id,)
- )
+ self._invalidate_cache_and_stream_bulk(
+ txn,
+ self._get_state_group_for_event,
+ [(event_id,) for event_id, _ in event_rows],
+ )
- # XXX: This is racy, since have_seen_events could be called between the
- # transaction completing and the invalidation running. On the other hand,
- # that's no different to calling `have_seen_events` just before the
- # event is deleted from the database.
+ # XXX: This is racy, since have_seen_events could be called between the
+ # transaction completing and the invalidation running. On the other hand,
+ # that's no different to calling `have_seen_events` just before the
+ # event is deleted from the database.
+ self._invalidate_cache_and_stream_bulk(
+ txn,
+ self.have_seen_event,
+ [
+ (room_id, event_id)
+ for event_id, should_delete in event_rows
+ if should_delete
+ ],
+ )
+
+ for event_id, should_delete in event_rows:
if should_delete:
- self._invalidate_cache_and_stream(
- txn, self.have_seen_event, (room_id, event_id)
- )
self.invalidate_get_event_cache_after_txn(txn, event_id)
logger.info("[purge] done")
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index dec9858575..2c3f30e2eb 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -561,16 +561,15 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
updatevalues={"shadow_banned": shadow_banned},
)
# In order for this to apply immediately, clear the cache for this user.
- tokens = self.db_pool.simple_select_onecol_txn(
+ tokens = self.db_pool.simple_select_list_txn(
txn,
table="access_tokens",
keyvalues={"user_id": user_id},
- retcol="token",
+ retcols=("token",),
+ )
+ self._invalidate_cache_and_stream_bulk(
+ txn, self.get_user_by_access_token, tokens
)
- for token in tokens:
- self._invalidate_cache_and_stream(
- txn, self.get_user_by_access_token, (token,)
- )
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
await self.db_pool.runInteraction("set_shadow_banned", set_shadow_banned_txn)
@@ -2683,10 +2682,11 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
)
tokens_and_devices = [(r[0], r[1], r[2]) for r in txn]
- for token, _, _ in tokens_and_devices:
- self._invalidate_cache_and_stream(
- txn, self.get_user_by_access_token, (token,)
- )
+ self._invalidate_cache_and_stream_bulk(
+ txn,
+ self.get_user_by_access_token,
+ [(token,) for token, _, _ in tokens_and_devices],
+ )
txn.execute("DELETE FROM access_tokens WHERE %s" % where_clause, values)
|