diff options
author | Richard van der Hoff <richard@matrix.org> | 2017-03-09 16:20:13 +0000 |
---|---|---|
committer | Richard van der Hoff <richard@matrix.org> | 2017-03-09 16:20:13 +0000 |
commit | 29235901b81b344fc28ff9f59c36257afecf0265 (patch) | |
tree | 9561901ebaf994d613b4d051b3d62d1dbab48c8d /synapse/federation/federation_server.py | |
parent | Move sig check out of _handle_new_pdu (diff) | |
download | synapse-29235901b81b344fc28ff9f59c36257afecf0265.tar.xz |
Move FederationServer._handle_new_pdu to FederationHandler
Unfortunately this significantly increases the size of the already-rather-big FederationHandler, but the code fits more naturally here, and it paves the way for the tighter integration that I need between handling incoming PDUs and doing the join dance. Other than renaming the existing `FederationHandler.on_receive_pdu` to `_process_received_pdu` to make way for it, this just consists of the move, and replacing `self.handler` with `self` and `self` with `self.replication_layer`.
Diffstat (limited to 'synapse/federation/federation_server.py')
-rw-r--r-- | synapse/federation/federation_server.py | 194 |
1 files changed, 1 insertions, 193 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index d8df6555d1..510a176821 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -52,7 +52,6 @@ class FederationServer(FederationBase): self.auth = hs.get_auth() - self._room_pdu_linearizer = Linearizer("fed_room_pdu") self._server_linearizer = Linearizer("fed_server") # We cache responses to state queries, as they take a while and often @@ -518,198 +517,7 @@ class FederationServer(FederationBase): affected=pdu.event_id, ) - yield self._handle_new_pdu(origin, pdu, get_missing=True) - - @defer.inlineCallbacks - @log_function - def _handle_new_pdu(self, origin, pdu, get_missing=True): - """ Process a PDU received via a federation /send/ transaction, or - via backfill of missing prev_events - - Args: - origin (str): server which initiated the /send/ transaction. Will - be used to fetch missing events or state. - pdu (FrozenEvent): received PDU - get_missing (bool): True if we should fetch missing prev_events - - Returns (Deferred): completes with None - """ - - # We reprocess pdus when we have seen them only as outliers - existing = yield self._get_persisted_pdu( - origin, pdu.event_id, do_auth=False - ) - - # FIXME: Currently we fetch an event again when we already have it - # if it has been marked as an outlier. - - already_seen = ( - existing and ( - not existing.internal_metadata.is_outlier() - or pdu.internal_metadata.is_outlier() - ) - ) - if already_seen: - logger.debug("Already seen pdu %s", pdu.event_id) - return - - state = None - - auth_chain = [] - - have_seen = yield self.store.have_events( - [ev for ev, _ in pdu.prev_events] - ) - - fetch_state = False - - # Get missing pdus if necessary. - if not pdu.internal_metadata.is_outlier(): - # We only backfill backwards to the min depth. - min_depth = yield self.handler.get_min_depth_for_context( - pdu.room_id - ) - - logger.debug( - "_handle_new_pdu min_depth for %s: %d", - pdu.room_id, min_depth - ) - - prevs = {e_id for e_id, _ in pdu.prev_events} - seen = set(have_seen.keys()) - - if min_depth and pdu.depth < min_depth: - # This is so that we don't notify the user about this - # message, to work around the fact that some events will - # reference really really old events we really don't want to - # send to the clients. - pdu.internal_metadata.outlier = True - elif min_depth and pdu.depth > min_depth: - if get_missing and prevs - seen: - # If we're missing stuff, ensure we only fetch stuff one - # at a time. - logger.info( - "Acquiring lock for room %r to fetch %d missing events: %r...", - pdu.room_id, len(prevs - seen), list(prevs - seen)[:5], - ) - with (yield self._room_pdu_linearizer.queue(pdu.room_id)): - logger.info( - "Acquired lock for room %r to fetch %d missing events", - pdu.room_id, len(prevs - seen), - ) - - yield self._get_missing_events_for_pdu( - origin, pdu, prevs, min_depth - ) - - prevs = {e_id for e_id, _ in pdu.prev_events} - seen = set(have_seen.keys()) - if prevs - seen: - logger.info( - "Still missing %d events for room %r: %r...", - len(prevs - seen), pdu.room_id, list(prevs - seen)[:5] - ) - fetch_state = True - - if fetch_state: - # We need to get the state at this event, since we haven't - # processed all the prev events. - logger.debug( - "_handle_new_pdu getting state for %s", - pdu.room_id - ) - try: - state, auth_chain = yield self.get_state_for_room( - origin, pdu.room_id, pdu.event_id, - ) - except: - logger.exception("Failed to get state for event: %s", pdu.event_id) - - yield self.handler.on_receive_pdu( - origin, - pdu, - state=state, - auth_chain=auth_chain, - ) - - @defer.inlineCallbacks - def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth): - """ - Args: - origin (str): Origin of the pdu. Will be called to get the missing events - pdu: received pdu - prevs (str[]): List of event ids which we are missing - min_depth (int): Minimum depth of events to return. - - Returns: - Deferred<dict(str, str?)>: updated have_seen dictionary - """ - # We recalculate seen, since it may have changed. - have_seen = yield self.store.have_events(prevs) - seen = set(have_seen.keys()) - - if not prevs - seen: - # nothing left to do - defer.returnValue(have_seen) - - latest = yield self.store.get_latest_event_ids_in_room( - pdu.room_id - ) - - # We add the prev events that we have seen to the latest - # list to ensure the remote server doesn't give them to us - latest = set(latest) - latest |= seen - - logger.info( - "Missing %d events for room %r: %r...", - len(prevs - seen), pdu.room_id, list(prevs - seen)[:5] - ) - - # XXX: we set timeout to 10s to help workaround - # https://github.com/matrix-org/synapse/issues/1733. - # The reason is to avoid holding the linearizer lock - # whilst processing inbound /send transactions, causing - # FDs to stack up and block other inbound transactions - # which empirically can currently take up to 30 minutes. - # - # N.B. this explicitly disables retry attempts. - # - # N.B. this also increases our chances of falling back to - # fetching fresh state for the room if the missing event - # can't be found, which slightly reduces our security. - # it may also increase our DAG extremity count for the room, - # causing additional state resolution? See #1760. - # However, fetching state doesn't hold the linearizer lock - # apparently. - # - # see https://github.com/matrix-org/synapse/pull/1744 - - missing_events = yield self.get_missing_events( - origin, - pdu.room_id, - earliest_events_ids=list(latest), - latest_events=[pdu], - limit=10, - min_depth=min_depth, - timeout=10000, - ) - - # We want to sort these by depth so we process them and - # tell clients about them in order. - missing_events.sort(key=lambda x: x.depth) - - for e in missing_events: - yield self._handle_new_pdu( - origin, - e, - get_missing=False - ) - - have_seen = yield self.store.have_events( - [ev for ev, _ in pdu.prev_events] - ) - defer.returnValue(have_seen) + yield self.handler.on_receive_pdu(origin, pdu, get_missing=True) def __str__(self): return "<ReplicationLayer(%s)>" % self.server_name |