summary refs log tree commit diff
path: root/synapse/storage/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r--synapse/storage/events.py34
1 files changed, 27 insertions, 7 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 2f482af3a1..e8e5a0fe44 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -34,6 +34,8 @@ from synapse.api.errors import SynapseError
 from synapse.events import EventBase  # noqa: F401
 from synapse.events.snapshot import EventContext  # noqa: F401
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.storage.background_updates import BackgroundUpdateStore
+from synapse.storage.event_federation import EventFederationStore
 from synapse.storage.events_worker import EventsWorkerStore
 from synapse.types import RoomStreamToken, get_domain_from_id
 from synapse.util.async import ObservableDeferred
@@ -65,7 +67,13 @@ state_delta_reuse_delta_counter = Counter(
 
 
 def encode_json(json_object):
-    return frozendict_json_encoder.encode(json_object)
+    """
+    Encode a Python object as JSON and return it in a Unicode string.
+    """
+    out = frozendict_json_encoder.encode(json_object)
+    if isinstance(out, bytes):
+        out = out.decode('utf8')
+    return out
 
 
 class _EventPeristenceQueue(object):
@@ -193,7 +201,9 @@ def _retry_on_integrity_error(func):
     return f
 
 
-class EventsStore(EventsWorkerStore):
+# inherits from EventFederationStore so that we can call _update_backward_extremities
+# and _handle_mult_prev_events (though arguably those could both be moved in here)
+class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore):
     EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
     EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
 
@@ -231,12 +241,18 @@ class EventsStore(EventsWorkerStore):
 
         self._state_resolution_handler = hs.get_state_resolution_handler()
 
+    @defer.inlineCallbacks
     def persist_events(self, events_and_contexts, backfilled=False):
         """
         Write events to the database
         Args:
             events_and_contexts: list of tuples of (event, context)
-            backfilled: ?
+            backfilled (bool): Whether the results are retrieved from federation
+                via backfill or not. Used to determine if they're "new" events
+                which might update the current state etc.
+
+        Returns:
+            Deferred[int]: the stream ordering of the latest persisted event
         """
         partitioned = {}
         for event, ctx in events_and_contexts:
@@ -253,10 +269,14 @@ class EventsStore(EventsWorkerStore):
         for room_id in partitioned:
             self._maybe_start_persisting(room_id)
 
-        return make_deferred_yieldable(
+        yield make_deferred_yieldable(
             defer.gatherResults(deferreds, consumeErrors=True)
         )
 
+        max_persisted_id = yield self._stream_id_gen.get_current_token()
+
+        defer.returnValue(max_persisted_id)
+
     @defer.inlineCallbacks
     @log_function
     def persist_event(self, event, context, backfilled=False):
@@ -1054,7 +1074,7 @@ class EventsStore(EventsWorkerStore):
 
                 metadata_json = encode_json(
                     event.internal_metadata.get_dict()
-                ).decode("UTF-8")
+                )
 
                 sql = (
                     "UPDATE event_json SET internal_metadata = ?"
@@ -1168,8 +1188,8 @@ class EventsStore(EventsWorkerStore):
                     "room_id": event.room_id,
                     "internal_metadata": encode_json(
                         event.internal_metadata.get_dict()
-                    ).decode("UTF-8"),
-                    "json": encode_json(event_dict(event)).decode("UTF-8"),
+                    ),
+                    "json": encode_json(event_dict(event)),
                 }
                 for event, _ in events_and_contexts
             ],