summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorEric Eastwood <erice@element.io>2022-09-22 01:03:35 -0500
committerEric Eastwood <erice@element.io>2022-09-22 01:03:35 -0500
commita25821d1600d48b2759a031505190a797047b92b (patch)
treeda72b3d9a07dd363ef9047261de33c37c8d5ec1f /synapse
parentCalculate the stream_ordering from newest -> oldest (in the correct order) an... (diff)
downloadsynapse-a25821d1600d48b2759a031505190a797047b92b.tar.xz
Try sort backfill points by tie breaking on stream_ordering
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/federation.py110
-rw-r--r--synapse/handlers/federation_event.py39
-rw-r--r--synapse/storage/databases/main/event_federation.py183
3 files changed, 220 insertions, 112 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 377ac04f8c..223f343f1a 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -38,7 +38,7 @@ from signedjson.sign import verify_signed_json
 from unpaddedbase64 import decode_base64
 
 from synapse import event_auth
-from synapse.api.constants import EventContentFields, EventTypes, Membership
+from synapse.api.constants import MAX_DEPTH, EventContentFields, EventTypes, Membership
 from synapse.api.errors import (
     AuthError,
     CodeMessageException,
@@ -60,13 +60,7 @@ from synapse.events.validator import EventValidator
 from synapse.federation.federation_client import InvalidResponseError
 from synapse.http.servlet import assert_params_in_dict
 from synapse.logging.context import nested_logging_context
-from synapse.logging.tracing import (
-    SynapseTags,
-    set_attribute,
-    start_active_span,
-    tag_args,
-    trace,
-)
+from synapse.logging.tracing import SynapseTags, set_attribute, tag_args, trace
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.module_api import NOT_SPAM
 from synapse.replication.http.federation import (
@@ -125,6 +119,7 @@ class _BackfillPoint:
 
     event_id: str
     depth: int
+    stream_ordering: int
     type: _BackfillPointType
 
 
@@ -231,18 +226,24 @@ class FederationHandler:
                 processing. Only used for timing.
         """
         backwards_extremities = [
-            _BackfillPoint(event_id, depth, _BackfillPointType.BACKWARDS_EXTREMITY)
-            for event_id, depth in await self.store.get_oldest_event_ids_with_depth_in_room(
-                room_id
+            _BackfillPoint(
+                event_id, depth, stream_ordering, _BackfillPointType.BACKWARDS_EXTREMITY
+            )
+            for event_id, depth, stream_ordering in await self.store.get_backfill_points_in_room(
+                room_id=room_id,
+                current_depth=current_depth,
             )
         ]
 
         insertion_events_to_be_backfilled: List[_BackfillPoint] = []
         if self.hs.config.experimental.msc2716_enabled:
             insertion_events_to_be_backfilled = [
-                _BackfillPoint(event_id, depth, _BackfillPointType.INSERTION_PONT)
-                for event_id, depth in await self.store.get_insertion_event_backward_extremities_in_room(
-                    room_id
+                _BackfillPoint(
+                    event_id, depth, stream_ordering, _BackfillPointType.INSERTION_PONT
+                )
+                for event_id, depth, stream_ordering in await self.store.get_insertion_event_backward_extremities_in_room(
+                    room_id=room_id,
+                    current_depth=current_depth,
                 )
             ]
         logger.debug(
@@ -251,10 +252,6 @@ class FederationHandler:
             insertion_events_to_be_backfilled,
         )
 
-        if not backwards_extremities and not insertion_events_to_be_backfilled:
-            logger.debug("Not backfilling as no extremeties found.")
-            return False
-
         # we now have a list of potential places to backpaginate from. We prefer to
         # start with the most recent (ie, max depth), so let's sort the list.
         sorted_backfill_points: List[_BackfillPoint] = sorted(
@@ -262,7 +259,7 @@ class FederationHandler:
                 backwards_extremities,
                 insertion_events_to_be_backfilled,
             ),
-            key=lambda e: -int(e.depth),
+            key=lambda e: (-e.depth, -e.stream_ordering, e.event_id),
         )
 
         logger.debug(
@@ -275,6 +272,29 @@ class FederationHandler:
             sorted_backfill_points,
         )
 
+        # If we have no backfill points lower than the `current_depth` then
+        # either we can a) bail or b) still attempt to backfill. We opt to try
+        # backfilling anyway just in case we do get relevant events.
+        if not sorted_backfill_points and current_depth != MAX_DEPTH:
+            logger.debug(
+                "_maybe_backfill_inner: all backfill points are *after* current depth. Backfilling anyway."
+            )
+            return await self._maybe_backfill_inner(
+                room_id=room_id,
+                # We use `MAX_DEPTH` so that we find all backfill points next
+                # time (all events are below the `MAX_DEPTH`)
+                current_depth=MAX_DEPTH,
+                limit=limit,
+                processing_start_time=processing_start_time,
+            )
+        elif not sorted_backfill_points and current_depth == MAX_DEPTH:
+            # Even after trying again with `MAX_DEPTH`, we didn't find any
+            # backward extremities to backfill from.
+            logger.debug(
+                "_maybe_backfill_inner: Not backfilling as no backward extremeties found."
+            )
+            return False
+
         # If we're approaching an extremity we trigger a backfill, otherwise we
         # no-op.
         #
@@ -288,43 +308,16 @@ class FederationHandler:
         # XXX: shouldn't we do this *after* the filter by depth below? Again, we don't
         # care about events that have happened after our current position.
         #
-        max_depth = sorted_backfill_points[0].depth
-        if current_depth - 2 * limit > max_depth:
+        max_depth_of_backfill_points = sorted_backfill_points[0].depth
+        if current_depth - 2 * limit > max_depth_of_backfill_points:
             logger.debug(
                 "Not backfilling as we don't need to. %d < %d - 2 * %d",
-                max_depth,
+                max_depth_of_backfill_points,
                 current_depth,
                 limit,
             )
             return False
 
-        # We ignore extremities that have a greater depth than our current depth
-        # as:
-        #    1. we don't really care about getting events that have happened
-        #       after our current position; and
-        #    2. we have likely previously tried and failed to backfill from that
-        #       extremity, so to avoid getting "stuck" requesting the same
-        #       backfill repeatedly we drop those extremities.
-        #
-        # However, we need to check that the filtered extremities are non-empty.
-        # If they are empty then either we can a) bail or b) still attempt to
-        # backfill. We opt to try backfilling anyway just in case we do get
-        # relevant events.
-        #
-        filtered_sorted_backfill_points = [
-            t for t in sorted_backfill_points if t.depth <= current_depth
-        ]
-        if filtered_sorted_backfill_points:
-            logger.debug(
-                "_maybe_backfill_inner: backfill points before current depth: %s",
-                filtered_sorted_backfill_points,
-            )
-            sorted_backfill_points = filtered_sorted_backfill_points
-        else:
-            logger.debug(
-                "_maybe_backfill_inner: all backfill points are *after* current depth. Backfilling anyway."
-            )
-
         # For performance's sake, we only want to paginate from a particular extremity
         # if we can actually see the events we'll get. Otherwise, we'd just spend a lot
         # of resources to get redacted events. We check each extremity in turn and
@@ -346,14 +339,10 @@ class FederationHandler:
         # attempting to paginate before backfill reached the visible history.
 
         extremities_to_request: List[str] = []
-        for i, bp in enumerate(sorted_backfill_points):
+        for bp in sorted_backfill_points:
             if len(extremities_to_request) >= 5:
                 break
 
-            set_attribute(
-                SynapseTags.RESULT_PREFIX + "backfill_point" + str(i), str(bp)
-            )
-
             # For regular backwards extremities, we don't have the extremity events
             # themselves, so we need to actually check the events that reference them -
             # their "successor" events.
@@ -410,13 +399,12 @@ class FederationHandler:
             str(len(extremities_to_request)),
         )
 
-        with start_active_span("getting likely_domains"):
-            # Now we need to decide which hosts to hit first.
-            # First we try hosts that are already in the room.
-            # TODO: HEURISTIC ALERT.
-            likely_domains = (
-                await self._storage_controllers.state.get_current_hosts_in_room(room_id)
-            )
+        # Now we need to decide which hosts to hit first.
+        # First we try hosts that are already in the room.
+        # TODO: HEURISTIC ALERT.
+        likely_domains = (
+            await self._storage_controllers.state.get_current_hosts_in_room(room_id)
+        )
 
         async def try_backfill(domains: Collection[str]) -> bool:
             # TODO: Should we try multiple of these at a time?
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 40fbcce53a..5aaf2dd9e0 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -649,13 +649,31 @@ class FederationEventHandler:
             #
             # We expect the events from the `/backfill`response to start from
             # `?v` and include events that preceded it (so the list will be
-            # newest -> oldest). This is at-most a convention between Synapse
-            # servers as the order is not specced.
+            # newest -> oldest, reverse chronological). This is at-most a
+            # convention between Synapse servers as the order is not specced.
             #
-            # Reverse the list of events
+            # We want to calculate the `stream_ordering`` from newest -> oldest
+            # (so historical events sort in the correct order) and persist in
+            # oldest -> newest to get the least missing `prev_event` fetch
+            # thrashing.
             reverse_chronological_events = events
+            # `[::-1]` is just syntax to reverse the list and give us a copy
             chronological_events = reverse_chronological_events[::-1]
 
+            logger.info(
+                "backfill assumed reverse_chronological_events=%s",
+                [
+                    "event_id=%s,depth=%d,body=%s,prevs=%s\n"
+                    % (
+                        event.event_id,
+                        event.depth,
+                        event.content.get("body", event.type),
+                        event.prev_event_ids(),
+                    )
+                    for event in reverse_chronological_events
+                ],
+            )
+
             from synapse.storage.util.id_generators import AbstractStreamIdGenerator
 
             # This should only exist on instances that are configured to write
@@ -826,6 +844,21 @@ class FederationEventHandler:
         # We want to sort these by depth so we process them and
         # tell clients about them in order.
         sorted_events = sorted(events, key=lambda x: x.depth)
+
+        logger.info(
+            "backfill sorted_events=%s",
+            [
+                "event_id=%s,depth=%d,body=%s,prevs=%s\n"
+                % (
+                    event.event_id,
+                    event.depth,
+                    event.content.get("body", event.type),
+                    event.prev_event_ids(),
+                )
+                for event in sorted_events
+            ],
+        )
+
         for ev in sorted_events:
             with nested_logging_context(ev.event_id):
                 await self._process_pulled_event(origin, ev, backfilled=backfilled)
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 0669d54822..29c461d441 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -11,6 +11,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import datetime
 import itertools
 import logging
 from queue import Empty, PriorityQueue
@@ -43,7 +44,7 @@ from synapse.storage.database import (
 )
 from synapse.storage.databases.main.events_worker import EventsWorkerStore
 from synapse.storage.databases.main.signatures import SignatureWorkerStore
-from synapse.storage.engines import PostgresEngine
+from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 from synapse.types import JsonDict
 from synapse.util import json_encoder
 from synapse.util.caches.descriptors import cached
@@ -72,7 +73,6 @@ pdus_pruned_from_federation_queue = Counter(
 
 logger = logging.getLogger(__name__)
 
-
 # All the info we need while iterating the DAG while backfilling
 @attr.s(frozen=True, slots=True, auto_attribs=True)
 class BackfillQueueNavigationItem:
@@ -715,97 +715,179 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
 
     @trace
     @tag_args
-    async def get_oldest_event_ids_with_depth_in_room(
-        self, room_id: str
-    ) -> List[Tuple[str, int]]:
-        """Gets the oldest events(backwards extremities) in the room along with the
-        aproximate depth.
-
-        We use this function so that we can compare and see if someones current
-        depth at their current scrollback is within pagination range of the
-        event extremeties. If the current depth is close to the depth of given
-        oldest event, we can trigger a backfill.
+    async def get_backfill_points_in_room(
+        self,
+        room_id: str,
+        current_depth: int,
+    ) -> List[Tuple[str, int, int]]:
+        """
+        Gets the oldest events(backwards extremities) in the room along with the
+        approximate depth. Sorted by depth, highest to lowest (descending) so the closest
+        events to the `current_depth` are first in the list.
+
+        We use this function so that we can compare and see if a client's
+        `current_depth` at their current scrollback is within pagination range
+        of the event extremities. If the `current_depth` is close to the depth
+        of given oldest event, we can trigger a backfill.
+
+        We ignore extremities that have a greater depth than our `current_depth`
+        as:
+            1. we don't really care about getting events that have happened
+               after our current position; and
+            2. by the nature of paginating and scrolling back, we have likely
+               previously tried and failed to backfill from that extremity, so
+               to avoid getting "stuck" requesting the same backfill repeatedly
+               we drop those extremities.
 
         Args:
             room_id: Room where we want to find the oldest events
+            current_depth: The depth at the users current scrollback position
+                because we only care about finding events older than the given
+                `current_depth` when scrolling and paginating backwards.
 
         Returns:
-            List of (event_id, depth) tuples
+            List of (event_id, depth, stream_ordering) tuples. Sorted by depth,
+            highest to lowest (descending) so the closest events to the
+            `current_depth` are first in the list. Tie-broken with `stream_ordering`,
+            then `event_id` to get a stable sort.
         """
 
-        def get_oldest_event_ids_with_depth_in_room_txn(
+        def get_backfill_points_in_room_txn(
             txn: LoggingTransaction, room_id: str
-        ) -> List[Tuple[str, int]]:
-            # Assemble a dictionary with event_id -> depth for the oldest events
+        ) -> List[Tuple[str, int, int]]:
+            # Assemble a tuple lookup of event_id -> depth for the oldest events
             # we know of in the room. Backwards extremeties are the oldest
             # events we know of in the room but we only know of them because
-            # some other event referenced them by prev_event and aren't peristed
-            # in our database yet (meaning we don't know their depth
-            # specifically). So we need to look for the aproximate depth from
+            # some other event referenced them by prev_event and aren't
+            # persisted in our database yet (meaning we don't know their depth
+            # specifically). So we need to look for the approximate depth from
             # the events connected to the current backwards extremeties.
             sql = """
-                SELECT b.event_id, MAX(e.depth) FROM events as e
+                SELECT backward_extrem.event_id, event.depth, event.stream_ordering FROM events AS event
                 /**
                  * Get the edge connections from the event_edges table
                  * so we can see whether this event's prev_events points
                  * to a backward extremity in the next join.
                  */
-                INNER JOIN event_edges as g
-                ON g.event_id = e.event_id
+                INNER JOIN event_edges AS edge
+                ON edge.event_id = event.event_id
                 /**
                  * We find the "oldest" events in the room by looking for
                  * events connected to backwards extremeties (oldest events
                  * in the room that we know of so far).
                  */
-                INNER JOIN event_backward_extremities as b
-                ON g.prev_event_id = b.event_id
-                WHERE b.room_id = ? AND g.is_state is ?
-                GROUP BY b.event_id
+                INNER JOIN event_backward_extremities AS backward_extrem
+                ON edge.prev_event_id = backward_extrem.event_id
+                WHERE
+                    backward_extrem.room_id = ?
+                    /* We only care about non-state events because TODO: why */
+                    AND edge.is_state is ? /* False */
+                    /**
+                     * We only want backwards extremities that are older than or at
+                     * the same position of the given `current_depth` (where older
+                     * means less than the given depth) because we're looking backwards
+                     * from the `current_depth` when backfilling.
+                     *
+                     *                           current_depth (ignore events that come after this, ignore 2-4)
+                     *                           |
+                     * <oldest-in-time> [0]<--[1]▼<--[2]<--[3]<--[4] <newest-in-time>
+                     */
+                    AND event.depth <= ? /* current_depth */
+                /**
+                 * Sort from highest (closest to the `current_depth`) to the lowest depth
+                 * because the closest are most relevant to backfill from first.
+                 * Then tie-break on alphabetical order of the event_ids so we get a
+                 * consistent ordering which is nice when asserting things in tests.
+                 */
+                ORDER BY event.depth DESC, event.stream_ordering DESC, backward_extrem.event_id DESC
             """
 
-            txn.execute(sql, (room_id, False))
+            txn.execute(
+                sql,
+                (
+                    room_id,
+                    False,
+                    current_depth,
+                ),
+            )
 
-            return cast(List[Tuple[str, int]], txn.fetchall())
+            return cast(List[Tuple[str, int, int]], txn.fetchall())
 
         return await self.db_pool.runInteraction(
-            "get_oldest_event_ids_with_depth_in_room",
-            get_oldest_event_ids_with_depth_in_room_txn,
+            "get_backfill_points_in_room",
+            get_backfill_points_in_room_txn,
             room_id,
         )
 
     @trace
     async def get_insertion_event_backward_extremities_in_room(
-        self, room_id: str
-    ) -> List[Tuple[str, int]]:
-        """Get the insertion events we know about that we haven't backfilled yet.
-
-        We use this function so that we can compare and see if someones current
-        depth at their current scrollback is within pagination range of the
-        insertion event. If the current depth is close to the depth of given
-        insertion event, we can trigger a backfill.
+        self,
+        room_id: str,
+        current_depth: int,
+    ) -> List[Tuple[str, int, int]]:
+        """
+        Get the insertion events we know about that we haven't backfilled yet
+        along with the approximate depth. Sorted by depth, highest to lowest
+        (descending) so the closest events to the `current_depth` are first
+        in the list.
+
+        We use this function so that we can compare and see if someones
+        `current_depth` at their current scrollback is within pagination range
+        of the insertion event. If the `current_depth` is close to the depth
+        of the given insertion event, we can trigger a backfill.
+
+        We ignore insertion events that have a greater depth than our `current_depth`
+        as:
+            1. we don't really care about getting events that have happened
+               after our current position; and
+            2. by the nature of paginating and scrolling back, we have likely
+               previously tried and failed to backfill from that insertion event, so
+               to avoid getting "stuck" requesting the same backfill repeatedly
+               we drop those insertion event.
 
         Args:
             room_id: Room where we want to find the oldest events
+            current_depth: The depth at the users current scrollback position because
+                we only care about finding events older than the given
+                `current_depth` when scrolling and paginating backwards.
 
         Returns:
-            List of (event_id, depth) tuples
+            List of (event_id, depth, stream_ordering) tuples. Sorted by depth,
+            highest to lowest (descending) so the closest events to the
+            `current_depth` are first in the list. Tie-broken with `stream_ordering`,
+            then `event_id` to get a stable sort.
         """
 
         def get_insertion_event_backward_extremities_in_room_txn(
             txn: LoggingTransaction, room_id: str
-        ) -> List[Tuple[str, int]]:
+        ) -> List[Tuple[str, int, int]]:
             sql = """
-                SELECT b.event_id, MAX(e.depth) FROM insertion_events as i
+                SELECT
+                    insertion_event_extremity.event_id, event.depth, event.stream_ordering
                 /* We only want insertion events that are also marked as backwards extremities */
-                INNER JOIN insertion_event_extremities as b USING (event_id)
+                FROM insertion_event_extremities AS insertion_event_extremity
                 /* Get the depth of the insertion event from the events table */
-                INNER JOIN events AS e USING (event_id)
-                WHERE b.room_id = ?
-                GROUP BY b.event_id
+                INNER JOIN events AS event USING (event_id)
+                WHERE
+                    insertion_event_extremity.room_id = ?
+                    AND event.depth <= ? /* current_depth */
+                /**
+                 * Sort from highest (closest to the `current_depth`) to the lowest depth
+                 * because the closest are most relevant to backfill from first.
+                 * Then tie-break on alphabetical order of the event_ids so we get a
+                 * consistent ordering which is nice when asserting things in tests.
+                 */
+                ORDER BY event.depth DESC, event.stream_ordering DESC, insertion_event_extremity.event_id DESC
             """
 
-            txn.execute(sql, (room_id,))
-            return cast(List[Tuple[str, int]], txn.fetchall())
+            txn.execute(
+                sql,
+                (
+                    room_id,
+                    current_depth,
+                ),
+            )
+            return cast(List[Tuple[str, int, int]], txn.fetchall())
 
         return await self.db_pool.runInteraction(
             "get_insertion_event_backward_extremities_in_room",
@@ -1539,7 +1621,12 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
         self,
         room_id: str,
     ) -> Optional[Tuple[str, str]]:
-        """Get the next event ID in the staging area for the given room."""
+        """
+        Get the next event ID in the staging area for the given room.
+
+        Returns:
+            Tuple of the `origin` and `event_id`
+        """
 
         def _get_next_staged_event_id_for_room_txn(
             txn: LoggingTransaction,