summary refs log tree commit diff
path: root/synapse/handlers/federation.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/federation.py')
-rw-r--r--synapse/handlers/federation.py142
1 files changed, 95 insertions, 47 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 8d6bd7976d..38bebbf598 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -136,7 +136,7 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def on_receive_pdu(
-            self, origin, pdu, get_missing=True, sent_to_us_directly=False,
+            self, origin, pdu, sent_to_us_directly=False,
     ):
         """ Process a PDU received via a federation /send/ transaction, or
         via backfill of missing prev_events
@@ -145,7 +145,8 @@ class FederationHandler(BaseHandler):
             origin (str): server which initiated the /send/ transaction. Will
                 be used to fetch missing events or state.
             pdu (FrozenEvent): received PDU
-            get_missing (bool): True if we should fetch missing prev_events
+            sent_to_us_directly (bool): True if this event was pushed to us; False if
+                we pulled it as the result of a missing prev_event.
 
         Returns (Deferred): completes with None
         """
@@ -250,7 +251,7 @@ class FederationHandler(BaseHandler):
                 pdu.internal_metadata.outlier = True
             elif min_depth and pdu.depth > min_depth:
                 missing_prevs = prevs - seen
-                if get_missing and missing_prevs:
+                if sent_to_us_directly and missing_prevs:
                     # If we're missing stuff, ensure we only fetch stuff one
                     # at a time.
                     logger.info(
@@ -282,24 +283,46 @@ class FederationHandler(BaseHandler):
                         room_id, event_id, len(missing_prevs), shortstr(missing_prevs),
                     )
 
-            if sent_to_us_directly and prevs - seen:
-                # If they have sent it to us directly, and the server
-                # isn't telling us about the auth events that it's
-                # made a message referencing, we explode
-                logger.warn(
-                    "[%s %s] Failed to fetch %d prev events: rejecting",
-                    room_id, event_id, len(prevs - seen),
-                )
-                raise FederationError(
-                    "ERROR",
-                    403,
-                    (
-                        "Your server isn't divulging details about prev_events "
-                        "referenced in this event."
-                    ),
-                    affected=pdu.event_id,
-                )
-            elif prevs - seen:
+            if prevs - seen:
+                # We've still not been able to get all of the prev_events for this event.
+                #
+                # In this case, we need to fall back to asking another server in the
+                # federation for the state at this event. That's ok provided we then
+                # resolve the state against other bits of the DAG before using it (which
+                # will ensure that you can't just take over a room by sending an event,
+                # withholding its prev_events, and declaring yourself to be an admin in
+                # the subsequent state request).
+                #
+                # Now, if we're pulling this event as a missing prev_event, then clearly
+                # this event is not going to become the only forward-extremity and we are
+                # guaranteed to resolve its state against our existing forward
+                # extremities, so that should be fine.
+                #
+                # On the other hand, if this event was pushed to us, it is possible for
+                # it to become the only forward-extremity in the room, and we would then
+                # trust its state to be the state for the whole room. This is very bad.
+                # Further, if the event was pushed to us, there is no excuse for us not to
+                # have all the prev_events. We therefore reject any such events.
+                #
+                # XXX this really feels like it could/should be merged with the above,
+                # but there is an interaction with min_depth that I'm not really
+                # following.
+
+                if sent_to_us_directly:
+                    logger.warn(
+                        "[%s %s] Failed to fetch %d prev events: rejecting",
+                        room_id, event_id, len(prevs - seen),
+                    )
+                    raise FederationError(
+                        "ERROR",
+                        403,
+                        (
+                            "Your server isn't divulging details about prev_events "
+                            "referenced in this event."
+                        ),
+                        affected=pdu.event_id,
+                    )
+
                 # Calculate the state of the previous events, and
                 # de-conflict them to find the current state.
                 state_groups = []
@@ -316,14 +339,26 @@ class FederationHandler(BaseHandler):
                             "[%s %s] Requesting state at missing prev_event %s",
                             room_id, event_id, p,
                         )
-                        state, got_auth_chain = (
-                            yield self.federation_client.get_state_for_room(
-                                origin, room_id, p,
+
+                        with logcontext.nested_logging_context(p):
+                            # note that if any of the missing prevs share missing state or
+                            # auth events, the requests to fetch those events are deduped
+                            # by the get_pdu_cache in federation_client.
+                            remote_state, got_auth_chain = (
+                                yield self.federation_client.get_state_for_room(
+                                    origin, room_id, p,
+                                )
                             )
-                        )
-                        auth_chains.update(got_auth_chain)
-                        state_group = {(x.type, x.state_key): x.event_id for x in state}
-                        state_groups.append(state_group)
+
+                            # XXX hrm I'm not convinced that duplicate events will compare
+                            # for equality, so I'm not sure this does what the author
+                            # hoped.
+                            auth_chains.update(got_auth_chain)
+
+                            state_group = {
+                                (x.type, x.state_key): x.event_id for x in remote_state
+                            }
+                            state_groups.append(state_group)
 
                     # Resolve any conflicting state
                     def fetch(ev_ids):
@@ -460,20 +495,21 @@ class FederationHandler(BaseHandler):
                 "[%s %s] Handling received prev_event %s",
                 room_id, event_id, ev.event_id,
             )
-            try:
-                yield self.on_receive_pdu(
-                    origin,
-                    ev,
-                    get_missing=False
-                )
-            except FederationError as e:
-                if e.code == 403:
-                    logger.warn(
-                        "[%s %s] Received prev_event %s failed history check.",
-                        room_id, event_id, ev.event_id,
+            with logcontext.nested_logging_context(ev.event_id):
+                try:
+                    yield self.on_receive_pdu(
+                        origin,
+                        ev,
+                        sent_to_us_directly=False,
                     )
-                else:
-                    raise
+                except FederationError as e:
+                    if e.code == 403:
+                        logger.warn(
+                            "[%s %s] Received prev_event %s failed history check.",
+                            room_id, event_id, ev.event_id,
+                        )
+                    else:
+                        raise
 
     @defer.inlineCallbacks
     def _process_received_pdu(self, origin, event, state, auth_chain):
@@ -549,6 +585,10 @@ class FederationHandler(BaseHandler):
                     })
                     seen_ids.add(e.event_id)
 
+                logger.info(
+                    "[%s %s] persisting newly-received auth/state events %s",
+                    room_id, event_id, [e["event"].event_id for e in event_infos]
+                )
                 yield self._handle_new_events(origin, event_infos)
 
             try:
@@ -1112,7 +1152,8 @@ class FederationHandler(BaseHandler):
             try:
                 logger.info("Processing queued PDU %s which was received "
                             "while we were joining %s", p.event_id, p.room_id)
-                yield self.on_receive_pdu(origin, p)
+                with logcontext.nested_logging_context(p.event_id):
+                    yield self.on_receive_pdu(origin, p, sent_to_us_directly=True)
             except Exception as e:
                 logger.warn(
                     "Error handling queued PDU %s from %s: %s",
@@ -1558,15 +1599,22 @@ class FederationHandler(BaseHandler):
 
         Notifies about the events where appropriate.
         """
-        contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
-            [
-                logcontext.run_in_background(
-                    self._prep_event,
+
+        @defer.inlineCallbacks
+        def prep(ev_info):
+            event = ev_info["event"]
+            with logcontext.nested_logging_context(suffix=event.event_id):
+                res = yield self._prep_event(
                     origin,
-                    ev_info["event"],
+                    event,
                     state=ev_info.get("state"),
                     auth_events=ev_info.get("auth_events"),
                 )
+            defer.returnValue(res)
+
+        contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
+            [
+                logcontext.run_in_background(prep, ev_info)
                 for ev_info in event_infos
             ], consumeErrors=True,
         ))