summary refs log tree commit diff
path: root/synapse/storage/events.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2017-10-23 13:13:53 +0100
committerErik Johnston <erik@matrix.org>2017-10-23 13:13:53 +0100
commitffba97807746f6f0d7a8820c0d980f188dcad08c (patch)
tree36301e98cdc6400afe71b89fc4b0c8beccd0deea /synapse/storage/events.py
parentMerge pull request #2540 from 4nd3r/patch-1 (diff)
parentBump version and changelog (diff)
downloadsynapse-ffba97807746f6f0d7a8820c0d980f188dcad08c.tar.xz
Merge branch 'release-v0.24.0' of github.com:matrix-org/synapse v0.24.0
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r--synapse/storage/events.py27
1 files changed, 20 insertions, 7 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 7002b3752e..637640ec2a 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -21,7 +21,7 @@ from synapse.events.utils import prune_event
 
 from synapse.util.async import ObservableDeferred
 from synapse.util.logcontext import (
-    preserve_fn, PreserveLoggingContext, preserve_context_over_deferred
+    preserve_fn, PreserveLoggingContext, make_deferred_yieldable
 )
 from synapse.util.logutils import log_function
 from synapse.util.metrics import Measure
@@ -88,13 +88,23 @@ class _EventPeristenceQueue(object):
     def add_to_queue(self, room_id, events_and_contexts, backfilled):
         """Add events to the queue, with the given persist_event options.
 
+        NB: due to the normal usage pattern of this method, it does *not*
+        follow the synapse logcontext rules, and leaves the logcontext in
+        place whether or not the returned deferred is ready.
+
         Args:
             room_id (str):
             events_and_contexts (list[(EventBase, EventContext)]):
             backfilled (bool):
+
+        Returns:
+            defer.Deferred: a deferred which will resolve once the events are
+                persisted. Runs its callbacks *without* a logcontext.
         """
         queue = self._event_persist_queues.setdefault(room_id, deque())
         if queue:
+            # if the last item in the queue has the same `backfilled` setting,
+            # we can just add these new events to that item.
             end_item = queue[-1]
             if end_item.backfilled == backfilled:
                 end_item.events_and_contexts.extend(events_and_contexts)
@@ -113,11 +123,11 @@ class _EventPeristenceQueue(object):
     def handle_queue(self, room_id, per_item_callback):
         """Attempts to handle the queue for a room if not already being handled.
 
-        The given callback will be invoked with for each item in the queue,1
+        The given callback will be invoked with for each item in the queue,
         of type _EventPersistQueueItem. The per_item_callback will continuously
         be called with new items, unless the queue becomnes empty. The return
         value of the function will be given to the deferreds waiting on the item,
-        exceptions will be passed to the deferres as well.
+        exceptions will be passed to the deferreds as well.
 
         This function should therefore be called whenever anything is added
         to the queue.
@@ -233,7 +243,7 @@ class EventsStore(SQLBaseStore):
 
         deferreds = []
         for room_id, evs_ctxs in partitioned.iteritems():
-            d = preserve_fn(self._event_persist_queue.add_to_queue)(
+            d = self._event_persist_queue.add_to_queue(
                 room_id, evs_ctxs,
                 backfilled=backfilled,
             )
@@ -242,7 +252,7 @@ class EventsStore(SQLBaseStore):
         for room_id in partitioned:
             self._maybe_start_persisting(room_id)
 
-        return preserve_context_over_deferred(
+        return make_deferred_yieldable(
             defer.gatherResults(deferreds, consumeErrors=True)
         )
 
@@ -267,7 +277,7 @@ class EventsStore(SQLBaseStore):
 
         self._maybe_start_persisting(event.room_id)
 
-        yield preserve_context_over_deferred(deferred)
+        yield make_deferred_yieldable(deferred)
 
         max_persisted_id = yield self._stream_id_gen.get_current_token()
         defer.returnValue((event.internal_metadata.stream_ordering, max_persisted_id))
@@ -784,6 +794,9 @@ class EventsStore(SQLBaseStore):
                     self._invalidate_cache_and_stream(
                         txn, self.is_host_joined, (room_id, host)
                     )
+                    self._invalidate_cache_and_stream(
+                        txn, self.was_host_joined, (room_id, host)
+                    )
 
                 self._invalidate_cache_and_stream(
                     txn, self.get_users_in_room, (room_id,)
@@ -1523,7 +1536,7 @@ class EventsStore(SQLBaseStore):
         if not allow_rejected:
             rows[:] = [r for r in rows if not r["rejects"]]
 
-        res = yield preserve_context_over_deferred(defer.gatherResults(
+        res = yield make_deferred_yieldable(defer.gatherResults(
             [
                 preserve_fn(self._get_event_from_row)(
                     row["internal_metadata"], row["json"], row["redacts"],