diff options
Diffstat (limited to 'synapse/storage/databases/main')
-rw-r--r-- | synapse/storage/databases/main/event_federation.py | 88 | ||||
-rw-r--r-- | synapse/storage/databases/main/events.py | 91 | ||||
-rw-r--r-- | synapse/storage/databases/main/state.py | 50 |
3 files changed, 209 insertions, 20 deletions
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index f4a00b0736..547e43ab98 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -936,15 +936,46 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas # We want to make sure that we do a breadth-first, "depth" ordered # search. - query = ( - "SELECT depth, prev_event_id FROM event_edges" - " INNER JOIN events" - " ON prev_event_id = events.event_id" - " WHERE event_edges.event_id = ?" - " AND event_edges.is_state = ?" - " LIMIT ?" - ) + # Look for the prev_event_id connected to the given event_id + query = """ + SELECT depth, prev_event_id FROM event_edges + /* Get the depth of the prev_event_id from the events table */ + INNER JOIN events + ON prev_event_id = events.event_id + /* Find an event which matches the given event_id */ + WHERE event_edges.event_id = ? + AND event_edges.is_state = ? + LIMIT ? + """ + + # Look for the "insertion" events connected to the given event_id + connected_insertion_event_query = """ + SELECT e.depth, i.event_id FROM insertion_event_edges AS i + /* Get the depth of the insertion event from the events table */ + INNER JOIN events AS e USING (event_id) + /* Find an insertion event which points via prev_events to the given event_id */ + WHERE i.insertion_prev_event_id = ? + LIMIT ? + """ + + # Find any chunk connections of a given insertion event + chunk_connection_query = """ + SELECT e.depth, c.event_id FROM insertion_events AS i + /* Find the chunk that connects to the given insertion event */ + INNER JOIN chunk_events AS c + ON i.next_chunk_id = c.chunk_id + /* Get the depth of the chunk start event from the events table */ + INNER JOIN events AS e USING (event_id) + /* Find an insertion event which matches the given event_id */ + WHERE i.event_id = ? + LIMIT ? + """ + # In a PriorityQueue, the lowest valued entries are retrieved first. + # We're using depth as the priority in the queue. + # Depth is lowest at the oldest-in-time message and highest and + # newest-in-time message. We add events to the queue with a negative depth so that + # we process the newest-in-time messages first going backwards in time. queue = PriorityQueue() for event_id in event_list: @@ -970,9 +1001,48 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas event_results.add(event_id) + # Try and find any potential historical chunks of message history. + # + # First we look for an insertion event connected to the current + # event (by prev_event). If we find any, we need to go and try to + # find any chunk events connected to the insertion event (by + # chunk_id). If we find any, we'll add them to the queue and + # navigate up the DAG like normal in the next iteration of the loop. + txn.execute( + connected_insertion_event_query, (event_id, limit - len(event_results)) + ) + connected_insertion_event_id_results = txn.fetchall() + logger.debug( + "_get_backfill_events: connected_insertion_event_query %s", + connected_insertion_event_id_results, + ) + for row in connected_insertion_event_id_results: + connected_insertion_event_depth = row[0] + connected_insertion_event = row[1] + queue.put((-connected_insertion_event_depth, connected_insertion_event)) + + # Find any chunk connections for the given insertion event + txn.execute( + chunk_connection_query, + (connected_insertion_event, limit - len(event_results)), + ) + chunk_start_event_id_results = txn.fetchall() + logger.debug( + "_get_backfill_events: chunk_start_event_id_results %s", + chunk_start_event_id_results, + ) + for row in chunk_start_event_id_results: + if row[1] not in event_results: + queue.put((-row[0], row[1])) + + # Navigate up the DAG by prev_event txn.execute(query, (event_id, False, limit - len(event_results))) + prev_event_id_results = txn.fetchall() + logger.debug( + "_get_backfill_events: prev_event_ids %s", prev_event_id_results + ) - for row in txn: + for row in prev_event_id_results: if row[1] not in event_results: queue.put((-row[0], row[1])) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index a396a201d4..86baf397fb 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1502,6 +1502,9 @@ class PersistEventsStore: self._handle_event_relations(txn, event) + self._handle_insertion_event(txn, event) + self._handle_chunk_event(txn, event) + # Store the labels for this event. labels = event.content.get(EventContentFields.LABELS) if labels: @@ -1754,6 +1757,94 @@ class PersistEventsStore: if rel_type == RelationTypes.REPLACE: txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,)) + def _handle_insertion_event(self, txn: LoggingTransaction, event: EventBase): + """Handles keeping track of insertion events and edges/connections. + Part of MSC2716. + + Args: + txn: The database transaction object + event: The event to process + """ + + if event.type != EventTypes.MSC2716_INSERTION: + # Not a insertion event + return + + # Skip processing a insertion event if the room version doesn't + # support it. + room_version = self.store.get_room_version_txn(txn, event.room_id) + if not room_version.msc2716_historical: + return + + next_chunk_id = event.content.get(EventContentFields.MSC2716_NEXT_CHUNK_ID) + if next_chunk_id is None: + # Invalid insertion event without next chunk ID + return + + logger.debug( + "_handle_insertion_event (next_chunk_id=%s) %s", next_chunk_id, event + ) + + # Keep track of the insertion event and the chunk ID + self.db_pool.simple_insert_txn( + txn, + table="insertion_events", + values={ + "event_id": event.event_id, + "room_id": event.room_id, + "next_chunk_id": next_chunk_id, + }, + ) + + # Insert an edge for every prev_event connection + for prev_event_id in event.prev_events: + self.db_pool.simple_insert_txn( + txn, + table="insertion_event_edges", + values={ + "event_id": event.event_id, + "room_id": event.room_id, + "insertion_prev_event_id": prev_event_id, + }, + ) + + def _handle_chunk_event(self, txn: LoggingTransaction, event: EventBase): + """Handles inserting the chunk edges/connections between the chunk event + and an insertion event. Part of MSC2716. + + Args: + txn: The database transaction object + event: The event to process + """ + + if event.type != EventTypes.MSC2716_CHUNK: + # Not a chunk event + return + + # Skip processing a chunk event if the room version doesn't + # support it. + room_version = self.store.get_room_version_txn(txn, event.room_id) + if not room_version.msc2716_historical: + return + + chunk_id = event.content.get(EventContentFields.MSC2716_CHUNK_ID) + if chunk_id is None: + # Invalid chunk event without a chunk ID + return + + logger.debug("_handle_chunk_event chunk_id=%s %s", chunk_id, event) + + # Keep track of the insertion event and the chunk ID + self.db_pool.simple_insert_txn( + txn, + table="chunk_events", + values={ + "event_id": event.event_id, + "room_id": event.room_id, + "chunk_id": chunk_id, + }, + ) + def _handle_redaction(self, txn, redacted_event_id): """Handles receiving a redaction and checking whether we need to remove any redacted relations from the database. diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py index 1757064a68..8e22da99ae 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py @@ -22,7 +22,7 @@ from synapse.api.errors import NotFoundError, UnsupportedRoomVersionError from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion from synapse.events import EventBase from synapse.storage._base import SQLBaseStore -from synapse.storage.database import DatabasePool +from synapse.storage.database import DatabasePool, LoggingTransaction from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.databases.main.roommember import RoomMemberWorkerStore from synapse.storage.state import StateFilter @@ -58,15 +58,32 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): async def get_room_version(self, room_id: str) -> RoomVersion: """Get the room_version of a given room - Raises: NotFoundError: if the room is unknown + UnsupportedRoomVersionError: if the room uses an unknown room version. + Typically this happens if support for the room's version has been + removed from Synapse. + """ + return await self.db_pool.runInteraction( + "get_room_version_txn", + self.get_room_version_txn, + room_id, + ) + def get_room_version_txn( + self, txn: LoggingTransaction, room_id: str + ) -> RoomVersion: + """Get the room_version of a given room + Args: + txn: Transaction object + room_id: The room_id of the room you are trying to get the version for + Raises: + NotFoundError: if the room is unknown UnsupportedRoomVersionError: if the room uses an unknown room version. Typically this happens if support for the room's version has been removed from Synapse. """ - room_version_id = await self.get_room_version_id(room_id) + room_version_id = self.get_room_version_id_txn(txn, room_id) v = KNOWN_ROOM_VERSIONS.get(room_version_id) if not v: @@ -80,7 +97,20 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): @cached(max_entries=10000) async def get_room_version_id(self, room_id: str) -> str: """Get the room_version of a given room + Raises: + NotFoundError: if the room is unknown + """ + return await self.db_pool.runInteraction( + "get_room_version_id_txn", + self.get_room_version_id_txn, + room_id, + ) + def get_room_version_id_txn(self, txn: LoggingTransaction, room_id: str) -> str: + """Get the room_version of a given room + Args: + txn: Transaction object + room_id: The room_id of the room you are trying to get the version for Raises: NotFoundError: if the room is unknown """ @@ -88,24 +118,22 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): # First we try looking up room version from the database, but for old # rooms we might not have added the room version to it yet so we fall # back to previous behaviour and look in current state events. - + # # We really should have an entry in the rooms table for every room we # care about, but let's be a bit paranoid (at least while the background # update is happening) to avoid breaking existing rooms. - version = await self.db_pool.simple_select_one_onecol( + room_version = self.db_pool.simple_select_one_onecol_txn( + txn, table="rooms", keyvalues={"room_id": room_id}, retcol="room_version", - desc="get_room_version", allow_none=True, ) - if version is not None: - return version + if room_version is None: + raise NotFoundError("Could not room_version for %s" % (room_id,)) - # Retrieve the room's create event - create_event = await self.get_create_event_for_room(room_id) - return create_event.content.get("room_version", "1") + return room_version async def get_room_predecessor(self, room_id: str) -> Optional[dict]: """Get the predecessor of an upgraded room if it exists. |