diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 906a405031..2f482af3a1 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -142,15 +142,14 @@ class _EventPeristenceQueue(object):
try:
queue = self._get_drainining_queue(room_id)
for item in queue:
- # handle_queue_loop runs in the sentinel logcontext, so
- # there is no need to preserve_fn when running the
- # callbacks on the deferred.
try:
ret = yield per_item_callback(item)
+ except Exception:
+ with PreserveLoggingContext():
+ item.deferred.errback()
+ else:
with PreserveLoggingContext():
item.deferred.callback(ret)
- except Exception:
- item.deferred.errback()
finally:
queue = self._event_persist_queues.pop(room_id, None)
if queue:
@@ -521,7 +520,6 @@ class EventsStore(EventsWorkerStore):
iterable=list(new_latest_event_ids),
retcols=["prev_event_id"],
keyvalues={
- "room_id": room_id,
"is_state": False,
},
desc="_calculate_new_extremeties",
@@ -575,11 +573,13 @@ class EventsStore(EventsWorkerStore):
for ev, ctx in events_context:
if ctx.state_group is None:
- # I don't think this can happen, but let's double-check
- raise Exception(
- "Context for new extremity event %s has no state "
- "group" % (ev.event_id, ),
- )
+ # This should only happen for outlier events.
+ if not ev.internal_metadata.is_outlier():
+ raise Exception(
+ "Context for new event %s has no state "
+ "group" % (ev.event_id, ),
+ )
+ continue
if ctx.state_group in state_groups_map:
continue
@@ -607,7 +607,7 @@ class EventsStore(EventsWorkerStore):
for event_id in new_latest_event_ids:
# First search in the list of new events we're adding.
for ev, ctx in events_context:
- if event_id == ev.event_id:
+ if event_id == ev.event_id and ctx.state_group is not None:
event_id_to_state_group[event_id] = ctx.state_group
break
else:
@@ -1137,7 +1137,7 @@ class EventsStore(EventsWorkerStore):
):
txn.executemany(
"DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,),
- [(ev.event_id,) for ev, _ in events_and_contexts]
+ [(ev.room_id, ev.event_id) for ev, _ in events_and_contexts]
)
def _store_event_txn(self, txn, events_and_contexts):
@@ -1188,7 +1188,6 @@ class EventsStore(EventsWorkerStore):
"type": event.type,
"processed": True,
"outlier": event.internal_metadata.is_outlier(),
- "content": encode_json(event.content).decode("UTF-8"),
"origin_server_ts": int(event.origin_server_ts),
"received_ts": self._clock.time_msec(),
"sender": event.sender,
|