diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index d5257acb7d..19b8728db9 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -40,6 +40,11 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
+# How many single event gaps we tolerate returning in a `/messages` response before we
+# backfill and try to fill in the history. This is an arbitrarily picked number so feel
+# free to tune it in the future.
+BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD = 3
+
@attr.s(slots=True, auto_attribs=True)
class PurgeStatus:
@@ -486,35 +491,35 @@ class PaginationHandler:
room_id, room_token.stream
)
- if not use_admin_priviledge and membership == Membership.LEAVE:
- # If they have left the room then clamp the token to be before
- # they left the room, to save the effort of loading from the
- # database.
-
- # This is only None if the room is world_readable, in which
- # case "JOIN" would have been returned.
- assert member_event_id
+ # If they have left the room then clamp the token to be before
+ # they left the room, to save the effort of loading from the
+ # database.
+ if (
+ pagin_config.direction == Direction.BACKWARDS
+ and not use_admin_priviledge
+ and membership == Membership.LEAVE
+ ):
+ # This is only None if the room is world_readable, in which case
+ # "Membership.JOIN" would have been returned and we should never hit
+ # this branch.
+ assert member_event_id
+
+ leave_token = await self.store.get_topological_token_for_event(
+ member_event_id
+ )
+ assert leave_token.topological is not None
- leave_token = await self.store.get_topological_token_for_event(
- member_event_id
+ if leave_token.topological < curr_topo:
+ from_token = from_token.copy_and_replace(
+ StreamKeyType.ROOM, leave_token
)
- assert leave_token.topological is not None
-
- if leave_token.topological < curr_topo:
- from_token = from_token.copy_and_replace(
- StreamKeyType.ROOM, leave_token
- )
-
- await self.hs.get_federation_handler().maybe_backfill(
- room_id,
- curr_topo,
- limit=pagin_config.limit,
- )
to_room_key = None
if pagin_config.to_token:
to_room_key = pagin_config.to_token.room_key
+ # Initially fetch the events from the database. With any luck, we can return
+ # these without blocking on backfill (handled below).
events, next_key = await self.store.paginate_room_events(
room_id=room_id,
from_key=from_token.room_key,
@@ -524,6 +529,94 @@ class PaginationHandler:
event_filter=event_filter,
)
+ if pagin_config.direction == Direction.BACKWARDS:
+ # We use a `Set` because there can be multiple events at a given depth
+ # and we only care about looking at the unique continum of depths to
+ # find gaps.
+ event_depths: Set[int] = {event.depth for event in events}
+ sorted_event_depths = sorted(event_depths)
+
+ # Inspect the depths of the returned events to see if there are any gaps
+ found_big_gap = False
+ number_of_gaps = 0
+ previous_event_depth = (
+ sorted_event_depths[0] if len(sorted_event_depths) > 0 else 0
+ )
+ for event_depth in sorted_event_depths:
+ # We don't expect a negative depth but we'll just deal with it in
+ # any case by taking the absolute value to get the true gap between
+ # any two integers.
+ depth_gap = abs(event_depth - previous_event_depth)
+ # A `depth_gap` of 1 is a normal continuous chain to the next event
+ # (1 <-- 2 <-- 3) so anything larger indicates a missing event (it's
+ # also possible there is no event at a given depth but we can't ever
+ # know that for sure)
+ if depth_gap > 1:
+ number_of_gaps += 1
+
+ # We only tolerate a small number single-event long gaps in the
+ # returned events because those are most likely just events we've
+ # failed to pull in the past. Anything longer than that is probably
+ # a sign that we're missing a decent chunk of history and we should
+ # try to backfill it.
+ #
+ # XXX: It's possible we could tolerate longer gaps if we checked
+ # that a given events `prev_events` is one that has failed pull
+ # attempts and we could just treat it like a dead branch of history
+ # for now or at least something that we don't need the block the
+ # client on to try pulling.
+ #
+ # XXX: If we had something like MSC3871 to indicate gaps in the
+ # timeline to the client, we could also get away with any sized gap
+ # and just have the client refetch the holes as they see fit.
+ if depth_gap > 2:
+ found_big_gap = True
+ break
+ previous_event_depth = event_depth
+
+ # Backfill in the foreground if we found a big gap, have too many holes,
+ # or we don't have enough events to fill the limit that the client asked
+ # for.
+ missing_too_many_events = (
+ number_of_gaps > BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD
+ )
+ not_enough_events_to_fill_response = len(events) < pagin_config.limit
+ if (
+ found_big_gap
+ or missing_too_many_events
+ or not_enough_events_to_fill_response
+ ):
+ did_backfill = (
+ await self.hs.get_federation_handler().maybe_backfill(
+ room_id,
+ curr_topo,
+ limit=pagin_config.limit,
+ )
+ )
+
+ # If we did backfill something, refetch the events from the database to
+ # catch anything new that might have been added since we last fetched.
+ if did_backfill:
+ events, next_key = await self.store.paginate_room_events(
+ room_id=room_id,
+ from_key=from_token.room_key,
+ to_key=to_room_key,
+ direction=pagin_config.direction,
+ limit=pagin_config.limit,
+ event_filter=event_filter,
+ )
+ else:
+ # Otherwise, we can backfill in the background for eventual
+ # consistency's sake but we don't need to block the client waiting
+ # for a costly federation call and processing.
+ run_as_background_process(
+ "maybe_backfill_in_the_background",
+ self.hs.get_federation_handler().maybe_backfill,
+ room_id,
+ curr_topo,
+ limit=pagin_config.limit,
+ )
+
next_token = from_token.copy_and_replace(StreamKeyType.ROOM, next_key)
# if no events are returned from pagination, that implies
|