From e70b4ce06920102e2460dfb65bc357e2d7e8b794 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 26 Sep 2018 07:56:06 +0100 Subject: Logging improvements Some logging tweaks to help with debugging incoming federation transactions --- synapse/handlers/federation.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 2ccdc3bfa7..a70ae8c830 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -572,6 +572,10 @@ class FederationHandler(BaseHandler): }) seen_ids.add(e.event_id) + logger.info( + "[%s %s] persisting newly-received auth/state events %s", + room_id, event_id, [e["event"].event_id for e in event_infos] + ) yield self._handle_new_events(origin, event_infos) try: -- cgit 1.5.1 From 4a15a3e4d539dcea9a4a57e7cd800a926f2a17c3 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Thu, 27 Sep 2018 11:25:34 +0100 Subject: Include eventid in log lines when processing incoming federation transactions (#3959) when processing incoming transactions, it can be hard to see what's going on, because we process a bunch of stuff in parallel, and because we may end up recursively working our way through a chain of three or four events. This commit creates a way to use logcontexts to add the relevant event ids to the log lines. --- changelog.d/3959.feature | 1 + synapse/federation/federation_server.py | 32 ++++++++-------- synapse/handlers/federation.py | 65 ++++++++++++++++++++------------- synapse/util/logcontext.py | 41 +++++++++++++++++++-- tests/test_federation.py | 28 ++++++++------ tests/util/test_logcontext.py | 5 +++ 6 files changed, 115 insertions(+), 57 deletions(-) create mode 100644 changelog.d/3959.feature (limited to 'synapse/handlers/federation.py') diff --git a/changelog.d/3959.feature b/changelog.d/3959.feature new file mode 100644 index 0000000000..b3a4f37a8d --- /dev/null +++ b/changelog.d/3959.feature @@ -0,0 +1 @@ +Include eventid in log lines when processing incoming federation transactions \ No newline at end of file diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 9a571e4fc7..819e8f7331 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -46,6 +46,7 @@ from synapse.replication.http.federation import ( from synapse.types import get_domain_from_id from synapse.util.async_helpers import Linearizer, concurrently_execute from synapse.util.caches.response_cache import ResponseCache +from synapse.util.logcontext import nested_logging_context from synapse.util.logutils import log_function # when processing incoming transactions, we try to handle multiple rooms in @@ -187,21 +188,22 @@ class FederationServer(FederationBase): for pdu in pdus_by_room[room_id]: event_id = pdu.event_id - try: - yield self._handle_received_pdu( - origin, pdu - ) - pdu_results[event_id] = {} - except FederationError as e: - logger.warn("Error handling PDU %s: %s", event_id, e) - pdu_results[event_id] = {"error": str(e)} - except Exception as e: - f = failure.Failure() - pdu_results[event_id] = {"error": str(e)} - logger.error( - "Failed to handle PDU %s: %s", - event_id, f.getTraceback().rstrip(), - ) + with nested_logging_context(event_id): + try: + yield self._handle_received_pdu( + origin, pdu + ) + pdu_results[event_id] = {} + except FederationError as e: + logger.warn("Error handling PDU %s: %s", event_id, e) + pdu_results[event_id] = {"error": str(e)} + except Exception as e: + f = failure.Failure() + pdu_results[event_id] = {"error": str(e)} + logger.error( + "Failed to handle PDU %s: %s", + event_id, f.getTraceback().rstrip(), + ) yield concurrently_execute( process_pdus_for_room, pdus_by_room.keys(), diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 2ccdc3bfa7..993546387c 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -339,14 +339,18 @@ class FederationHandler(BaseHandler): "[%s %s] Requesting state at missing prev_event %s", room_id, event_id, p, ) - state, got_auth_chain = ( - yield self.federation_client.get_state_for_room( - origin, room_id, p, + + with logcontext.nested_logging_context(p): + state, got_auth_chain = ( + yield self.federation_client.get_state_for_room( + origin, room_id, p, + ) ) - ) - auth_chains.update(got_auth_chain) - state_group = {(x.type, x.state_key): x.event_id for x in state} - state_groups.append(state_group) + auth_chains.update(got_auth_chain) + state_group = { + (x.type, x.state_key): x.event_id for x in state + } + state_groups.append(state_group) # Resolve any conflicting state def fetch(ev_ids): @@ -483,20 +487,21 @@ class FederationHandler(BaseHandler): "[%s %s] Handling received prev_event %s", room_id, event_id, ev.event_id, ) - try: - yield self.on_receive_pdu( - origin, - ev, - sent_to_us_directly=False, - ) - except FederationError as e: - if e.code == 403: - logger.warn( - "[%s %s] Received prev_event %s failed history check.", - room_id, event_id, ev.event_id, + with logcontext.nested_logging_context(ev.event_id): + try: + yield self.on_receive_pdu( + origin, + ev, + sent_to_us_directly=False, ) - else: - raise + except FederationError as e: + if e.code == 403: + logger.warn( + "[%s %s] Received prev_event %s failed history check.", + room_id, event_id, ev.event_id, + ) + else: + raise @defer.inlineCallbacks def _process_received_pdu(self, origin, event, state, auth_chain): @@ -1135,7 +1140,8 @@ class FederationHandler(BaseHandler): try: logger.info("Processing queued PDU %s which was received " "while we were joining %s", p.event_id, p.room_id) - yield self.on_receive_pdu(origin, p, sent_to_us_directly=True) + with logcontext.nested_logging_context(p.event_id): + yield self.on_receive_pdu(origin, p, sent_to_us_directly=True) except Exception as e: logger.warn( "Error handling queued PDU %s from %s: %s", @@ -1581,15 +1587,22 @@ class FederationHandler(BaseHandler): Notifies about the events where appropriate. """ - contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults( - [ - logcontext.run_in_background( - self._prep_event, + + @defer.inlineCallbacks + def prep(ev_info): + event = ev_info["event"] + with logcontext.nested_logging_context(suffix=event.event_id): + res = yield self._prep_event( origin, - ev_info["event"], + event, state=ev_info.get("state"), auth_events=ev_info.get("auth_events"), ) + defer.returnValue(res) + + contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults( + [ + logcontext.run_in_background(prep, ev_info) for ev_info in event_infos ], consumeErrors=True, )) diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index a0c2d37610..89224b26cc 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -200,7 +200,7 @@ class LoggingContext(object): sentinel = Sentinel() - def __init__(self, name=None, parent_context=None): + def __init__(self, name=None, parent_context=None, request=None): self.previous_context = LoggingContext.current_context() self.name = name @@ -218,6 +218,13 @@ class LoggingContext(object): self.parent_context = parent_context + if self.parent_context is not None: + self.parent_context.copy_to(self) + + if request is not None: + # the request param overrides the request from the parent context + self.request = request + def __str__(self): return "%s@%x" % (self.name, id(self)) @@ -256,9 +263,6 @@ class LoggingContext(object): ) self.alive = True - if self.parent_context is not None: - self.parent_context.copy_to(self) - return self def __exit__(self, type, value, traceback): @@ -439,6 +443,35 @@ class PreserveLoggingContext(object): ) +def nested_logging_context(suffix, parent_context=None): + """Creates a new logging context as a child of another. + + The nested logging context will have a 'request' made up of the parent context's + request, plus the given suffix. + + CPU/db usage stats will be added to the parent context's on exit. + + Normal usage looks like: + + with nested_logging_context(suffix): + # ... do stuff + + Args: + suffix (str): suffix to add to the parent context's 'request'. + parent_context (LoggingContext|None): parent context. Will use the current context + if None. + + Returns: + LoggingContext: new logging context. + """ + if parent_context is None: + parent_context = LoggingContext.current_context() + return LoggingContext( + parent_context=parent_context, + request=parent_context.request + "-" + suffix, + ) + + def preserve_fn(f): """Function decorator which wraps the function with run_in_background""" def g(*args, **kwargs): diff --git a/tests/test_federation.py b/tests/test_federation.py index 2540604fcc..ff55c7a627 100644 --- a/tests/test_federation.py +++ b/tests/test_federation.py @@ -6,6 +6,7 @@ from twisted.internet.defer import maybeDeferred, succeed from synapse.events import FrozenEvent from synapse.types import Requester, UserID from synapse.util import Clock +from synapse.util.logcontext import LoggingContext from tests import unittest from tests.server import ThreadedMemoryReactorClock, setup_test_homeserver @@ -117,9 +118,10 @@ class MessageAcceptTests(unittest.TestCase): } ) - d = self.handler.on_receive_pdu( - "test.serv", lying_event, sent_to_us_directly=True - ) + with LoggingContext(request="lying_event"): + d = self.handler.on_receive_pdu( + "test.serv", lying_event, sent_to_us_directly=True + ) # Step the reactor, so the database fetches come back self.reactor.advance(1) @@ -209,11 +211,12 @@ class MessageAcceptTests(unittest.TestCase): } ) - d = self.handler.on_receive_pdu( - "test.serv", good_event, sent_to_us_directly=True - ) - self.reactor.advance(1) - self.assertEqual(self.successResultOf(d), None) + with LoggingContext(request="good_event"): + d = self.handler.on_receive_pdu( + "test.serv", good_event, sent_to_us_directly=True + ) + self.reactor.advance(1) + self.assertEqual(self.successResultOf(d), None) bad_event = FrozenEvent( { @@ -230,10 +233,11 @@ class MessageAcceptTests(unittest.TestCase): } ) - d = self.handler.on_receive_pdu( - "test.serv", bad_event, sent_to_us_directly=True - ) - self.reactor.advance(1) + with LoggingContext(request="bad_event"): + d = self.handler.on_receive_pdu( + "test.serv", bad_event, sent_to_us_directly=True + ) + self.reactor.advance(1) extrem = maybeDeferred( self.homeserver.datastore.get_latest_event_ids_in_room, self.room_id diff --git a/tests/util/test_logcontext.py b/tests/util/test_logcontext.py index 4633db77b3..8adaee3c8d 100644 --- a/tests/util/test_logcontext.py +++ b/tests/util/test_logcontext.py @@ -159,6 +159,11 @@ class LoggingContextTestCase(unittest.TestCase): self.assertEqual(r, "bum") self._check_test_key("one") + def test_nested_logging_context(self): + with LoggingContext(request="foo"): + nested_context = logcontext.nested_logging_context(suffix="bar") + self.assertEqual(nested_context.request, "foo-bar") + # a function which returns a deferred which has been "called", but # which had a function which returned another incomplete deferred on -- cgit 1.5.1 From e3c159863d72ba1628394497bba45dd96b9cc1ac Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 26 Sep 2018 08:09:07 +0100 Subject: Clarifications in FederationHandler * add some comments on things that look a bit bogus * rename this `state` variable to avoid confusion with the `state` used elsewhere in this function. (There was no actual conflict, but it was a confusing bit of spaghetti.) --- synapse/handlers/federation.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 128926e719..6793f9b6c6 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -341,14 +341,23 @@ class FederationHandler(BaseHandler): ) with logcontext.nested_logging_context(p): - state, got_auth_chain = ( + # XXX if any of the missing prevs share missing state or auth + # events, we'll end up requesting those missing events for + # *each* missing prev, contributing to the hammering of /event + # as per https://github.com/matrix-org/synapse/issues/2164. + remote_state, got_auth_chain = ( yield self.federation_client.get_state_for_room( origin, room_id, p, ) ) + + # XXX hrm I'm not convinced that duplicate events will compare + # for equality, so I'm not sure this does what the author + # hoped. auth_chains.update(got_auth_chain) + state_group = { - (x.type, x.state_key): x.event_id for x in state + (x.type, x.state_key): x.event_id for x in remote_state } state_groups.append(state_group) -- cgit 1.5.1 From 28223841e05a77a44ec2c0b29d1e930c68974913 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 26 Sep 2018 19:17:36 +0100 Subject: more comments --- synapse/federation/federation_client.py | 2 -- synapse/handlers/federation.py | 7 +++---- 2 files changed, 3 insertions(+), 6 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 8bf1ad0c1f..d05ed91d64 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -209,8 +209,6 @@ class FederationClient(FederationBase): Will attempt to get the PDU from each destination in the list until one succeeds. - This will persist the PDU locally upon receipt. - Args: destinations (list): Which home servers to query event_id (str): event to fetch diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 6793f9b6c6..38bebbf598 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -341,10 +341,9 @@ class FederationHandler(BaseHandler): ) with logcontext.nested_logging_context(p): - # XXX if any of the missing prevs share missing state or auth - # events, we'll end up requesting those missing events for - # *each* missing prev, contributing to the hammering of /event - # as per https://github.com/matrix-org/synapse/issues/2164. + # note that if any of the missing prevs share missing state or + # auth events, the requests to fetch those events are deduped + # by the get_pdu_cache in federation_client. remote_state, got_auth_chain = ( yield self.federation_client.get_state_for_room( origin, room_id, p, -- cgit 1.5.1 From a215b698c43f4cfe8a2fdb9c160c8ecd1c1297c5 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 26 Sep 2018 09:52:56 +0100 Subject: Fix "unhashable type: 'list'" exception in federation handling get_state_groups returns a map from state_group_id to a list of FrozenEvents, so was very much the wrong thing to be putting as one of the entries in the list passed to resolve_events_with_factory (which expects maps from (event_type, state_key) to event id). We actually want get_state_groups_ids().values() rather than get_state_groups(). This fixes the main problem in #3923, but there are other problems with this bit of code which get discovered once you do so. --- synapse/handlers/federation.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 38bebbf598..2d6b8edec4 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -106,7 +106,7 @@ class FederationHandler(BaseHandler): self.hs = hs - self.store = hs.get_datastore() + self.store = hs.get_datastore() # type: synapse.storage.DataStore self.federation_client = hs.get_federation_client() self.state_handler = hs.get_state_handler() self.server_name = hs.hostname @@ -325,12 +325,17 @@ class FederationHandler(BaseHandler): # Calculate the state of the previous events, and # de-conflict them to find the current state. - state_groups = [] auth_chains = set() try: # Get the state of the events we know about - ours = yield self.store.get_state_groups(room_id, list(seen)) - state_groups.append(ours) + ours = yield self.store.get_state_groups_ids(room_id, seen) + + # state_maps is a list of mappings from (type, state_key) to event_id + # type: list[dict[tuple[str, str], str]] + state_maps = list(ours.values()) + + # we don't need this any more, let's delete it. + del ours # Ask the remote server for the states we don't # know about @@ -355,10 +360,10 @@ class FederationHandler(BaseHandler): # hoped. auth_chains.update(got_auth_chain) - state_group = { + remote_state_map = { (x.type, x.state_key): x.event_id for x in remote_state } - state_groups.append(state_group) + state_maps.append(remote_state_map) # Resolve any conflicting state def fetch(ev_ids): @@ -368,7 +373,7 @@ class FederationHandler(BaseHandler): room_version = yield self.store.get_room_version(room_id) state_map = yield resolve_events_with_factory( - room_version, state_groups, {event_id: pdu}, fetch + room_version, state_maps, {event_id: pdu}, fetch, ) state = (yield self.store.get_events(state_map.values())).values() -- cgit 1.5.1 From bd61c82bdf97460a33080bcb6b2c836616a3b415 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 26 Sep 2018 12:16:13 +0100 Subject: Include state from remote servers in pdu handling If we've fetched state events from remote servers in order to resolve the state for a new event, we need to actually pass those events into resolve_events_with_factory (so that it can do the state res) and then persist the ones we need - otherwise other bits of the codebase get confused about why we have state groups pointing to non-existent events. --- synapse/handlers/federation.py | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 2d6b8edec4..cdad565d04 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -326,6 +326,9 @@ class FederationHandler(BaseHandler): # Calculate the state of the previous events, and # de-conflict them to find the current state. auth_chains = set() + event_map = { + event_id: pdu, + } try: # Get the state of the events we know about ours = yield self.store.get_state_groups_ids(room_id, seen) @@ -365,18 +368,30 @@ class FederationHandler(BaseHandler): } state_maps.append(remote_state_map) + for x in remote_state: + event_map[x.event_id] = x + # Resolve any conflicting state + @defer.inlineCallbacks def fetch(ev_ids): - return self.store.get_events( - ev_ids, get_prev_content=False, check_redacted=False + fetched = yield self.store.get_events( + ev_ids, get_prev_content=False, check_redacted=False, ) + # add any events we fetch here to the `event_map` so that we + # can use them to build the state event list below. + event_map.update(fetched) + defer.returnValue(fetched) room_version = yield self.store.get_room_version(room_id) state_map = yield resolve_events_with_factory( - room_version, state_maps, {event_id: pdu}, fetch, + room_version, state_maps, event_map, fetch, ) - state = (yield self.store.get_events(state_map.values())).values() + # we need to give _process_received_pdu the actual state events + # rather than event ids, so generate that now. + state = [ + event_map[e] for e in six.itervalues(state_map) + ] auth_chain = list(auth_chains) except Exception: logger.warn( -- cgit 1.5.1 From 333bee27f53916bf5354a39a79aa468967730326 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 26 Sep 2018 19:49:59 +0100 Subject: Include event when resolving state for missing prevs If we have a forward extremity for a room as `E`, and you receive `A`, `B`, s.t. `A -> B -> E`, and `B` also points to an unknown event `X`, then we need to do state res between `X` and `E`. When that happens, we need to make sure we include `X` in the state that goes into the state res alg. Fixes #3934. --- synapse/handlers/federation.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index cdad565d04..d05b63673f 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -323,8 +323,8 @@ class FederationHandler(BaseHandler): affected=pdu.event_id, ) - # Calculate the state of the previous events, and - # de-conflict them to find the current state. + # Calculate the state after each of the previous events, and + # resolve them to find the correct state at the current event. auth_chains = set() event_map = { event_id: pdu, @@ -358,6 +358,20 @@ class FederationHandler(BaseHandler): ) ) + # we want the state *after* p; get_state_for_room returns the + # state *before* p. + remote_event = yield self.federation_client.get_pdu( + [origin], p, outlier=True, + ) + + if remote_event is None: + raise Exception( + "Unable to get missing prev_event %s" % (p, ) + ) + + if remote_event.is_state(): + remote_state.append(remote_event) + # XXX hrm I'm not convinced that duplicate events will compare # for equality, so I'm not sure this does what the author # hoped. -- cgit 1.5.1