diff --git a/synapse/storage/databases/main/censor_events.py b/synapse/storage/databases/main/censor_events.py
index fd3fc298b3..58177ecec1 100644
--- a/synapse/storage/databases/main/censor_events.py
+++ b/synapse/storage/databases/main/censor_events.py
@@ -194,7 +194,7 @@ class CensorEventsStore(EventsWorkerStore, CacheInvalidationWorkerStore, SQLBase
# changed its content in the database. We can't call
# self._invalidate_cache_and_stream because self.get_event_cache isn't of the
# right type.
- txn.call_after(self._get_event_cache.invalidate, (event.event_id,))
+ self.invalidate_get_event_cache_after_txn(txn, event.event_id)
# Send that invalidation to replication so that other workers also invalidate
# the event cache.
self._send_invalidation_to_replication(
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index fa2266ba20..156e1bd5ab 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1293,7 +1293,7 @@ class PersistEventsStore:
depth_updates: Dict[str, int] = {}
for event, context in events_and_contexts:
# Remove the any existing cache entries for the event_ids
- txn.call_after(self.store._invalidate_get_event_cache, event.event_id)
+ self.store.invalidate_get_event_cache_after_txn(txn, event.event_id)
# Then update the `stream_ordering` position to mark the latest
# event as the front of the room. This should not be done for
# backfilled events because backfilled events have negative
@@ -1675,7 +1675,7 @@ class PersistEventsStore:
(cache_entry.event.event_id,), cache_entry
)
- txn.call_after(prefill)
+ txn.async_call_after(prefill)
def _store_redaction(self, txn: LoggingTransaction, event: EventBase) -> None:
"""Invalidate the caches for the redacted event.
@@ -1684,7 +1684,7 @@ class PersistEventsStore:
_invalidate_caches_for_event.
"""
assert event.redacts is not None
- txn.call_after(self.store._invalidate_get_event_cache, event.redacts)
+ self.store.invalidate_get_event_cache_after_txn(txn, event.redacts)
txn.call_after(self.store.get_relations_for_event.invalidate, (event.redacts,))
txn.call_after(self.store.get_applicable_edit.invalidate, (event.redacts,))
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index f3935bfead..4435373146 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -712,17 +712,41 @@ class EventsWorkerStore(SQLBaseStore):
return event_entry_map
- async def _invalidate_get_event_cache(self, event_id: str) -> None:
- # First we invalidate the asynchronous cache instance. This may include
- # out-of-process caches such as Redis/memcache. Once complete we can
- # invalidate any in memory cache. The ordering is important here to
- # ensure we don't pull in any remote invalid value after we invalidate
- # the in-memory cache.
+ def invalidate_get_event_cache_after_txn(
+ self, txn: LoggingTransaction, event_id: str
+ ) -> None:
+ """
+ Prepares a database transaction to invalidate the get event cache for a given
+ event ID when executed successfully. This is achieved by attaching two callbacks
+ to the transaction, one to invalidate the async cache and one for the in memory
+ sync cache (importantly called in that order).
+
+ Arguments:
+ txn: the database transaction to attach the callbacks to
+ event_id: the event ID to be invalidated from caches
+ """
+
+ txn.async_call_after(self._invalidate_async_get_event_cache, event_id)
+ txn.call_after(self._invalidate_local_get_event_cache, event_id)
+
+ async def _invalidate_async_get_event_cache(self, event_id: str) -> None:
+ """
+ Invalidates an event in the asyncronous get event cache, which may be remote.
+
+ Arguments:
+ event_id: the event ID to invalidate
+ """
+
await self._get_event_cache.invalidate((event_id,))
- self._event_ref.pop(event_id, None)
- self._current_event_fetches.pop(event_id, None)
def _invalidate_local_get_event_cache(self, event_id: str) -> None:
+ """
+ Invalidates an event in local in-memory get event caches.
+
+ Arguments:
+ event_id: the event ID to invalidate
+ """
+
self._get_event_cache.invalidate_local((event_id,))
self._event_ref.pop(event_id, None)
self._current_event_fetches.pop(event_id, None)
@@ -958,7 +982,13 @@ class EventsWorkerStore(SQLBaseStore):
}
row_dict = self.db_pool.new_transaction(
- conn, "do_fetch", [], [], self._fetch_event_rows, events_to_fetch
+ conn,
+ "do_fetch",
+ [],
+ [],
+ [],
+ self._fetch_event_rows,
+ events_to_fetch,
)
# We only want to resolve deferreds from the main thread
diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py
index 9a63f953fb..efd136a864 100644
--- a/synapse/storage/databases/main/monthly_active_users.py
+++ b/synapse/storage/databases/main/monthly_active_users.py
@@ -66,6 +66,7 @@ class MonthlyActiveUsersWorkerStore(RegistrationWorkerStore):
"initialise_mau_threepids",
[],
[],
+ [],
self._initialise_reserved_users,
hs.config.server.mau_limits_reserved_threepids[: self._max_mau_value],
)
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index 6d42276503..f6822707e4 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -304,7 +304,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
self._invalidate_cache_and_stream(
txn, self.have_seen_event, (room_id, event_id)
)
- txn.call_after(self._invalidate_get_event_cache, event_id)
+ self.invalidate_get_event_cache_after_txn(txn, event_id)
logger.info("[purge] done")
|