diff --git a/synapse/storage/schema/delta/49/event_chunks.py b/synapse/storage/schema/delta/49/event_chunks.py
index 7d8d711600..50040a779c 100644
--- a/synapse/storage/schema/delta/49/event_chunks.py
+++ b/synapse/storage/schema/delta/49/event_chunks.py
@@ -53,11 +53,23 @@ CREATE INDEX chunk_backwards_extremities_event_id ON chunk_backwards_extremities
CREATE TABLE chunk_linearized (
chunk_id BIGINT NOT NULL,
room_id TEXT NOT NULL,
- ordering DOUBLE PRECISION NOT NULL
+ next_chunk_id BIGINT, -- The chunk directly after this chunk, or NULL if last chunk
+ numerator BIGINT NOT NULL,
+ denominator BIGINT NOT NULL
);
CREATE UNIQUE INDEX chunk_linearized_id ON chunk_linearized (chunk_id);
-CREATE INDEX chunk_linearized_ordering ON chunk_linearized (room_id, ordering);
+CREATE UNIQUE INDEX chunk_linearized_next_id ON chunk_linearized (
+ next_chunk_id, room_id
+);
+
+-- Records the first chunk in a room.
+CREATE TABLE chunk_linearized_first (
+ chunk_id BIGINT NOT NULL,
+ room_id TEXT NOT NULL
+);
+
+CREATE UNIQUE INDEX chunk_linearized_first_id ON chunk_linearized_first (room_id);
INSERT into background_updates (update_name, progress_json)
VALUES ('event_fields_chunk_id', '{}');
@@ -69,10 +81,6 @@ def run_create(cur, database_engine, *args, **kwargs):
for statement in get_statements(SQL.splitlines()):
cur.execute(statement)
- # We now go through and assign chunk IDs for all forward extremities.
- # Note that we know that extremities can't reference each other, so we
- # can simply assign each event a new chunk ID with an arbitrary order.
-
txn = LoggingTransaction(
cur, "schema_update", database_engine, [], [],
)
@@ -86,6 +94,7 @@ def run_create(cur, database_engine, *args, **kwargs):
next_chunk_id = 1
room_to_next_order = {}
+ prev_chunks_by_room = {}
for row in rows:
chunk_id = next_chunk_id
@@ -101,19 +110,41 @@ def run_create(cur, database_engine, *args, **kwargs):
updatevalues={"chunk_id": chunk_id},
)
- ordering = room_to_next_order.get(room_id, 0)
+ ordering = room_to_next_order.get(room_id, 1)
room_to_next_order[room_id] = ordering + 1
+ prev_chunks = prev_chunks_by_room.setdefault(room_id, [])
+
SQLBaseStore._simple_insert_txn(
txn,
table="chunk_linearized",
values={
"chunk_id": chunk_id,
"room_id": row["room_id"],
- "ordering": 0,
+ "numerator": ordering,
+ "denominator": 1,
},
)
+ if prev_chunks:
+ SQLBaseStore._simple_update_one_txn(
+ txn,
+ table="chunk_linearized",
+ keyvalues={"chunk_id": prev_chunks[-1]},
+ updatevalues={"next_chunk_id": chunk_id},
+ )
+ else:
+ SQLBaseStore._simple_insert_txn(
+ txn,
+ table="chunk_linearized_first",
+ values={
+ "chunk_id": chunk_id,
+ "room_id": row["room_id"],
+ },
+ )
+
+ prev_chunks.append(chunk_id)
+
def run_upgrade(*args, **kwargs):
pass
|