diff options
Diffstat (limited to 'synapse/handlers/federation.py')
-rw-r--r-- | synapse/handlers/federation.py | 35 |
1 files changed, 25 insertions, 10 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 2b93b8c621..29cd45550a 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -60,6 +60,7 @@ from synapse.events import EventBase from synapse.events.snapshot import EventContext, UnpersistedEventContextBase from synapse.events.validator import EventValidator from synapse.federation.federation_client import InvalidResponseError +from synapse.handlers.pagination import PURGE_PAGINATION_LOCK_NAME from synapse.http.servlet import assert_params_in_dict from synapse.logging.context import nested_logging_context from synapse.logging.opentracing import SynapseTags, set_tag, tag_args, trace @@ -152,6 +153,7 @@ class FederationHandler: self._device_handler = hs.get_device_handler() self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator() self._notifier = hs.get_notifier() + self._worker_locks = hs.get_worker_locks_handler() self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client( hs @@ -200,7 +202,7 @@ class FederationHandler: @trace @tag_args async def maybe_backfill( - self, room_id: str, current_depth: int, limit: int + self, room_id: str, current_depth: int, limit: int, record_time: bool = True ) -> bool: """Checks the database to see if we should backfill before paginating, and if so do. @@ -213,21 +215,25 @@ class FederationHandler: limit: The number of events that the pagination request will return. This is used as part of the heuristic to decide if we should back paginate. + record_time: Whether to record the time it takes to backfill. Returns: True if we actually tried to backfill something, otherwise False. """ # Starting the processing time here so we can include the room backfill # linearizer lock queue in the timing - processing_start_time = self.clock.time_msec() + processing_start_time = self.clock.time_msec() if record_time else 0 async with self._room_backfill.queue(room_id): - return await self._maybe_backfill_inner( - room_id, - current_depth, - limit, - processing_start_time=processing_start_time, - ) + async with self._worker_locks.acquire_read_write_lock( + PURGE_PAGINATION_LOCK_NAME, room_id, write=False + ): + return await self._maybe_backfill_inner( + room_id, + current_depth, + limit, + processing_start_time=processing_start_time, + ) @trace @tag_args @@ -305,12 +311,21 @@ class FederationHandler: # of history that extends all the way back to where we are currently paginating # and it's within the 100 events that are returned from `/backfill`. if not sorted_backfill_points and current_depth != MAX_DEPTH: + # Check that we actually have later backfill points, if not just return. + have_later_backfill_points = await self.store.get_backfill_points_in_room( + room_id=room_id, + current_depth=MAX_DEPTH, + limit=1, + ) + if not have_later_backfill_points: + return False + logger.debug( "_maybe_backfill_inner: all backfill points are *after* current depth. Trying again with later backfill points." ) run_as_background_process( "_maybe_backfill_inner_anyway_with_max_depth", - self._maybe_backfill_inner, + self.maybe_backfill, room_id=room_id, # We use `MAX_DEPTH` so that we find all backfill points next # time (all events are below the `MAX_DEPTH`) @@ -319,7 +334,7 @@ class FederationHandler: # We don't want to start another timing observation from this # nested recursive call. The top-most call can record the time # overall otherwise the smaller one will throw off the results. - processing_start_time=None, + record_time=False, ) # We return `False` because we're backfilling in the background and there is # no new events immediately for the caller to know about yet. |