diff options
author | Eric Eastwood <erice@element.io> | 2021-09-21 15:06:28 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-09-21 15:06:28 -0500 |
commit | 51e2db35983953b13e536331ec2f6ad4cae7e0f1 (patch) | |
tree | 823c37f186796e8cb2b5beada4fb351477481770 /synapse/storage/databases/main | |
parent | Add type hints for event streams. (#10856) (diff) | |
download | synapse-51e2db35983953b13e536331ec2f6ad4cae7e0f1.tar.xz |
Rename MSC2716 things from `chunk` to `batch` to match `/batch_send` endpoint (#10838)
See https://github.com/matrix-org/matrix-doc/pull/2716#discussion_r684574497 Dropping support for older MSC2716 room versions so we don't have to worry about supporting both chunk and batch events.
Diffstat (limited to 'synapse/storage/databases/main')
-rw-r--r-- | synapse/storage/databases/main/event_federation.py | 30 | ||||
-rw-r--r-- | synapse/storage/databases/main/events.py | 46 | ||||
-rw-r--r-- | synapse/storage/databases/main/room_batch.py | 6 |
3 files changed, 41 insertions, 41 deletions
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 047782eb06..10184d6ae7 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1034,13 +1034,13 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas LIMIT ? """ - # Find any chunk connections of a given insertion event - chunk_connection_query = """ + # Find any batch connections of a given insertion event + batch_connection_query = """ SELECT e.depth, c.event_id FROM insertion_events AS i - /* Find the chunk that connects to the given insertion event */ - INNER JOIN chunk_events AS c - ON i.next_chunk_id = c.chunk_id - /* Get the depth of the chunk start event from the events table */ + /* Find the batch that connects to the given insertion event */ + INNER JOIN batch_events AS c + ON i.next_batch_id = c.batch_id + /* Get the depth of the batch start event from the events table */ INNER JOIN events AS e USING (event_id) /* Find an insertion event which matches the given event_id */ WHERE i.event_id = ? @@ -1077,12 +1077,12 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas event_results.add(event_id) - # Try and find any potential historical chunks of message history. + # Try and find any potential historical batches of message history. # # First we look for an insertion event connected to the current # event (by prev_event). If we find any, we need to go and try to - # find any chunk events connected to the insertion event (by - # chunk_id). If we find any, we'll add them to the queue and + # find any batch events connected to the insertion event (by + # batch_id). If we find any, we'll add them to the queue and # navigate up the DAG like normal in the next iteration of the loop. txn.execute( connected_insertion_event_query, (event_id, limit - len(event_results)) @@ -1097,17 +1097,17 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas connected_insertion_event = row[1] queue.put((-connected_insertion_event_depth, connected_insertion_event)) - # Find any chunk connections for the given insertion event + # Find any batch connections for the given insertion event txn.execute( - chunk_connection_query, + batch_connection_query, (connected_insertion_event, limit - len(event_results)), ) - chunk_start_event_id_results = txn.fetchall() + batch_start_event_id_results = txn.fetchall() logger.debug( - "_get_backfill_events: chunk_start_event_id_results %s", - chunk_start_event_id_results, + "_get_backfill_events: batch_start_event_id_results %s", + batch_start_event_id_results, ) - for row in chunk_start_event_id_results: + for row in batch_start_event_id_results: if row[1] not in event_results: queue.put((-row[0], row[1])) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index dec7e8594e..584f818ff3 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1509,7 +1509,7 @@ class PersistEventsStore: self._handle_event_relations(txn, event) self._handle_insertion_event(txn, event) - self._handle_chunk_event(txn, event) + self._handle_batch_event(txn, event) # Store the labels for this event. labels = event.content.get(EventContentFields.LABELS) @@ -1790,23 +1790,23 @@ class PersistEventsStore: ): return - next_chunk_id = event.content.get(EventContentFields.MSC2716_NEXT_CHUNK_ID) - if next_chunk_id is None: - # Invalid insertion event without next chunk ID + next_batch_id = event.content.get(EventContentFields.MSC2716_NEXT_BATCH_ID) + if next_batch_id is None: + # Invalid insertion event without next batch ID return logger.debug( - "_handle_insertion_event (next_chunk_id=%s) %s", next_chunk_id, event + "_handle_insertion_event (next_batch_id=%s) %s", next_batch_id, event ) - # Keep track of the insertion event and the chunk ID + # Keep track of the insertion event and the batch ID self.db_pool.simple_insert_txn( txn, table="insertion_events", values={ "event_id": event.event_id, "room_id": event.room_id, - "next_chunk_id": next_chunk_id, + "next_batch_id": next_batch_id, }, ) @@ -1822,8 +1822,8 @@ class PersistEventsStore: }, ) - def _handle_chunk_event(self, txn: LoggingTransaction, event: EventBase): - """Handles inserting the chunk edges/connections between the chunk event + def _handle_batch_event(self, txn: LoggingTransaction, event: EventBase): + """Handles inserting the batch edges/connections between the batch event and an insertion event. Part of MSC2716. Args: @@ -1831,11 +1831,11 @@ class PersistEventsStore: event: The event to process """ - if event.type != EventTypes.MSC2716_CHUNK: - # Not a chunk event + if event.type != EventTypes.MSC2716_BATCH: + # Not a batch event return - # Skip processing a chunk event if the room version doesn't + # Skip processing a batch event if the room version doesn't # support it or the event is not from the room creator. room_version = self.store.get_room_version_txn(txn, event.room_id) room_creator = self.db_pool.simple_select_one_onecol_txn( @@ -1852,35 +1852,35 @@ class PersistEventsStore: ): return - chunk_id = event.content.get(EventContentFields.MSC2716_CHUNK_ID) - if chunk_id is None: - # Invalid chunk event without a chunk ID + batch_id = event.content.get(EventContentFields.MSC2716_BATCH_ID) + if batch_id is None: + # Invalid batch event without a batch ID return - logger.debug("_handle_chunk_event chunk_id=%s %s", chunk_id, event) + logger.debug("_handle_batch_event batch_id=%s %s", batch_id, event) - # Keep track of the insertion event and the chunk ID + # Keep track of the insertion event and the batch ID self.db_pool.simple_insert_txn( txn, - table="chunk_events", + table="batch_events", values={ "event_id": event.event_id, "room_id": event.room_id, - "chunk_id": chunk_id, + "batch_id": batch_id, }, ) - # When we receive an event with a `chunk_id` referencing the - # `next_chunk_id` of the insertion event, we can remove it from the + # When we receive an event with a `batch_id` referencing the + # `next_batch_id` of the insertion event, we can remove it from the # `insertion_event_extremities` table. sql = """ DELETE FROM insertion_event_extremities WHERE event_id IN ( SELECT event_id FROM insertion_events - WHERE next_chunk_id = ? + WHERE next_batch_id = ? ) """ - txn.execute(sql, (chunk_id,)) + txn.execute(sql, (batch_id,)) def _handle_redaction(self, txn, redacted_event_id): """Handles receiving a redaction and checking whether we need to remove diff --git a/synapse/storage/databases/main/room_batch.py b/synapse/storage/databases/main/room_batch.py index 54fa361d3e..a383388757 100644 --- a/synapse/storage/databases/main/room_batch.py +++ b/synapse/storage/databases/main/room_batch.py @@ -18,11 +18,11 @@ from synapse.storage._base import SQLBaseStore class RoomBatchStore(SQLBaseStore): - async def get_insertion_event_by_chunk_id(self, chunk_id: str) -> Optional[str]: + async def get_insertion_event_by_batch_id(self, batch_id: str) -> Optional[str]: """Retrieve a insertion event ID. Args: - chunk_id: The chunk ID of the insertion event to retrieve. + batch_id: The batch ID of the insertion event to retrieve. Returns: The event_id of an insertion event, or None if there is no known @@ -30,7 +30,7 @@ class RoomBatchStore(SQLBaseStore): """ return await self.db_pool.simple_select_one_onecol( table="insertion_events", - keyvalues={"next_chunk_id": chunk_id}, + keyvalues={"next_batch_id": batch_id}, retcol="event_id", allow_none=True, ) |