diff options
author | Richard van der Hoff <richard@matrix.org> | 2017-03-09 16:20:13 +0000 |
---|---|---|
committer | Richard van der Hoff <richard@matrix.org> | 2017-03-09 16:20:13 +0000 |
commit | 29235901b81b344fc28ff9f59c36257afecf0265 (patch) | |
tree | 9561901ebaf994d613b4d051b3d62d1dbab48c8d /synapse | |
parent | Move sig check out of _handle_new_pdu (diff) | |
download | synapse-29235901b81b344fc28ff9f59c36257afecf0265.tar.xz |
Move FederationServer._handle_new_pdu to FederationHandler
Unfortunately this significantly increases the size of the already-rather-big FederationHandler, but the code fits more naturally here, and it paves the way for the tighter integration that I need between handling incoming PDUs and doing the join dance. Other than renaming the existing `FederationHandler.on_receive_pdu` to `_process_received_pdu` to make way for it, this just consists of the move, and replacing `self.handler` with `self` and `self` with `self.replication_layer`.
Diffstat (limited to '')
-rw-r--r-- | synapse/federation/federation_server.py | 194 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 202 |
2 files changed, 198 insertions, 198 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index d8df6555d1..510a176821 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -52,7 +52,6 @@ class FederationServer(FederationBase): self.auth = hs.get_auth() - self._room_pdu_linearizer = Linearizer("fed_room_pdu") self._server_linearizer = Linearizer("fed_server") # We cache responses to state queries, as they take a while and often @@ -518,198 +517,7 @@ class FederationServer(FederationBase): affected=pdu.event_id, ) - yield self._handle_new_pdu(origin, pdu, get_missing=True) - - @defer.inlineCallbacks - @log_function - def _handle_new_pdu(self, origin, pdu, get_missing=True): - """ Process a PDU received via a federation /send/ transaction, or - via backfill of missing prev_events - - Args: - origin (str): server which initiated the /send/ transaction. Will - be used to fetch missing events or state. - pdu (FrozenEvent): received PDU - get_missing (bool): True if we should fetch missing prev_events - - Returns (Deferred): completes with None - """ - - # We reprocess pdus when we have seen them only as outliers - existing = yield self._get_persisted_pdu( - origin, pdu.event_id, do_auth=False - ) - - # FIXME: Currently we fetch an event again when we already have it - # if it has been marked as an outlier. - - already_seen = ( - existing and ( - not existing.internal_metadata.is_outlier() - or pdu.internal_metadata.is_outlier() - ) - ) - if already_seen: - logger.debug("Already seen pdu %s", pdu.event_id) - return - - state = None - - auth_chain = [] - - have_seen = yield self.store.have_events( - [ev for ev, _ in pdu.prev_events] - ) - - fetch_state = False - - # Get missing pdus if necessary. - if not pdu.internal_metadata.is_outlier(): - # We only backfill backwards to the min depth. - min_depth = yield self.handler.get_min_depth_for_context( - pdu.room_id - ) - - logger.debug( - "_handle_new_pdu min_depth for %s: %d", - pdu.room_id, min_depth - ) - - prevs = {e_id for e_id, _ in pdu.prev_events} - seen = set(have_seen.keys()) - - if min_depth and pdu.depth < min_depth: - # This is so that we don't notify the user about this - # message, to work around the fact that some events will - # reference really really old events we really don't want to - # send to the clients. - pdu.internal_metadata.outlier = True - elif min_depth and pdu.depth > min_depth: - if get_missing and prevs - seen: - # If we're missing stuff, ensure we only fetch stuff one - # at a time. - logger.info( - "Acquiring lock for room %r to fetch %d missing events: %r...", - pdu.room_id, len(prevs - seen), list(prevs - seen)[:5], - ) - with (yield self._room_pdu_linearizer.queue(pdu.room_id)): - logger.info( - "Acquired lock for room %r to fetch %d missing events", - pdu.room_id, len(prevs - seen), - ) - - yield self._get_missing_events_for_pdu( - origin, pdu, prevs, min_depth - ) - - prevs = {e_id for e_id, _ in pdu.prev_events} - seen = set(have_seen.keys()) - if prevs - seen: - logger.info( - "Still missing %d events for room %r: %r...", - len(prevs - seen), pdu.room_id, list(prevs - seen)[:5] - ) - fetch_state = True - - if fetch_state: - # We need to get the state at this event, since we haven't - # processed all the prev events. - logger.debug( - "_handle_new_pdu getting state for %s", - pdu.room_id - ) - try: - state, auth_chain = yield self.get_state_for_room( - origin, pdu.room_id, pdu.event_id, - ) - except: - logger.exception("Failed to get state for event: %s", pdu.event_id) - - yield self.handler.on_receive_pdu( - origin, - pdu, - state=state, - auth_chain=auth_chain, - ) - - @defer.inlineCallbacks - def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth): - """ - Args: - origin (str): Origin of the pdu. Will be called to get the missing events - pdu: received pdu - prevs (str[]): List of event ids which we are missing - min_depth (int): Minimum depth of events to return. - - Returns: - Deferred<dict(str, str?)>: updated have_seen dictionary - """ - # We recalculate seen, since it may have changed. - have_seen = yield self.store.have_events(prevs) - seen = set(have_seen.keys()) - - if not prevs - seen: - # nothing left to do - defer.returnValue(have_seen) - - latest = yield self.store.get_latest_event_ids_in_room( - pdu.room_id - ) - - # We add the prev events that we have seen to the latest - # list to ensure the remote server doesn't give them to us - latest = set(latest) - latest |= seen - - logger.info( - "Missing %d events for room %r: %r...", - len(prevs - seen), pdu.room_id, list(prevs - seen)[:5] - ) - - # XXX: we set timeout to 10s to help workaround - # https://github.com/matrix-org/synapse/issues/1733. - # The reason is to avoid holding the linearizer lock - # whilst processing inbound /send transactions, causing - # FDs to stack up and block other inbound transactions - # which empirically can currently take up to 30 minutes. - # - # N.B. this explicitly disables retry attempts. - # - # N.B. this also increases our chances of falling back to - # fetching fresh state for the room if the missing event - # can't be found, which slightly reduces our security. - # it may also increase our DAG extremity count for the room, - # causing additional state resolution? See #1760. - # However, fetching state doesn't hold the linearizer lock - # apparently. - # - # see https://github.com/matrix-org/synapse/pull/1744 - - missing_events = yield self.get_missing_events( - origin, - pdu.room_id, - earliest_events_ids=list(latest), - latest_events=[pdu], - limit=10, - min_depth=min_depth, - timeout=10000, - ) - - # We want to sort these by depth so we process them and - # tell clients about them in order. - missing_events.sort(key=lambda x: x.depth) - - for e in missing_events: - yield self._handle_new_pdu( - origin, - e, - get_missing=False - ) - - have_seen = yield self.store.have_events( - [ev for ev, _ in pdu.prev_events] - ) - defer.returnValue(have_seen) + yield self.handler.on_receive_pdu(origin, pdu, get_missing=True) def __str__(self): return "<ReplicationLayer(%s)>" % self.server_name diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index ed0fa51e7f..d0c2b4d6ed 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -31,7 +31,7 @@ from synapse.util.logcontext import ( ) from synapse.util.metrics import measure_func from synapse.util.logutils import log_function -from synapse.util.async import run_on_reactor +from synapse.util.async import run_on_reactor, Linearizer from synapse.util.frozenutils import unfreeze from synapse.crypto.event_signing import ( compute_event_signature, add_hashes_and_signatures, @@ -79,12 +79,204 @@ class FederationHandler(BaseHandler): # When joining a room we need to queue any events for that room up self.room_queues = {} + self._room_pdu_linearizer = Linearizer("fed_room_pdu") + + @defer.inlineCallbacks + @log_function + def on_receive_pdu(self, origin, pdu, get_missing=True): + """ Process a PDU received via a federation /send/ transaction, or + via backfill of missing prev_events + + Args: + origin (str): server which initiated the /send/ transaction. Will + be used to fetch missing events or state. + pdu (FrozenEvent): received PDU + get_missing (bool): True if we should fetch missing prev_events + + Returns (Deferred): completes with None + """ + + # We reprocess pdus when we have seen them only as outliers + existing = yield self.get_persisted_pdu( + origin, pdu.event_id, do_auth=False + ) + + # FIXME: Currently we fetch an event again when we already have it + # if it has been marked as an outlier. + + already_seen = ( + existing and ( + not existing.internal_metadata.is_outlier() + or pdu.internal_metadata.is_outlier() + ) + ) + if already_seen: + logger.debug("Already seen pdu %s", pdu.event_id) + return + + state = None + + auth_chain = [] + + have_seen = yield self.store.have_events( + [ev for ev, _ in pdu.prev_events] + ) + + fetch_state = False + + # Get missing pdus if necessary. + if not pdu.internal_metadata.is_outlier(): + # We only backfill backwards to the min depth. + min_depth = yield self.get_min_depth_for_context( + pdu.room_id + ) + + logger.debug( + "_handle_new_pdu min_depth for %s: %d", + pdu.room_id, min_depth + ) + + prevs = {e_id for e_id, _ in pdu.prev_events} + seen = set(have_seen.keys()) + + if min_depth and pdu.depth < min_depth: + # This is so that we don't notify the user about this + # message, to work around the fact that some events will + # reference really really old events we really don't want to + # send to the clients. + pdu.internal_metadata.outlier = True + elif min_depth and pdu.depth > min_depth: + if get_missing and prevs - seen: + # If we're missing stuff, ensure we only fetch stuff one + # at a time. + logger.info( + "Acquiring lock for room %r to fetch %d missing events: %r...", + pdu.room_id, len(prevs - seen), list(prevs - seen)[:5], + ) + with (yield self._room_pdu_linearizer.queue(pdu.room_id)): + logger.info( + "Acquired lock for room %r to fetch %d missing events", + pdu.room_id, len(prevs - seen), + ) + + yield self._get_missing_events_for_pdu( + origin, pdu, prevs, min_depth + ) + + prevs = {e_id for e_id, _ in pdu.prev_events} + seen = set(have_seen.keys()) + if prevs - seen: + logger.info( + "Still missing %d events for room %r: %r...", + len(prevs - seen), pdu.room_id, list(prevs - seen)[:5] + ) + fetch_state = True + + if fetch_state: + # We need to get the state at this event, since we haven't + # processed all the prev events. + logger.debug( + "_handle_new_pdu getting state for %s", + pdu.room_id + ) + try: + state, auth_chain = yield self.replication_layer.get_state_for_room( + origin, pdu.room_id, pdu.event_id, + ) + except: + logger.exception("Failed to get state for event: %s", pdu.event_id) + + yield self._process_received_pdu( + origin, + pdu, + state=state, + auth_chain=auth_chain, + ) + + @defer.inlineCallbacks + def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth): + """ + Args: + origin (str): Origin of the pdu. Will be called to get the missing events + pdu: received pdu + prevs (str[]): List of event ids which we are missing + min_depth (int): Minimum depth of events to return. + + Returns: + Deferred<dict(str, str?)>: updated have_seen dictionary + """ + # We recalculate seen, since it may have changed. + have_seen = yield self.store.have_events(prevs) + seen = set(have_seen.keys()) + + if not prevs - seen: + # nothing left to do + defer.returnValue(have_seen) + + latest = yield self.store.get_latest_event_ids_in_room( + pdu.room_id + ) + + # We add the prev events that we have seen to the latest + # list to ensure the remote server doesn't give them to us + latest = set(latest) + latest |= seen + + logger.info( + "Missing %d events for room %r: %r...", + len(prevs - seen), pdu.room_id, list(prevs - seen)[:5] + ) + + # XXX: we set timeout to 10s to help workaround + # https://github.com/matrix-org/synapse/issues/1733. + # The reason is to avoid holding the linearizer lock + # whilst processing inbound /send transactions, causing + # FDs to stack up and block other inbound transactions + # which empirically can currently take up to 30 minutes. + # + # N.B. this explicitly disables retry attempts. + # + # N.B. this also increases our chances of falling back to + # fetching fresh state for the room if the missing event + # can't be found, which slightly reduces our security. + # it may also increase our DAG extremity count for the room, + # causing additional state resolution? See #1760. + # However, fetching state doesn't hold the linearizer lock + # apparently. + # + # see https://github.com/matrix-org/synapse/pull/1744 + + missing_events = yield self.replication_layer.get_missing_events( + origin, + pdu.room_id, + earliest_events_ids=list(latest), + latest_events=[pdu], + limit=10, + min_depth=min_depth, + timeout=10000, + ) + + # We want to sort these by depth so we process them and + # tell clients about them in order. + missing_events.sort(key=lambda x: x.depth) + + for e in missing_events: + yield self.on_receive_pdu( + origin, + e, + get_missing=False + ) + + have_seen = yield self.store.have_events( + [ev for ev, _ in pdu.prev_events] + ) + defer.returnValue(have_seen) @log_function @defer.inlineCallbacks - def on_receive_pdu(self, origin, pdu, state=None, auth_chain=None): - """ Called by the ReplicationLayer when we have a new pdu. We need to - do auth checks and put it through the StateHandler. + def _process_received_pdu(self, origin, pdu, state=None, auth_chain=None): + """ Called when we have a new pdu. We need to do auth checks and put it + through the StateHandler. auth_chain and state are None if we already have the necessary state and prev_events in the db @@ -738,7 +930,7 @@ class FederationHandler(BaseHandler): continue try: - self.on_receive_pdu(origin, p) + self._process_received_pdu(origin, p) except: logger.exception("Couldn't handle pdu") |