summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-07-25 16:00:38 +0100
committerErik Johnston <erik@matrix.org>2018-07-25 16:22:56 +0100
commit78a691d005b925b6211a3d090add34c6efb1c0f4 (patch)
tree56d8c0ce2d5317e563bd9d04a184e814222dd57b /synapse/storage
parentMerge pull request #3603 from matrix-org/erikj/handle_outliers (diff)
downloadsynapse-78a691d005b925b6211a3d090add34c6efb1c0f4.tar.xz
Split out DB writes in federation handler
This will allow us to easily add an internal replication API to proxy
these reqeusts to master, so that we can move federation APIs to
workers.
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/events.py14
1 files changed, 12 insertions, 2 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 200f5ec95f..e3910ed282 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -231,12 +231,18 @@ class EventsStore(EventsWorkerStore):
 
         self._state_resolution_handler = hs.get_state_resolution_handler()
 
+    @defer.inlineCallbacks
     def persist_events(self, events_and_contexts, backfilled=False):
         """
         Write events to the database
         Args:
             events_and_contexts: list of tuples of (event, context)
-            backfilled: ?
+            backfilled (bool): Whether the results are retrieved from federation
+                via backfill or not. Used to determine if they're "new" events
+                which might update the current state etc.
+
+        Returns:
+            Deferred[int]: he stream ordering of the latest persisted event
         """
         partitioned = {}
         for event, ctx in events_and_contexts:
@@ -253,10 +259,14 @@ class EventsStore(EventsWorkerStore):
         for room_id in partitioned:
             self._maybe_start_persisting(room_id)
 
-        return make_deferred_yieldable(
+        yield make_deferred_yieldable(
             defer.gatherResults(deferreds, consumeErrors=True)
         )
 
+        max_persisted_id = yield self._stream_id_gen.get_current_token()
+
+        defer.returnValue(max_persisted_id)
+
     @defer.inlineCallbacks
     @log_function
     def persist_event(self, event, context, backfilled=False):