summary refs log tree commit diff
path: root/synapse/storage/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r--synapse/storage/events.py77
1 files changed, 41 insertions, 36 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 07e873fce4..9f8f0a0823 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -19,7 +19,7 @@ from twisted.internet import defer, reactor
 from synapse.events import FrozenEvent, USE_FROZEN_DICTS
 from synapse.events.utils import prune_event
 
-from synapse.util.async import ObservableDeferred
+from synapse.util.async import ObservableDeferred, run_on_reactor
 from synapse.util.logcontext import preserve_fn, PreserveLoggingContext
 from synapse.util.logutils import log_function
 from synapse.api.constants import EventTypes
@@ -89,12 +89,14 @@ class _EventPeristenceQueue(object):
 
         return deferred.observe()
 
-    def handle_queue(self, room_id, callback):
+    def handle_queue(self, room_id, per_item_callback):
         """Attempts to handle the queue for a room if not already being handled.
 
-        The given callback will be invoked with a 'queue' arg, which is a
-        generator over _EventPersistQueueItem's. The queue will finish if there
-        are no longer any items in the room queue.
+        The given callback will be invoked with for each item in the queue,1
+        of type _EventPersistQueueItem. The per_item_callback will continuously
+        be called with new items, unless the queue becomnes empty. The return
+        value of the function will be given to the deferreds waiting on the item,
+        exceptions will be passed to the deferres as well.
 
         This function should therefore be called whenever anything is added
         to the queue.
@@ -108,15 +110,26 @@ class _EventPeristenceQueue(object):
 
         self._currently_persisting_rooms.add(room_id)
 
-        try:
-            callback(self._get_drainining_queue(room_id))
-        finally:
-            self._currently_persisting_rooms.discard(room_id)
+        @defer.inlineCallbacks
+        def handle_queue_loop():
+            try:
+                queue = self._get_drainining_queue(room_id)
+                for item in queue:
+                    try:
+                        ret = yield per_item_callback(item)
+                        item.deferred.callback(ret)
+                    except Exception as e:
+                        item.deferred.errback(e)
+            finally:
+                queue = self._event_persist_queues.pop(room_id, None)
+                if queue:
+                    self._event_persist_queues[room_id] = queue
+                self._currently_persisting_rooms.discard(room_id)
+
+        preserve_fn(handle_queue_loop)()
 
     def _get_drainining_queue(self, room_id):
-        queue = self._event_persist_queues.pop(room_id, None)
-        if not queue:
-            return
+        queue = self._event_persist_queues.setdefault(room_id, deque())
 
         try:
             while True:
@@ -180,30 +193,22 @@ class EventsStore(SQLBaseStore):
 
     def _maybe_start_persisting(self, room_id):
         @defer.inlineCallbacks
-        def persisting_queue(queue):
-            for item in queue:
-                try:
-                    ret = None
-                    if item.current_state:
-                        for event, context in item.events_and_contexts:
-                            # There should only ever be one item in
-                            # events_and_contexts when current_state is
-                            # not None
-                            yield self._persist_event(
-                                event, context,
-                                current_state=item.current_state,
-                                backfilled=item.backfilled,
-                            )
-                    else:
-                        yield self._persist_events(
-                            item.events_and_contexts,
-                            backfilled=item.backfilled,
-                        )
-                    logger.info("Resolving with ret: %r", ret)
-                    item.deferred.callback(ret)
-                except Exception as e:
-                    logger.exception("Failed to persist events")
-                    item.deferred.errback(e)
+        def persisting_queue(item):
+            if item.current_state:
+                for event, context in item.events_and_contexts:
+                    # There should only ever be one item in
+                    # events_and_contexts when current_state is
+                    # not None
+                    yield self._persist_event(
+                        event, context,
+                        current_state=item.current_state,
+                        backfilled=item.backfilled,
+                    )
+            else:
+                yield self._persist_events(
+                    item.events_and_contexts,
+                    backfilled=item.backfilled,
+                )
 
         self._event_persist_queue.handle_queue(room_id, persisting_queue)