diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 18def01f50..b19c424ba9 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -52,16 +52,6 @@ event_counter = Counter(
)
-def encode_json(json_object):
- """
- Encode a Python object as JSON and return it in a Unicode string.
- """
- out = frozendict_json_encoder.encode(json_object)
- if isinstance(out, bytes):
- out = out.decode("utf8")
- return out
-
-
_EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))
@@ -341,6 +331,10 @@ class PersistEventsStore:
min_stream_order = events_and_contexts[0][0].internal_metadata.stream_ordering
max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
+ # stream orderings should have been assigned by now
+ assert min_stream_order
+ assert max_stream_order
+
self._update_forward_extremities_txn(
txn,
new_forward_extremities=new_forward_extremeties,
@@ -432,12 +426,12 @@ class PersistEventsStore:
# so that async background tasks get told what happened.
sql = """
INSERT INTO current_state_delta_stream
- (stream_id, room_id, type, state_key, event_id, prev_event_id)
- SELECT ?, room_id, type, state_key, null, event_id
+ (stream_id, instance_name, room_id, type, state_key, event_id, prev_event_id)
+ SELECT ?, ?, room_id, type, state_key, null, event_id
FROM current_state_events
WHERE room_id = ?
"""
- txn.execute(sql, (stream_id, room_id))
+ txn.execute(sql, (stream_id, self._instance_name, room_id))
self.db_pool.simple_delete_txn(
txn, table="current_state_events", keyvalues={"room_id": room_id},
@@ -458,8 +452,8 @@ class PersistEventsStore:
#
sql = """
INSERT INTO current_state_delta_stream
- (stream_id, room_id, type, state_key, event_id, prev_event_id)
- SELECT ?, ?, ?, ?, ?, (
+ (stream_id, instance_name, room_id, type, state_key, event_id, prev_event_id)
+ SELECT ?, ?, ?, ?, ?, ?, (
SELECT event_id FROM current_state_events
WHERE room_id = ? AND type = ? AND state_key = ?
)
@@ -469,6 +463,7 @@ class PersistEventsStore:
(
(
stream_id,
+ self._instance_name,
room_id,
etype,
state_key,
@@ -743,7 +738,9 @@ class PersistEventsStore:
logger.exception("")
raise
- metadata_json = encode_json(event.internal_metadata.get_dict())
+ metadata_json = frozendict_json_encoder.encode(
+ event.internal_metadata.get_dict()
+ )
sql = "UPDATE event_json SET internal_metadata = ? WHERE event_id = ?"
txn.execute(sql, (metadata_json, event.event_id))
@@ -759,6 +756,7 @@ class PersistEventsStore:
"event_stream_ordering": stream_order,
"event_id": event.event_id,
"state_group": state_group_id,
+ "instance_name": self._instance_name,
},
)
@@ -797,10 +795,10 @@ class PersistEventsStore:
{
"event_id": event.event_id,
"room_id": event.room_id,
- "internal_metadata": encode_json(
+ "internal_metadata": frozendict_json_encoder.encode(
event.internal_metadata.get_dict()
),
- "json": encode_json(event_dict(event)),
+ "json": frozendict_json_encoder.encode(event_dict(event)),
"format_version": event.format_version,
}
for event, _ in events_and_contexts
|