summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/_base.py31
-rw-r--r--synapse/storage/databases/main/cache.py134
-rw-r--r--synapse/storage/databases/main/events_worker.py9
-rw-r--r--synapse/storage/databases/main/purge_events.py8
4 files changed, 177 insertions, 5 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 481fec72fe..fe4a763411 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -86,9 +86,14 @@ class SQLBaseStore(metaclass=ABCMeta):
             room_id: Room where state changed
             members_changed: The user_ids of members that have changed
         """
+
+        # XXX: If you add something to this function make sure you add it to
+        # `_invalidate_state_caches_all` as well.
+
         # If there were any membership changes, purge the appropriate caches.
         for host in {get_domain_from_id(u) for u in members_changed}:
             self._attempt_to_invalidate_cache("is_host_joined", (room_id, host))
+            self._attempt_to_invalidate_cache("is_host_invited", (room_id, host))
         if members_changed:
             self._attempt_to_invalidate_cache("get_users_in_room", (room_id,))
             self._attempt_to_invalidate_cache("get_current_hosts_in_room", (room_id,))
@@ -117,6 +122,32 @@ class SQLBaseStore(metaclass=ABCMeta):
         self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
         self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,))
 
+    def _invalidate_state_caches_all(self, room_id: str) -> None:
+        """Invalidates caches that are based on the current state, but does
+        not stream invalidations down replication.
+
+        Same as `_invalidate_state_caches`, except that works when we don't know
+        which memberships have changed.
+
+        Args:
+            room_id: Room where state changed
+        """
+        self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,))
+        self._attempt_to_invalidate_cache("get_users_in_room", (room_id,))
+        self._attempt_to_invalidate_cache("is_host_invited", None)
+        self._attempt_to_invalidate_cache("is_host_joined", None)
+        self._attempt_to_invalidate_cache("get_current_hosts_in_room", (room_id,))
+        self._attempt_to_invalidate_cache("get_users_in_room_with_profiles", (room_id,))
+        self._attempt_to_invalidate_cache("get_number_joined_users_in_room", (room_id,))
+        self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,))
+        self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None)
+        self._attempt_to_invalidate_cache("get_user_in_room_with_profile", None)
+        self._attempt_to_invalidate_cache(
+            "get_rooms_for_user_with_stream_ordering", None
+        )
+        self._attempt_to_invalidate_cache("get_rooms_for_user", None)
+        self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
+
     def _attempt_to_invalidate_cache(
         self, cache_name: str, key: Optional[Collection[Any]]
     ) -> bool:
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index 46fa0a73f9..6e1c7d681f 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -46,6 +46,12 @@ logger = logging.getLogger(__name__)
 # based on the current state when notifying workers over replication.
 CURRENT_STATE_CACHE_NAME = "cs_cache_fake"
 
+# As above, but for invalidating event caches on history deletion
+PURGE_HISTORY_CACHE_NAME = "ph_cache_fake"
+
+# As above, but for invalidating room caches on room deletion
+DELETE_ROOM_CACHE_NAME = "dr_cache_fake"
+
 
 class CacheInvalidationWorkerStore(SQLBaseStore):
     def __init__(
@@ -175,6 +181,23 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
                     room_id = row.keys[0]
                     members_changed = set(row.keys[1:])
                     self._invalidate_state_caches(room_id, members_changed)
+                elif row.cache_func == PURGE_HISTORY_CACHE_NAME:
+                    if row.keys is None:
+                        raise Exception(
+                            "Can't send an 'invalidate all' for 'purge history' cache"
+                        )
+
+                    room_id = row.keys[0]
+                    self._invalidate_caches_for_room_events(room_id)
+                elif row.cache_func == DELETE_ROOM_CACHE_NAME:
+                    if row.keys is None:
+                        raise Exception(
+                            "Can't send an 'invalidate all' for 'delete room' cache"
+                        )
+
+                    room_id = row.keys[0]
+                    self._invalidate_caches_for_room_events(room_id)
+                    self._invalidate_caches_for_room(room_id)
                 else:
                     self._attempt_to_invalidate_cache(row.cache_func, row.keys)
 
@@ -226,6 +249,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
         relates_to: Optional[str],
         backfilled: bool,
     ) -> None:
+        # XXX: If you add something to this function make sure you add it to
+        # `_invalidate_caches_for_room_events` as well.
+
         # This invalidates any local in-memory cached event objects, the original
         # process triggering the invalidation is responsible for clearing any external
         # cached objects.
@@ -271,6 +297,106 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
             self._attempt_to_invalidate_cache("get_thread_participated", (relates_to,))
             self._attempt_to_invalidate_cache("get_threads", (room_id,))
 
+    def _invalidate_caches_for_room_events_and_stream(
+        self, txn: LoggingTransaction, room_id: str
+    ) -> None:
+        """Invalidate caches associated with events in a room, and stream to
+        replication.
+
+        Used when we delete events a room, but don't know which events we've
+        deleted.
+        """
+
+        self._send_invalidation_to_replication(txn, PURGE_HISTORY_CACHE_NAME, [room_id])
+        txn.call_after(self._invalidate_caches_for_room_events, room_id)
+
+    def _invalidate_caches_for_room_events(self, room_id: str) -> None:
+        """Invalidate caches associated with events in a room, and stream to
+        replication.
+
+        Used when we delete events in a room, but don't know which events we've
+        deleted.
+        """
+
+        self._invalidate_local_get_event_cache_all()  # type: ignore[attr-defined]
+
+        self._attempt_to_invalidate_cache("have_seen_event", (room_id,))
+        self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,))
+        self._attempt_to_invalidate_cache(
+            "get_unread_event_push_actions_by_room_for_user", (room_id,)
+        )
+
+        self._attempt_to_invalidate_cache("_get_membership_from_event_id", None)
+        self._attempt_to_invalidate_cache("get_relations_for_event", None)
+        self._attempt_to_invalidate_cache("get_applicable_edit", None)
+        self._attempt_to_invalidate_cache("get_thread_id", None)
+        self._attempt_to_invalidate_cache("get_thread_id_for_receipts", None)
+        self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None)
+        self._attempt_to_invalidate_cache(
+            "get_rooms_for_user_with_stream_ordering", None
+        )
+        self._attempt_to_invalidate_cache("get_rooms_for_user", None)
+        self._attempt_to_invalidate_cache("get_references_for_event", None)
+        self._attempt_to_invalidate_cache("get_thread_summary", None)
+        self._attempt_to_invalidate_cache("get_thread_participated", None)
+        self._attempt_to_invalidate_cache("get_threads", (room_id,))
+
+        self._attempt_to_invalidate_cache("_get_state_group_for_event", None)
+
+        self._attempt_to_invalidate_cache("get_event_ordering", None)
+        self._attempt_to_invalidate_cache("is_partial_state_event", None)
+        self._attempt_to_invalidate_cache("_get_joined_profile_from_event_id", None)
+
+    def _invalidate_caches_for_room_and_stream(
+        self, txn: LoggingTransaction, room_id: str
+    ) -> None:
+        """Invalidate caches associated with rooms, and stream to replication.
+
+        Used when we delete rooms.
+        """
+
+        self._send_invalidation_to_replication(txn, DELETE_ROOM_CACHE_NAME, [room_id])
+        txn.call_after(self._invalidate_caches_for_room, room_id)
+
+    def _invalidate_caches_for_room(self, room_id: str) -> None:
+        """Invalidate caches associated with rooms.
+
+        Used when we delete rooms.
+        """
+
+        # If we've deleted the room then we also need to purge all event caches.
+        self._invalidate_caches_for_room_events(room_id)
+
+        self._attempt_to_invalidate_cache("get_account_data_for_room", None)
+        self._attempt_to_invalidate_cache("get_account_data_for_room_and_type", None)
+        self._attempt_to_invalidate_cache("get_aliases_for_room", (room_id,))
+        self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,))
+        self._attempt_to_invalidate_cache("_get_forward_extremeties_for_room", None)
+        self._attempt_to_invalidate_cache(
+            "get_unread_event_push_actions_by_room_for_user", (room_id,)
+        )
+        self._attempt_to_invalidate_cache(
+            "_get_linearized_receipts_for_room", (room_id,)
+        )
+        self._attempt_to_invalidate_cache("is_room_blocked", (room_id,))
+        self._attempt_to_invalidate_cache("get_retention_policy_for_room", (room_id,))
+        self._attempt_to_invalidate_cache(
+            "_get_partial_state_servers_at_join", (room_id,)
+        )
+        self._attempt_to_invalidate_cache("is_partial_state_room", (room_id,))
+        self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None)
+        self._attempt_to_invalidate_cache(
+            "get_current_hosts_in_room_ordered", (room_id,)
+        )
+        self._attempt_to_invalidate_cache("did_forget", None)
+        self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
+        self._attempt_to_invalidate_cache("_get_membership_from_event_id", None)
+        self._attempt_to_invalidate_cache("get_room_version_id", (room_id,))
+
+        # And delete state caches.
+
+        self._invalidate_state_caches_all(room_id)
+
     async def invalidate_cache_and_stream(
         self, cache_name: str, keys: Tuple[Any, ...]
     ) -> None:
@@ -377,6 +503,14 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
                 "Can't stream invalidate all with magic current state cache"
             )
 
+        if cache_name == PURGE_HISTORY_CACHE_NAME and keys is None:
+            raise Exception(
+                "Can't stream invalidate all with magic purge history cache"
+            )
+
+        if cache_name == DELETE_ROOM_CACHE_NAME and keys is None:
+            raise Exception("Can't stream invalidate all with magic delete room cache")
+
         if isinstance(self.database_engine, PostgresEngine):
             assert self._cache_id_gen is not None
 
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index a39bc90974..d93ffc4efa 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -903,6 +903,15 @@ class EventsWorkerStore(SQLBaseStore):
         self._event_ref.pop(event_id, None)
         self._current_event_fetches.pop(event_id, None)
 
+    def _invalidate_local_get_event_cache_all(self) -> None:
+        """Clears the in-memory get event caches.
+
+        Used when we purge room history.
+        """
+        self._get_event_cache.clear()
+        self._event_ref.clear()
+        self._current_event_fetches.clear()
+
     async def _get_events_from_cache(
         self, events: Iterable[str], update_metrics: bool = True
     ) -> Dict[str, EventCacheEntry]:
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index efbd3e75d9..9773c1fcd2 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -308,6 +308,8 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
 
         logger.info("[purge] done")
 
+        self._invalidate_caches_for_room_events_and_stream(txn, room_id)
+
         return referenced_state_groups
 
     async def purge_room(self, room_id: str) -> List[int]:
@@ -485,10 +487,6 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
         #       index on them. In any case we should be clearing out 'stream' tables
         #       periodically anyway (#5888)
 
-        # TODO: we could probably usefully do a bunch more cache invalidation here
-
-        # XXX: as with purge_history, this is racy, but no worse than other races
-        #   that already exist.
-        self._invalidate_cache_and_stream(txn, self.have_seen_event, (room_id,))
+        self._invalidate_caches_for_room_and_stream(txn, room_id)
 
         return state_groups