diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 2a1e567ce0..9a6c2fd47a 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -47,6 +47,7 @@ from synapse.storage.database import (
)
from synapse.storage.databases.main.events_worker import EventCacheEntry
from synapse.storage.databases.main.search import SearchEntry
+from synapse.storage.engines.postgres import PostgresEngine
from synapse.storage.util.id_generators import AbstractStreamIdGenerator
from synapse.storage.util.sequence import SequenceGenerator
from synapse.types import StateMap, get_domain_from_id
@@ -364,6 +365,20 @@ class PersistEventsStore:
min_stream_order = events_and_contexts[0][0].internal_metadata.stream_ordering
max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
+ # We check that the room still exists for events we're trying to
+ # persist. This is to protect against races with deleting a room.
+ #
+ # Annoyingly SQLite doesn't support row level locking.
+ if isinstance(self.database_engine, PostgresEngine):
+ for room_id in {e.room_id for e, _ in events_and_contexts}:
+ txn.execute(
+ "SELECT room_version FROM rooms WHERE room_id = ? FOR SHARE",
+ (room_id,),
+ )
+ row = txn.fetchone()
+ if row is None:
+ raise Exception(f"Room does not exist {room_id}")
+
# stream orderings should have been assigned by now
assert min_stream_order
assert max_stream_order
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index 2e3818e432..bfc85b3add 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -324,7 +324,12 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
)
def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[int]:
- # First we fetch all the state groups that should be deleted, before
+ # We *immediately* delete the room from the rooms table. This ensures
+ # that we don't race when persisting events (as that transaction checks
+ # that the room exists).
+ txn.execute("DELETE FROM rooms WHERE room_id = ?", (room_id,))
+
+ # Next, we fetch all the state groups that should be deleted, before
# we delete that information.
txn.execute(
"""
@@ -403,7 +408,6 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
"room_stats_state",
"room_stats_current",
"room_stats_earliest_token",
- "rooms",
"stream_ordering_to_exterm",
"users_in_public_rooms",
"users_who_share_private_rooms",
|