summary refs log tree commit diff
path: root/synapse/storage/databases/main
diff options
context:
space:
mode:
authorEric Eastwood <erice@element.io>2021-07-28 10:46:37 -0500
committerGitHub <noreply@github.com>2021-07-28 10:46:37 -0500
commitd0b294ad974c05621426369a00be6bf05c4af997 (patch)
tree1a4ab4505665684c30ea50dddde1890af4a928b6 /synapse/storage/databases/main
parentMerge tag 'v1.39.0rc3' into develop (diff)
downloadsynapse-d0b294ad974c05621426369a00be6bf05c4af997.tar.xz
Make historical events discoverable from backfill for servers without any scrollback history (MSC2716) (#10245)
* Make historical messages available to federated servers

Part of MSC2716: https://github.com/matrix-org/matrix-doc/pull/2716

Follow-up to https://github.com/matrix-org/synapse/pull/9247

* Debug message not available on federation

* Add base starting insertion point when no chunk ID is provided

* Fix messages from multiple senders in historical chunk

Follow-up to https://github.com/matrix-org/synapse/pull/9247

Part of MSC2716: https://github.com/matrix-org/matrix-doc/pull/2716

---

Previously, Synapse would throw a 403,
`Cannot force another user to join.`,
because we were trying to use `?user_id` from a single virtual user
which did not match with messages from other users in the chunk.

* Remove debug lines

* Messing with selecting insertion event extremeties

* Move db schema change to new version

* Add more better comments

* Make a fake requester with just what we need

See https://github.com/matrix-org/synapse/pull/10276#discussion_r660999080

* Store insertion events in table

* Make base insertion event float off on its own

See https://github.com/matrix-org/synapse/pull/10250#issuecomment-875711889

Conflicts:
	synapse/rest/client/v1/room.py

* Validate that the app service can actually control the given user

See https://github.com/matrix-org/synapse/pull/10276#issuecomment-876316455

Conflicts:
	synapse/rest/client/v1/room.py

* Add some better comments on what we're trying to check for

* Continue debugging

* Share validation logic

* Add inserted historical messages to /backfill response

* Remove debug sql queries

* Some marker event implemntation trials

* Clean up PR

* Rename insertion_event_id to just event_id

* Add some better sql comments

* More accurate description

* Add changelog

* Make it clear what MSC the change is part of

* Add more detail on which insertion event came through

* Address review and improve sql queries

* Only use event_id as unique constraint

* Fix test case where insertion event is already in the normal DAG

* Remove debug changes

* Switch to chunk events so we can auth via power_levels

Previously, we were using `content.chunk_id` to connect one
chunk to another. But these events can be from any `sender`
and we can't tell who should be able to send historical events.
We know we only want the application service to do it but these
events have the sender of a real historical message, not the
application service user ID as the sender. Other federated homeservers
also have no indicator which senders are an application service on
the originating homeserver.

So we want to auth all of the MSC2716 events via power_levels
and have them be sent by the application service with proper
PL levels in the room.

* Switch to chunk events for federation

* Add unstable room version to support new historical PL

* Fix federated events being rejected for no state_groups

Add fix from https://github.com/matrix-org/synapse/pull/10439
until it merges.

* Only connect base insertion event to prev_event_ids

Per discussion with @erikjohnston,
https://matrix.to/#/!UytJQHLQYfvYWsGrGY:jki.re/$12bTUiObDFdHLAYtT7E-BvYRp3k_xv8w0dUQHibasJk?via=jki.re&via=matrix.org

* Make it possible to get the room_version with txn

* Allow but ignore historical events in unsupported room version

See https://github.com/matrix-org/synapse/pull/10245#discussion_r675592489

We can't reject historical events on unsupported room versions because homeservers without knowledge of MSC2716 or the new room version don't reject historical events either.

Since we can't rely on the auth check here to stop historical events on unsupported room versions, I've added some additional checks in the processing/persisting code (`synapse/storage/databases/main/events.py` ->  `_handle_insertion_event` and `_handle_chunk_event`). I've had to do some refactoring so there is method to fetch the room version by `txn`.

* Move to unique index syntax

See https://github.com/matrix-org/synapse/pull/10245#discussion_r675638509

* High-level document how the insertion->chunk lookup works

* Remove create_event fallback for room_versions

See https://github.com/matrix-org/synapse/pull/10245/files#r677641879

* Use updated method name
Diffstat (limited to 'synapse/storage/databases/main')
-rw-r--r--synapse/storage/databases/main/event_federation.py88
-rw-r--r--synapse/storage/databases/main/events.py91
-rw-r--r--synapse/storage/databases/main/state.py50
3 files changed, 209 insertions, 20 deletions
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index f4a00b0736..547e43ab98 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -936,15 +936,46 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         # We want to make sure that we do a breadth-first, "depth" ordered
         # search.
 
-        query = (
-            "SELECT depth, prev_event_id FROM event_edges"
-            " INNER JOIN events"
-            " ON prev_event_id = events.event_id"
-            " WHERE event_edges.event_id = ?"
-            " AND event_edges.is_state = ?"
-            " LIMIT ?"
-        )
+        # Look for the prev_event_id connected to the given event_id
+        query = """
+            SELECT depth, prev_event_id FROM event_edges
+            /* Get the depth of the prev_event_id from the events table */
+            INNER JOIN events
+            ON prev_event_id = events.event_id
+            /* Find an event which matches the given event_id */
+            WHERE event_edges.event_id = ?
+            AND event_edges.is_state = ?
+            LIMIT ?
+        """
+
+        # Look for the "insertion" events connected to the given event_id
+        connected_insertion_event_query = """
+            SELECT e.depth, i.event_id FROM insertion_event_edges AS i
+            /* Get the depth of the insertion event from the events table */
+            INNER JOIN events AS e USING (event_id)
+            /* Find an insertion event which points via prev_events to the given event_id */
+            WHERE i.insertion_prev_event_id = ?
+            LIMIT ?
+        """
+
+        # Find any chunk connections of a given insertion event
+        chunk_connection_query = """
+            SELECT e.depth, c.event_id FROM insertion_events AS i
+            /* Find the chunk that connects to the given insertion event */
+            INNER JOIN chunk_events AS c
+            ON i.next_chunk_id = c.chunk_id
+            /* Get the depth of the chunk start event from the events table */
+            INNER JOIN events AS e USING (event_id)
+            /* Find an insertion event which matches the given event_id */
+            WHERE i.event_id = ?
+            LIMIT ?
+        """
 
+        # In a PriorityQueue, the lowest valued entries are retrieved first.
+        # We're using depth as the priority in the queue.
+        # Depth is lowest at the oldest-in-time message and highest and
+        # newest-in-time message. We add events to the queue with a negative depth so that
+        # we process the newest-in-time messages first going backwards in time.
         queue = PriorityQueue()
 
         for event_id in event_list:
@@ -970,9 +1001,48 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
 
             event_results.add(event_id)
 
+            # Try and find any potential historical chunks of message history.
+            #
+            # First we look for an insertion event connected to the current
+            # event (by prev_event). If we find any, we need to go and try to
+            # find any chunk events connected to the insertion event (by
+            # chunk_id). If we find any, we'll add them to the queue and
+            # navigate up the DAG like normal in the next iteration of the loop.
+            txn.execute(
+                connected_insertion_event_query, (event_id, limit - len(event_results))
+            )
+            connected_insertion_event_id_results = txn.fetchall()
+            logger.debug(
+                "_get_backfill_events: connected_insertion_event_query %s",
+                connected_insertion_event_id_results,
+            )
+            for row in connected_insertion_event_id_results:
+                connected_insertion_event_depth = row[0]
+                connected_insertion_event = row[1]
+                queue.put((-connected_insertion_event_depth, connected_insertion_event))
+
+                # Find any chunk connections for the given insertion event
+                txn.execute(
+                    chunk_connection_query,
+                    (connected_insertion_event, limit - len(event_results)),
+                )
+                chunk_start_event_id_results = txn.fetchall()
+                logger.debug(
+                    "_get_backfill_events: chunk_start_event_id_results %s",
+                    chunk_start_event_id_results,
+                )
+                for row in chunk_start_event_id_results:
+                    if row[1] not in event_results:
+                        queue.put((-row[0], row[1]))
+
+            # Navigate up the DAG by prev_event
             txn.execute(query, (event_id, False, limit - len(event_results)))
+            prev_event_id_results = txn.fetchall()
+            logger.debug(
+                "_get_backfill_events: prev_event_ids %s", prev_event_id_results
+            )
 
-            for row in txn:
+            for row in prev_event_id_results:
                 if row[1] not in event_results:
                     queue.put((-row[0], row[1]))
 
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index a396a201d4..86baf397fb 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1502,6 +1502,9 @@ class PersistEventsStore:
 
             self._handle_event_relations(txn, event)
 
+            self._handle_insertion_event(txn, event)
+            self._handle_chunk_event(txn, event)
+
             # Store the labels for this event.
             labels = event.content.get(EventContentFields.LABELS)
             if labels:
@@ -1754,6 +1757,94 @@ class PersistEventsStore:
         if rel_type == RelationTypes.REPLACE:
             txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,))
 
+    def _handle_insertion_event(self, txn: LoggingTransaction, event: EventBase):
+        """Handles keeping track of insertion events and edges/connections.
+        Part of MSC2716.
+
+        Args:
+            txn: The database transaction object
+            event: The event to process
+        """
+
+        if event.type != EventTypes.MSC2716_INSERTION:
+            # Not a insertion event
+            return
+
+        # Skip processing a insertion event if the room version doesn't
+        # support it.
+        room_version = self.store.get_room_version_txn(txn, event.room_id)
+        if not room_version.msc2716_historical:
+            return
+
+        next_chunk_id = event.content.get(EventContentFields.MSC2716_NEXT_CHUNK_ID)
+        if next_chunk_id is None:
+            # Invalid insertion event without next chunk ID
+            return
+
+        logger.debug(
+            "_handle_insertion_event (next_chunk_id=%s) %s", next_chunk_id, event
+        )
+
+        # Keep track of the insertion event and the chunk ID
+        self.db_pool.simple_insert_txn(
+            txn,
+            table="insertion_events",
+            values={
+                "event_id": event.event_id,
+                "room_id": event.room_id,
+                "next_chunk_id": next_chunk_id,
+            },
+        )
+
+        # Insert an edge for every prev_event connection
+        for prev_event_id in event.prev_events:
+            self.db_pool.simple_insert_txn(
+                txn,
+                table="insertion_event_edges",
+                values={
+                    "event_id": event.event_id,
+                    "room_id": event.room_id,
+                    "insertion_prev_event_id": prev_event_id,
+                },
+            )
+
+    def _handle_chunk_event(self, txn: LoggingTransaction, event: EventBase):
+        """Handles inserting the chunk edges/connections between the chunk event
+        and an insertion event. Part of MSC2716.
+
+        Args:
+            txn: The database transaction object
+            event: The event to process
+        """
+
+        if event.type != EventTypes.MSC2716_CHUNK:
+            # Not a chunk event
+            return
+
+        # Skip processing a chunk event if the room version doesn't
+        # support it.
+        room_version = self.store.get_room_version_txn(txn, event.room_id)
+        if not room_version.msc2716_historical:
+            return
+
+        chunk_id = event.content.get(EventContentFields.MSC2716_CHUNK_ID)
+        if chunk_id is None:
+            # Invalid chunk event without a chunk ID
+            return
+
+        logger.debug("_handle_chunk_event chunk_id=%s %s", chunk_id, event)
+
+        # Keep track of the insertion event and the chunk ID
+        self.db_pool.simple_insert_txn(
+            txn,
+            table="chunk_events",
+            values={
+                "event_id": event.event_id,
+                "room_id": event.room_id,
+                "chunk_id": chunk_id,
+            },
+        )
+
     def _handle_redaction(self, txn, redacted_event_id):
         """Handles receiving a redaction and checking whether we need to remove
         any redacted relations from the database.
diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index 1757064a68..8e22da99ae 100644
--- a/synapse/storage/databases/main/state.py
+++ b/synapse/storage/databases/main/state.py
@@ -22,7 +22,7 @@ from synapse.api.errors import NotFoundError, UnsupportedRoomVersionError
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
 from synapse.events import EventBase
 from synapse.storage._base import SQLBaseStore
-from synapse.storage.database import DatabasePool
+from synapse.storage.database import DatabasePool, LoggingTransaction
 from synapse.storage.databases.main.events_worker import EventsWorkerStore
 from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
 from synapse.storage.state import StateFilter
@@ -58,15 +58,32 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
 
     async def get_room_version(self, room_id: str) -> RoomVersion:
         """Get the room_version of a given room
-
         Raises:
             NotFoundError: if the room is unknown
+            UnsupportedRoomVersionError: if the room uses an unknown room version.
+                Typically this happens if support for the room's version has been
+                removed from Synapse.
+        """
+        return await self.db_pool.runInteraction(
+            "get_room_version_txn",
+            self.get_room_version_txn,
+            room_id,
+        )
 
+    def get_room_version_txn(
+        self, txn: LoggingTransaction, room_id: str
+    ) -> RoomVersion:
+        """Get the room_version of a given room
+        Args:
+            txn: Transaction object
+            room_id: The room_id of the room you are trying to get the version for
+        Raises:
+            NotFoundError: if the room is unknown
             UnsupportedRoomVersionError: if the room uses an unknown room version.
                 Typically this happens if support for the room's version has been
                 removed from Synapse.
         """
-        room_version_id = await self.get_room_version_id(room_id)
+        room_version_id = self.get_room_version_id_txn(txn, room_id)
         v = KNOWN_ROOM_VERSIONS.get(room_version_id)
 
         if not v:
@@ -80,7 +97,20 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
     @cached(max_entries=10000)
     async def get_room_version_id(self, room_id: str) -> str:
         """Get the room_version of a given room
+        Raises:
+            NotFoundError: if the room is unknown
+        """
+        return await self.db_pool.runInteraction(
+            "get_room_version_id_txn",
+            self.get_room_version_id_txn,
+            room_id,
+        )
 
+    def get_room_version_id_txn(self, txn: LoggingTransaction, room_id: str) -> str:
+        """Get the room_version of a given room
+        Args:
+            txn: Transaction object
+            room_id: The room_id of the room you are trying to get the version for
         Raises:
             NotFoundError: if the room is unknown
         """
@@ -88,24 +118,22 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
         # First we try looking up room version from the database, but for old
         # rooms we might not have added the room version to it yet so we fall
         # back to previous behaviour and look in current state events.
-
+        #
         # We really should have an entry in the rooms table for every room we
         # care about, but let's be a bit paranoid (at least while the background
         # update is happening) to avoid breaking existing rooms.
-        version = await self.db_pool.simple_select_one_onecol(
+        room_version = self.db_pool.simple_select_one_onecol_txn(
+            txn,
             table="rooms",
             keyvalues={"room_id": room_id},
             retcol="room_version",
-            desc="get_room_version",
             allow_none=True,
         )
 
-        if version is not None:
-            return version
+        if room_version is None:
+            raise NotFoundError("Could not room_version for %s" % (room_id,))
 
-        # Retrieve the room's create event
-        create_event = await self.get_create_event_for_room(room_id)
-        return create_event.content.get("room_version", "1")
+        return room_version
 
     async def get_room_predecessor(self, room_id: str) -> Optional[dict]:
         """Get the predecessor of an upgraded room if it exists.