diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 906a405031..e8e5a0fe44 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -34,6 +34,8 @@ from synapse.api.errors import SynapseError
from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.storage.background_updates import BackgroundUpdateStore
+from synapse.storage.event_federation import EventFederationStore
from synapse.storage.events_worker import EventsWorkerStore
from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.util.async import ObservableDeferred
@@ -65,7 +67,13 @@ state_delta_reuse_delta_counter = Counter(
def encode_json(json_object):
- return frozendict_json_encoder.encode(json_object)
+ """
+ Encode a Python object as JSON and return it in a Unicode string.
+ """
+ out = frozendict_json_encoder.encode(json_object)
+ if isinstance(out, bytes):
+ out = out.decode('utf8')
+ return out
class _EventPeristenceQueue(object):
@@ -142,15 +150,14 @@ class _EventPeristenceQueue(object):
try:
queue = self._get_drainining_queue(room_id)
for item in queue:
- # handle_queue_loop runs in the sentinel logcontext, so
- # there is no need to preserve_fn when running the
- # callbacks on the deferred.
try:
ret = yield per_item_callback(item)
+ except Exception:
+ with PreserveLoggingContext():
+ item.deferred.errback()
+ else:
with PreserveLoggingContext():
item.deferred.callback(ret)
- except Exception:
- item.deferred.errback()
finally:
queue = self._event_persist_queues.pop(room_id, None)
if queue:
@@ -194,7 +201,9 @@ def _retry_on_integrity_error(func):
return f
-class EventsStore(EventsWorkerStore):
+# inherits from EventFederationStore so that we can call _update_backward_extremities
+# and _handle_mult_prev_events (though arguably those could both be moved in here)
+class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore):
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
@@ -232,12 +241,18 @@ class EventsStore(EventsWorkerStore):
self._state_resolution_handler = hs.get_state_resolution_handler()
+ @defer.inlineCallbacks
def persist_events(self, events_and_contexts, backfilled=False):
"""
Write events to the database
Args:
events_and_contexts: list of tuples of (event, context)
- backfilled: ?
+ backfilled (bool): Whether the results are retrieved from federation
+ via backfill or not. Used to determine if they're "new" events
+ which might update the current state etc.
+
+ Returns:
+ Deferred[int]: the stream ordering of the latest persisted event
"""
partitioned = {}
for event, ctx in events_and_contexts:
@@ -254,10 +269,14 @@ class EventsStore(EventsWorkerStore):
for room_id in partitioned:
self._maybe_start_persisting(room_id)
- return make_deferred_yieldable(
+ yield make_deferred_yieldable(
defer.gatherResults(deferreds, consumeErrors=True)
)
+ max_persisted_id = yield self._stream_id_gen.get_current_token()
+
+ defer.returnValue(max_persisted_id)
+
@defer.inlineCallbacks
@log_function
def persist_event(self, event, context, backfilled=False):
@@ -521,7 +540,6 @@ class EventsStore(EventsWorkerStore):
iterable=list(new_latest_event_ids),
retcols=["prev_event_id"],
keyvalues={
- "room_id": room_id,
"is_state": False,
},
desc="_calculate_new_extremeties",
@@ -575,11 +593,13 @@ class EventsStore(EventsWorkerStore):
for ev, ctx in events_context:
if ctx.state_group is None:
- # I don't think this can happen, but let's double-check
- raise Exception(
- "Context for new extremity event %s has no state "
- "group" % (ev.event_id, ),
- )
+ # This should only happen for outlier events.
+ if not ev.internal_metadata.is_outlier():
+ raise Exception(
+ "Context for new event %s has no state "
+ "group" % (ev.event_id, ),
+ )
+ continue
if ctx.state_group in state_groups_map:
continue
@@ -607,7 +627,7 @@ class EventsStore(EventsWorkerStore):
for event_id in new_latest_event_ids:
# First search in the list of new events we're adding.
for ev, ctx in events_context:
- if event_id == ev.event_id:
+ if event_id == ev.event_id and ctx.state_group is not None:
event_id_to_state_group[event_id] = ctx.state_group
break
else:
@@ -1054,7 +1074,7 @@ class EventsStore(EventsWorkerStore):
metadata_json = encode_json(
event.internal_metadata.get_dict()
- ).decode("UTF-8")
+ )
sql = (
"UPDATE event_json SET internal_metadata = ?"
@@ -1137,7 +1157,7 @@ class EventsStore(EventsWorkerStore):
):
txn.executemany(
"DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,),
- [(ev.event_id,) for ev, _ in events_and_contexts]
+ [(ev.room_id, ev.event_id) for ev, _ in events_and_contexts]
)
def _store_event_txn(self, txn, events_and_contexts):
@@ -1168,8 +1188,8 @@ class EventsStore(EventsWorkerStore):
"room_id": event.room_id,
"internal_metadata": encode_json(
event.internal_metadata.get_dict()
- ).decode("UTF-8"),
- "json": encode_json(event_dict(event)).decode("UTF-8"),
+ ),
+ "json": encode_json(event_dict(event)),
}
for event, _ in events_and_contexts
],
@@ -1188,7 +1208,6 @@ class EventsStore(EventsWorkerStore):
"type": event.type,
"processed": True,
"outlier": event.internal_metadata.is_outlier(),
- "content": encode_json(event.content).decode("UTF-8"),
"origin_server_ts": int(event.origin_server_ts),
"received_ts": self._clock.time_msec(),
"sender": event.sender,
|