diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 07e873fce4..9f8f0a0823 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -19,7 +19,7 @@ from twisted.internet import defer, reactor
from synapse.events import FrozenEvent, USE_FROZEN_DICTS
from synapse.events.utils import prune_event
-from synapse.util.async import ObservableDeferred
+from synapse.util.async import ObservableDeferred, run_on_reactor
from synapse.util.logcontext import preserve_fn, PreserveLoggingContext
from synapse.util.logutils import log_function
from synapse.api.constants import EventTypes
@@ -89,12 +89,14 @@ class _EventPeristenceQueue(object):
return deferred.observe()
- def handle_queue(self, room_id, callback):
+ def handle_queue(self, room_id, per_item_callback):
"""Attempts to handle the queue for a room if not already being handled.
- The given callback will be invoked with a 'queue' arg, which is a
- generator over _EventPersistQueueItem's. The queue will finish if there
- are no longer any items in the room queue.
+ The given callback will be invoked with for each item in the queue,1
+ of type _EventPersistQueueItem. The per_item_callback will continuously
+ be called with new items, unless the queue becomnes empty. The return
+ value of the function will be given to the deferreds waiting on the item,
+ exceptions will be passed to the deferres as well.
This function should therefore be called whenever anything is added
to the queue.
@@ -108,15 +110,26 @@ class _EventPeristenceQueue(object):
self._currently_persisting_rooms.add(room_id)
- try:
- callback(self._get_drainining_queue(room_id))
- finally:
- self._currently_persisting_rooms.discard(room_id)
+ @defer.inlineCallbacks
+ def handle_queue_loop():
+ try:
+ queue = self._get_drainining_queue(room_id)
+ for item in queue:
+ try:
+ ret = yield per_item_callback(item)
+ item.deferred.callback(ret)
+ except Exception as e:
+ item.deferred.errback(e)
+ finally:
+ queue = self._event_persist_queues.pop(room_id, None)
+ if queue:
+ self._event_persist_queues[room_id] = queue
+ self._currently_persisting_rooms.discard(room_id)
+
+ preserve_fn(handle_queue_loop)()
def _get_drainining_queue(self, room_id):
- queue = self._event_persist_queues.pop(room_id, None)
- if not queue:
- return
+ queue = self._event_persist_queues.setdefault(room_id, deque())
try:
while True:
@@ -180,30 +193,22 @@ class EventsStore(SQLBaseStore):
def _maybe_start_persisting(self, room_id):
@defer.inlineCallbacks
- def persisting_queue(queue):
- for item in queue:
- try:
- ret = None
- if item.current_state:
- for event, context in item.events_and_contexts:
- # There should only ever be one item in
- # events_and_contexts when current_state is
- # not None
- yield self._persist_event(
- event, context,
- current_state=item.current_state,
- backfilled=item.backfilled,
- )
- else:
- yield self._persist_events(
- item.events_and_contexts,
- backfilled=item.backfilled,
- )
- logger.info("Resolving with ret: %r", ret)
- item.deferred.callback(ret)
- except Exception as e:
- logger.exception("Failed to persist events")
- item.deferred.errback(e)
+ def persisting_queue(item):
+ if item.current_state:
+ for event, context in item.events_and_contexts:
+ # There should only ever be one item in
+ # events_and_contexts when current_state is
+ # not None
+ yield self._persist_event(
+ event, context,
+ current_state=item.current_state,
+ backfilled=item.backfilled,
+ )
+ else:
+ yield self._persist_events(
+ item.events_and_contexts,
+ backfilled=item.backfilled,
+ )
self._event_persist_queue.handle_queue(room_id, persisting_queue)
|