diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 2b93b8c621..29cd45550a 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -60,6 +60,7 @@ from synapse.events import EventBase
from synapse.events.snapshot import EventContext, UnpersistedEventContextBase
from synapse.events.validator import EventValidator
from synapse.federation.federation_client import InvalidResponseError
+from synapse.handlers.pagination import PURGE_PAGINATION_LOCK_NAME
from synapse.http.servlet import assert_params_in_dict
from synapse.logging.context import nested_logging_context
from synapse.logging.opentracing import SynapseTags, set_tag, tag_args, trace
@@ -152,6 +153,7 @@ class FederationHandler:
self._device_handler = hs.get_device_handler()
self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator()
self._notifier = hs.get_notifier()
+ self._worker_locks = hs.get_worker_locks_handler()
self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client(
hs
@@ -200,7 +202,7 @@ class FederationHandler:
@trace
@tag_args
async def maybe_backfill(
- self, room_id: str, current_depth: int, limit: int
+ self, room_id: str, current_depth: int, limit: int, record_time: bool = True
) -> bool:
"""Checks the database to see if we should backfill before paginating,
and if so do.
@@ -213,21 +215,25 @@ class FederationHandler:
limit: The number of events that the pagination request will
return. This is used as part of the heuristic to decide if we
should back paginate.
+ record_time: Whether to record the time it takes to backfill.
Returns:
True if we actually tried to backfill something, otherwise False.
"""
# Starting the processing time here so we can include the room backfill
# linearizer lock queue in the timing
- processing_start_time = self.clock.time_msec()
+ processing_start_time = self.clock.time_msec() if record_time else 0
async with self._room_backfill.queue(room_id):
- return await self._maybe_backfill_inner(
- room_id,
- current_depth,
- limit,
- processing_start_time=processing_start_time,
- )
+ async with self._worker_locks.acquire_read_write_lock(
+ PURGE_PAGINATION_LOCK_NAME, room_id, write=False
+ ):
+ return await self._maybe_backfill_inner(
+ room_id,
+ current_depth,
+ limit,
+ processing_start_time=processing_start_time,
+ )
@trace
@tag_args
@@ -305,12 +311,21 @@ class FederationHandler:
# of history that extends all the way back to where we are currently paginating
# and it's within the 100 events that are returned from `/backfill`.
if not sorted_backfill_points and current_depth != MAX_DEPTH:
+ # Check that we actually have later backfill points, if not just return.
+ have_later_backfill_points = await self.store.get_backfill_points_in_room(
+ room_id=room_id,
+ current_depth=MAX_DEPTH,
+ limit=1,
+ )
+ if not have_later_backfill_points:
+ return False
+
logger.debug(
"_maybe_backfill_inner: all backfill points are *after* current depth. Trying again with later backfill points."
)
run_as_background_process(
"_maybe_backfill_inner_anyway_with_max_depth",
- self._maybe_backfill_inner,
+ self.maybe_backfill,
room_id=room_id,
# We use `MAX_DEPTH` so that we find all backfill points next
# time (all events are below the `MAX_DEPTH`)
@@ -319,7 +334,7 @@ class FederationHandler:
# We don't want to start another timing observation from this
# nested recursive call. The top-most call can record the time
# overall otherwise the smaller one will throw off the results.
- processing_start_time=None,
+ record_time=False,
)
# We return `False` because we're backfilling in the background and there is
# no new events immediately for the caller to know about yet.
|