summary refs log tree commit diff
path: root/synapse/storage/databases/main
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main')
-rw-r--r--synapse/storage/databases/main/events.py15
-rw-r--r--synapse/storage/databases/main/purge_events.py8
2 files changed, 21 insertions, 2 deletions
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 2a1e567ce0..9a6c2fd47a 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -47,6 +47,7 @@ from synapse.storage.database import (
 )
 from synapse.storage.databases.main.events_worker import EventCacheEntry
 from synapse.storage.databases.main.search import SearchEntry
+from synapse.storage.engines.postgres import PostgresEngine
 from synapse.storage.util.id_generators import AbstractStreamIdGenerator
 from synapse.storage.util.sequence import SequenceGenerator
 from synapse.types import StateMap, get_domain_from_id
@@ -364,6 +365,20 @@ class PersistEventsStore:
         min_stream_order = events_and_contexts[0][0].internal_metadata.stream_ordering
         max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
 
+        # We check that the room still exists for events we're trying to
+        # persist. This is to protect against races with deleting a room.
+        #
+        # Annoyingly SQLite doesn't support row level locking.
+        if isinstance(self.database_engine, PostgresEngine):
+            for room_id in {e.room_id for e, _ in events_and_contexts}:
+                txn.execute(
+                    "SELECT room_version FROM rooms WHERE room_id = ? FOR SHARE",
+                    (room_id,),
+                )
+                row = txn.fetchone()
+                if row is None:
+                    raise Exception(f"Room does not exist {room_id}")
+
         # stream orderings should have been assigned by now
         assert min_stream_order
         assert max_stream_order
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index 2e3818e432..bfc85b3add 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -324,7 +324,12 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
         )
 
     def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[int]:
-        # First we fetch all the state groups that should be deleted, before
+        # We *immediately* delete the room from the rooms table. This ensures
+        # that we don't race when persisting events (as that transaction checks
+        # that the room exists).
+        txn.execute("DELETE FROM rooms WHERE room_id = ?", (room_id,))
+
+        # Next, we fetch all the state groups that should be deleted, before
         # we delete that information.
         txn.execute(
             """
@@ -403,7 +408,6 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
             "room_stats_state",
             "room_stats_current",
             "room_stats_earliest_token",
-            "rooms",
             "stream_ordering_to_exterm",
             "users_in_public_rooms",
             "users_who_share_private_rooms",