diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 047782eb06..10184d6ae7 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -1034,13 +1034,13 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
LIMIT ?
"""
- # Find any chunk connections of a given insertion event
- chunk_connection_query = """
+ # Find any batch connections of a given insertion event
+ batch_connection_query = """
SELECT e.depth, c.event_id FROM insertion_events AS i
- /* Find the chunk that connects to the given insertion event */
- INNER JOIN chunk_events AS c
- ON i.next_chunk_id = c.chunk_id
- /* Get the depth of the chunk start event from the events table */
+ /* Find the batch that connects to the given insertion event */
+ INNER JOIN batch_events AS c
+ ON i.next_batch_id = c.batch_id
+ /* Get the depth of the batch start event from the events table */
INNER JOIN events AS e USING (event_id)
/* Find an insertion event which matches the given event_id */
WHERE i.event_id = ?
@@ -1077,12 +1077,12 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
event_results.add(event_id)
- # Try and find any potential historical chunks of message history.
+ # Try and find any potential historical batches of message history.
#
# First we look for an insertion event connected to the current
# event (by prev_event). If we find any, we need to go and try to
- # find any chunk events connected to the insertion event (by
- # chunk_id). If we find any, we'll add them to the queue and
+ # find any batch events connected to the insertion event (by
+ # batch_id). If we find any, we'll add them to the queue and
# navigate up the DAG like normal in the next iteration of the loop.
txn.execute(
connected_insertion_event_query, (event_id, limit - len(event_results))
@@ -1097,17 +1097,17 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
connected_insertion_event = row[1]
queue.put((-connected_insertion_event_depth, connected_insertion_event))
- # Find any chunk connections for the given insertion event
+ # Find any batch connections for the given insertion event
txn.execute(
- chunk_connection_query,
+ batch_connection_query,
(connected_insertion_event, limit - len(event_results)),
)
- chunk_start_event_id_results = txn.fetchall()
+ batch_start_event_id_results = txn.fetchall()
logger.debug(
- "_get_backfill_events: chunk_start_event_id_results %s",
- chunk_start_event_id_results,
+ "_get_backfill_events: batch_start_event_id_results %s",
+ batch_start_event_id_results,
)
- for row in chunk_start_event_id_results:
+ for row in batch_start_event_id_results:
if row[1] not in event_results:
queue.put((-row[0], row[1]))
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index dec7e8594e..584f818ff3 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1509,7 +1509,7 @@ class PersistEventsStore:
self._handle_event_relations(txn, event)
self._handle_insertion_event(txn, event)
- self._handle_chunk_event(txn, event)
+ self._handle_batch_event(txn, event)
# Store the labels for this event.
labels = event.content.get(EventContentFields.LABELS)
@@ -1790,23 +1790,23 @@ class PersistEventsStore:
):
return
- next_chunk_id = event.content.get(EventContentFields.MSC2716_NEXT_CHUNK_ID)
- if next_chunk_id is None:
- # Invalid insertion event without next chunk ID
+ next_batch_id = event.content.get(EventContentFields.MSC2716_NEXT_BATCH_ID)
+ if next_batch_id is None:
+ # Invalid insertion event without next batch ID
return
logger.debug(
- "_handle_insertion_event (next_chunk_id=%s) %s", next_chunk_id, event
+ "_handle_insertion_event (next_batch_id=%s) %s", next_batch_id, event
)
- # Keep track of the insertion event and the chunk ID
+ # Keep track of the insertion event and the batch ID
self.db_pool.simple_insert_txn(
txn,
table="insertion_events",
values={
"event_id": event.event_id,
"room_id": event.room_id,
- "next_chunk_id": next_chunk_id,
+ "next_batch_id": next_batch_id,
},
)
@@ -1822,8 +1822,8 @@ class PersistEventsStore:
},
)
- def _handle_chunk_event(self, txn: LoggingTransaction, event: EventBase):
- """Handles inserting the chunk edges/connections between the chunk event
+ def _handle_batch_event(self, txn: LoggingTransaction, event: EventBase):
+ """Handles inserting the batch edges/connections between the batch event
and an insertion event. Part of MSC2716.
Args:
@@ -1831,11 +1831,11 @@ class PersistEventsStore:
event: The event to process
"""
- if event.type != EventTypes.MSC2716_CHUNK:
- # Not a chunk event
+ if event.type != EventTypes.MSC2716_BATCH:
+ # Not a batch event
return
- # Skip processing a chunk event if the room version doesn't
+ # Skip processing a batch event if the room version doesn't
# support it or the event is not from the room creator.
room_version = self.store.get_room_version_txn(txn, event.room_id)
room_creator = self.db_pool.simple_select_one_onecol_txn(
@@ -1852,35 +1852,35 @@ class PersistEventsStore:
):
return
- chunk_id = event.content.get(EventContentFields.MSC2716_CHUNK_ID)
- if chunk_id is None:
- # Invalid chunk event without a chunk ID
+ batch_id = event.content.get(EventContentFields.MSC2716_BATCH_ID)
+ if batch_id is None:
+ # Invalid batch event without a batch ID
return
- logger.debug("_handle_chunk_event chunk_id=%s %s", chunk_id, event)
+ logger.debug("_handle_batch_event batch_id=%s %s", batch_id, event)
- # Keep track of the insertion event and the chunk ID
+ # Keep track of the insertion event and the batch ID
self.db_pool.simple_insert_txn(
txn,
- table="chunk_events",
+ table="batch_events",
values={
"event_id": event.event_id,
"room_id": event.room_id,
- "chunk_id": chunk_id,
+ "batch_id": batch_id,
},
)
- # When we receive an event with a `chunk_id` referencing the
- # `next_chunk_id` of the insertion event, we can remove it from the
+ # When we receive an event with a `batch_id` referencing the
+ # `next_batch_id` of the insertion event, we can remove it from the
# `insertion_event_extremities` table.
sql = """
DELETE FROM insertion_event_extremities WHERE event_id IN (
SELECT event_id FROM insertion_events
- WHERE next_chunk_id = ?
+ WHERE next_batch_id = ?
)
"""
- txn.execute(sql, (chunk_id,))
+ txn.execute(sql, (batch_id,))
def _handle_redaction(self, txn, redacted_event_id):
"""Handles receiving a redaction and checking whether we need to remove
diff --git a/synapse/storage/databases/main/room_batch.py b/synapse/storage/databases/main/room_batch.py
index 54fa361d3e..a383388757 100644
--- a/synapse/storage/databases/main/room_batch.py
+++ b/synapse/storage/databases/main/room_batch.py
@@ -18,11 +18,11 @@ from synapse.storage._base import SQLBaseStore
class RoomBatchStore(SQLBaseStore):
- async def get_insertion_event_by_chunk_id(self, chunk_id: str) -> Optional[str]:
+ async def get_insertion_event_by_batch_id(self, batch_id: str) -> Optional[str]:
"""Retrieve a insertion event ID.
Args:
- chunk_id: The chunk ID of the insertion event to retrieve.
+ batch_id: The batch ID of the insertion event to retrieve.
Returns:
The event_id of an insertion event, or None if there is no known
@@ -30,7 +30,7 @@ class RoomBatchStore(SQLBaseStore):
"""
return await self.db_pool.simple_select_one_onecol(
table="insertion_events",
- keyvalues={"next_chunk_id": chunk_id},
+ keyvalues={"next_batch_id": batch_id},
retcol="event_id",
allow_none=True,
)
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index af9cc69949..aa2ce44c6c 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -14,7 +14,7 @@
# When updating these values, please leave a short summary of the changes below.
-SCHEMA_VERSION = 63
+SCHEMA_VERSION = 64
"""Represents the expectations made by the codebase about the database schema
This should be incremented whenever the codebase changes its requirements on the
diff --git a/synapse/storage/schema/main/delta/64/01msc2716_chunk_to_batch_rename.sql.postgres b/synapse/storage/schema/main/delta/64/01msc2716_chunk_to_batch_rename.sql.postgres
new file mode 100644
index 0000000000..5f38993208
--- /dev/null
+++ b/synapse/storage/schema/main/delta/64/01msc2716_chunk_to_batch_rename.sql.postgres
@@ -0,0 +1,23 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE insertion_events RENAME COLUMN next_chunk_id TO next_batch_id;
+DROP INDEX insertion_events_next_chunk_id;
+CREATE INDEX IF NOT EXISTS insertion_events_next_batch_id ON insertion_events(next_batch_id);
+
+ALTER TABLE chunk_events RENAME TO batch_events;
+ALTER TABLE batch_events RENAME COLUMN chunk_id TO batch_id;
+DROP INDEX chunk_events_chunk_id;
+CREATE INDEX IF NOT EXISTS batch_events_batch_id ON batch_events(batch_id);
diff --git a/synapse/storage/schema/main/delta/64/01msc2716_chunk_to_batch_rename.sql.sqlite b/synapse/storage/schema/main/delta/64/01msc2716_chunk_to_batch_rename.sql.sqlite
new file mode 100644
index 0000000000..4989563995
--- /dev/null
+++ b/synapse/storage/schema/main/delta/64/01msc2716_chunk_to_batch_rename.sql.sqlite
@@ -0,0 +1,37 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Re-create the insertion_events table since SQLite doesn't support better
+-- renames for columns (next_chunk_id -> next_batch_id)
+DROP TABLE insertion_events;
+CREATE TABLE IF NOT EXISTS insertion_events(
+ event_id TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ next_batch_id TEXT NOT NULL
+);
+CREATE UNIQUE INDEX IF NOT EXISTS insertion_events_event_id ON insertion_events(event_id);
+CREATE INDEX IF NOT EXISTS insertion_events_next_batch_id ON insertion_events(next_batch_id);
+
+-- Re-create the chunk_events table since SQLite doesn't support better renames
+-- for columns (chunk_id -> batch_id)
+DROP TABLE chunk_events;
+CREATE TABLE IF NOT EXISTS batch_events(
+ event_id TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ batch_id TEXT NOT NULL
+);
+
+CREATE UNIQUE INDEX IF NOT EXISTS batch_events_event_id ON batch_events(event_id);
+CREATE INDEX IF NOT EXISTS batch_events_batch_id ON batch_events(batch_id);
|