diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/handlers/pagination.py | 137 |
1 files changed, 115 insertions, 22 deletions
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index d5257acb7d..19b8728db9 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -40,6 +40,11 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +# How many single event gaps we tolerate returning in a `/messages` response before we +# backfill and try to fill in the history. This is an arbitrarily picked number so feel +# free to tune it in the future. +BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD = 3 + @attr.s(slots=True, auto_attribs=True) class PurgeStatus: @@ -486,35 +491,35 @@ class PaginationHandler: room_id, room_token.stream ) - if not use_admin_priviledge and membership == Membership.LEAVE: - # If they have left the room then clamp the token to be before - # they left the room, to save the effort of loading from the - # database. - - # This is only None if the room is world_readable, in which - # case "JOIN" would have been returned. - assert member_event_id + # If they have left the room then clamp the token to be before + # they left the room, to save the effort of loading from the + # database. + if ( + pagin_config.direction == Direction.BACKWARDS + and not use_admin_priviledge + and membership == Membership.LEAVE + ): + # This is only None if the room is world_readable, in which case + # "Membership.JOIN" would have been returned and we should never hit + # this branch. + assert member_event_id + + leave_token = await self.store.get_topological_token_for_event( + member_event_id + ) + assert leave_token.topological is not None - leave_token = await self.store.get_topological_token_for_event( - member_event_id + if leave_token.topological < curr_topo: + from_token = from_token.copy_and_replace( + StreamKeyType.ROOM, leave_token ) - assert leave_token.topological is not None - - if leave_token.topological < curr_topo: - from_token = from_token.copy_and_replace( - StreamKeyType.ROOM, leave_token - ) - - await self.hs.get_federation_handler().maybe_backfill( - room_id, - curr_topo, - limit=pagin_config.limit, - ) to_room_key = None if pagin_config.to_token: to_room_key = pagin_config.to_token.room_key + # Initially fetch the events from the database. With any luck, we can return + # these without blocking on backfill (handled below). events, next_key = await self.store.paginate_room_events( room_id=room_id, from_key=from_token.room_key, @@ -524,6 +529,94 @@ class PaginationHandler: event_filter=event_filter, ) + if pagin_config.direction == Direction.BACKWARDS: + # We use a `Set` because there can be multiple events at a given depth + # and we only care about looking at the unique continum of depths to + # find gaps. + event_depths: Set[int] = {event.depth for event in events} + sorted_event_depths = sorted(event_depths) + + # Inspect the depths of the returned events to see if there are any gaps + found_big_gap = False + number_of_gaps = 0 + previous_event_depth = ( + sorted_event_depths[0] if len(sorted_event_depths) > 0 else 0 + ) + for event_depth in sorted_event_depths: + # We don't expect a negative depth but we'll just deal with it in + # any case by taking the absolute value to get the true gap between + # any two integers. + depth_gap = abs(event_depth - previous_event_depth) + # A `depth_gap` of 1 is a normal continuous chain to the next event + # (1 <-- 2 <-- 3) so anything larger indicates a missing event (it's + # also possible there is no event at a given depth but we can't ever + # know that for sure) + if depth_gap > 1: + number_of_gaps += 1 + + # We only tolerate a small number single-event long gaps in the + # returned events because those are most likely just events we've + # failed to pull in the past. Anything longer than that is probably + # a sign that we're missing a decent chunk of history and we should + # try to backfill it. + # + # XXX: It's possible we could tolerate longer gaps if we checked + # that a given events `prev_events` is one that has failed pull + # attempts and we could just treat it like a dead branch of history + # for now or at least something that we don't need the block the + # client on to try pulling. + # + # XXX: If we had something like MSC3871 to indicate gaps in the + # timeline to the client, we could also get away with any sized gap + # and just have the client refetch the holes as they see fit. + if depth_gap > 2: + found_big_gap = True + break + previous_event_depth = event_depth + + # Backfill in the foreground if we found a big gap, have too many holes, + # or we don't have enough events to fill the limit that the client asked + # for. + missing_too_many_events = ( + number_of_gaps > BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD + ) + not_enough_events_to_fill_response = len(events) < pagin_config.limit + if ( + found_big_gap + or missing_too_many_events + or not_enough_events_to_fill_response + ): + did_backfill = ( + await self.hs.get_federation_handler().maybe_backfill( + room_id, + curr_topo, + limit=pagin_config.limit, + ) + ) + + # If we did backfill something, refetch the events from the database to + # catch anything new that might have been added since we last fetched. + if did_backfill: + events, next_key = await self.store.paginate_room_events( + room_id=room_id, + from_key=from_token.room_key, + to_key=to_room_key, + direction=pagin_config.direction, + limit=pagin_config.limit, + event_filter=event_filter, + ) + else: + # Otherwise, we can backfill in the background for eventual + # consistency's sake but we don't need to block the client waiting + # for a costly federation call and processing. + run_as_background_process( + "maybe_backfill_in_the_background", + self.hs.get_federation_handler().maybe_backfill, + room_id, + curr_topo, + limit=pagin_config.limit, + ) + next_token = from_token.copy_and_replace(StreamKeyType.ROOM, next_key) # if no events are returned from pagination, that implies |