summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/10498.feature1
-rwxr-xr-xscripts-dev/complement.sh2
-rw-r--r--synapse/handlers/federation.py119
-rw-r--r--synapse/storage/database.py14
-rw-r--r--synapse/storage/databases/main/event_federation.py114
-rw-r--r--synapse/storage/databases/main/events.py24
-rw-r--r--synapse/storage/schema/__init__.py2
-rw-r--r--synapse/storage/schema/main/delta/62/01insertion_event_extremities.sql24
8 files changed, 265 insertions, 35 deletions
diff --git a/changelog.d/10498.feature b/changelog.d/10498.feature
new file mode 100644
index 0000000000..5df896572d
--- /dev/null
+++ b/changelog.d/10498.feature
@@ -0,0 +1 @@
+Add support for "marker" events which makes historical events discoverable for servers that already have all of the scrollback history (part of MSC2716).
diff --git a/scripts-dev/complement.sh b/scripts-dev/complement.sh
index cba015d942..5d0ef8dd3a 100755
--- a/scripts-dev/complement.sh
+++ b/scripts-dev/complement.sh
@@ -65,4 +65,4 @@ if [[ -n "$1" ]]; then
 fi
 
 # Run the tests!
-go test -v -tags synapse_blacklist,msc2946,msc3083,msc2403 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests/...
+go test -v -tags synapse_blacklist,msc2946,msc3083,msc2403,msc2716 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests/...
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 8197b60b76..8b602e3813 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -42,6 +42,7 @@ from twisted.internet import defer
 
 from synapse import event_auth
 from synapse.api.constants import (
+    EventContentFields,
     EventTypes,
     Membership,
     RejectedReason,
@@ -262,7 +263,12 @@ class FederationHandler(BaseHandler):
 
         state = None
 
-        # Get missing pdus if necessary.
+        # Check that the event passes auth based on the state at the event. This is
+        # done for events that are to be added to the timeline (non-outliers).
+        #
+        # Get missing pdus if necessary:
+        #  - Fetching any missing prev events to fill in gaps in the graph
+        #  - Fetching state if we have a hole in the graph
         if not pdu.internal_metadata.is_outlier():
             # We only backfill backwards to the min depth.
             min_depth = await self.get_min_depth_for_context(pdu.room_id)
@@ -432,6 +438,13 @@ class FederationHandler(BaseHandler):
                         affected=event_id,
                     )
 
+        # A second round of checks for all events. Check that the event passes auth
+        # based on `auth_events`, this allows us to assert that the event would
+        # have been allowed at some point. If an event passes this check its OK
+        # for it to be used as part of a returned `/state` request, as either
+        # a) we received the event as part of the original join and so trust it, or
+        # b) we'll do a state resolution with existing state before it becomes
+        # part of the "current state", which adds more protection.
         await self._process_received_pdu(origin, pdu, state=state)
 
     async def _get_missing_events_for_pdu(
@@ -889,6 +902,79 @@ class FederationHandler(BaseHandler):
                     "resync_device_due_to_pdu", self._resync_device, event.sender
                 )
 
+        await self._handle_marker_event(origin, event)
+
+    async def _handle_marker_event(self, origin: str, marker_event: EventBase):
+        """Handles backfilling the insertion event when we receive a marker
+        event that points to one.
+
+        Args:
+            origin: Origin of the event. Will be called to get the insertion event
+            marker_event: The event to process
+        """
+
+        if marker_event.type != EventTypes.MSC2716_MARKER:
+            # Not a marker event
+            return
+
+        if marker_event.rejected_reason is not None:
+            # Rejected event
+            return
+
+        # Skip processing a marker event if the room version doesn't
+        # support it.
+        room_version = await self.store.get_room_version(marker_event.room_id)
+        if not room_version.msc2716_historical:
+            return
+
+        logger.debug("_handle_marker_event: received %s", marker_event)
+
+        insertion_event_id = marker_event.content.get(
+            EventContentFields.MSC2716_MARKER_INSERTION
+        )
+
+        if insertion_event_id is None:
+            # Nothing to retrieve then (invalid marker)
+            return
+
+        logger.debug(
+            "_handle_marker_event: backfilling insertion event %s", insertion_event_id
+        )
+
+        await self._get_events_and_persist(
+            origin,
+            marker_event.room_id,
+            [insertion_event_id],
+        )
+
+        insertion_event = await self.store.get_event(
+            insertion_event_id, allow_none=True
+        )
+        if insertion_event is None:
+            logger.warning(
+                "_handle_marker_event: server %s didn't return insertion event %s for marker %s",
+                origin,
+                insertion_event_id,
+                marker_event.event_id,
+            )
+            return
+
+        logger.debug(
+            "_handle_marker_event: succesfully backfilled insertion event %s from marker event %s",
+            insertion_event,
+            marker_event,
+        )
+
+        await self.store.insert_insertion_extremity(
+            insertion_event_id, marker_event.room_id
+        )
+
+        logger.debug(
+            "_handle_marker_event: insertion extremity added for %s from marker event %s",
+            insertion_event,
+            marker_event,
+        )
+
     async def _resync_device(self, sender: str) -> None:
         """We have detected that the device list for the given user may be out
         of sync, so we try and resync them.
@@ -1057,9 +1143,19 @@ class FederationHandler(BaseHandler):
     async def _maybe_backfill_inner(
         self, room_id: str, current_depth: int, limit: int
     ) -> bool:
-        extremities = await self.store.get_oldest_events_with_depth_in_room(room_id)
+        oldest_events_with_depth = (
+            await self.store.get_oldest_event_ids_with_depth_in_room(room_id)
+        )
+        insertion_events_to_be_backfilled = (
+            await self.store.get_insertion_event_backwards_extremities_in_room(room_id)
+        )
+        logger.debug(
+            "_maybe_backfill_inner: extremities oldest_events_with_depth=%s insertion_events_to_be_backfilled=%s",
+            oldest_events_with_depth,
+            insertion_events_to_be_backfilled,
+        )
 
-        if not extremities:
+        if not oldest_events_with_depth and not insertion_events_to_be_backfilled:
             logger.debug("Not backfilling as no extremeties found.")
             return False
 
@@ -1089,10 +1185,12 @@ class FederationHandler(BaseHandler):
         #   state *before* the event, ignoring the special casing certain event
         #   types have.
 
-        forward_events = await self.store.get_successor_events(list(extremities))
+        forward_event_ids = await self.store.get_successor_events(
+            list(oldest_events_with_depth)
+        )
 
         extremities_events = await self.store.get_events(
-            forward_events,
+            forward_event_ids,
             redact_behaviour=EventRedactBehaviour.AS_IS,
             get_prev_content=False,
         )
@@ -1106,10 +1204,19 @@ class FederationHandler(BaseHandler):
             redact=False,
             check_history_visibility_only=True,
         )
+        logger.debug(
+            "_maybe_backfill_inner: filtered_extremities %s", filtered_extremities
+        )
 
-        if not filtered_extremities:
+        if not filtered_extremities and not insertion_events_to_be_backfilled:
             return False
 
+        extremities = {
+            **oldest_events_with_depth,
+            # TODO: insertion_events_to_be_backfilled is currently skipping the filtered_extremities checks
+            **insertion_events_to_be_backfilled,
+        }
+
         # Check if we reached a point where we should start backfilling.
         sorted_extremeties_tuple = sorted(extremities.items(), key=lambda e: -int(e[1]))
         max_depth = sorted_extremeties_tuple[0][1]
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index c8015a3848..95d2caff62 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -941,13 +941,13 @@ class DatabasePool:
 
         `lock` should generally be set to True (the default), but can be set
         to False if either of the following are true:
-
-        * there is a UNIQUE INDEX on the key columns. In this case a conflict
-          will cause an IntegrityError in which case this function will retry
-          the update.
-
-        * we somehow know that we are the only thread which will be updating
-          this table.
+            1. there is a UNIQUE INDEX on the key columns. In this case a conflict
+            will cause an IntegrityError in which case this function will retry
+            the update.
+            2. we somehow know that we are the only thread which will be updating
+            this table.
+        As an additional note, this parameter only matters for old SQLite versions
+        because we will use native upserts otherwise.
 
         Args:
             table: The table to upsert into
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 44018c1c31..bddf5ef192 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -671,27 +671,97 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         # Return all events where not all sets can reach them.
         return {eid for eid, n in event_to_missing_sets.items() if n}
 
-    async def get_oldest_events_with_depth_in_room(self, room_id):
+    async def get_oldest_event_ids_with_depth_in_room(self, room_id) -> Dict[str, int]:
+        """Gets the oldest events(backwards extremities) in the room along with the
+        aproximate depth.
+
+        We use this function so that we can compare and see if someones current
+        depth at their current scrollback is within pagination range of the
+        event extremeties. If the current depth is close to the depth of given
+        oldest event, we can trigger a backfill.
+
+        Args:
+            room_id: Room where we want to find the oldest events
+
+        Returns:
+            Map from event_id to depth
+        """
+
+        def get_oldest_event_ids_with_depth_in_room_txn(txn, room_id):
+            # Assemble a dictionary with event_id -> depth for the oldest events
+            # we know of in the room. Backwards extremeties are the oldest
+            # events we know of in the room but we only know of them because
+            # some other event referenced them by prev_event and aren't peristed
+            # in our database yet (meaning we don't know their depth
+            # specifically). So we need to look for the aproximate depth from
+            # the events connected to the current backwards extremeties.
+            sql = """
+                SELECT b.event_id, MAX(e.depth) FROM events as e
+                /**
+                 * Get the edge connections from the event_edges table
+                 * so we can see whether this event's prev_events points
+                 * to a backward extremity in the next join.
+                 */
+                INNER JOIN event_edges as g
+                ON g.event_id = e.event_id
+                /**
+                 * We find the "oldest" events in the room by looking for
+                 * events connected to backwards extremeties (oldest events
+                 * in the room that we know of so far).
+                 */
+                INNER JOIN event_backward_extremities as b
+                ON g.prev_event_id = b.event_id
+                WHERE b.room_id = ? AND g.is_state is ?
+                GROUP BY b.event_id
+            """
+
+            txn.execute(sql, (room_id, False))
+
+            return dict(txn)
+
         return await self.db_pool.runInteraction(
-            "get_oldest_events_with_depth_in_room",
-            self.get_oldest_events_with_depth_in_room_txn,
+            "get_oldest_event_ids_with_depth_in_room",
+            get_oldest_event_ids_with_depth_in_room_txn,
             room_id,
         )
 
-    def get_oldest_events_with_depth_in_room_txn(self, txn, room_id):
-        sql = (
-            "SELECT b.event_id, MAX(e.depth) FROM events as e"
-            " INNER JOIN event_edges as g"
-            " ON g.event_id = e.event_id"
-            " INNER JOIN event_backward_extremities as b"
-            " ON g.prev_event_id = b.event_id"
-            " WHERE b.room_id = ? AND g.is_state is ?"
-            " GROUP BY b.event_id"
-        )
+    async def get_insertion_event_backwards_extremities_in_room(
+        self, room_id
+    ) -> Dict[str, int]:
+        """Get the insertion events we know about that we haven't backfilled yet.
 
-        txn.execute(sql, (room_id, False))
+        We use this function so that we can compare and see if someones current
+        depth at their current scrollback is within pagination range of the
+        insertion event. If the current depth is close to the depth of given
+        insertion event, we can trigger a backfill.
 
-        return dict(txn)
+        Args:
+            room_id: Room where we want to find the oldest events
+
+        Returns:
+            Map from event_id to depth
+        """
+
+        def get_insertion_event_backwards_extremities_in_room_txn(txn, room_id):
+            sql = """
+                SELECT b.event_id, MAX(e.depth) FROM insertion_events as i
+                /* We only want insertion events that are also marked as backwards extremities */
+                INNER JOIN insertion_event_extremities as b USING (event_id)
+                /* Get the depth of the insertion event from the events table */
+                INNER JOIN events AS e USING (event_id)
+                WHERE b.room_id = ?
+                GROUP BY b.event_id
+            """
+
+            txn.execute(sql, (room_id,))
+
+            return dict(txn)
+
+        return await self.db_pool.runInteraction(
+            "get_insertion_event_backwards_extremities_in_room",
+            get_insertion_event_backwards_extremities_in_room_txn,
+            room_id,
+        )
 
     async def get_max_depth_of(self, event_ids: List[str]) -> Tuple[str, int]:
         """Returns the event ID and depth for the event that has the max depth from a set of event IDs
@@ -1041,7 +1111,6 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
                     if row[1] not in event_results:
                         queue.put((-row[0], row[1]))
 
-            # Navigate up the DAG by prev_event
             txn.execute(query, (event_id, False, limit - len(event_results)))
             prev_event_id_results = txn.fetchall()
             logger.debug(
@@ -1136,6 +1205,19 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
             _delete_old_forward_extrem_cache_txn,
         )
 
+    async def insert_insertion_extremity(self, event_id: str, room_id: str) -> None:
+        await self.db_pool.simple_upsert(
+            table="insertion_event_extremities",
+            keyvalues={"event_id": event_id},
+            values={
+                "event_id": event_id,
+                "room_id": room_id,
+            },
+            insertion_values={},
+            desc="insert_insertion_extremity",
+            lock=False,
+        )
+
     async def insert_received_event_to_staging(
         self, origin: str, event: EventBase
     ) -> None:
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 86baf397fb..40b53274fb 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1845,6 +1845,18 @@ class PersistEventsStore:
             },
         )
 
+        # When we receive an event with a `chunk_id` referencing the
+        # `next_chunk_id` of the insertion event, we can remove it from the
+        # `insertion_event_extremities` table.
+        sql = """
+            DELETE FROM insertion_event_extremities WHERE event_id IN (
+                SELECT event_id FROM insertion_events
+                WHERE next_chunk_id = ?
+            )
+        """
+
+        txn.execute(sql, (chunk_id,))
+
     def _handle_redaction(self, txn, redacted_event_id):
         """Handles receiving a redaction and checking whether we need to remove
         any redacted relations from the database.
@@ -2101,15 +2113,17 @@ class PersistEventsStore:
 
         Forward extremities are handled when we first start persisting the events.
         """
+        # From the events passed in, add all of the prev events as backwards extremities.
+        # Ignore any events that are already backwards extrems or outliers.
         query = (
             "INSERT INTO event_backward_extremities (event_id, room_id)"
             " SELECT ?, ? WHERE NOT EXISTS ("
-            " SELECT 1 FROM event_backward_extremities"
-            " WHERE event_id = ? AND room_id = ?"
+            "   SELECT 1 FROM event_backward_extremities"
+            "   WHERE event_id = ? AND room_id = ?"
             " )"
             " AND NOT EXISTS ("
-            " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
-            " AND outlier = ?"
+            "   SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
+            "   AND outlier = ?"
             " )"
         )
 
@@ -2123,6 +2137,8 @@ class PersistEventsStore:
             ],
         )
 
+        # Delete all these events that we've already fetched and now know that their
+        # prev events are the new backwards extremeties.
         query = (
             "DELETE FROM event_backward_extremities"
             " WHERE event_id = ? AND room_id = ?"
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index 36340a652a..fd4dd67d91 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-SCHEMA_VERSION = 61
+SCHEMA_VERSION = 62
 """Represents the expectations made by the codebase about the database schema
 
 This should be incremented whenever the codebase changes its requirements on the
diff --git a/synapse/storage/schema/main/delta/62/01insertion_event_extremities.sql b/synapse/storage/schema/main/delta/62/01insertion_event_extremities.sql
new file mode 100644
index 0000000000..b731ef284a
--- /dev/null
+++ b/synapse/storage/schema/main/delta/62/01insertion_event_extremities.sql
@@ -0,0 +1,24 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+-- Add a table that keeps track of which "insertion" events need to be backfilled
+CREATE TABLE IF NOT EXISTS insertion_event_extremities(
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL
+);
+
+CREATE UNIQUE INDEX IF NOT EXISTS insertion_event_extremities_event_id ON insertion_event_extremities(event_id);
+CREATE INDEX IF NOT EXISTS insertion_event_extremities_room_id ON insertion_event_extremities(room_id);