diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index b127289d8d..d4b8ad6560 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -148,6 +148,7 @@ class SQLBaseStore(metaclass=ABCMeta):
self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,))
self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None)
self._attempt_to_invalidate_cache("get_user_in_room_with_profile", None)
+ self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None)
self._attempt_to_invalidate_cache(
"get_rooms_for_user_with_stream_ordering", None
)
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index c6787faea0..4ebd083055 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -233,8 +233,8 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
)
room_id = row.keys[0]
- self._invalidate_caches_for_room_events(room_id)
- self._invalidate_caches_for_room(room_id)
+ server_joined = row.keys.get(1, True)
+ self._invalidate_caches_for_room(room_id, server_joined)
else:
self._attempt_to_invalidate_cache(row.cache_func, row.keys)
@@ -388,7 +388,6 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self._invalidate_local_get_event_cache_room_id(room_id) # type: ignore[attr-defined]
self._attempt_to_invalidate_cache("have_seen_event", (room_id,))
- self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,))
self._attempt_to_invalidate_cache(
"get_unread_event_push_actions_by_room_for_user", (room_id,)
)
@@ -398,11 +397,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self._attempt_to_invalidate_cache("get_applicable_edit", None)
self._attempt_to_invalidate_cache("get_thread_id", None)
self._attempt_to_invalidate_cache("get_thread_id_for_receipts", None)
- self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None)
- self._attempt_to_invalidate_cache(
- "get_rooms_for_user_with_stream_ordering", None
- )
- self._attempt_to_invalidate_cache("get_rooms_for_user", None)
+
self._attempt_to_invalidate_cache("did_forget", None)
self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
self._attempt_to_invalidate_cache("get_references_for_event", None)
@@ -417,17 +412,28 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self._attempt_to_invalidate_cache("_get_joined_profile_from_event_id", None)
def _invalidate_caches_for_room_and_stream(
- self, txn: LoggingTransaction, room_id: str
+ self,
+ txn: LoggingTransaction,
+ room_id: str,
+ server_in_room: bool,
) -> None:
"""Invalidate caches associated with rooms, and stream to replication.
Used when we delete rooms.
+
+ Args:
+ txn
+ room_id
+ server_in_room: Whether the server was joined or invited to the
+ room when we deleted it.
"""
- self._send_invalidation_to_replication(txn, DELETE_ROOM_CACHE_NAME, [room_id])
- txn.call_after(self._invalidate_caches_for_room, room_id)
+ self._send_invalidation_to_replication(
+ txn, DELETE_ROOM_CACHE_NAME, [room_id, server_in_room]
+ )
+ txn.call_after(self._invalidate_caches_for_room, room_id, server_in_room)
- def _invalidate_caches_for_room(self, room_id: str) -> None:
+ def _invalidate_caches_for_room(self, room_id: str, server_in_room: bool) -> None:
"""Invalidate caches associated with rooms.
Used when we delete rooms.
@@ -439,8 +445,16 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self._attempt_to_invalidate_cache("get_account_data_for_room", None)
self._attempt_to_invalidate_cache("get_account_data_for_room_and_type", None)
self._attempt_to_invalidate_cache("get_aliases_for_room", (room_id,))
- self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,))
- self._attempt_to_invalidate_cache("_get_forward_extremeties_for_room", None)
+
+ if server_in_room:
+ self._attempt_to_invalidate_cache(
+ "get_latest_event_ids_in_room", (room_id,)
+ )
+ self._attempt_to_invalidate_cache("_get_forward_extremeties_for_room", None)
+
+ # And delete state caches.
+ self._invalidate_state_caches_all(room_id)
+
self._attempt_to_invalidate_cache(
"get_unread_event_push_actions_by_room_for_user", (room_id,)
)
@@ -453,19 +467,13 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
"_get_partial_state_servers_at_join", (room_id,)
)
self._attempt_to_invalidate_cache("is_partial_state_room", (room_id,))
- self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None)
self._attempt_to_invalidate_cache(
"get_current_hosts_in_room_ordered", (room_id,)
)
- self._attempt_to_invalidate_cache("did_forget", None)
self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
self._attempt_to_invalidate_cache("_get_membership_from_event_id", None)
self._attempt_to_invalidate_cache("get_room_version_id", (room_id,))
- # And delete state caches.
-
- self._invalidate_state_caches_all(room_id)
-
async def invalidate_cache_and_stream(
self, cache_name: str, keys: Tuple[Any, ...]
) -> None:
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index 3b81ed943c..e4d156bb8e 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -22,6 +22,7 @@
import logging
from typing import Any, List, Set, Tuple, cast
+from synapse.api.constants import Membership
from synapse.api.errors import SynapseError
from synapse.storage.database import LoggingTransaction
from synapse.storage.databases.main import CacheInvalidationWorkerStore
@@ -376,6 +377,20 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
(room_id,),
)
+ server_in_room = self._check_host_room_membership_txn( # type: ignore[attr-defined]
+ txn,
+ room_id,
+ host=self.hs.hostname,
+ membership=Membership.JOIN,
+ )
+ if not server_in_room:
+ server_in_room = self._check_host_room_membership_txn( # type: ignore[attr-defined]
+ txn,
+ room_id,
+ host=self.hs.hostname,
+ membership=Membership.INVITE,
+ )
+
# First, fetch all the state groups that should be deleted, before
# we delete that information.
txn.execute(
@@ -503,6 +518,6 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
# index on them. In any case we should be clearing out 'stream' tables
# periodically anyway (https://github.com/matrix-org/synapse/issues/5888)
- self._invalidate_caches_for_room_and_stream(txn, room_id)
+ self._invalidate_caches_for_room_and_stream(txn, room_id, server_in_room)
return state_groups
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 9fddbb2caf..90020c7cb6 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -963,6 +963,17 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
async def _check_host_room_membership(
self, room_id: str, host: str, membership: str
) -> bool:
+ return await self.db_pool.runInteraction(
+ "is_host_joined",
+ self._check_host_room_membership_txn,
+ room_id,
+ host,
+ membership,
+ )
+
+ def _check_host_room_membership_txn(
+ self, txn: LoggingTransaction, room_id: str, host: str, membership: str
+ ) -> bool:
if "%" in host or "_" in host:
raise Exception("Invalid host name")
@@ -980,14 +991,14 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
# the returned user actually has the correct domain.
like_clause = "%:" + host
- rows = await self.db_pool.execute(
- "is_host_joined", sql, membership, room_id, like_clause
- )
+ txn.execute(sql, (membership, room_id, like_clause))
+
+ row = txn.fetchone()
- if not rows:
+ if not row:
return False
- user_id = rows[0][0]
+ user_id = row[0]
if get_domain_from_id(user_id) != host:
# This can only happen if the host name has something funky in it
raise Exception("Invalid host name")
|