diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 38395c66ab..a5a6869079 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -23,6 +23,7 @@ from synapse.crypto.event_signing import compute_event_reference_hash
from syutil.base64util import decode_base64
from syutil.jsonutil import encode_canonical_json
+from contextlib import contextmanager
import logging
@@ -41,17 +42,25 @@ class EventsStore(SQLBaseStore):
self.min_token -= 1
stream_ordering = self.min_token
+ if stream_ordering is None:
+ stream_ordering_manager = yield self._stream_id_gen.get_next(self)
+ else:
+ @contextmanager
+ def stream_ordering_manager():
+ yield stream_ordering
+
try:
- yield self.runInteraction(
- "persist_event",
- self._persist_event_txn,
- event=event,
- context=context,
- backfilled=backfilled,
- stream_ordering=stream_ordering,
- is_new_state=is_new_state,
- current_state=current_state,
- )
+ with stream_ordering_manager as stream_ordering:
+ yield self.runInteraction(
+ "persist_event",
+ self._persist_event_txn,
+ event=event,
+ context=context,
+ backfilled=backfilled,
+ stream_ordering=stream_ordering,
+ is_new_state=is_new_state,
+ current_state=current_state,
+ )
except _RollbackButIsFineException:
pass
@@ -95,15 +104,6 @@ class EventsStore(SQLBaseStore):
# Remove the any existing cache entries for the event_id
txn.call_after(self._invalidate_get_event_cache, event.event_id)
- if stream_ordering is None:
- with self._stream_id_gen.get_next_txn(txn) as stream_ordering:
- return self._persist_event_txn(
- txn, event, context, backfilled,
- stream_ordering=stream_ordering,
- is_new_state=is_new_state,
- current_state=current_state,
- )
-
# We purposefully do this first since if we include a `current_state`
# key, we *want* to update the `current_state_events` table
if current_state:
@@ -135,19 +135,17 @@ class EventsStore(SQLBaseStore):
outlier = event.internal_metadata.is_outlier()
if not outlier:
- self._store_state_groups_txn(txn, event, context)
-
self._update_min_depth_for_room_txn(
txn,
event.room_id,
event.depth
)
- have_persisted = self._simple_select_one_onecol_txn(
+ have_persisted = self._simple_select_one_txn(
txn,
- table="event_json",
+ table="events",
keyvalues={"event_id": event.event_id},
- retcol="event_id",
+ retcols=["event_id", "outlier"],
allow_none=True,
)
@@ -162,7 +160,9 @@ class EventsStore(SQLBaseStore):
# if we are persisting an event that we had persisted as an outlier,
# but is no longer one.
if have_persisted:
- if not outlier:
+ if not outlier and have_persisted["outlier"]:
+ self._store_state_groups_txn(txn, event, context)
+
sql = (
"UPDATE event_json SET internal_metadata = ?"
" WHERE event_id = ?"
@@ -182,6 +182,9 @@ class EventsStore(SQLBaseStore):
)
return
+ if not outlier:
+ self._store_state_groups_txn(txn, event, context)
+
self._handle_prev_events(
txn,
outlier=outlier,
|