summary refs log tree commit diff
path: root/synapse/storage/controllers/persist_events.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/controllers/persist_events.py')
-rw-r--r--synapse/storage/controllers/persist_events.py27
1 files changed, 18 insertions, 9 deletions
diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py
index 35c0680365..35cd1089d6 100644
--- a/synapse/storage/controllers/persist_events.py
+++ b/synapse/storage/controllers/persist_events.py
@@ -45,6 +45,7 @@ from twisted.internet import defer
 from synapse.api.constants import EventTypes, Membership
 from synapse.events import EventBase
 from synapse.events.snapshot import EventContext
+from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME
 from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
 from synapse.logging.opentracing import (
     SynapseTags,
@@ -338,6 +339,7 @@ class EventsPersistenceStorageController:
         )
         self._state_resolution_handler = hs.get_state_resolution_handler()
         self._state_controller = state_controller
+        self.hs = hs
 
     async def _process_event_persist_queue_task(
         self,
@@ -350,15 +352,22 @@ class EventsPersistenceStorageController:
             A dictionary of event ID to event ID we didn't persist as we already
             had another event persisted with the same TXN ID.
         """
-        if isinstance(task, _PersistEventsTask):
-            return await self._persist_event_batch(room_id, task)
-        elif isinstance(task, _UpdateCurrentStateTask):
-            await self._update_current_state(room_id, task)
-            return {}
-        else:
-            raise AssertionError(
-                f"Found an unexpected task type in event persistence queue: {task}"
-            )
+
+        # Ensure that the room can't be deleted while we're persisting events to
+        # it. We might already have taken out the lock, but since this is just a
+        # "read" lock its inherently reentrant.
+        async with self.hs.get_worker_locks_handler().acquire_read_write_lock(
+            DELETE_ROOM_LOCK_NAME, room_id, write=False
+        ):
+            if isinstance(task, _PersistEventsTask):
+                return await self._persist_event_batch(room_id, task)
+            elif isinstance(task, _UpdateCurrentStateTask):
+                await self._update_current_state(room_id, task)
+                return {}
+            else:
+                raise AssertionError(
+                    f"Found an unexpected task type in event persistence queue: {task}"
+                )
 
     @trace
     async def persist_events(