summary refs log tree commit diff
path: root/synapse/handlers/federation.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/federation.py')
-rw-r--r--synapse/handlers/federation.py177
1 files changed, 122 insertions, 55 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 7b9d07e4f5..610432b55a 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -38,7 +38,7 @@ from signedjson.sign import verify_signed_json
 from unpaddedbase64 import decode_base64
 
 from synapse import event_auth
-from synapse.api.constants import EventContentFields, EventTypes, Membership
+from synapse.api.constants import MAX_DEPTH, EventContentFields, EventTypes, Membership
 from synapse.api.errors import (
     AuthError,
     CodeMessageException,
@@ -149,6 +149,8 @@ class FederationHandler:
         self.http_client = hs.get_proxied_blacklisted_http_client()
         self._replication = hs.get_replication_data_handler()
         self._federation_event_handler = hs.get_federation_event_handler()
+        self._device_handler = hs.get_device_handler()
+        self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator()
 
         self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client(
             hs
@@ -209,7 +211,7 @@ class FederationHandler:
         current_depth: int,
         limit: int,
         *,
-        processing_start_time: int,
+        processing_start_time: Optional[int],
     ) -> bool:
         """
         Checks whether the `current_depth` is at or approaching any backfill
@@ -221,12 +223,23 @@ class FederationHandler:
             room_id: The room to backfill in.
             current_depth: The depth to check at for any upcoming backfill points.
             limit: The max number of events to request from the remote federated server.
-            processing_start_time: The time when `maybe_backfill` started
-                processing. Only used for timing.
+            processing_start_time: The time when `maybe_backfill` started processing.
+                Only used for timing. If `None`, no timing observation will be made.
         """
         backwards_extremities = [
             _BackfillPoint(event_id, depth, _BackfillPointType.BACKWARDS_EXTREMITY)
-            for event_id, depth in await self.store.get_backfill_points_in_room(room_id)
+            for event_id, depth in await self.store.get_backfill_points_in_room(
+                room_id=room_id,
+                current_depth=current_depth,
+                # We only need to end up with 5 extremities combined with the
+                # insertion event extremities to make the `/backfill` request
+                # but fetch an order of magnitude more to make sure there is
+                # enough even after we filter them by whether visible in the
+                # history. This isn't fool-proof as all backfill points within
+                # our limit could be filtered out but seems like a good amount
+                # to try with at least.
+                limit=50,
+            )
         ]
 
         insertion_events_to_be_backfilled: List[_BackfillPoint] = []
@@ -234,7 +247,12 @@ class FederationHandler:
             insertion_events_to_be_backfilled = [
                 _BackfillPoint(event_id, depth, _BackfillPointType.INSERTION_PONT)
                 for event_id, depth in await self.store.get_insertion_event_backward_extremities_in_room(
-                    room_id
+                    room_id=room_id,
+                    current_depth=current_depth,
+                    # We only need to end up with 5 extremities combined with
+                    # the backfill points to make the `/backfill` request ...
+                    # (see the other comment above for more context).
+                    limit=50,
                 )
             ]
         logger.debug(
@@ -243,10 +261,6 @@ class FederationHandler:
             insertion_events_to_be_backfilled,
         )
 
-        if not backwards_extremities and not insertion_events_to_be_backfilled:
-            logger.debug("Not backfilling as no extremeties found.")
-            return False
-
         # we now have a list of potential places to backpaginate from. We prefer to
         # start with the most recent (ie, max depth), so let's sort the list.
         sorted_backfill_points: List[_BackfillPoint] = sorted(
@@ -267,6 +281,33 @@ class FederationHandler:
             sorted_backfill_points,
         )
 
+        # If we have no backfill points lower than the `current_depth` then
+        # either we can a) bail or b) still attempt to backfill. We opt to try
+        # backfilling anyway just in case we do get relevant events.
+        if not sorted_backfill_points and current_depth != MAX_DEPTH:
+            logger.debug(
+                "_maybe_backfill_inner: all backfill points are *after* current depth. Trying again with later backfill points."
+            )
+            return await self._maybe_backfill_inner(
+                room_id=room_id,
+                # We use `MAX_DEPTH` so that we find all backfill points next
+                # time (all events are below the `MAX_DEPTH`)
+                current_depth=MAX_DEPTH,
+                limit=limit,
+                # We don't want to start another timing observation from this
+                # nested recursive call. The top-most call can record the time
+                # overall otherwise the smaller one will throw off the results.
+                processing_start_time=None,
+            )
+
+        # Even after recursing with `MAX_DEPTH`, we didn't find any
+        # backward extremities to backfill from.
+        if not sorted_backfill_points:
+            logger.debug(
+                "_maybe_backfill_inner: Not backfilling as no backward extremeties found."
+            )
+            return False
+
         # If we're approaching an extremity we trigger a backfill, otherwise we
         # no-op.
         #
@@ -276,47 +317,16 @@ class FederationHandler:
         # chose more than one times the limit in case of failure, but choosing a
         # much larger factor will result in triggering a backfill request much
         # earlier than necessary.
-        #
-        # XXX: shouldn't we do this *after* the filter by depth below? Again, we don't
-        # care about events that have happened after our current position.
-        #
-        max_depth = sorted_backfill_points[0].depth
-        if current_depth - 2 * limit > max_depth:
+        max_depth_of_backfill_points = sorted_backfill_points[0].depth
+        if current_depth - 2 * limit > max_depth_of_backfill_points:
             logger.debug(
                 "Not backfilling as we don't need to. %d < %d - 2 * %d",
-                max_depth,
+                max_depth_of_backfill_points,
                 current_depth,
                 limit,
             )
             return False
 
-        # We ignore extremities that have a greater depth than our current depth
-        # as:
-        #    1. we don't really care about getting events that have happened
-        #       after our current position; and
-        #    2. we have likely previously tried and failed to backfill from that
-        #       extremity, so to avoid getting "stuck" requesting the same
-        #       backfill repeatedly we drop those extremities.
-        #
-        # However, we need to check that the filtered extremities are non-empty.
-        # If they are empty then either we can a) bail or b) still attempt to
-        # backfill. We opt to try backfilling anyway just in case we do get
-        # relevant events.
-        #
-        filtered_sorted_backfill_points = [
-            t for t in sorted_backfill_points if t.depth <= current_depth
-        ]
-        if filtered_sorted_backfill_points:
-            logger.debug(
-                "_maybe_backfill_inner: backfill points before current depth: %s",
-                filtered_sorted_backfill_points,
-            )
-            sorted_backfill_points = filtered_sorted_backfill_points
-        else:
-            logger.debug(
-                "_maybe_backfill_inner: all backfill points are *after* current depth. Backfilling anyway."
-            )
-
         # For performance's sake, we only want to paginate from a particular extremity
         # if we can actually see the events we'll get. Otherwise, we'd just spend a lot
         # of resources to get redacted events. We check each extremity in turn and
@@ -402,11 +412,22 @@ class FederationHandler:
         # First we try hosts that are already in the room.
         # TODO: HEURISTIC ALERT.
         likely_domains = (
-            await self._storage_controllers.state.get_current_hosts_in_room(room_id)
+            await self._storage_controllers.state.get_current_hosts_in_room_ordered(
+                room_id
+            )
         )
 
         async def try_backfill(domains: Collection[str]) -> bool:
             # TODO: Should we try multiple of these at a time?
+
+            # Number of contacted remote homeservers that have denied our backfill
+            # request with a 4xx code.
+            denied_count = 0
+
+            # Maximum number of contacted remote homeservers that can deny our
+            # backfill request with 4xx codes before we give up.
+            max_denied_count = 5
+
             for dom in domains:
                 # We don't want to ask our own server for information we don't have
                 if dom == self.server_name:
@@ -425,13 +446,33 @@ class FederationHandler:
                     continue
                 except HttpResponseException as e:
                     if 400 <= e.code < 500:
-                        raise e.to_synapse_error()
+                        logger.warning(
+                            "Backfill denied from %s because %s [%d/%d]",
+                            dom,
+                            e,
+                            denied_count,
+                            max_denied_count,
+                        )
+                        denied_count += 1
+                        if denied_count >= max_denied_count:
+                            return False
+                        continue
 
                     logger.info("Failed to backfill from %s because %s", dom, e)
                     continue
                 except CodeMessageException as e:
                     if 400 <= e.code < 500:
-                        raise
+                        logger.warning(
+                            "Backfill denied from %s because %s [%d/%d]",
+                            dom,
+                            e,
+                            denied_count,
+                            max_denied_count,
+                        )
+                        denied_count += 1
+                        if denied_count >= max_denied_count:
+                            return False
+                        continue
 
                     logger.info("Failed to backfill from %s because %s", dom, e)
                     continue
@@ -450,10 +491,15 @@ class FederationHandler:
 
             return False
 
-        processing_end_time = self.clock.time_msec()
-        backfill_processing_before_timer.observe(
-            (processing_end_time - processing_start_time) / 1000
-        )
+        # If we have the `processing_start_time`, then we can make an
+        # observation. We wouldn't have the `processing_start_time` in the case
+        # where `_maybe_backfill_inner` is recursively called to find any
+        # backfill points regardless of `current_depth`.
+        if processing_start_time is not None:
+            processing_end_time = self.clock.time_msec()
+            backfill_processing_before_timer.observe(
+                (processing_end_time - processing_start_time) / 1000
+            )
 
         success = await try_backfill(likely_domains)
         if success:
@@ -581,7 +627,11 @@ class FederationHandler:
                 # Mark the room as having partial state.
                 # The background process is responsible for unmarking this flag,
                 # even if the join fails.
-                await self.store.store_partial_state_room(room_id, ret.servers_in_room)
+                await self.store.store_partial_state_room(
+                    room_id=room_id,
+                    servers=ret.servers_in_room,
+                    device_lists_stream_id=self.store.get_device_stream_token(),
+                )
 
             try:
                 max_stream_id = (
@@ -606,6 +656,14 @@ class FederationHandler:
                     room_id,
                 )
                 raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0)
+            else:
+                # Record the join event id for future use (when we finish the full
+                # join). We have to do this after persisting the event to keep foreign
+                # key constraints intact.
+                if ret.partial_state:
+                    await self.store.write_partial_state_rooms_join_event_id(
+                        room_id, event.event_id
+                    )
             finally:
                 # Always kick off the background process that asynchronously fetches
                 # state for the room.
@@ -944,9 +1002,15 @@ class FederationHandler:
         )
 
         context = EventContext.for_outlier(self._storage_controllers)
-        await self._federation_event_handler.persist_events_and_notify(
-            event.room_id, [(event, context)]
-        )
+
+        await self._bulk_push_rule_evaluator.action_for_event_by_user(event, context)
+        try:
+            await self._federation_event_handler.persist_events_and_notify(
+                event.room_id, [(event, context)]
+            )
+        except Exception:
+            await self.store.remove_push_actions_from_staging(event.event_id)
+            raise
 
         return event
 
@@ -1612,6 +1676,9 @@ class FederationHandler:
                 #   https://github.com/matrix-org/synapse/issues/12994
                 await self.state_handler.update_current_state(room_id)
 
+                logger.info("Handling any pending device list updates")
+                await self._device_handler.handle_room_un_partial_stated(room_id)
+
                 logger.info("Clearing partial-state flag for %s", room_id)
                 success = await self.store.clear_partial_state_room(room_id)
                 if success: