diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 593932adb7..8c1a230d13 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -937,9 +937,18 @@ class FederationHandler(BaseHandler):
return events
- async def maybe_backfill(self, room_id, current_depth):
+ async def maybe_backfill(self, room_id: str, current_depth: int, limit: int):
"""Checks the database to see if we should backfill before paginating,
and if so do.
+
+ Args:
+ room_id
+ current_depth: The depth from which we're paginating from. This is
+ used to decide if we should backfill and what extremities to
+ use.
+ limit: The number of events that the pagination request will
+ return. This is used as part of the heuristic to decide if we
+ should back paginate.
"""
extremities = await self.store.get_oldest_events_with_depth_in_room(room_id)
@@ -998,16 +1007,47 @@ class FederationHandler(BaseHandler):
sorted_extremeties_tuple = sorted(extremities.items(), key=lambda e: -int(e[1]))
max_depth = sorted_extremeties_tuple[0][1]
- # We don't want to specify too many extremities as it causes the backfill
- # request URI to be too long.
- extremities = dict(sorted_extremeties_tuple[:5])
-
- if current_depth > max_depth:
+ # If we're approaching an extremity we trigger a backfill, otherwise we
+ # no-op.
+ if current_depth - 2 * limit > max_depth:
logger.debug(
- "Not backfilling as we don't need to. %d < %d", max_depth, current_depth
+ "Not backfilling as we don't need to. %d < %d - 2 * %d",
+ max_depth,
+ current_depth,
+ limit,
)
return
+ logger.debug(
+ "room_id: %s, backfill: current_depth: %s, max_depth: %s, extrems: %s",
+ room_id,
+ current_depth,
+ max_depth,
+ sorted_extremeties_tuple,
+ )
+
+ # We ignore extremities that have a greater depth than our current depth
+ # as:
+ # 1. we don't really care about getting events that has happened
+ # before our current position; and
+ # 2. we have likely previously tried and failed to backfill from that
+ # extremity, so to avoid getting "stuck" requesting the same
+ # backfill repeatedly we drop those extremities.
+ filtered_sorted_extremeties_tuple = [
+ t for t in sorted_extremeties_tuple if int(t[1]) <= current_depth
+ ]
+
+ # However, we need to check that the filtered extremities are non-empty.
+ # If they are empty then either we can a) bail or b) still attempt to
+ # backill. We opt to try backfilling anyway just in case we do get
+ # relevant events.
+ if filtered_sorted_extremeties_tuple:
+ sorted_extremeties_tuple = filtered_sorted_extremeties_tuple
+
+ # We don't want to specify too many extremities as it causes the backfill
+ # request URI to be too long.
+ extremities = dict(sorted_extremeties_tuple[:5])
+
# Now we need to decide which hosts to hit first.
# First we try hosts that are already in the room
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 487420bb5d..77061cf61f 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -335,7 +335,7 @@ class PaginationHandler(object):
if room_token.topological:
max_topo = room_token.topological
else:
- max_topo = await self.store.get_max_topological_token(
+ max_topo = await self.store.get_current_topological_token(
room_id, room_token.stream
)
@@ -351,7 +351,7 @@ class PaginationHandler(object):
source_config.from_key = str(leave_token)
await self.hs.get_handlers().federation_handler.maybe_backfill(
- room_id, max_topo
+ room_id, max_topo, limit=pagin_config.limit,
)
events, next_key = await self.store.paginate_room_events(
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index aaf225894e..b98b88b877 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -605,23 +605,20 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
lambda row: "t%d-%d" % (row["topological_ordering"], row["stream_ordering"])
)
- def get_max_topological_token(self, room_id, stream_key):
- """Get the max topological token in a room before the given stream
+ def get_current_topological_token(self, room_id, stream_key):
+ """Gets the topological token in a room after or at the given stream
ordering.
Args:
- room_id (str)
- stream_key (int)
-
- Returns:
- Deferred[int]
+ room_id
+ stream_key
"""
sql = (
- "SELECT coalesce(max(topological_ordering), 0) FROM events"
- " WHERE room_id = ? AND stream_ordering < ?"
+ "SELECT coalesce(MIN(topological_ordering), 0) FROM events"
+ " WHERE room_id = ? AND stream_ordering >= ?"
)
return self.db_pool.execute(
- "get_max_topological_token", None, sql, room_id, stream_key
+ "get_current_topological_token", None, sql, room_id, stream_key
).addCallback(lambda r: r[0][0] if r else 0)
def _get_max_topological_txn(self, txn, room_id):
|