diff options
Diffstat (limited to 'synapse/handlers/pagination.py')
-rw-r--r-- | synapse/handlers/pagination.py | 267 |
1 files changed, 131 insertions, 136 deletions
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 1be6ebc6d9..e5ac9096cc 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -487,155 +487,150 @@ class PaginationHandler: room_token = from_token.room_key - async with self._worker_locks.acquire_read_write_lock( - PURGE_PAGINATION_LOCK_NAME, room_id, write=False - ): - (membership, member_event_id) = (None, None) - if not use_admin_priviledge: - ( - membership, - member_event_id, - ) = await self.auth.check_user_in_room_or_world_readable( - room_id, requester, allow_departed_users=True + (membership, member_event_id) = (None, None) + if not use_admin_priviledge: + ( + membership, + member_event_id, + ) = await self.auth.check_user_in_room_or_world_readable( + room_id, requester, allow_departed_users=True + ) + + if pagin_config.direction == Direction.BACKWARDS: + # if we're going backwards, we might need to backfill. This + # requires that we have a topo token. + if room_token.topological: + curr_topo = room_token.topological + else: + curr_topo = await self.store.get_current_topological_token( + room_id, room_token.stream ) - if pagin_config.direction == Direction.BACKWARDS: - # if we're going backwards, we might need to backfill. This - # requires that we have a topo token. - if room_token.topological: - curr_topo = room_token.topological - else: - curr_topo = await self.store.get_current_topological_token( - room_id, room_token.stream - ) + # If they have left the room then clamp the token to be before + # they left the room, to save the effort of loading from the + # database. + if ( + pagin_config.direction == Direction.BACKWARDS + and not use_admin_priviledge + and membership == Membership.LEAVE + ): + # This is only None if the room is world_readable, in which case + # "Membership.JOIN" would have been returned and we should never hit + # this branch. + assert member_event_id - # If they have left the room then clamp the token to be before - # they left the room, to save the effort of loading from the - # database. - if ( - pagin_config.direction == Direction.BACKWARDS - and not use_admin_priviledge - and membership == Membership.LEAVE - ): - # This is only None if the room is world_readable, in which case - # "Membership.JOIN" would have been returned and we should never hit - # this branch. - assert member_event_id + leave_token = await self.store.get_topological_token_for_event( + member_event_id + ) + assert leave_token.topological is not None - leave_token = await self.store.get_topological_token_for_event( - member_event_id + if leave_token.topological < curr_topo: + from_token = from_token.copy_and_replace( + StreamKeyType.ROOM, leave_token ) - assert leave_token.topological is not None - if leave_token.topological < curr_topo: - from_token = from_token.copy_and_replace( - StreamKeyType.ROOM, leave_token - ) + to_room_key = None + if pagin_config.to_token: + to_room_key = pagin_config.to_token.room_key + + # Initially fetch the events from the database. With any luck, we can return + # these without blocking on backfill (handled below). + events, next_key = await self.store.paginate_room_events( + room_id=room_id, + from_key=from_token.room_key, + to_key=to_room_key, + direction=pagin_config.direction, + limit=pagin_config.limit, + event_filter=event_filter, + ) - to_room_key = None - if pagin_config.to_token: - to_room_key = pagin_config.to_token.room_key - - # Initially fetch the events from the database. With any luck, we can return - # these without blocking on backfill (handled below). - events, next_key = await self.store.paginate_room_events( - room_id=room_id, - from_key=from_token.room_key, - to_key=to_room_key, - direction=pagin_config.direction, - limit=pagin_config.limit, - event_filter=event_filter, + if pagin_config.direction == Direction.BACKWARDS: + # We use a `Set` because there can be multiple events at a given depth + # and we only care about looking at the unique continum of depths to + # find gaps. + event_depths: Set[int] = {event.depth for event in events} + sorted_event_depths = sorted(event_depths) + + # Inspect the depths of the returned events to see if there are any gaps + found_big_gap = False + number_of_gaps = 0 + previous_event_depth = ( + sorted_event_depths[0] if len(sorted_event_depths) > 0 else 0 ) - - if pagin_config.direction == Direction.BACKWARDS: - # We use a `Set` because there can be multiple events at a given depth - # and we only care about looking at the unique continum of depths to - # find gaps. - event_depths: Set[int] = {event.depth for event in events} - sorted_event_depths = sorted(event_depths) - - # Inspect the depths of the returned events to see if there are any gaps - found_big_gap = False - number_of_gaps = 0 - previous_event_depth = ( - sorted_event_depths[0] if len(sorted_event_depths) > 0 else 0 - ) - for event_depth in sorted_event_depths: - # We don't expect a negative depth but we'll just deal with it in - # any case by taking the absolute value to get the true gap between - # any two integers. - depth_gap = abs(event_depth - previous_event_depth) - # A `depth_gap` of 1 is a normal continuous chain to the next event - # (1 <-- 2 <-- 3) so anything larger indicates a missing event (it's - # also possible there is no event at a given depth but we can't ever - # know that for sure) - if depth_gap > 1: - number_of_gaps += 1 - - # We only tolerate a small number single-event long gaps in the - # returned events because those are most likely just events we've - # failed to pull in the past. Anything longer than that is probably - # a sign that we're missing a decent chunk of history and we should - # try to backfill it. - # - # XXX: It's possible we could tolerate longer gaps if we checked - # that a given events `prev_events` is one that has failed pull - # attempts and we could just treat it like a dead branch of history - # for now or at least something that we don't need the block the - # client on to try pulling. - # - # XXX: If we had something like MSC3871 to indicate gaps in the - # timeline to the client, we could also get away with any sized gap - # and just have the client refetch the holes as they see fit. - if depth_gap > 2: - found_big_gap = True - break - previous_event_depth = event_depth - - # Backfill in the foreground if we found a big gap, have too many holes, - # or we don't have enough events to fill the limit that the client asked - # for. - missing_too_many_events = ( - number_of_gaps > BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD + for event_depth in sorted_event_depths: + # We don't expect a negative depth but we'll just deal with it in + # any case by taking the absolute value to get the true gap between + # any two integers. + depth_gap = abs(event_depth - previous_event_depth) + # A `depth_gap` of 1 is a normal continuous chain to the next event + # (1 <-- 2 <-- 3) so anything larger indicates a missing event (it's + # also possible there is no event at a given depth but we can't ever + # know that for sure) + if depth_gap > 1: + number_of_gaps += 1 + + # We only tolerate a small number single-event long gaps in the + # returned events because those are most likely just events we've + # failed to pull in the past. Anything longer than that is probably + # a sign that we're missing a decent chunk of history and we should + # try to backfill it. + # + # XXX: It's possible we could tolerate longer gaps if we checked + # that a given events `prev_events` is one that has failed pull + # attempts and we could just treat it like a dead branch of history + # for now or at least something that we don't need the block the + # client on to try pulling. + # + # XXX: If we had something like MSC3871 to indicate gaps in the + # timeline to the client, we could also get away with any sized gap + # and just have the client refetch the holes as they see fit. + if depth_gap > 2: + found_big_gap = True + break + previous_event_depth = event_depth + + # Backfill in the foreground if we found a big gap, have too many holes, + # or we don't have enough events to fill the limit that the client asked + # for. + missing_too_many_events = ( + number_of_gaps > BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD + ) + not_enough_events_to_fill_response = len(events) < pagin_config.limit + if ( + found_big_gap + or missing_too_many_events + or not_enough_events_to_fill_response + ): + did_backfill = await self.hs.get_federation_handler().maybe_backfill( + room_id, + curr_topo, + limit=pagin_config.limit, ) - not_enough_events_to_fill_response = len(events) < pagin_config.limit - if ( - found_big_gap - or missing_too_many_events - or not_enough_events_to_fill_response - ): - did_backfill = ( - await self.hs.get_federation_handler().maybe_backfill( - room_id, - curr_topo, - limit=pagin_config.limit, - ) - ) - # If we did backfill something, refetch the events from the database to - # catch anything new that might have been added since we last fetched. - if did_backfill: - events, next_key = await self.store.paginate_room_events( - room_id=room_id, - from_key=from_token.room_key, - to_key=to_room_key, - direction=pagin_config.direction, - limit=pagin_config.limit, - event_filter=event_filter, - ) - else: - # Otherwise, we can backfill in the background for eventual - # consistency's sake but we don't need to block the client waiting - # for a costly federation call and processing. - run_as_background_process( - "maybe_backfill_in_the_background", - self.hs.get_federation_handler().maybe_backfill, - room_id, - curr_topo, + # If we did backfill something, refetch the events from the database to + # catch anything new that might have been added since we last fetched. + if did_backfill: + events, next_key = await self.store.paginate_room_events( + room_id=room_id, + from_key=from_token.room_key, + to_key=to_room_key, + direction=pagin_config.direction, limit=pagin_config.limit, + event_filter=event_filter, ) + else: + # Otherwise, we can backfill in the background for eventual + # consistency's sake but we don't need to block the client waiting + # for a costly federation call and processing. + run_as_background_process( + "maybe_backfill_in_the_background", + self.hs.get_federation_handler().maybe_backfill, + room_id, + curr_topo, + limit=pagin_config.limit, + ) - next_token = from_token.copy_and_replace(StreamKeyType.ROOM, next_key) + next_token = from_token.copy_and_replace(StreamKeyType.ROOM, next_key) # if no events are returned from pagination, that implies # we have reached the end of the available events. |