diff options
author | Eric Eastwood <erice@element.io> | 2021-09-21 15:06:28 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-09-21 15:06:28 -0500 |
commit | 51e2db35983953b13e536331ec2f6ad4cae7e0f1 (patch) | |
tree | 823c37f186796e8cb2b5beada4fb351477481770 /synapse/storage | |
parent | Add type hints for event streams. (#10856) (diff) | |
download | synapse-51e2db35983953b13e536331ec2f6ad4cae7e0f1.tar.xz |
Rename MSC2716 things from `chunk` to `batch` to match `/batch_send` endpoint (#10838)
See https://github.com/matrix-org/matrix-doc/pull/2716#discussion_r684574497 Dropping support for older MSC2716 room versions so we don't have to worry about supporting both chunk and batch events.
Diffstat (limited to 'synapse/storage')
6 files changed, 102 insertions, 42 deletions
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 047782eb06..10184d6ae7 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1034,13 +1034,13 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas LIMIT ? """ - # Find any chunk connections of a given insertion event - chunk_connection_query = """ + # Find any batch connections of a given insertion event + batch_connection_query = """ SELECT e.depth, c.event_id FROM insertion_events AS i - /* Find the chunk that connects to the given insertion event */ - INNER JOIN chunk_events AS c - ON i.next_chunk_id = c.chunk_id - /* Get the depth of the chunk start event from the events table */ + /* Find the batch that connects to the given insertion event */ + INNER JOIN batch_events AS c + ON i.next_batch_id = c.batch_id + /* Get the depth of the batch start event from the events table */ INNER JOIN events AS e USING (event_id) /* Find an insertion event which matches the given event_id */ WHERE i.event_id = ? @@ -1077,12 +1077,12 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas event_results.add(event_id) - # Try and find any potential historical chunks of message history. + # Try and find any potential historical batches of message history. # # First we look for an insertion event connected to the current # event (by prev_event). If we find any, we need to go and try to - # find any chunk events connected to the insertion event (by - # chunk_id). If we find any, we'll add them to the queue and + # find any batch events connected to the insertion event (by + # batch_id). If we find any, we'll add them to the queue and # navigate up the DAG like normal in the next iteration of the loop. txn.execute( connected_insertion_event_query, (event_id, limit - len(event_results)) @@ -1097,17 +1097,17 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas connected_insertion_event = row[1] queue.put((-connected_insertion_event_depth, connected_insertion_event)) - # Find any chunk connections for the given insertion event + # Find any batch connections for the given insertion event txn.execute( - chunk_connection_query, + batch_connection_query, (connected_insertion_event, limit - len(event_results)), ) - chunk_start_event_id_results = txn.fetchall() + batch_start_event_id_results = txn.fetchall() logger.debug( - "_get_backfill_events: chunk_start_event_id_results %s", - chunk_start_event_id_results, + "_get_backfill_events: batch_start_event_id_results %s", + batch_start_event_id_results, ) - for row in chunk_start_event_id_results: + for row in batch_start_event_id_results: if row[1] not in event_results: queue.put((-row[0], row[1])) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index dec7e8594e..584f818ff3 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1509,7 +1509,7 @@ class PersistEventsStore: self._handle_event_relations(txn, event) self._handle_insertion_event(txn, event) - self._handle_chunk_event(txn, event) + self._handle_batch_event(txn, event) # Store the labels for this event. labels = event.content.get(EventContentFields.LABELS) @@ -1790,23 +1790,23 @@ class PersistEventsStore: ): return - next_chunk_id = event.content.get(EventContentFields.MSC2716_NEXT_CHUNK_ID) - if next_chunk_id is None: - # Invalid insertion event without next chunk ID + next_batch_id = event.content.get(EventContentFields.MSC2716_NEXT_BATCH_ID) + if next_batch_id is None: + # Invalid insertion event without next batch ID return logger.debug( - "_handle_insertion_event (next_chunk_id=%s) %s", next_chunk_id, event + "_handle_insertion_event (next_batch_id=%s) %s", next_batch_id, event ) - # Keep track of the insertion event and the chunk ID + # Keep track of the insertion event and the batch ID self.db_pool.simple_insert_txn( txn, table="insertion_events", values={ "event_id": event.event_id, "room_id": event.room_id, - "next_chunk_id": next_chunk_id, + "next_batch_id": next_batch_id, }, ) @@ -1822,8 +1822,8 @@ class PersistEventsStore: }, ) - def _handle_chunk_event(self, txn: LoggingTransaction, event: EventBase): - """Handles inserting the chunk edges/connections between the chunk event + def _handle_batch_event(self, txn: LoggingTransaction, event: EventBase): + """Handles inserting the batch edges/connections between the batch event and an insertion event. Part of MSC2716. Args: @@ -1831,11 +1831,11 @@ class PersistEventsStore: event: The event to process """ - if event.type != EventTypes.MSC2716_CHUNK: - # Not a chunk event + if event.type != EventTypes.MSC2716_BATCH: + # Not a batch event return - # Skip processing a chunk event if the room version doesn't + # Skip processing a batch event if the room version doesn't # support it or the event is not from the room creator. room_version = self.store.get_room_version_txn(txn, event.room_id) room_creator = self.db_pool.simple_select_one_onecol_txn( @@ -1852,35 +1852,35 @@ class PersistEventsStore: ): return - chunk_id = event.content.get(EventContentFields.MSC2716_CHUNK_ID) - if chunk_id is None: - # Invalid chunk event without a chunk ID + batch_id = event.content.get(EventContentFields.MSC2716_BATCH_ID) + if batch_id is None: + # Invalid batch event without a batch ID return - logger.debug("_handle_chunk_event chunk_id=%s %s", chunk_id, event) + logger.debug("_handle_batch_event batch_id=%s %s", batch_id, event) - # Keep track of the insertion event and the chunk ID + # Keep track of the insertion event and the batch ID self.db_pool.simple_insert_txn( txn, - table="chunk_events", + table="batch_events", values={ "event_id": event.event_id, "room_id": event.room_id, - "chunk_id": chunk_id, + "batch_id": batch_id, }, ) - # When we receive an event with a `chunk_id` referencing the - # `next_chunk_id` of the insertion event, we can remove it from the + # When we receive an event with a `batch_id` referencing the + # `next_batch_id` of the insertion event, we can remove it from the # `insertion_event_extremities` table. sql = """ DELETE FROM insertion_event_extremities WHERE event_id IN ( SELECT event_id FROM insertion_events - WHERE next_chunk_id = ? + WHERE next_batch_id = ? ) """ - txn.execute(sql, (chunk_id,)) + txn.execute(sql, (batch_id,)) def _handle_redaction(self, txn, redacted_event_id): """Handles receiving a redaction and checking whether we need to remove diff --git a/synapse/storage/databases/main/room_batch.py b/synapse/storage/databases/main/room_batch.py index 54fa361d3e..a383388757 100644 --- a/synapse/storage/databases/main/room_batch.py +++ b/synapse/storage/databases/main/room_batch.py @@ -18,11 +18,11 @@ from synapse.storage._base import SQLBaseStore class RoomBatchStore(SQLBaseStore): - async def get_insertion_event_by_chunk_id(self, chunk_id: str) -> Optional[str]: + async def get_insertion_event_by_batch_id(self, batch_id: str) -> Optional[str]: """Retrieve a insertion event ID. Args: - chunk_id: The chunk ID of the insertion event to retrieve. + batch_id: The batch ID of the insertion event to retrieve. Returns: The event_id of an insertion event, or None if there is no known @@ -30,7 +30,7 @@ class RoomBatchStore(SQLBaseStore): """ return await self.db_pool.simple_select_one_onecol( table="insertion_events", - keyvalues={"next_chunk_id": chunk_id}, + keyvalues={"next_batch_id": batch_id}, retcol="event_id", allow_none=True, ) diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index af9cc69949..aa2ce44c6c 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -14,7 +14,7 @@ # When updating these values, please leave a short summary of the changes below. -SCHEMA_VERSION = 63 +SCHEMA_VERSION = 64 """Represents the expectations made by the codebase about the database schema This should be incremented whenever the codebase changes its requirements on the diff --git a/synapse/storage/schema/main/delta/64/01msc2716_chunk_to_batch_rename.sql.postgres b/synapse/storage/schema/main/delta/64/01msc2716_chunk_to_batch_rename.sql.postgres new file mode 100644 index 0000000000..5f38993208 --- /dev/null +++ b/synapse/storage/schema/main/delta/64/01msc2716_chunk_to_batch_rename.sql.postgres @@ -0,0 +1,23 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE insertion_events RENAME COLUMN next_chunk_id TO next_batch_id; +DROP INDEX insertion_events_next_chunk_id; +CREATE INDEX IF NOT EXISTS insertion_events_next_batch_id ON insertion_events(next_batch_id); + +ALTER TABLE chunk_events RENAME TO batch_events; +ALTER TABLE batch_events RENAME COLUMN chunk_id TO batch_id; +DROP INDEX chunk_events_chunk_id; +CREATE INDEX IF NOT EXISTS batch_events_batch_id ON batch_events(batch_id); diff --git a/synapse/storage/schema/main/delta/64/01msc2716_chunk_to_batch_rename.sql.sqlite b/synapse/storage/schema/main/delta/64/01msc2716_chunk_to_batch_rename.sql.sqlite new file mode 100644 index 0000000000..4989563995 --- /dev/null +++ b/synapse/storage/schema/main/delta/64/01msc2716_chunk_to_batch_rename.sql.sqlite @@ -0,0 +1,37 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Re-create the insertion_events table since SQLite doesn't support better +-- renames for columns (next_chunk_id -> next_batch_id) +DROP TABLE insertion_events; +CREATE TABLE IF NOT EXISTS insertion_events( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + next_batch_id TEXT NOT NULL +); +CREATE UNIQUE INDEX IF NOT EXISTS insertion_events_event_id ON insertion_events(event_id); +CREATE INDEX IF NOT EXISTS insertion_events_next_batch_id ON insertion_events(next_batch_id); + +-- Re-create the chunk_events table since SQLite doesn't support better renames +-- for columns (chunk_id -> batch_id) +DROP TABLE chunk_events; +CREATE TABLE IF NOT EXISTS batch_events( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + batch_id TEXT NOT NULL +); + +CREATE UNIQUE INDEX IF NOT EXISTS batch_events_event_id ON batch_events(event_id); +CREATE INDEX IF NOT EXISTS batch_events_batch_id ON batch_events(batch_id); |