diff --git a/changelog.d/16159.misc b/changelog.d/16159.misc
new file mode 100644
index 0000000000..04cdd1afaf
--- /dev/null
+++ b/changelog.d/16159.misc
@@ -0,0 +1 @@
+Reduce scope of locks when paginating to alleviate DB contention.
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 2b93b8c621..29cd45550a 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -60,6 +60,7 @@ from synapse.events import EventBase
from synapse.events.snapshot import EventContext, UnpersistedEventContextBase
from synapse.events.validator import EventValidator
from synapse.federation.federation_client import InvalidResponseError
+from synapse.handlers.pagination import PURGE_PAGINATION_LOCK_NAME
from synapse.http.servlet import assert_params_in_dict
from synapse.logging.context import nested_logging_context
from synapse.logging.opentracing import SynapseTags, set_tag, tag_args, trace
@@ -152,6 +153,7 @@ class FederationHandler:
self._device_handler = hs.get_device_handler()
self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator()
self._notifier = hs.get_notifier()
+ self._worker_locks = hs.get_worker_locks_handler()
self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client(
hs
@@ -200,7 +202,7 @@ class FederationHandler:
@trace
@tag_args
async def maybe_backfill(
- self, room_id: str, current_depth: int, limit: int
+ self, room_id: str, current_depth: int, limit: int, record_time: bool = True
) -> bool:
"""Checks the database to see if we should backfill before paginating,
and if so do.
@@ -213,21 +215,25 @@ class FederationHandler:
limit: The number of events that the pagination request will
return. This is used as part of the heuristic to decide if we
should back paginate.
+ record_time: Whether to record the time it takes to backfill.
Returns:
True if we actually tried to backfill something, otherwise False.
"""
# Starting the processing time here so we can include the room backfill
# linearizer lock queue in the timing
- processing_start_time = self.clock.time_msec()
+ processing_start_time = self.clock.time_msec() if record_time else 0
async with self._room_backfill.queue(room_id):
- return await self._maybe_backfill_inner(
- room_id,
- current_depth,
- limit,
- processing_start_time=processing_start_time,
- )
+ async with self._worker_locks.acquire_read_write_lock(
+ PURGE_PAGINATION_LOCK_NAME, room_id, write=False
+ ):
+ return await self._maybe_backfill_inner(
+ room_id,
+ current_depth,
+ limit,
+ processing_start_time=processing_start_time,
+ )
@trace
@tag_args
@@ -305,12 +311,21 @@ class FederationHandler:
# of history that extends all the way back to where we are currently paginating
# and it's within the 100 events that are returned from `/backfill`.
if not sorted_backfill_points and current_depth != MAX_DEPTH:
+ # Check that we actually have later backfill points, if not just return.
+ have_later_backfill_points = await self.store.get_backfill_points_in_room(
+ room_id=room_id,
+ current_depth=MAX_DEPTH,
+ limit=1,
+ )
+ if not have_later_backfill_points:
+ return False
+
logger.debug(
"_maybe_backfill_inner: all backfill points are *after* current depth. Trying again with later backfill points."
)
run_as_background_process(
"_maybe_backfill_inner_anyway_with_max_depth",
- self._maybe_backfill_inner,
+ self.maybe_backfill,
room_id=room_id,
# We use `MAX_DEPTH` so that we find all backfill points next
# time (all events are below the `MAX_DEPTH`)
@@ -319,7 +334,7 @@ class FederationHandler:
# We don't want to start another timing observation from this
# nested recursive call. The top-most call can record the time
# overall otherwise the smaller one will throw off the results.
- processing_start_time=None,
+ record_time=False,
)
# We return `False` because we're backfilling in the background and there is
# no new events immediately for the caller to know about yet.
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 1be6ebc6d9..e5ac9096cc 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -487,155 +487,150 @@ class PaginationHandler:
room_token = from_token.room_key
- async with self._worker_locks.acquire_read_write_lock(
- PURGE_PAGINATION_LOCK_NAME, room_id, write=False
- ):
- (membership, member_event_id) = (None, None)
- if not use_admin_priviledge:
- (
- membership,
- member_event_id,
- ) = await self.auth.check_user_in_room_or_world_readable(
- room_id, requester, allow_departed_users=True
+ (membership, member_event_id) = (None, None)
+ if not use_admin_priviledge:
+ (
+ membership,
+ member_event_id,
+ ) = await self.auth.check_user_in_room_or_world_readable(
+ room_id, requester, allow_departed_users=True
+ )
+
+ if pagin_config.direction == Direction.BACKWARDS:
+ # if we're going backwards, we might need to backfill. This
+ # requires that we have a topo token.
+ if room_token.topological:
+ curr_topo = room_token.topological
+ else:
+ curr_topo = await self.store.get_current_topological_token(
+ room_id, room_token.stream
)
- if pagin_config.direction == Direction.BACKWARDS:
- # if we're going backwards, we might need to backfill. This
- # requires that we have a topo token.
- if room_token.topological:
- curr_topo = room_token.topological
- else:
- curr_topo = await self.store.get_current_topological_token(
- room_id, room_token.stream
- )
+ # If they have left the room then clamp the token to be before
+ # they left the room, to save the effort of loading from the
+ # database.
+ if (
+ pagin_config.direction == Direction.BACKWARDS
+ and not use_admin_priviledge
+ and membership == Membership.LEAVE
+ ):
+ # This is only None if the room is world_readable, in which case
+ # "Membership.JOIN" would have been returned and we should never hit
+ # this branch.
+ assert member_event_id
- # If they have left the room then clamp the token to be before
- # they left the room, to save the effort of loading from the
- # database.
- if (
- pagin_config.direction == Direction.BACKWARDS
- and not use_admin_priviledge
- and membership == Membership.LEAVE
- ):
- # This is only None if the room is world_readable, in which case
- # "Membership.JOIN" would have been returned and we should never hit
- # this branch.
- assert member_event_id
+ leave_token = await self.store.get_topological_token_for_event(
+ member_event_id
+ )
+ assert leave_token.topological is not None
- leave_token = await self.store.get_topological_token_for_event(
- member_event_id
+ if leave_token.topological < curr_topo:
+ from_token = from_token.copy_and_replace(
+ StreamKeyType.ROOM, leave_token
)
- assert leave_token.topological is not None
- if leave_token.topological < curr_topo:
- from_token = from_token.copy_and_replace(
- StreamKeyType.ROOM, leave_token
- )
+ to_room_key = None
+ if pagin_config.to_token:
+ to_room_key = pagin_config.to_token.room_key
+
+ # Initially fetch the events from the database. With any luck, we can return
+ # these without blocking on backfill (handled below).
+ events, next_key = await self.store.paginate_room_events(
+ room_id=room_id,
+ from_key=from_token.room_key,
+ to_key=to_room_key,
+ direction=pagin_config.direction,
+ limit=pagin_config.limit,
+ event_filter=event_filter,
+ )
- to_room_key = None
- if pagin_config.to_token:
- to_room_key = pagin_config.to_token.room_key
-
- # Initially fetch the events from the database. With any luck, we can return
- # these without blocking on backfill (handled below).
- events, next_key = await self.store.paginate_room_events(
- room_id=room_id,
- from_key=from_token.room_key,
- to_key=to_room_key,
- direction=pagin_config.direction,
- limit=pagin_config.limit,
- event_filter=event_filter,
+ if pagin_config.direction == Direction.BACKWARDS:
+ # We use a `Set` because there can be multiple events at a given depth
+ # and we only care about looking at the unique continum of depths to
+ # find gaps.
+ event_depths: Set[int] = {event.depth for event in events}
+ sorted_event_depths = sorted(event_depths)
+
+ # Inspect the depths of the returned events to see if there are any gaps
+ found_big_gap = False
+ number_of_gaps = 0
+ previous_event_depth = (
+ sorted_event_depths[0] if len(sorted_event_depths) > 0 else 0
)
-
- if pagin_config.direction == Direction.BACKWARDS:
- # We use a `Set` because there can be multiple events at a given depth
- # and we only care about looking at the unique continum of depths to
- # find gaps.
- event_depths: Set[int] = {event.depth for event in events}
- sorted_event_depths = sorted(event_depths)
-
- # Inspect the depths of the returned events to see if there are any gaps
- found_big_gap = False
- number_of_gaps = 0
- previous_event_depth = (
- sorted_event_depths[0] if len(sorted_event_depths) > 0 else 0
- )
- for event_depth in sorted_event_depths:
- # We don't expect a negative depth but we'll just deal with it in
- # any case by taking the absolute value to get the true gap between
- # any two integers.
- depth_gap = abs(event_depth - previous_event_depth)
- # A `depth_gap` of 1 is a normal continuous chain to the next event
- # (1 <-- 2 <-- 3) so anything larger indicates a missing event (it's
- # also possible there is no event at a given depth but we can't ever
- # know that for sure)
- if depth_gap > 1:
- number_of_gaps += 1
-
- # We only tolerate a small number single-event long gaps in the
- # returned events because those are most likely just events we've
- # failed to pull in the past. Anything longer than that is probably
- # a sign that we're missing a decent chunk of history and we should
- # try to backfill it.
- #
- # XXX: It's possible we could tolerate longer gaps if we checked
- # that a given events `prev_events` is one that has failed pull
- # attempts and we could just treat it like a dead branch of history
- # for now or at least something that we don't need the block the
- # client on to try pulling.
- #
- # XXX: If we had something like MSC3871 to indicate gaps in the
- # timeline to the client, we could also get away with any sized gap
- # and just have the client refetch the holes as they see fit.
- if depth_gap > 2:
- found_big_gap = True
- break
- previous_event_depth = event_depth
-
- # Backfill in the foreground if we found a big gap, have too many holes,
- # or we don't have enough events to fill the limit that the client asked
- # for.
- missing_too_many_events = (
- number_of_gaps > BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD
+ for event_depth in sorted_event_depths:
+ # We don't expect a negative depth but we'll just deal with it in
+ # any case by taking the absolute value to get the true gap between
+ # any two integers.
+ depth_gap = abs(event_depth - previous_event_depth)
+ # A `depth_gap` of 1 is a normal continuous chain to the next event
+ # (1 <-- 2 <-- 3) so anything larger indicates a missing event (it's
+ # also possible there is no event at a given depth but we can't ever
+ # know that for sure)
+ if depth_gap > 1:
+ number_of_gaps += 1
+
+ # We only tolerate a small number single-event long gaps in the
+ # returned events because those are most likely just events we've
+ # failed to pull in the past. Anything longer than that is probably
+ # a sign that we're missing a decent chunk of history and we should
+ # try to backfill it.
+ #
+ # XXX: It's possible we could tolerate longer gaps if we checked
+ # that a given events `prev_events` is one that has failed pull
+ # attempts and we could just treat it like a dead branch of history
+ # for now or at least something that we don't need the block the
+ # client on to try pulling.
+ #
+ # XXX: If we had something like MSC3871 to indicate gaps in the
+ # timeline to the client, we could also get away with any sized gap
+ # and just have the client refetch the holes as they see fit.
+ if depth_gap > 2:
+ found_big_gap = True
+ break
+ previous_event_depth = event_depth
+
+ # Backfill in the foreground if we found a big gap, have too many holes,
+ # or we don't have enough events to fill the limit that the client asked
+ # for.
+ missing_too_many_events = (
+ number_of_gaps > BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD
+ )
+ not_enough_events_to_fill_response = len(events) < pagin_config.limit
+ if (
+ found_big_gap
+ or missing_too_many_events
+ or not_enough_events_to_fill_response
+ ):
+ did_backfill = await self.hs.get_federation_handler().maybe_backfill(
+ room_id,
+ curr_topo,
+ limit=pagin_config.limit,
)
- not_enough_events_to_fill_response = len(events) < pagin_config.limit
- if (
- found_big_gap
- or missing_too_many_events
- or not_enough_events_to_fill_response
- ):
- did_backfill = (
- await self.hs.get_federation_handler().maybe_backfill(
- room_id,
- curr_topo,
- limit=pagin_config.limit,
- )
- )
- # If we did backfill something, refetch the events from the database to
- # catch anything new that might have been added since we last fetched.
- if did_backfill:
- events, next_key = await self.store.paginate_room_events(
- room_id=room_id,
- from_key=from_token.room_key,
- to_key=to_room_key,
- direction=pagin_config.direction,
- limit=pagin_config.limit,
- event_filter=event_filter,
- )
- else:
- # Otherwise, we can backfill in the background for eventual
- # consistency's sake but we don't need to block the client waiting
- # for a costly federation call and processing.
- run_as_background_process(
- "maybe_backfill_in_the_background",
- self.hs.get_federation_handler().maybe_backfill,
- room_id,
- curr_topo,
+ # If we did backfill something, refetch the events from the database to
+ # catch anything new that might have been added since we last fetched.
+ if did_backfill:
+ events, next_key = await self.store.paginate_room_events(
+ room_id=room_id,
+ from_key=from_token.room_key,
+ to_key=to_room_key,
+ direction=pagin_config.direction,
limit=pagin_config.limit,
+ event_filter=event_filter,
)
+ else:
+ # Otherwise, we can backfill in the background for eventual
+ # consistency's sake but we don't need to block the client waiting
+ # for a costly federation call and processing.
+ run_as_background_process(
+ "maybe_backfill_in_the_background",
+ self.hs.get_federation_handler().maybe_backfill,
+ room_id,
+ curr_topo,
+ limit=pagin_config.limit,
+ )
- next_token = from_token.copy_and_replace(StreamKeyType.ROOM, next_key)
+ next_token = from_token.copy_and_replace(StreamKeyType.ROOM, next_key)
# if no events are returned from pagination, that implies
# we have reached the end of the available events.
|