diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index a86230d92c..3b3416716e 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -96,12 +96,16 @@ class EventsStore(SQLBaseStore):
# Remove the any existing cache entries for the event_id
self._get_event_cache.pop(event.event_id)
+ if stream_ordering is None:
+ stream_ordering = self._stream_id_gen.get_next_txn(txn)
+
# We purposefully do this first since if we include a `current_state`
# key, we *want* to update the `current_state_events` table
if current_state:
- txn.execute(
- "DELETE FROM current_state_events WHERE room_id = ?",
- (event.room_id,)
+ self._simple_delete_txn(
+ txn,
+ table="current_state_events",
+ keyvalues={"room_id": event.room_id},
)
for s in current_state:
@@ -114,7 +118,6 @@ class EventsStore(SQLBaseStore):
"type": s.type,
"state_key": s.state_key,
},
- or_replace=True,
)
if event.is_state() and is_new_state:
@@ -128,7 +131,6 @@ class EventsStore(SQLBaseStore):
"type": event.type,
"state_key": event.state_key,
},
- or_replace=True,
)
for prev_state_id, _ in event.prev_state:
@@ -151,14 +153,6 @@ class EventsStore(SQLBaseStore):
event.depth
)
- self._handle_prev_events(
- txn,
- outlier=outlier,
- event_id=event.event_id,
- prev_events=event.prev_events,
- room_id=event.room_id,
- )
-
have_persisted = self._simple_select_one_onecol_txn(
txn,
table="event_json",
@@ -185,7 +179,7 @@ class EventsStore(SQLBaseStore):
)
txn.execute(
sql,
- (metadata_json.decode("UTF-8"), event.event_id,)
+ (buffer(metadata_json), event.event_id,)
)
sql = (
@@ -198,10 +192,16 @@ class EventsStore(SQLBaseStore):
)
return
+ self._handle_prev_events(
+ txn,
+ outlier=outlier,
+ event_id=event.event_id,
+ prev_events=event.prev_events,
+ room_id=event.room_id,
+ )
+
if event.type == EventTypes.Member:
self._store_room_member_txn(txn, event)
- elif event.type == EventTypes.Feedback:
- self._store_feedback_txn(txn, event)
elif event.type == EventTypes.Name:
self._store_room_name_txn(txn, event)
elif event.type == EventTypes.Topic:
@@ -224,15 +224,14 @@ class EventsStore(SQLBaseStore):
values={
"event_id": event.event_id,
"room_id": event.room_id,
- "internal_metadata": metadata_json.decode("UTF-8"),
- "json": encode_canonical_json(event_dict).decode("UTF-8"),
+ "internal_metadata": buffer(metadata_json),
+ "json": buffer(encode_canonical_json(event_dict)),
},
- or_replace=True,
)
- content = encode_canonical_json(
+ content = buffer(encode_canonical_json(
event.content
- ).decode("UTF-8")
+ ))
vals = {
"topological_ordering": event.depth,
@@ -245,9 +244,6 @@ class EventsStore(SQLBaseStore):
"depth": event.depth,
}
- if stream_ordering is not None:
- vals["stream_ordering"] = stream_ordering
-
unrec = {
k: v
for k, v in event.get_dict().items()
@@ -260,25 +256,22 @@ class EventsStore(SQLBaseStore):
]
}
- vals["unrecognized_keys"] = encode_canonical_json(
+ vals["unrecognized_keys"] = buffer(encode_canonical_json(
unrec
- ).decode("UTF-8")
+ ))
- try:
- self._simple_insert_txn(
- txn,
- "events",
- vals,
- or_replace=(not outlier),
- or_ignore=bool(outlier),
- )
- except:
- logger.warn(
- "Failed to persist, probably duplicate: %s",
- event.event_id,
- exc_info=True,
- )
- raise _RollbackButIsFineException("_persist_event")
+ sql = (
+ "INSERT INTO events"
+ " (stream_ordering, topological_ordering, event_id, type,"
+ " room_id, content, processed, outlier, depth)"
+ " VALUES (%s,?,?,?,?,?,?,?,?)"
+ ) % (stream_ordering,)
+
+ txn.execute(
+ sql,
+ (event.depth, event.event_id, event.type, event.room_id,
+ content, True, outlier, event.depth)
+ )
if context.rejected:
self._store_rejections_txn(txn, event.event_id, context.rejected)
@@ -302,15 +295,17 @@ class EventsStore(SQLBaseStore):
)
if is_new_state and not context.rejected:
- self._simple_insert_txn(
+ self._simple_upsert_txn(
txn,
"current_state_events",
- {
- "event_id": event.event_id,
+ keyvalues={
"room_id": event.room_id,
"type": event.type,
"state_key": event.state_key,
},
+ values={
+ "event_id": event.event_id,
+ }
)
for e_id, h in event.prev_state:
|