diff --git a/changelog.d/14161.bugfix b/changelog.d/14161.bugfix
new file mode 100644
index 0000000000..aed4d9e386
--- /dev/null
+++ b/changelog.d/14161.bugfix
@@ -0,0 +1 @@
+Fix a bug introduced in 1.30.0 where purging and rejoining a room without restarting in-between would result in a broken room.
\ No newline at end of file
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index d4104462b5..cfd4780add 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -1502,21 +1502,15 @@ class EventsWorkerStore(SQLBaseStore):
Returns:
a dict {event_id -> bool}
"""
- # if the event cache contains the event, obviously we've seen it.
-
- cache_results = {
- event_id
- for event_id in event_ids
- if await self._get_event_cache.contains((event_id,))
- }
- results = dict.fromkeys(cache_results, True)
- remaining = [
- event_id for event_id in event_ids if event_id not in cache_results
- ]
- if not remaining:
- return results
+ # TODO: We used to query the _get_event_cache here as a fast-path before
+ # hitting the database. For if an event were in the cache, we've presumably
+ # seen it before.
+ #
+ # But this is currently an invalid assumption due to the _get_event_cache
+ # not being invalidated when purging events from a room. The optimisation can
+ # be re-added after https://github.com/matrix-org/synapse/issues/13476
- def have_seen_events_txn(txn: LoggingTransaction) -> None:
+ def have_seen_events_txn(txn: LoggingTransaction) -> Dict[str, bool]:
# we deliberately do *not* query the database for room_id, to make the
# query an index-only lookup on `events_event_id_key`.
#
@@ -1524,16 +1518,17 @@ class EventsWorkerStore(SQLBaseStore):
sql = "SELECT event_id FROM events AS e WHERE "
clause, args = make_in_list_sql_clause(
- txn.database_engine, "e.event_id", remaining
+ txn.database_engine, "e.event_id", event_ids
)
txn.execute(sql + clause, args)
found_events = {eid for eid, in txn}
# ... and then we can update the results for each key
- results.update({eid: (eid in found_events) for eid in remaining})
+ return {eid: (eid in found_events) for eid in event_ids}
- await self.db_pool.runInteraction("have_seen_events", have_seen_events_txn)
- return results
+ return await self.db_pool.runInteraction(
+ "have_seen_events", have_seen_events_txn
+ )
@cached(max_entries=100000, tree=True)
async def have_seen_event(self, room_id: str, event_id: str) -> bool:
diff --git a/tests/storage/databases/main/test_events_worker.py b/tests/storage/databases/main/test_events_worker.py
index 32a798d74b..5773172ab8 100644
--- a/tests/storage/databases/main/test_events_worker.py
+++ b/tests/storage/databases/main/test_events_worker.py
@@ -90,18 +90,6 @@ class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
self.assertEqual(res, {self.event_ids[0]})
self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)
- def test_query_via_event_cache(self):
- # fetch an event into the event cache
- self.get_success(self.store.get_event(self.event_ids[0]))
-
- # looking it up should now cause no db hits
- with LoggingContext(name="test") as ctx:
- res = self.get_success(
- self.store.have_seen_events(self.room_id, [self.event_ids[0]])
- )
- self.assertEqual(res, {self.event_ids[0]})
- self.assertEqual(ctx.get_resource_usage().db_txn_count, 0)
-
def test_persisting_event_invalidates_cache(self):
"""
Test to make sure that the `have_seen_event` cache
|