summary refs log tree commit diff
path: root/synapse/storage/databases/main
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main')
-rw-r--r--synapse/storage/databases/main/event_federation.py88
-rw-r--r--synapse/storage/databases/main/events.py91
-rw-r--r--synapse/storage/databases/main/state.py50
3 files changed, 209 insertions, 20 deletions
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index f4a00b0736..547e43ab98 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -936,15 +936,46 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         # We want to make sure that we do a breadth-first, "depth" ordered
         # search.
 
-        query = (
-            "SELECT depth, prev_event_id FROM event_edges"
-            " INNER JOIN events"
-            " ON prev_event_id = events.event_id"
-            " WHERE event_edges.event_id = ?"
-            " AND event_edges.is_state = ?"
-            " LIMIT ?"
-        )
+        # Look for the prev_event_id connected to the given event_id
+        query = """
+            SELECT depth, prev_event_id FROM event_edges
+            /* Get the depth of the prev_event_id from the events table */
+            INNER JOIN events
+            ON prev_event_id = events.event_id
+            /* Find an event which matches the given event_id */
+            WHERE event_edges.event_id = ?
+            AND event_edges.is_state = ?
+            LIMIT ?
+        """
+
+        # Look for the "insertion" events connected to the given event_id
+        connected_insertion_event_query = """
+            SELECT e.depth, i.event_id FROM insertion_event_edges AS i
+            /* Get the depth of the insertion event from the events table */
+            INNER JOIN events AS e USING (event_id)
+            /* Find an insertion event which points via prev_events to the given event_id */
+            WHERE i.insertion_prev_event_id = ?
+            LIMIT ?
+        """
+
+        # Find any chunk connections of a given insertion event
+        chunk_connection_query = """
+            SELECT e.depth, c.event_id FROM insertion_events AS i
+            /* Find the chunk that connects to the given insertion event */
+            INNER JOIN chunk_events AS c
+            ON i.next_chunk_id = c.chunk_id
+            /* Get the depth of the chunk start event from the events table */
+            INNER JOIN events AS e USING (event_id)
+            /* Find an insertion event which matches the given event_id */
+            WHERE i.event_id = ?
+            LIMIT ?
+        """
 
+        # In a PriorityQueue, the lowest valued entries are retrieved first.
+        # We're using depth as the priority in the queue.
+        # Depth is lowest at the oldest-in-time message and highest and
+        # newest-in-time message. We add events to the queue with a negative depth so that
+        # we process the newest-in-time messages first going backwards in time.
         queue = PriorityQueue()
 
         for event_id in event_list:
@@ -970,9 +1001,48 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
 
             event_results.add(event_id)
 
+            # Try and find any potential historical chunks of message history.
+            #
+            # First we look for an insertion event connected to the current
+            # event (by prev_event). If we find any, we need to go and try to
+            # find any chunk events connected to the insertion event (by
+            # chunk_id). If we find any, we'll add them to the queue and
+            # navigate up the DAG like normal in the next iteration of the loop.
+            txn.execute(
+                connected_insertion_event_query, (event_id, limit - len(event_results))
+            )
+            connected_insertion_event_id_results = txn.fetchall()
+            logger.debug(
+                "_get_backfill_events: connected_insertion_event_query %s",
+                connected_insertion_event_id_results,
+            )
+            for row in connected_insertion_event_id_results:
+                connected_insertion_event_depth = row[0]
+                connected_insertion_event = row[1]
+                queue.put((-connected_insertion_event_depth, connected_insertion_event))
+
+                # Find any chunk connections for the given insertion event
+                txn.execute(
+                    chunk_connection_query,
+                    (connected_insertion_event, limit - len(event_results)),
+                )
+                chunk_start_event_id_results = txn.fetchall()
+                logger.debug(
+                    "_get_backfill_events: chunk_start_event_id_results %s",
+                    chunk_start_event_id_results,
+                )
+                for row in chunk_start_event_id_results:
+                    if row[1] not in event_results:
+                        queue.put((-row[0], row[1]))
+
+            # Navigate up the DAG by prev_event
             txn.execute(query, (event_id, False, limit - len(event_results)))
+            prev_event_id_results = txn.fetchall()
+            logger.debug(
+                "_get_backfill_events: prev_event_ids %s", prev_event_id_results
+            )
 
-            for row in txn:
+            for row in prev_event_id_results:
                 if row[1] not in event_results:
                     queue.put((-row[0], row[1]))
 
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index a396a201d4..86baf397fb 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1502,6 +1502,9 @@ class PersistEventsStore:
 
             self._handle_event_relations(txn, event)
 
+            self._handle_insertion_event(txn, event)
+            self._handle_chunk_event(txn, event)
+
             # Store the labels for this event.
             labels = event.content.get(EventContentFields.LABELS)
             if labels:
@@ -1754,6 +1757,94 @@ class PersistEventsStore:
         if rel_type == RelationTypes.REPLACE:
             txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,))
 
+    def _handle_insertion_event(self, txn: LoggingTransaction, event: EventBase):
+        """Handles keeping track of insertion events and edges/connections.
+        Part of MSC2716.
+
+        Args:
+            txn: The database transaction object
+            event: The event to process
+        """
+
+        if event.type != EventTypes.MSC2716_INSERTION:
+            # Not a insertion event
+            return
+
+        # Skip processing a insertion event if the room version doesn't
+        # support it.
+        room_version = self.store.get_room_version_txn(txn, event.room_id)
+        if not room_version.msc2716_historical:
+            return
+
+        next_chunk_id = event.content.get(EventContentFields.MSC2716_NEXT_CHUNK_ID)
+        if next_chunk_id is None:
+            # Invalid insertion event without next chunk ID
+            return
+
+        logger.debug(
+            "_handle_insertion_event (next_chunk_id=%s) %s", next_chunk_id, event
+        )
+
+        # Keep track of the insertion event and the chunk ID
+        self.db_pool.simple_insert_txn(
+            txn,
+            table="insertion_events",
+            values={
+                "event_id": event.event_id,
+                "room_id": event.room_id,
+                "next_chunk_id": next_chunk_id,
+            },
+        )
+
+        # Insert an edge for every prev_event connection
+        for prev_event_id in event.prev_events:
+            self.db_pool.simple_insert_txn(
+                txn,
+                table="insertion_event_edges",
+                values={
+                    "event_id": event.event_id,
+                    "room_id": event.room_id,
+                    "insertion_prev_event_id": prev_event_id,
+                },
+            )
+
+    def _handle_chunk_event(self, txn: LoggingTransaction, event: EventBase):
+        """Handles inserting the chunk edges/connections between the chunk event
+        and an insertion event. Part of MSC2716.
+
+        Args:
+            txn: The database transaction object
+            event: The event to process
+        """
+
+        if event.type != EventTypes.MSC2716_CHUNK:
+            # Not a chunk event
+            return
+
+        # Skip processing a chunk event if the room version doesn't
+        # support it.
+        room_version = self.store.get_room_version_txn(txn, event.room_id)
+        if not room_version.msc2716_historical:
+            return
+
+        chunk_id = event.content.get(EventContentFields.MSC2716_CHUNK_ID)
+        if chunk_id is None:
+            # Invalid chunk event without a chunk ID
+            return
+
+        logger.debug("_handle_chunk_event chunk_id=%s %s", chunk_id, event)
+
+        # Keep track of the insertion event and the chunk ID
+        self.db_pool.simple_insert_txn(
+            txn,
+            table="chunk_events",
+            values={
+                "event_id": event.event_id,
+                "room_id": event.room_id,
+                "chunk_id": chunk_id,
+            },
+        )
+
     def _handle_redaction(self, txn, redacted_event_id):
         """Handles receiving a redaction and checking whether we need to remove
         any redacted relations from the database.
diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index 1757064a68..8e22da99ae 100644
--- a/synapse/storage/databases/main/state.py
+++ b/synapse/storage/databases/main/state.py
@@ -22,7 +22,7 @@ from synapse.api.errors import NotFoundError, UnsupportedRoomVersionError
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
 from synapse.events import EventBase
 from synapse.storage._base import SQLBaseStore
-from synapse.storage.database import DatabasePool
+from synapse.storage.database import DatabasePool, LoggingTransaction
 from synapse.storage.databases.main.events_worker import EventsWorkerStore
 from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
 from synapse.storage.state import StateFilter
@@ -58,15 +58,32 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
 
     async def get_room_version(self, room_id: str) -> RoomVersion:
         """Get the room_version of a given room
-
         Raises:
             NotFoundError: if the room is unknown
+            UnsupportedRoomVersionError: if the room uses an unknown room version.
+                Typically this happens if support for the room's version has been
+                removed from Synapse.
+        """
+        return await self.db_pool.runInteraction(
+            "get_room_version_txn",
+            self.get_room_version_txn,
+            room_id,
+        )
 
+    def get_room_version_txn(
+        self, txn: LoggingTransaction, room_id: str
+    ) -> RoomVersion:
+        """Get the room_version of a given room
+        Args:
+            txn: Transaction object
+            room_id: The room_id of the room you are trying to get the version for
+        Raises:
+            NotFoundError: if the room is unknown
             UnsupportedRoomVersionError: if the room uses an unknown room version.
                 Typically this happens if support for the room's version has been
                 removed from Synapse.
         """
-        room_version_id = await self.get_room_version_id(room_id)
+        room_version_id = self.get_room_version_id_txn(txn, room_id)
         v = KNOWN_ROOM_VERSIONS.get(room_version_id)
 
         if not v:
@@ -80,7 +97,20 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
     @cached(max_entries=10000)
     async def get_room_version_id(self, room_id: str) -> str:
         """Get the room_version of a given room
+        Raises:
+            NotFoundError: if the room is unknown
+        """
+        return await self.db_pool.runInteraction(
+            "get_room_version_id_txn",
+            self.get_room_version_id_txn,
+            room_id,
+        )
 
+    def get_room_version_id_txn(self, txn: LoggingTransaction, room_id: str) -> str:
+        """Get the room_version of a given room
+        Args:
+            txn: Transaction object
+            room_id: The room_id of the room you are trying to get the version for
         Raises:
             NotFoundError: if the room is unknown
         """
@@ -88,24 +118,22 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
         # First we try looking up room version from the database, but for old
         # rooms we might not have added the room version to it yet so we fall
         # back to previous behaviour and look in current state events.
-
+        #
         # We really should have an entry in the rooms table for every room we
         # care about, but let's be a bit paranoid (at least while the background
         # update is happening) to avoid breaking existing rooms.
-        version = await self.db_pool.simple_select_one_onecol(
+        room_version = self.db_pool.simple_select_one_onecol_txn(
+            txn,
             table="rooms",
             keyvalues={"room_id": room_id},
             retcol="room_version",
-            desc="get_room_version",
             allow_none=True,
         )
 
-        if version is not None:
-            return version
+        if room_version is None:
+            raise NotFoundError("Could not room_version for %s" % (room_id,))
 
-        # Retrieve the room's create event
-        create_event = await self.get_create_event_for_room(room_id)
-        return create_event.content.get("room_version", "1")
+        return room_version
 
     async def get_room_predecessor(self, room_id: str) -> Optional[dict]:
         """Get the predecessor of an upgraded room if it exists.