diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 2425f57f5f..a3c260ddc4 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -52,7 +52,6 @@ class EventsStore(SQLBaseStore):
is_new_state=is_new_state,
current_state=current_state,
)
- self.get_room_events_max_id.invalidate()
except _RollbackButIsFineException:
pass
@@ -96,12 +95,22 @@ class EventsStore(SQLBaseStore):
# Remove the any existing cache entries for the event_id
self._invalidate_get_event_cache(event.event_id)
+ if stream_ordering is None:
+ with self._stream_id_gen.get_next_txn(txn) as stream_ordering:
+ return self._persist_event_txn(
+ txn, event, context, backfilled,
+ stream_ordering=stream_ordering,
+ is_new_state=is_new_state,
+ current_state=current_state,
+ )
+
# We purposefully do this first since if we include a `current_state`
# key, we *want* to update the `current_state_events` table
if current_state:
- txn.execute(
- "DELETE FROM current_state_events WHERE room_id = ?",
- (event.room_id,)
+ self._simple_delete_txn(
+ txn,
+ table="current_state_events",
+ keyvalues={"room_id": event.room_id},
)
for s in current_state:
@@ -114,7 +123,6 @@ class EventsStore(SQLBaseStore):
"type": s.type,
"state_key": s.state_key,
},
- or_replace=True,
)
if event.is_state() and is_new_state:
@@ -128,7 +136,6 @@ class EventsStore(SQLBaseStore):
"type": event.type,
"state_key": event.state_key,
},
- or_replace=True,
)
for prev_state_id, _ in event.prev_state:
@@ -151,14 +158,6 @@ class EventsStore(SQLBaseStore):
event.depth
)
- self._handle_prev_events(
- txn,
- outlier=outlier,
- event_id=event.event_id,
- prev_events=event.prev_events,
- room_id=event.room_id,
- )
-
have_persisted = self._simple_select_one_onecol_txn(
txn,
table="event_json",
@@ -169,7 +168,7 @@ class EventsStore(SQLBaseStore):
metadata_json = encode_canonical_json(
event.internal_metadata.get_dict()
- )
+ ).decode("UTF-8")
# If we have already persisted this event, we don't need to do any
# more processing.
@@ -185,23 +184,29 @@ class EventsStore(SQLBaseStore):
)
txn.execute(
sql,
- (metadata_json.decode("UTF-8"), event.event_id,)
+ (metadata_json, event.event_id,)
)
sql = (
- "UPDATE events SET outlier = 0"
+ "UPDATE events SET outlier = ?"
" WHERE event_id = ?"
)
txn.execute(
sql,
- (event.event_id,)
+ (False, event.event_id,)
)
return
+ self._handle_prev_events(
+ txn,
+ outlier=outlier,
+ event_id=event.event_id,
+ prev_events=event.prev_events,
+ room_id=event.room_id,
+ )
+
if event.type == EventTypes.Member:
self._store_room_member_txn(txn, event)
- elif event.type == EventTypes.Feedback:
- self._store_feedback_txn(txn, event)
elif event.type == EventTypes.Name:
self._store_room_name_txn(txn, event)
elif event.type == EventTypes.Topic:
@@ -224,10 +229,9 @@ class EventsStore(SQLBaseStore):
values={
"event_id": event.event_id,
"room_id": event.room_id,
- "internal_metadata": metadata_json.decode("UTF-8"),
+ "internal_metadata": metadata_json,
"json": encode_canonical_json(event_dict).decode("UTF-8"),
},
- or_replace=True,
)
content = encode_canonical_json(
@@ -245,9 +249,6 @@ class EventsStore(SQLBaseStore):
"depth": event.depth,
}
- if stream_ordering is not None:
- vals["stream_ordering"] = stream_ordering
-
unrec = {
k: v
for k, v in event.get_dict().items()
@@ -264,25 +265,53 @@ class EventsStore(SQLBaseStore):
unrec
).decode("UTF-8")
- try:
- self._simple_insert_txn(
- txn,
- "events",
- vals,
- or_replace=(not outlier),
- or_ignore=bool(outlier),
- )
- except:
- logger.warn(
- "Failed to persist, probably duplicate: %s",
- event.event_id,
- exc_info=True,
+ sql = (
+ "INSERT INTO events"
+ " (stream_ordering, topological_ordering, event_id, type,"
+ " room_id, content, processed, outlier, depth)"
+ " VALUES (?,?,?,?,?,?,?,?,?)"
+ )
+
+ txn.execute(
+ sql,
+ (
+ stream_ordering, event.depth, event.event_id, event.type,
+ event.room_id, content, True, outlier, event.depth
)
- raise _RollbackButIsFineException("_persist_event")
+ )
if context.rejected:
self._store_rejections_txn(txn, event.event_id, context.rejected)
+ for hash_alg, hash_base64 in event.hashes.items():
+ hash_bytes = decode_base64(hash_base64)
+ self._store_event_content_hash_txn(
+ txn, event.event_id, hash_alg, hash_bytes,
+ )
+
+ for prev_event_id, prev_hashes in event.prev_events:
+ for alg, hash_base64 in prev_hashes.items():
+ hash_bytes = decode_base64(hash_base64)
+ self._store_prev_event_hash_txn(
+ txn, event.event_id, prev_event_id, alg, hash_bytes
+ )
+
+ for auth_id, _ in event.auth_events:
+ self._simple_insert_txn(
+ txn,
+ table="event_auth",
+ values={
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "auth_id": auth_id,
+ },
+ )
+
+ (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event)
+ self._store_event_reference_hash_txn(
+ txn, event.event_id, ref_alg, ref_hash_bytes
+ )
+
if event.is_state():
vals = {
"event_id": event.event_id,
@@ -301,18 +330,6 @@ class EventsStore(SQLBaseStore):
vals,
)
- if is_new_state and not context.rejected:
- self._simple_insert_txn(
- txn,
- "current_state_events",
- {
- "event_id": event.event_id,
- "room_id": event.room_id,
- "type": event.type,
- "state_key": event.state_key,
- },
- )
-
for e_id, h in event.prev_state:
self._simple_insert_txn(
txn,
@@ -321,39 +338,24 @@ class EventsStore(SQLBaseStore):
"event_id": event.event_id,
"prev_event_id": e_id,
"room_id": event.room_id,
- "is_state": 1,
+ "is_state": True,
},
)
- for hash_alg, hash_base64 in event.hashes.items():
- hash_bytes = decode_base64(hash_base64)
- self._store_event_content_hash_txn(
- txn, event.event_id, hash_alg, hash_bytes,
- )
-
- for prev_event_id, prev_hashes in event.prev_events:
- for alg, hash_base64 in prev_hashes.items():
- hash_bytes = decode_base64(hash_base64)
- self._store_prev_event_hash_txn(
- txn, event.event_id, prev_event_id, alg, hash_bytes
+ if is_new_state and not context.rejected:
+ self._simple_upsert_txn(
+ txn,
+ "current_state_events",
+ keyvalues={
+ "room_id": event.room_id,
+ "type": event.type,
+ "state_key": event.state_key,
+ },
+ values={
+ "event_id": event.event_id,
+ }
)
- for auth_id, _ in event.auth_events:
- self._simple_insert_txn(
- txn,
- table="event_auth",
- values={
- "event_id": event.event_id,
- "room_id": event.room_id,
- "auth_id": auth_id,
- },
- )
-
- (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event)
- self._store_event_reference_hash_txn(
- txn, event.event_id, ref_alg, ref_hash_bytes
- )
-
def _store_redaction(self, txn, event):
# invalidate the cache for the redacted event
self._invalidate_get_event_cache(event.redacts)
|