diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 4f0b43c36d..637640ec2a 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -21,7 +21,7 @@ from synapse.events.utils import prune_event
from synapse.util.async import ObservableDeferred
from synapse.util.logcontext import (
- preserve_fn, PreserveLoggingContext, preserve_context_over_deferred
+ preserve_fn, PreserveLoggingContext, make_deferred_yieldable
)
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
@@ -88,13 +88,23 @@ class _EventPeristenceQueue(object):
def add_to_queue(self, room_id, events_and_contexts, backfilled):
"""Add events to the queue, with the given persist_event options.
+ NB: due to the normal usage pattern of this method, it does *not*
+ follow the synapse logcontext rules, and leaves the logcontext in
+ place whether or not the returned deferred is ready.
+
Args:
room_id (str):
events_and_contexts (list[(EventBase, EventContext)]):
backfilled (bool):
+
+ Returns:
+ defer.Deferred: a deferred which will resolve once the events are
+ persisted. Runs its callbacks *without* a logcontext.
"""
queue = self._event_persist_queues.setdefault(room_id, deque())
if queue:
+ # if the last item in the queue has the same `backfilled` setting,
+ # we can just add these new events to that item.
end_item = queue[-1]
if end_item.backfilled == backfilled:
end_item.events_and_contexts.extend(events_and_contexts)
@@ -113,11 +123,11 @@ class _EventPeristenceQueue(object):
def handle_queue(self, room_id, per_item_callback):
"""Attempts to handle the queue for a room if not already being handled.
- The given callback will be invoked with for each item in the queue,1
+ The given callback will be invoked with for each item in the queue,
of type _EventPersistQueueItem. The per_item_callback will continuously
be called with new items, unless the queue becomnes empty. The return
value of the function will be given to the deferreds waiting on the item,
- exceptions will be passed to the deferres as well.
+ exceptions will be passed to the deferreds as well.
This function should therefore be called whenever anything is added
to the queue.
@@ -233,7 +243,7 @@ class EventsStore(SQLBaseStore):
deferreds = []
for room_id, evs_ctxs in partitioned.iteritems():
- d = preserve_fn(self._event_persist_queue.add_to_queue)(
+ d = self._event_persist_queue.add_to_queue(
room_id, evs_ctxs,
backfilled=backfilled,
)
@@ -242,7 +252,7 @@ class EventsStore(SQLBaseStore):
for room_id in partitioned:
self._maybe_start_persisting(room_id)
- return preserve_context_over_deferred(
+ return make_deferred_yieldable(
defer.gatherResults(deferreds, consumeErrors=True)
)
@@ -267,7 +277,7 @@ class EventsStore(SQLBaseStore):
self._maybe_start_persisting(event.room_id)
- yield preserve_context_over_deferred(deferred)
+ yield make_deferred_yieldable(deferred)
max_persisted_id = yield self._stream_id_gen.get_current_token()
defer.returnValue((event.internal_metadata.stream_ordering, max_persisted_id))
@@ -1526,7 +1536,7 @@ class EventsStore(SQLBaseStore):
if not allow_rejected:
rows[:] = [r for r in rows if not r["rejects"]]
- res = yield preserve_context_over_deferred(defer.gatherResults(
+ res = yield make_deferred_yieldable(defer.gatherResults(
[
preserve_fn(self._get_event_from_row)(
row["internal_metadata"], row["json"], row["redacts"],
|