summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/federation.py65
-rw-r--r--synapse/handlers/pagination.py8
-rw-r--r--synapse/storage/databases/main/stream.py13
3 files changed, 66 insertions, 20 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 43f2986f89..014dab2940 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -943,15 +943,26 @@ class FederationHandler(BaseHandler):
 
         return events
 
-    async def maybe_backfill(self, room_id, current_depth):
+    async def maybe_backfill(
+        self, room_id: str, current_depth: int, limit: int
+    ) -> bool:
         """Checks the database to see if we should backfill before paginating,
         and if so do.
+
+        Args:
+            room_id
+            current_depth: The depth from which we're paginating from. This is
+                used to decide if we should backfill and what extremities to
+                use.
+            limit: The number of events that the pagination request will
+                return. This is used as part of the heuristic to decide if we
+                should back paginate.
         """
         extremities = await self.store.get_oldest_events_with_depth_in_room(room_id)
 
         if not extremities:
             logger.debug("Not backfilling as no extremeties found.")
-            return
+            return False
 
         # We only want to paginate if we can actually see the events we'll get,
         # as otherwise we'll just spend a lot of resources to get redacted
@@ -1004,16 +1015,54 @@ class FederationHandler(BaseHandler):
         sorted_extremeties_tuple = sorted(extremities.items(), key=lambda e: -int(e[1]))
         max_depth = sorted_extremeties_tuple[0][1]
 
+        # If we're approaching an extremity we trigger a backfill, otherwise we
+        # no-op.
+        #
+        # We chose twice the limit here as then clients paginating backwards
+        # will send pagination requests that trigger backfill at least twice
+        # using the most recent extremity before it gets removed (see below). We
+        # chose more than one times the limit in case of failure, but choosing a
+        # much larger factor will result in triggering a backfill request much
+        # earlier than necessary.
+        if current_depth - 2 * limit > max_depth:
+            logger.debug(
+                "Not backfilling as we don't need to. %d < %d - 2 * %d",
+                max_depth,
+                current_depth,
+                limit,
+            )
+            return False
+
+        logger.debug(
+            "room_id: %s, backfill: current_depth: %s, max_depth: %s, extrems: %s",
+            room_id,
+            current_depth,
+            max_depth,
+            sorted_extremeties_tuple,
+        )
+
+        # We ignore extremities that have a greater depth than our current depth
+        # as:
+        #    1. we don't really care about getting events that have happened
+        #       before our current position; and
+        #    2. we have likely previously tried and failed to backfill from that
+        #       extremity, so to avoid getting "stuck" requesting the same
+        #       backfill repeatedly we drop those extremities.
+        filtered_sorted_extremeties_tuple = [
+            t for t in sorted_extremeties_tuple if int(t[1]) <= current_depth
+        ]
+
+        # However, we need to check that the filtered extremities are non-empty.
+        # If they are empty then either we can a) bail or b) still attempt to
+        # backill. We opt to try backfilling anyway just in case we do get
+        # relevant events.
+        if filtered_sorted_extremeties_tuple:
+            sorted_extremeties_tuple = filtered_sorted_extremeties_tuple
+
         # We don't want to specify too many extremities as it causes the backfill
         # request URI to be too long.
         extremities = dict(sorted_extremeties_tuple[:5])
 
-        if current_depth > max_depth:
-            logger.debug(
-                "Not backfilling as we don't need to. %d < %d", max_depth, current_depth
-            )
-            return
-
         # Now we need to decide which hosts to hit first.
 
         # First we try hosts that are already in the room
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 34ed0e2921..6067585f9b 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -362,9 +362,9 @@ class PaginationHandler:
                 # if we're going backwards, we might need to backfill. This
                 # requires that we have a topo token.
                 if room_token.topological:
-                    max_topo = room_token.topological
+                    curr_topo = room_token.topological
                 else:
-                    max_topo = await self.store.get_max_topological_token(
+                    curr_topo = await self.store.get_current_topological_token(
                         room_id, room_token.stream
                     )
 
@@ -380,11 +380,11 @@ class PaginationHandler:
                     leave_token = await self.store.get_topological_token_for_event(
                         member_event_id
                     )
-                    if RoomStreamToken.parse(leave_token).topological < max_topo:
+                    if RoomStreamToken.parse(leave_token).topological < curr_topo:
                         source_config.from_key = str(leave_token)
 
                 await self.hs.get_handlers().federation_handler.maybe_backfill(
-                    room_id, max_topo
+                    room_id, curr_topo, limit=source_config.limit,
                 )
 
             events, next_key = await self.store.paginate_room_events(
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index be6df8a6d1..db20a3db30 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -648,23 +648,20 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         )
         return "t%d-%d" % (row["topological_ordering"], row["stream_ordering"])
 
-    async def get_max_topological_token(self, room_id: str, stream_key: int) -> int:
-        """Get the max topological token in a room before the given stream
+    async def get_current_topological_token(self, room_id: str, stream_key: int) -> int:
+        """Gets the topological token in a room after or at the given stream
         ordering.
 
         Args:
             room_id
             stream_key
-
-        Returns:
-            The maximum topological token.
         """
         sql = (
-            "SELECT coalesce(max(topological_ordering), 0) FROM events"
-            " WHERE room_id = ? AND stream_ordering < ?"
+            "SELECT coalesce(MIN(topological_ordering), 0) FROM events"
+            " WHERE room_id = ? AND stream_ordering >= ?"
         )
         row = await self.db_pool.execute(
-            "get_max_topological_token", None, sql, room_id, stream_key
+            "get_current_topological_token", None, sql, room_id, stream_key
         )
         return row[0][0] if row else 0