summary refs log tree commit diff
path: root/synapse/federation/federation_server.py
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2017-03-09 16:20:13 +0000
committerRichard van der Hoff <richard@matrix.org>2017-03-09 16:20:13 +0000
commit29235901b81b344fc28ff9f59c36257afecf0265 (patch)
tree9561901ebaf994d613b4d051b3d62d1dbab48c8d /synapse/federation/federation_server.py
parentMove sig check out of _handle_new_pdu (diff)
downloadsynapse-29235901b81b344fc28ff9f59c36257afecf0265.tar.xz
Move FederationServer._handle_new_pdu to FederationHandler
Unfortunately this significantly increases the size of the already-rather-big
FederationHandler, but the code fits more naturally here, and it paves the way
for the tighter integration that I need between handling incoming PDUs and
doing the join dance.

Other than renaming the existing `FederationHandler.on_receive_pdu` to
`_process_received_pdu` to make way for it, this just consists of the move, and
replacing `self.handler` with `self` and `self` with `self.replication_layer`.
Diffstat (limited to '')
-rw-r--r--synapse/federation/federation_server.py194
1 files changed, 1 insertions, 193 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index d8df6555d1..510a176821 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -52,7 +52,6 @@ class FederationServer(FederationBase):
 
         self.auth = hs.get_auth()
 
-        self._room_pdu_linearizer = Linearizer("fed_room_pdu")
         self._server_linearizer = Linearizer("fed_server")
 
         # We cache responses to state queries, as they take a while and often
@@ -518,198 +517,7 @@ class FederationServer(FederationBase):
                 affected=pdu.event_id,
             )
 
-        yield self._handle_new_pdu(origin, pdu, get_missing=True)
-
-    @defer.inlineCallbacks
-    @log_function
-    def _handle_new_pdu(self, origin, pdu, get_missing=True):
-        """ Process a PDU received via a federation /send/ transaction, or
-        via backfill of missing prev_events
-
-        Args:
-            origin (str): server which initiated the /send/ transaction. Will
-                be used to fetch missing events or state.
-            pdu (FrozenEvent): received PDU
-            get_missing (bool): True if we should fetch missing prev_events
-
-        Returns (Deferred): completes with None
-        """
-
-        # We reprocess pdus when we have seen them only as outliers
-        existing = yield self._get_persisted_pdu(
-            origin, pdu.event_id, do_auth=False
-        )
-
-        # FIXME: Currently we fetch an event again when we already have it
-        # if it has been marked as an outlier.
-
-        already_seen = (
-            existing and (
-                not existing.internal_metadata.is_outlier()
-                or pdu.internal_metadata.is_outlier()
-            )
-        )
-        if already_seen:
-            logger.debug("Already seen pdu %s", pdu.event_id)
-            return
-
-        state = None
-
-        auth_chain = []
-
-        have_seen = yield self.store.have_events(
-            [ev for ev, _ in pdu.prev_events]
-        )
-
-        fetch_state = False
-
-        # Get missing pdus if necessary.
-        if not pdu.internal_metadata.is_outlier():
-            # We only backfill backwards to the min depth.
-            min_depth = yield self.handler.get_min_depth_for_context(
-                pdu.room_id
-            )
-
-            logger.debug(
-                "_handle_new_pdu min_depth for %s: %d",
-                pdu.room_id, min_depth
-            )
-
-            prevs = {e_id for e_id, _ in pdu.prev_events}
-            seen = set(have_seen.keys())
-
-            if min_depth and pdu.depth < min_depth:
-                # This is so that we don't notify the user about this
-                # message, to work around the fact that some events will
-                # reference really really old events we really don't want to
-                # send to the clients.
-                pdu.internal_metadata.outlier = True
-            elif min_depth and pdu.depth > min_depth:
-                if get_missing and prevs - seen:
-                    # If we're missing stuff, ensure we only fetch stuff one
-                    # at a time.
-                    logger.info(
-                        "Acquiring lock for room %r to fetch %d missing events: %r...",
-                        pdu.room_id, len(prevs - seen), list(prevs - seen)[:5],
-                    )
-                    with (yield self._room_pdu_linearizer.queue(pdu.room_id)):
-                        logger.info(
-                            "Acquired lock for room %r to fetch %d missing events",
-                            pdu.room_id, len(prevs - seen),
-                        )
-
-                        yield self._get_missing_events_for_pdu(
-                            origin, pdu, prevs, min_depth
-                        )
-
-            prevs = {e_id for e_id, _ in pdu.prev_events}
-            seen = set(have_seen.keys())
-            if prevs - seen:
-                logger.info(
-                    "Still missing %d events for room %r: %r...",
-                    len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
-                )
-                fetch_state = True
-
-        if fetch_state:
-            # We need to get the state at this event, since we haven't
-            # processed all the prev events.
-            logger.debug(
-                "_handle_new_pdu getting state for %s",
-                pdu.room_id
-            )
-            try:
-                state, auth_chain = yield self.get_state_for_room(
-                    origin, pdu.room_id, pdu.event_id,
-                )
-            except:
-                logger.exception("Failed to get state for event: %s", pdu.event_id)
-
-        yield self.handler.on_receive_pdu(
-            origin,
-            pdu,
-            state=state,
-            auth_chain=auth_chain,
-        )
-
-    @defer.inlineCallbacks
-    def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth):
-        """
-        Args:
-            origin (str): Origin of the pdu. Will be called to get the missing events
-            pdu: received pdu
-            prevs (str[]): List of event ids which we are missing
-            min_depth (int): Minimum depth of events to return.
-
-        Returns:
-            Deferred<dict(str, str?)>: updated have_seen dictionary
-        """
-        # We recalculate seen, since it may have changed.
-        have_seen = yield self.store.have_events(prevs)
-        seen = set(have_seen.keys())
-
-        if not prevs - seen:
-            # nothing left to do
-            defer.returnValue(have_seen)
-
-        latest = yield self.store.get_latest_event_ids_in_room(
-            pdu.room_id
-        )
-
-        # We add the prev events that we have seen to the latest
-        # list to ensure the remote server doesn't give them to us
-        latest = set(latest)
-        latest |= seen
-
-        logger.info(
-            "Missing %d events for room %r: %r...",
-            len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
-        )
-
-        # XXX: we set timeout to 10s to help workaround
-        # https://github.com/matrix-org/synapse/issues/1733.
-        # The reason is to avoid holding the linearizer lock
-        # whilst processing inbound /send transactions, causing
-        # FDs to stack up and block other inbound transactions
-        # which empirically can currently take up to 30 minutes.
-        #
-        # N.B. this explicitly disables retry attempts.
-        #
-        # N.B. this also increases our chances of falling back to
-        # fetching fresh state for the room if the missing event
-        # can't be found, which slightly reduces our security.
-        # it may also increase our DAG extremity count for the room,
-        # causing additional state resolution?  See #1760.
-        # However, fetching state doesn't hold the linearizer lock
-        # apparently.
-        #
-        # see https://github.com/matrix-org/synapse/pull/1744
-
-        missing_events = yield self.get_missing_events(
-            origin,
-            pdu.room_id,
-            earliest_events_ids=list(latest),
-            latest_events=[pdu],
-            limit=10,
-            min_depth=min_depth,
-            timeout=10000,
-        )
-
-        # We want to sort these by depth so we process them and
-        # tell clients about them in order.
-        missing_events.sort(key=lambda x: x.depth)
-
-        for e in missing_events:
-            yield self._handle_new_pdu(
-                origin,
-                e,
-                get_missing=False
-            )
-
-        have_seen = yield self.store.have_events(
-            [ev for ev, _ in pdu.prev_events]
-        )
-        defer.returnValue(have_seen)
+        yield self.handler.on_receive_pdu(origin, pdu, get_missing=True)
 
     def __str__(self):
         return "<ReplicationLayer(%s)>" % self.server_name