diff options
-rw-r--r-- | synapse/handlers/federation.py | 54 | ||||
-rw-r--r-- | synapse/handlers/pagination.py | 4 | ||||
-rw-r--r-- | synapse/storage/databases/main/stream.py | 17 |
3 files changed, 56 insertions, 19 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 593932adb7..8c1a230d13 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -937,9 +937,18 @@ class FederationHandler(BaseHandler): return events - async def maybe_backfill(self, room_id, current_depth): + async def maybe_backfill(self, room_id: str, current_depth: int, limit: int): """Checks the database to see if we should backfill before paginating, and if so do. + + Args: + room_id + current_depth: The depth from which we're paginating from. This is + used to decide if we should backfill and what extremities to + use. + limit: The number of events that the pagination request will + return. This is used as part of the heuristic to decide if we + should back paginate. """ extremities = await self.store.get_oldest_events_with_depth_in_room(room_id) @@ -998,16 +1007,47 @@ class FederationHandler(BaseHandler): sorted_extremeties_tuple = sorted(extremities.items(), key=lambda e: -int(e[1])) max_depth = sorted_extremeties_tuple[0][1] - # We don't want to specify too many extremities as it causes the backfill - # request URI to be too long. - extremities = dict(sorted_extremeties_tuple[:5]) - - if current_depth > max_depth: + # If we're approaching an extremity we trigger a backfill, otherwise we + # no-op. + if current_depth - 2 * limit > max_depth: logger.debug( - "Not backfilling as we don't need to. %d < %d", max_depth, current_depth + "Not backfilling as we don't need to. %d < %d - 2 * %d", + max_depth, + current_depth, + limit, ) return + logger.debug( + "room_id: %s, backfill: current_depth: %s, max_depth: %s, extrems: %s", + room_id, + current_depth, + max_depth, + sorted_extremeties_tuple, + ) + + # We ignore extremities that have a greater depth than our current depth + # as: + # 1. we don't really care about getting events that has happened + # before our current position; and + # 2. we have likely previously tried and failed to backfill from that + # extremity, so to avoid getting "stuck" requesting the same + # backfill repeatedly we drop those extremities. + filtered_sorted_extremeties_tuple = [ + t for t in sorted_extremeties_tuple if int(t[1]) <= current_depth + ] + + # However, we need to check that the filtered extremities are non-empty. + # If they are empty then either we can a) bail or b) still attempt to + # backill. We opt to try backfilling anyway just in case we do get + # relevant events. + if filtered_sorted_extremeties_tuple: + sorted_extremeties_tuple = filtered_sorted_extremeties_tuple + + # We don't want to specify too many extremities as it causes the backfill + # request URI to be too long. + extremities = dict(sorted_extremeties_tuple[:5]) + # Now we need to decide which hosts to hit first. # First we try hosts that are already in the room diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 487420bb5d..77061cf61f 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -335,7 +335,7 @@ class PaginationHandler(object): if room_token.topological: max_topo = room_token.topological else: - max_topo = await self.store.get_max_topological_token( + max_topo = await self.store.get_current_topological_token( room_id, room_token.stream ) @@ -351,7 +351,7 @@ class PaginationHandler(object): source_config.from_key = str(leave_token) await self.hs.get_handlers().federation_handler.maybe_backfill( - room_id, max_topo + room_id, max_topo, limit=pagin_config.limit, ) events, next_key = await self.store.paginate_room_events( diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index aaf225894e..b98b88b877 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -605,23 +605,20 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): lambda row: "t%d-%d" % (row["topological_ordering"], row["stream_ordering"]) ) - def get_max_topological_token(self, room_id, stream_key): - """Get the max topological token in a room before the given stream + def get_current_topological_token(self, room_id, stream_key): + """Gets the topological token in a room after or at the given stream ordering. Args: - room_id (str) - stream_key (int) - - Returns: - Deferred[int] + room_id + stream_key """ sql = ( - "SELECT coalesce(max(topological_ordering), 0) FROM events" - " WHERE room_id = ? AND stream_ordering < ?" + "SELECT coalesce(MIN(topological_ordering), 0) FROM events" + " WHERE room_id = ? AND stream_ordering >= ?" ) return self.db_pool.execute( - "get_max_topological_token", None, sql, room_id, stream_key + "get_current_topological_token", None, sql, room_id, stream_key ).addCallback(lambda r: r[0][0] if r else 0) def _get_max_topological_txn(self, txn, room_id): |