summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorEric Eastwood <erice@element.io>2021-09-21 15:06:28 -0500
committerGitHub <noreply@github.com>2021-09-21 15:06:28 -0500
commit51e2db35983953b13e536331ec2f6ad4cae7e0f1 (patch)
tree823c37f186796e8cb2b5beada4fb351477481770 /synapse/storage
parentAdd type hints for event streams. (#10856) (diff)
downloadsynapse-51e2db35983953b13e536331ec2f6ad4cae7e0f1.tar.xz
Rename MSC2716 things from `chunk` to `batch` to match `/batch_send` endpoint (#10838)
See https://github.com/matrix-org/matrix-doc/pull/2716#discussion_r684574497

Dropping support for older MSC2716 room versions so we don't have to worry about
supporting both chunk and batch events.
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/event_federation.py30
-rw-r--r--synapse/storage/databases/main/events.py46
-rw-r--r--synapse/storage/databases/main/room_batch.py6
-rw-r--r--synapse/storage/schema/__init__.py2
-rw-r--r--synapse/storage/schema/main/delta/64/01msc2716_chunk_to_batch_rename.sql.postgres23
-rw-r--r--synapse/storage/schema/main/delta/64/01msc2716_chunk_to_batch_rename.sql.sqlite37
6 files changed, 102 insertions, 42 deletions
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 047782eb06..10184d6ae7 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -1034,13 +1034,13 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
             LIMIT ?
         """
 
-        # Find any chunk connections of a given insertion event
-        chunk_connection_query = """
+        # Find any batch connections of a given insertion event
+        batch_connection_query = """
             SELECT e.depth, c.event_id FROM insertion_events AS i
-            /* Find the chunk that connects to the given insertion event */
-            INNER JOIN chunk_events AS c
-            ON i.next_chunk_id = c.chunk_id
-            /* Get the depth of the chunk start event from the events table */
+            /* Find the batch that connects to the given insertion event */
+            INNER JOIN batch_events AS c
+            ON i.next_batch_id = c.batch_id
+            /* Get the depth of the batch start event from the events table */
             INNER JOIN events AS e USING (event_id)
             /* Find an insertion event which matches the given event_id */
             WHERE i.event_id = ?
@@ -1077,12 +1077,12 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
 
             event_results.add(event_id)
 
-            # Try and find any potential historical chunks of message history.
+            # Try and find any potential historical batches of message history.
             #
             # First we look for an insertion event connected to the current
             # event (by prev_event). If we find any, we need to go and try to
-            # find any chunk events connected to the insertion event (by
-            # chunk_id). If we find any, we'll add them to the queue and
+            # find any batch events connected to the insertion event (by
+            # batch_id). If we find any, we'll add them to the queue and
             # navigate up the DAG like normal in the next iteration of the loop.
             txn.execute(
                 connected_insertion_event_query, (event_id, limit - len(event_results))
@@ -1097,17 +1097,17 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
                 connected_insertion_event = row[1]
                 queue.put((-connected_insertion_event_depth, connected_insertion_event))
 
-                # Find any chunk connections for the given insertion event
+                # Find any batch connections for the given insertion event
                 txn.execute(
-                    chunk_connection_query,
+                    batch_connection_query,
                     (connected_insertion_event, limit - len(event_results)),
                 )
-                chunk_start_event_id_results = txn.fetchall()
+                batch_start_event_id_results = txn.fetchall()
                 logger.debug(
-                    "_get_backfill_events: chunk_start_event_id_results %s",
-                    chunk_start_event_id_results,
+                    "_get_backfill_events: batch_start_event_id_results %s",
+                    batch_start_event_id_results,
                 )
-                for row in chunk_start_event_id_results:
+                for row in batch_start_event_id_results:
                     if row[1] not in event_results:
                         queue.put((-row[0], row[1]))
 
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index dec7e8594e..584f818ff3 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1509,7 +1509,7 @@ class PersistEventsStore:
             self._handle_event_relations(txn, event)
 
             self._handle_insertion_event(txn, event)
-            self._handle_chunk_event(txn, event)
+            self._handle_batch_event(txn, event)
 
             # Store the labels for this event.
             labels = event.content.get(EventContentFields.LABELS)
@@ -1790,23 +1790,23 @@ class PersistEventsStore:
         ):
             return
 
-        next_chunk_id = event.content.get(EventContentFields.MSC2716_NEXT_CHUNK_ID)
-        if next_chunk_id is None:
-            # Invalid insertion event without next chunk ID
+        next_batch_id = event.content.get(EventContentFields.MSC2716_NEXT_BATCH_ID)
+        if next_batch_id is None:
+            # Invalid insertion event without next batch ID
             return
 
         logger.debug(
-            "_handle_insertion_event (next_chunk_id=%s) %s", next_chunk_id, event
+            "_handle_insertion_event (next_batch_id=%s) %s", next_batch_id, event
         )
 
-        # Keep track of the insertion event and the chunk ID
+        # Keep track of the insertion event and the batch ID
         self.db_pool.simple_insert_txn(
             txn,
             table="insertion_events",
             values={
                 "event_id": event.event_id,
                 "room_id": event.room_id,
-                "next_chunk_id": next_chunk_id,
+                "next_batch_id": next_batch_id,
             },
         )
 
@@ -1822,8 +1822,8 @@ class PersistEventsStore:
                 },
             )
 
-    def _handle_chunk_event(self, txn: LoggingTransaction, event: EventBase):
-        """Handles inserting the chunk edges/connections between the chunk event
+    def _handle_batch_event(self, txn: LoggingTransaction, event: EventBase):
+        """Handles inserting the batch edges/connections between the batch event
         and an insertion event. Part of MSC2716.
 
         Args:
@@ -1831,11 +1831,11 @@ class PersistEventsStore:
             event: The event to process
         """
 
-        if event.type != EventTypes.MSC2716_CHUNK:
-            # Not a chunk event
+        if event.type != EventTypes.MSC2716_BATCH:
+            # Not a batch event
             return
 
-        # Skip processing a chunk event if the room version doesn't
+        # Skip processing a batch event if the room version doesn't
         # support it or the event is not from the room creator.
         room_version = self.store.get_room_version_txn(txn, event.room_id)
         room_creator = self.db_pool.simple_select_one_onecol_txn(
@@ -1852,35 +1852,35 @@ class PersistEventsStore:
         ):
             return
 
-        chunk_id = event.content.get(EventContentFields.MSC2716_CHUNK_ID)
-        if chunk_id is None:
-            # Invalid chunk event without a chunk ID
+        batch_id = event.content.get(EventContentFields.MSC2716_BATCH_ID)
+        if batch_id is None:
+            # Invalid batch event without a batch ID
             return
 
-        logger.debug("_handle_chunk_event chunk_id=%s %s", chunk_id, event)
+        logger.debug("_handle_batch_event batch_id=%s %s", batch_id, event)
 
-        # Keep track of the insertion event and the chunk ID
+        # Keep track of the insertion event and the batch ID
         self.db_pool.simple_insert_txn(
             txn,
-            table="chunk_events",
+            table="batch_events",
             values={
                 "event_id": event.event_id,
                 "room_id": event.room_id,
-                "chunk_id": chunk_id,
+                "batch_id": batch_id,
             },
         )
 
-        # When we receive an event with a `chunk_id` referencing the
-        # `next_chunk_id` of the insertion event, we can remove it from the
+        # When we receive an event with a `batch_id` referencing the
+        # `next_batch_id` of the insertion event, we can remove it from the
         # `insertion_event_extremities` table.
         sql = """
             DELETE FROM insertion_event_extremities WHERE event_id IN (
                 SELECT event_id FROM insertion_events
-                WHERE next_chunk_id = ?
+                WHERE next_batch_id = ?
             )
         """
 
-        txn.execute(sql, (chunk_id,))
+        txn.execute(sql, (batch_id,))
 
     def _handle_redaction(self, txn, redacted_event_id):
         """Handles receiving a redaction and checking whether we need to remove
diff --git a/synapse/storage/databases/main/room_batch.py b/synapse/storage/databases/main/room_batch.py
index 54fa361d3e..a383388757 100644
--- a/synapse/storage/databases/main/room_batch.py
+++ b/synapse/storage/databases/main/room_batch.py
@@ -18,11 +18,11 @@ from synapse.storage._base import SQLBaseStore
 
 
 class RoomBatchStore(SQLBaseStore):
-    async def get_insertion_event_by_chunk_id(self, chunk_id: str) -> Optional[str]:
+    async def get_insertion_event_by_batch_id(self, batch_id: str) -> Optional[str]:
         """Retrieve a insertion event ID.
 
         Args:
-            chunk_id: The chunk ID of the insertion event to retrieve.
+            batch_id: The batch ID of the insertion event to retrieve.
 
         Returns:
             The event_id of an insertion event, or None if there is no known
@@ -30,7 +30,7 @@ class RoomBatchStore(SQLBaseStore):
         """
         return await self.db_pool.simple_select_one_onecol(
             table="insertion_events",
-            keyvalues={"next_chunk_id": chunk_id},
+            keyvalues={"next_batch_id": batch_id},
             retcol="event_id",
             allow_none=True,
         )
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index af9cc69949..aa2ce44c6c 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -14,7 +14,7 @@
 
 # When updating these values, please leave a short summary of the changes below.
 
-SCHEMA_VERSION = 63
+SCHEMA_VERSION = 64
 """Represents the expectations made by the codebase about the database schema
 
 This should be incremented whenever the codebase changes its requirements on the
diff --git a/synapse/storage/schema/main/delta/64/01msc2716_chunk_to_batch_rename.sql.postgres b/synapse/storage/schema/main/delta/64/01msc2716_chunk_to_batch_rename.sql.postgres
new file mode 100644
index 0000000000..5f38993208
--- /dev/null
+++ b/synapse/storage/schema/main/delta/64/01msc2716_chunk_to_batch_rename.sql.postgres
@@ -0,0 +1,23 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE insertion_events RENAME COLUMN next_chunk_id TO next_batch_id;
+DROP INDEX insertion_events_next_chunk_id;
+CREATE INDEX IF NOT EXISTS insertion_events_next_batch_id ON insertion_events(next_batch_id);
+
+ALTER TABLE chunk_events RENAME TO batch_events;
+ALTER TABLE batch_events RENAME COLUMN chunk_id TO batch_id;
+DROP INDEX chunk_events_chunk_id;
+CREATE INDEX IF NOT EXISTS batch_events_batch_id ON batch_events(batch_id);
diff --git a/synapse/storage/schema/main/delta/64/01msc2716_chunk_to_batch_rename.sql.sqlite b/synapse/storage/schema/main/delta/64/01msc2716_chunk_to_batch_rename.sql.sqlite
new file mode 100644
index 0000000000..4989563995
--- /dev/null
+++ b/synapse/storage/schema/main/delta/64/01msc2716_chunk_to_batch_rename.sql.sqlite
@@ -0,0 +1,37 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Re-create the insertion_events table since SQLite doesn't support better
+-- renames for columns (next_chunk_id -> next_batch_id)
+DROP TABLE insertion_events;
+CREATE TABLE IF NOT EXISTS insertion_events(
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    next_batch_id TEXT NOT NULL
+);
+CREATE UNIQUE INDEX IF NOT EXISTS insertion_events_event_id ON insertion_events(event_id);
+CREATE INDEX IF NOT EXISTS insertion_events_next_batch_id ON insertion_events(next_batch_id);
+
+-- Re-create the chunk_events table since SQLite doesn't support better renames
+-- for columns (chunk_id -> batch_id)
+DROP TABLE chunk_events;
+CREATE TABLE IF NOT EXISTS batch_events(
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    batch_id TEXT NOT NULL
+);
+
+CREATE UNIQUE INDEX IF NOT EXISTS batch_events_event_id ON batch_events(event_id);
+CREATE INDEX IF NOT EXISTS batch_events_batch_id ON batch_events(batch_id);