summary refs log tree commit diff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--changelog.d/16159.misc1
-rw-r--r--synapse/handlers/federation.py35
-rw-r--r--synapse/handlers/pagination.py267
3 files changed, 157 insertions, 146 deletions
diff --git a/changelog.d/16159.misc b/changelog.d/16159.misc
new file mode 100644
index 0000000000..04cdd1afaf
--- /dev/null
+++ b/changelog.d/16159.misc
@@ -0,0 +1 @@
+Reduce scope of locks when paginating to alleviate DB contention.
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 2b93b8c621..29cd45550a 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -60,6 +60,7 @@ from synapse.events import EventBase
 from synapse.events.snapshot import EventContext, UnpersistedEventContextBase
 from synapse.events.validator import EventValidator
 from synapse.federation.federation_client import InvalidResponseError
+from synapse.handlers.pagination import PURGE_PAGINATION_LOCK_NAME
 from synapse.http.servlet import assert_params_in_dict
 from synapse.logging.context import nested_logging_context
 from synapse.logging.opentracing import SynapseTags, set_tag, tag_args, trace
@@ -152,6 +153,7 @@ class FederationHandler:
         self._device_handler = hs.get_device_handler()
         self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator()
         self._notifier = hs.get_notifier()
+        self._worker_locks = hs.get_worker_locks_handler()
 
         self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client(
             hs
@@ -200,7 +202,7 @@ class FederationHandler:
     @trace
     @tag_args
     async def maybe_backfill(
-        self, room_id: str, current_depth: int, limit: int
+        self, room_id: str, current_depth: int, limit: int, record_time: bool = True
     ) -> bool:
         """Checks the database to see if we should backfill before paginating,
         and if so do.
@@ -213,21 +215,25 @@ class FederationHandler:
             limit: The number of events that the pagination request will
                 return. This is used as part of the heuristic to decide if we
                 should back paginate.
+            record_time: Whether to record the time it takes to backfill.
 
         Returns:
             True if we actually tried to backfill something, otherwise False.
         """
         # Starting the processing time here so we can include the room backfill
         # linearizer lock queue in the timing
-        processing_start_time = self.clock.time_msec()
+        processing_start_time = self.clock.time_msec() if record_time else 0
 
         async with self._room_backfill.queue(room_id):
-            return await self._maybe_backfill_inner(
-                room_id,
-                current_depth,
-                limit,
-                processing_start_time=processing_start_time,
-            )
+            async with self._worker_locks.acquire_read_write_lock(
+                PURGE_PAGINATION_LOCK_NAME, room_id, write=False
+            ):
+                return await self._maybe_backfill_inner(
+                    room_id,
+                    current_depth,
+                    limit,
+                    processing_start_time=processing_start_time,
+                )
 
     @trace
     @tag_args
@@ -305,12 +311,21 @@ class FederationHandler:
         # of history that extends all the way back to where we are currently paginating
         # and it's within the 100 events that are returned from `/backfill`.
         if not sorted_backfill_points and current_depth != MAX_DEPTH:
+            # Check that we actually have later backfill points, if not just return.
+            have_later_backfill_points = await self.store.get_backfill_points_in_room(
+                room_id=room_id,
+                current_depth=MAX_DEPTH,
+                limit=1,
+            )
+            if not have_later_backfill_points:
+                return False
+
             logger.debug(
                 "_maybe_backfill_inner: all backfill points are *after* current depth. Trying again with later backfill points."
             )
             run_as_background_process(
                 "_maybe_backfill_inner_anyway_with_max_depth",
-                self._maybe_backfill_inner,
+                self.maybe_backfill,
                 room_id=room_id,
                 # We use `MAX_DEPTH` so that we find all backfill points next
                 # time (all events are below the `MAX_DEPTH`)
@@ -319,7 +334,7 @@ class FederationHandler:
                 # We don't want to start another timing observation from this
                 # nested recursive call. The top-most call can record the time
                 # overall otherwise the smaller one will throw off the results.
-                processing_start_time=None,
+                record_time=False,
             )
             # We return `False` because we're backfilling in the background and there is
             # no new events immediately for the caller to know about yet.
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 1be6ebc6d9..e5ac9096cc 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -487,155 +487,150 @@ class PaginationHandler:
 
         room_token = from_token.room_key
 
-        async with self._worker_locks.acquire_read_write_lock(
-            PURGE_PAGINATION_LOCK_NAME, room_id, write=False
-        ):
-            (membership, member_event_id) = (None, None)
-            if not use_admin_priviledge:
-                (
-                    membership,
-                    member_event_id,
-                ) = await self.auth.check_user_in_room_or_world_readable(
-                    room_id, requester, allow_departed_users=True
+        (membership, member_event_id) = (None, None)
+        if not use_admin_priviledge:
+            (
+                membership,
+                member_event_id,
+            ) = await self.auth.check_user_in_room_or_world_readable(
+                room_id, requester, allow_departed_users=True
+            )
+
+        if pagin_config.direction == Direction.BACKWARDS:
+            # if we're going backwards, we might need to backfill. This
+            # requires that we have a topo token.
+            if room_token.topological:
+                curr_topo = room_token.topological
+            else:
+                curr_topo = await self.store.get_current_topological_token(
+                    room_id, room_token.stream
                 )
 
-            if pagin_config.direction == Direction.BACKWARDS:
-                # if we're going backwards, we might need to backfill. This
-                # requires that we have a topo token.
-                if room_token.topological:
-                    curr_topo = room_token.topological
-                else:
-                    curr_topo = await self.store.get_current_topological_token(
-                        room_id, room_token.stream
-                    )
+        # If they have left the room then clamp the token to be before
+        # they left the room, to save the effort of loading from the
+        # database.
+        if (
+            pagin_config.direction == Direction.BACKWARDS
+            and not use_admin_priviledge
+            and membership == Membership.LEAVE
+        ):
+            # This is only None if the room is world_readable, in which case
+            # "Membership.JOIN" would have been returned and we should never hit
+            # this branch.
+            assert member_event_id
 
-            # If they have left the room then clamp the token to be before
-            # they left the room, to save the effort of loading from the
-            # database.
-            if (
-                pagin_config.direction == Direction.BACKWARDS
-                and not use_admin_priviledge
-                and membership == Membership.LEAVE
-            ):
-                # This is only None if the room is world_readable, in which case
-                # "Membership.JOIN" would have been returned and we should never hit
-                # this branch.
-                assert member_event_id
+            leave_token = await self.store.get_topological_token_for_event(
+                member_event_id
+            )
+            assert leave_token.topological is not None
 
-                leave_token = await self.store.get_topological_token_for_event(
-                    member_event_id
+            if leave_token.topological < curr_topo:
+                from_token = from_token.copy_and_replace(
+                    StreamKeyType.ROOM, leave_token
                 )
-                assert leave_token.topological is not None
 
-                if leave_token.topological < curr_topo:
-                    from_token = from_token.copy_and_replace(
-                        StreamKeyType.ROOM, leave_token
-                    )
+        to_room_key = None
+        if pagin_config.to_token:
+            to_room_key = pagin_config.to_token.room_key
+
+        # Initially fetch the events from the database. With any luck, we can return
+        # these without blocking on backfill (handled below).
+        events, next_key = await self.store.paginate_room_events(
+            room_id=room_id,
+            from_key=from_token.room_key,
+            to_key=to_room_key,
+            direction=pagin_config.direction,
+            limit=pagin_config.limit,
+            event_filter=event_filter,
+        )
 
-            to_room_key = None
-            if pagin_config.to_token:
-                to_room_key = pagin_config.to_token.room_key
-
-            # Initially fetch the events from the database. With any luck, we can return
-            # these without blocking on backfill (handled below).
-            events, next_key = await self.store.paginate_room_events(
-                room_id=room_id,
-                from_key=from_token.room_key,
-                to_key=to_room_key,
-                direction=pagin_config.direction,
-                limit=pagin_config.limit,
-                event_filter=event_filter,
+        if pagin_config.direction == Direction.BACKWARDS:
+            # We use a `Set` because there can be multiple events at a given depth
+            # and we only care about looking at the unique continum of depths to
+            # find gaps.
+            event_depths: Set[int] = {event.depth for event in events}
+            sorted_event_depths = sorted(event_depths)
+
+            # Inspect the depths of the returned events to see if there are any gaps
+            found_big_gap = False
+            number_of_gaps = 0
+            previous_event_depth = (
+                sorted_event_depths[0] if len(sorted_event_depths) > 0 else 0
             )
-
-            if pagin_config.direction == Direction.BACKWARDS:
-                # We use a `Set` because there can be multiple events at a given depth
-                # and we only care about looking at the unique continum of depths to
-                # find gaps.
-                event_depths: Set[int] = {event.depth for event in events}
-                sorted_event_depths = sorted(event_depths)
-
-                # Inspect the depths of the returned events to see if there are any gaps
-                found_big_gap = False
-                number_of_gaps = 0
-                previous_event_depth = (
-                    sorted_event_depths[0] if len(sorted_event_depths) > 0 else 0
-                )
-                for event_depth in sorted_event_depths:
-                    # We don't expect a negative depth but we'll just deal with it in
-                    # any case by taking the absolute value to get the true gap between
-                    # any two integers.
-                    depth_gap = abs(event_depth - previous_event_depth)
-                    # A `depth_gap` of 1 is a normal continuous chain to the next event
-                    # (1 <-- 2 <-- 3) so anything larger indicates a missing event (it's
-                    # also possible there is no event at a given depth but we can't ever
-                    # know that for sure)
-                    if depth_gap > 1:
-                        number_of_gaps += 1
-
-                    # We only tolerate a small number single-event long gaps in the
-                    # returned events because those are most likely just events we've
-                    # failed to pull in the past. Anything longer than that is probably
-                    # a sign that we're missing a decent chunk of history and we should
-                    # try to backfill it.
-                    #
-                    # XXX: It's possible we could tolerate longer gaps if we checked
-                    # that a given events `prev_events` is one that has failed pull
-                    # attempts and we could just treat it like a dead branch of history
-                    # for now or at least something that we don't need the block the
-                    # client on to try pulling.
-                    #
-                    # XXX: If we had something like MSC3871 to indicate gaps in the
-                    # timeline to the client, we could also get away with any sized gap
-                    # and just have the client refetch the holes as they see fit.
-                    if depth_gap > 2:
-                        found_big_gap = True
-                        break
-                    previous_event_depth = event_depth
-
-                # Backfill in the foreground if we found a big gap, have too many holes,
-                # or we don't have enough events to fill the limit that the client asked
-                # for.
-                missing_too_many_events = (
-                    number_of_gaps > BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD
+            for event_depth in sorted_event_depths:
+                # We don't expect a negative depth but we'll just deal with it in
+                # any case by taking the absolute value to get the true gap between
+                # any two integers.
+                depth_gap = abs(event_depth - previous_event_depth)
+                # A `depth_gap` of 1 is a normal continuous chain to the next event
+                # (1 <-- 2 <-- 3) so anything larger indicates a missing event (it's
+                # also possible there is no event at a given depth but we can't ever
+                # know that for sure)
+                if depth_gap > 1:
+                    number_of_gaps += 1
+
+                # We only tolerate a small number single-event long gaps in the
+                # returned events because those are most likely just events we've
+                # failed to pull in the past. Anything longer than that is probably
+                # a sign that we're missing a decent chunk of history and we should
+                # try to backfill it.
+                #
+                # XXX: It's possible we could tolerate longer gaps if we checked
+                # that a given events `prev_events` is one that has failed pull
+                # attempts and we could just treat it like a dead branch of history
+                # for now or at least something that we don't need the block the
+                # client on to try pulling.
+                #
+                # XXX: If we had something like MSC3871 to indicate gaps in the
+                # timeline to the client, we could also get away with any sized gap
+                # and just have the client refetch the holes as they see fit.
+                if depth_gap > 2:
+                    found_big_gap = True
+                    break
+                previous_event_depth = event_depth
+
+            # Backfill in the foreground if we found a big gap, have too many holes,
+            # or we don't have enough events to fill the limit that the client asked
+            # for.
+            missing_too_many_events = (
+                number_of_gaps > BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD
+            )
+            not_enough_events_to_fill_response = len(events) < pagin_config.limit
+            if (
+                found_big_gap
+                or missing_too_many_events
+                or not_enough_events_to_fill_response
+            ):
+                did_backfill = await self.hs.get_federation_handler().maybe_backfill(
+                    room_id,
+                    curr_topo,
+                    limit=pagin_config.limit,
                 )
-                not_enough_events_to_fill_response = len(events) < pagin_config.limit
-                if (
-                    found_big_gap
-                    or missing_too_many_events
-                    or not_enough_events_to_fill_response
-                ):
-                    did_backfill = (
-                        await self.hs.get_federation_handler().maybe_backfill(
-                            room_id,
-                            curr_topo,
-                            limit=pagin_config.limit,
-                        )
-                    )
 
-                    # If we did backfill something, refetch the events from the database to
-                    # catch anything new that might have been added since we last fetched.
-                    if did_backfill:
-                        events, next_key = await self.store.paginate_room_events(
-                            room_id=room_id,
-                            from_key=from_token.room_key,
-                            to_key=to_room_key,
-                            direction=pagin_config.direction,
-                            limit=pagin_config.limit,
-                            event_filter=event_filter,
-                        )
-                else:
-                    # Otherwise, we can backfill in the background for eventual
-                    # consistency's sake but we don't need to block the client waiting
-                    # for a costly federation call and processing.
-                    run_as_background_process(
-                        "maybe_backfill_in_the_background",
-                        self.hs.get_federation_handler().maybe_backfill,
-                        room_id,
-                        curr_topo,
+                # If we did backfill something, refetch the events from the database to
+                # catch anything new that might have been added since we last fetched.
+                if did_backfill:
+                    events, next_key = await self.store.paginate_room_events(
+                        room_id=room_id,
+                        from_key=from_token.room_key,
+                        to_key=to_room_key,
+                        direction=pagin_config.direction,
                         limit=pagin_config.limit,
+                        event_filter=event_filter,
                     )
+            else:
+                # Otherwise, we can backfill in the background for eventual
+                # consistency's sake but we don't need to block the client waiting
+                # for a costly federation call and processing.
+                run_as_background_process(
+                    "maybe_backfill_in_the_background",
+                    self.hs.get_federation_handler().maybe_backfill,
+                    room_id,
+                    curr_topo,
+                    limit=pagin_config.limit,
+                )
 
-            next_token = from_token.copy_and_replace(StreamKeyType.ROOM, next_key)
+        next_token = from_token.copy_and_replace(StreamKeyType.ROOM, next_key)
 
         # if no events are returned from pagination, that implies
         # we have reached the end of the available events.