From fa1ce4d8adb307f9b325b4e8c125f78a673c4987 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Aug 2016 10:44:37 +0100 Subject: Don't print stack traces when failing to get remote keys --- synapse/crypto/keyring.py | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) (limited to 'synapse/crypto') diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 5012c10ee8..7cd11cfae7 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -61,6 +61,10 @@ Attributes: """ +class KeyLookupError(ValueError): + pass + + class Keyring(object): def __init__(self, hs): self.store = hs.get_datastore() @@ -363,7 +367,7 @@ class Keyring(object): ) except Exception as e: logger.info( - "Unable to getting key %r for %r directly: %s %s", + "Unable to get key %r for %r directly: %s %s", key_ids, server_name, type(e).__name__, str(e.message), ) @@ -425,7 +429,7 @@ class Keyring(object): for response in responses: if (u"signatures" not in response or perspective_name not in response[u"signatures"]): - raise ValueError( + raise KeyLookupError( "Key response not signed by perspective server" " %r" % (perspective_name,) ) @@ -448,7 +452,7 @@ class Keyring(object): list(response[u"signatures"][perspective_name]), list(perspective_keys) ) - raise ValueError( + raise KeyLookupError( "Response not signed with a known key for perspective" " server %r" % (perspective_name,) ) @@ -491,10 +495,10 @@ class Keyring(object): if (u"signatures" not in response or server_name not in response[u"signatures"]): - raise ValueError("Key response not signed by remote server") + raise KeyLookupError("Key response not signed by remote server") if "tls_fingerprints" not in response: - raise ValueError("Key response missing TLS fingerprints") + raise KeyLookupError("Key response missing TLS fingerprints") certificate_bytes = crypto.dump_certificate( crypto.FILETYPE_ASN1, tls_certificate @@ -508,7 +512,7 @@ class Keyring(object): response_sha256_fingerprints.add(fingerprint[u"sha256"]) if sha256_fingerprint_b64 not in response_sha256_fingerprints: - raise ValueError("TLS certificate not allowed by fingerprints") + raise KeyLookupError("TLS certificate not allowed by fingerprints") response_keys = yield self.process_v2_response( from_server=server_name, @@ -560,14 +564,14 @@ class Keyring(object): server_name = response_json["server_name"] if only_from_server: if server_name != from_server: - raise ValueError( + raise KeyLookupError( "Expected a response for server %r not %r" % ( from_server, server_name ) ) for key_id in response_json["signatures"].get(server_name, {}): if key_id not in response_json["verify_keys"]: - raise ValueError( + raise KeyLookupError( "Key response must include verification keys for all" " signatures" ) @@ -635,15 +639,15 @@ class Keyring(object): if ("signatures" not in response or server_name not in response["signatures"]): - raise ValueError("Key response not signed by remote server") + raise KeyLookupError("Key response not signed by remote server") if "tls_certificate" not in response: - raise ValueError("Key response missing TLS certificate") + raise KeyLookupError("Key response missing TLS certificate") tls_certificate_b64 = response["tls_certificate"] if encode_base64(x509_certificate_bytes) != tls_certificate_b64: - raise ValueError("TLS certificate doesn't match") + raise KeyLookupError("TLS certificate doesn't match") # Cache the result in the datastore. @@ -659,7 +663,7 @@ class Keyring(object): for key_id in response["signatures"][server_name]: if key_id not in response["verify_keys"]: - raise ValueError( + raise KeyLookupError( "Key response must include verification keys for all" " signatures" ) -- cgit 1.4.1 From 2426c2f21a4c026806676de4ea88b70b9520659e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 19 Aug 2016 17:38:15 +0100 Subject: Measure keyrings --- synapse/crypto/keyring.py | 70 ++++++++++++++++++++++++----------------------- 1 file changed, 36 insertions(+), 34 deletions(-) (limited to 'synapse/crypto') diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 7cd11cfae7..0bae6fdeed 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -22,6 +22,7 @@ from synapse.util.logcontext import ( preserve_context_over_deferred, preserve_context_over_fn, PreserveLoggingContext, preserve_fn ) +from synapse.util.metrics import Measure from twisted.internet import defer @@ -115,42 +116,43 @@ class Keyring(object): @defer.inlineCallbacks def handle_key_deferred(verify_request): - server_name = verify_request.server_name - try: - _, key_id, verify_key = yield verify_request.deferred - except IOError as e: - logger.warn( - "Got IOError when downloading keys for %s: %s %s", - server_name, type(e).__name__, str(e.message), - ) - raise SynapseError( - 502, - "Error downloading keys for %s" % (server_name,), - Codes.UNAUTHORIZED, - ) - except Exception as e: - logger.exception( - "Got Exception when downloading keys for %s: %s %s", - server_name, type(e).__name__, str(e.message), - ) - raise SynapseError( - 401, - "No key for %s with id %s" % (server_name, key_ids), - Codes.UNAUTHORIZED, - ) + with Measure(self.clock, "handle_key_deferred"): + server_name = verify_request.server_name + try: + _, key_id, verify_key = yield verify_request.deferred + except IOError as e: + logger.warn( + "Got IOError when downloading keys for %s: %s %s", + server_name, type(e).__name__, str(e.message), + ) + raise SynapseError( + 502, + "Error downloading keys for %s" % (server_name,), + Codes.UNAUTHORIZED, + ) + except Exception as e: + logger.exception( + "Got Exception when downloading keys for %s: %s %s", + server_name, type(e).__name__, str(e.message), + ) + raise SynapseError( + 401, + "No key for %s with id %s" % (server_name, key_ids), + Codes.UNAUTHORIZED, + ) - json_object = verify_request.json_object + json_object = verify_request.json_object - try: - verify_signed_json(json_object, server_name, verify_key) - except: - raise SynapseError( - 401, - "Invalid signature for server %s with key %s:%s" % ( - server_name, verify_key.alg, verify_key.version - ), - Codes.UNAUTHORIZED, - ) + try: + verify_signed_json(json_object, server_name, verify_key) + except: + raise SynapseError( + 401, + "Invalid signature for server %s with key %s:%s" % ( + server_name, verify_key.alg, verify_key.version + ), + Codes.UNAUTHORIZED, + ) server_to_deferred = { server_name: defer.Deferred() -- cgit 1.4.1 From 04fc8bbcb04673f73c0783bbcdfa3b54cbcb4615 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 19 Aug 2016 17:56:44 +0100 Subject: Update keyring Measure --- synapse/crypto/keyring.py | 170 +++++++++++++++++++++++----------------------- 1 file changed, 85 insertions(+), 85 deletions(-) (limited to 'synapse/crypto') diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 0bae6fdeed..1735ca9345 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -116,43 +116,42 @@ class Keyring(object): @defer.inlineCallbacks def handle_key_deferred(verify_request): - with Measure(self.clock, "handle_key_deferred"): - server_name = verify_request.server_name - try: - _, key_id, verify_key = yield verify_request.deferred - except IOError as e: - logger.warn( - "Got IOError when downloading keys for %s: %s %s", - server_name, type(e).__name__, str(e.message), - ) - raise SynapseError( - 502, - "Error downloading keys for %s" % (server_name,), - Codes.UNAUTHORIZED, - ) - except Exception as e: - logger.exception( - "Got Exception when downloading keys for %s: %s %s", - server_name, type(e).__name__, str(e.message), - ) - raise SynapseError( - 401, - "No key for %s with id %s" % (server_name, key_ids), - Codes.UNAUTHORIZED, - ) + server_name = verify_request.server_name + try: + _, key_id, verify_key = yield verify_request.deferred + except IOError as e: + logger.warn( + "Got IOError when downloading keys for %s: %s %s", + server_name, type(e).__name__, str(e.message), + ) + raise SynapseError( + 502, + "Error downloading keys for %s" % (server_name,), + Codes.UNAUTHORIZED, + ) + except Exception as e: + logger.exception( + "Got Exception when downloading keys for %s: %s %s", + server_name, type(e).__name__, str(e.message), + ) + raise SynapseError( + 401, + "No key for %s with id %s" % (server_name, key_ids), + Codes.UNAUTHORIZED, + ) - json_object = verify_request.json_object + json_object = verify_request.json_object - try: - verify_signed_json(json_object, server_name, verify_key) - except: - raise SynapseError( - 401, - "Invalid signature for server %s with key %s:%s" % ( - server_name, verify_key.alg, verify_key.version - ), - Codes.UNAUTHORIZED, - ) + try: + verify_signed_json(json_object, server_name, verify_key) + except: + raise SynapseError( + 401, + "Invalid signature for server %s with key %s:%s" % ( + server_name, verify_key.alg, verify_key.version + ), + Codes.UNAUTHORIZED, + ) server_to_deferred = { server_name: defer.Deferred() @@ -245,59 +244,60 @@ class Keyring(object): @defer.inlineCallbacks def do_iterations(): - merged_results = {} - - missing_keys = {} - for verify_request in verify_requests: - missing_keys.setdefault(verify_request.server_name, set()).update( - verify_request.key_ids - ) + with Measure(self.clock, "get_server_verify_keys"): + merged_results = {} - for fn in key_fetch_fns: - results = yield fn(missing_keys.items()) - merged_results.update(results) - - # We now need to figure out which verify requests we have keys - # for and which we don't missing_keys = {} - requests_missing_keys = [] for verify_request in verify_requests: - server_name = verify_request.server_name - result_keys = merged_results[server_name] - - if verify_request.deferred.called: - # We've already called this deferred, which probably - # means that we've already found a key for it. - continue - - for key_id in verify_request.key_ids: - if key_id in result_keys: - with PreserveLoggingContext(): - verify_request.deferred.callback(( - server_name, - key_id, - result_keys[key_id], - )) - break - else: - # The else block is only reached if the loop above - # doesn't break. - missing_keys.setdefault(server_name, set()).update( - verify_request.key_ids - ) - requests_missing_keys.append(verify_request) - - if not missing_keys: - break - - for verify_request in requests_missing_keys.values(): - verify_request.deferred.errback(SynapseError( - 401, - "No key for %s with id %s" % ( - verify_request.server_name, verify_request.key_ids, - ), - Codes.UNAUTHORIZED, - )) + missing_keys.setdefault(verify_request.server_name, set()).update( + verify_request.key_ids + ) + + for fn in key_fetch_fns: + results = yield fn(missing_keys.items()) + merged_results.update(results) + + # We now need to figure out which verify requests we have keys + # for and which we don't + missing_keys = {} + requests_missing_keys = [] + for verify_request in verify_requests: + server_name = verify_request.server_name + result_keys = merged_results[server_name] + + if verify_request.deferred.called: + # We've already called this deferred, which probably + # means that we've already found a key for it. + continue + + for key_id in verify_request.key_ids: + if key_id in result_keys: + with PreserveLoggingContext(): + verify_request.deferred.callback(( + server_name, + key_id, + result_keys[key_id], + )) + break + else: + # The else block is only reached if the loop above + # doesn't break. + missing_keys.setdefault(server_name, set()).update( + verify_request.key_ids + ) + requests_missing_keys.append(verify_request) + + if not missing_keys: + break + + for verify_request in requests_missing_keys.values(): + verify_request.deferred.errback(SynapseError( + 401, + "No key for %s with id %s" % ( + verify_request.server_name, verify_request.key_ids, + ), + Codes.UNAUTHORIZED, + )) def on_err(err): for verify_request in verify_requests: -- cgit 1.4.1 From 921913935176f5bd3df5e5b960d87c94a2adb304 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 23 Aug 2016 15:23:39 +0100 Subject: Preserve some logcontexts --- synapse/app/homeserver.py | 3 ++- synapse/appservice/scheduler.py | 6 +++--- synapse/crypto/keyring.py | 36 ++++++++++++++++---------------- synapse/federation/federation_base.py | 7 ++++--- synapse/federation/federation_client.py | 17 +++++++++------ synapse/handlers/appservice.py | 8 +++---- synapse/handlers/federation.py | 28 +++++++++++++------------ synapse/handlers/message.py | 35 ++++++++++++++++++------------- synapse/handlers/typing.py | 12 +++++++---- synapse/notifier.py | 7 ++++++- synapse/push/push_tools.py | 9 ++++---- synapse/push/pusherpool.py | 12 ++++++----- synapse/rest/client/v2_alpha/register.py | 3 +-- synapse/storage/events.py | 16 ++++++++------ synapse/storage/stream.py | 6 +++--- synapse/util/async.py | 9 ++++---- synapse/util/logcontext.py | 15 +++++++++---- synapse/visibility.py | 6 +++--- 18 files changed, 136 insertions(+), 99 deletions(-) (limited to 'synapse/crypto') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 54f35900f8..4493d3b847 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -50,7 +50,7 @@ from synapse.api.urls import ( ) from synapse.config.homeserver import HomeServerConfig from synapse.crypto import context_factory -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, logcontext_tracer from synapse.metrics import register_memory_metrics, get_metrics_for from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.replication.resource import ReplicationResource, REPLICATION_PREFIX @@ -449,6 +449,7 @@ def run(hs): # Uncomment to enable tracing of log context changes. # sys.settrace(logcontext_tracer) with LoggingContext("run"): + sys.settrace(logcontext_tracer) change_resource_limit(hs.config.soft_file_limit) if hs.config.gc_thresholds: gc.set_threshold(*hs.config.gc_thresholds) diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 6450a12890..68a9de17b8 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -150,12 +150,12 @@ class _TransactionController(object): if service_is_up: sent = yield txn.send(self.as_api) if sent: - txn.complete(self.store) + yield txn.complete(self.store) else: - self._start_recoverer(service) + preserve_fn(self._start_recoverer)(service) except Exception as e: logger.exception(e) - self._start_recoverer(service) + preserve_fn(self._start_recoverer)(service) @defer.inlineCallbacks def on_recovered(self, recoverer): diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 1735ca9345..d7211ee9b3 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -308,15 +308,15 @@ class Keyring(object): @defer.inlineCallbacks def get_keys_from_store(self, server_name_and_key_ids): - res = yield defer.gatherResults( + res = yield preserve_context_over_deferred(defer.gatherResults( [ - self.store.get_server_verify_keys( + preserve_fn(self.store.get_server_verify_keys)( server_name, key_ids ).addCallback(lambda ks, server: (server, ks), server_name) for server_name, key_ids in server_name_and_key_ids ], consumeErrors=True, - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) defer.returnValue(dict(res)) @@ -337,13 +337,13 @@ class Keyring(object): ) defer.returnValue({}) - results = yield defer.gatherResults( + results = yield preserve_context_over_deferred(defer.gatherResults( [ - get_key(p_name, p_keys) + preserve_fn(get_key)(p_name, p_keys) for p_name, p_keys in self.perspective_servers.items() ], consumeErrors=True, - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) union_of_keys = {} for result in results: @@ -383,13 +383,13 @@ class Keyring(object): defer.returnValue(keys) - results = yield defer.gatherResults( + results = yield preserve_context_over_deferred(defer.gatherResults( [ - get_key(server_name, key_ids) + preserve_fn(get_key)(server_name, key_ids) for server_name, key_ids in server_name_and_key_ids ], consumeErrors=True, - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) merged = {} for result in results: @@ -466,9 +466,9 @@ class Keyring(object): for server_name, response_keys in processed_response.items(): keys.setdefault(server_name, {}).update(response_keys) - yield defer.gatherResults( + yield preserve_context_over_deferred(defer.gatherResults( [ - self.store_keys( + preserve_fn(self.store_keys)( server_name=server_name, from_server=perspective_name, verify_keys=response_keys, @@ -476,7 +476,7 @@ class Keyring(object): for server_name, response_keys in keys.items() ], consumeErrors=True - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) defer.returnValue(keys) @@ -524,7 +524,7 @@ class Keyring(object): keys.update(response_keys) - yield defer.gatherResults( + yield preserve_context_over_deferred(defer.gatherResults( [ preserve_fn(self.store_keys)( server_name=key_server_name, @@ -534,7 +534,7 @@ class Keyring(object): for key_server_name, verify_keys in keys.items() ], consumeErrors=True - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) defer.returnValue(keys) @@ -600,7 +600,7 @@ class Keyring(object): response_keys.update(verify_keys) response_keys.update(old_verify_keys) - yield defer.gatherResults( + yield preserve_context_over_deferred(defer.gatherResults( [ preserve_fn(self.store.store_server_keys_json)( server_name=server_name, @@ -613,7 +613,7 @@ class Keyring(object): for key_id in updated_key_ids ], consumeErrors=True, - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) results[server_name] = response_keys @@ -702,7 +702,7 @@ class Keyring(object): A deferred that completes when the keys are stored. """ # TODO(markjh): Store whether the keys have expired. - yield defer.gatherResults( + yield preserve_context_over_deferred(defer.gatherResults( [ preserve_fn(self.store.store_server_verify_key)( server_name, server_name, key.time_added, key @@ -710,4 +710,4 @@ class Keyring(object): for key_id, key in verify_keys.items() ], consumeErrors=True, - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index da2f5e8cfd..2339cc9034 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -23,6 +23,7 @@ from synapse.crypto.event_signing import check_event_content_hash from synapse.api.errors import SynapseError from synapse.util import unwrapFirstError +from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred import logging @@ -102,10 +103,10 @@ class FederationBase(object): warn, pdu ) - valid_pdus = yield defer.gatherResults( + valid_pdus = yield preserve_context_over_deferred(defer.gatherResults( deferreds, consumeErrors=True - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) if include_none: defer.returnValue(valid_pdus) @@ -129,7 +130,7 @@ class FederationBase(object): for pdu in pdus ] - deferreds = self.keyring.verify_json_objects_for_server([ + deferreds = preserve_fn(self.keyring.verify_json_objects_for_server)([ (p.origin, p.get_pdu_json()) for p in redacted_pdus ]) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 9ba3151713..f2b3aceb49 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -27,6 +27,7 @@ from synapse.util import unwrapFirstError from synapse.util.async import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.logutils import log_function +from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred from synapse.events import FrozenEvent import synapse.metrics @@ -225,10 +226,10 @@ class FederationClient(FederationBase): ] # FIXME: We should handle signature failures more gracefully. - pdus[:] = yield defer.gatherResults( + pdus[:] = yield preserve_context_over_deferred(defer.gatherResults( self._check_sigs_and_hashes(pdus), consumeErrors=True, - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) defer.returnValue(pdus) @@ -457,14 +458,16 @@ class FederationClient(FederationBase): batch = set(missing_events[i:i + batch_size]) deferreds = [ - self.get_pdu( + preserve_fn(self.get_pdu)( destinations=random_server_list(), event_id=e_id, ) for e_id in batch ] - res = yield defer.DeferredList(deferreds, consumeErrors=True) + res = yield preserve_context_over_deferred( + defer.DeferredList(deferreds, consumeErrors=True) + ) for success, result in res: if success: signed_events.append(result) @@ -853,14 +856,16 @@ class FederationClient(FederationBase): return srvs deferreds = [ - self.get_pdu( + preserve_fn(self.get_pdu)( destinations=random_server_list(), event_id=e_id, ) for e_id, depth in ordered_missing[:limit - len(signed_events)] ] - res = yield defer.DeferredList(deferreds, consumeErrors=True) + res = yield preserve_context_over_deferred( + defer.DeferredList(deferreds, consumeErrors=True) + ) for (result, val), (e_id, _) in zip(res, ordered_missing): if result and val: signed_events.append(val) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index dd285452cd..306686a384 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -17,7 +17,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes from synapse.util.metrics import Measure -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred import logging @@ -163,10 +163,10 @@ class ApplicationServicesHandler(object): def query_3pe(self, kind, protocol, fields): services = yield self._get_services_for_3pn(protocol) - results = yield defer.DeferredList([ - self.appservice_api.query_3pe(service, kind, protocol, fields) + results = yield preserve_context_over_deferred(defer.DeferredList([ + preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields) for service in services - ], consumeErrors=True) + ], consumeErrors=True)) ret = [] for (success, result) in results: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 328f8f4842..fe3092b14b 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -26,7 +26,9 @@ from synapse.api.errors import ( from synapse.api.constants import EventTypes, Membership, RejectedReason from synapse.events.validator import EventValidator from synapse.util import unwrapFirstError -from synapse.util.logcontext import PreserveLoggingContext, preserve_fn +from synapse.util.logcontext import ( + PreserveLoggingContext, preserve_fn, preserve_context_over_deferred +) from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor from synapse.util.frozenutils import unfreeze @@ -361,9 +363,9 @@ class FederationHandler(BaseHandler): missing_auth - failed_to_fetch ) - results = yield defer.gatherResults( + results = yield preserve_context_over_deferred(defer.gatherResults( [ - self.replication_layer.get_pdu( + preserve_fn(self.replication_layer.get_pdu)( [dest], event_id, outlier=True, @@ -372,7 +374,7 @@ class FederationHandler(BaseHandler): for event_id in missing_auth - failed_to_fetch ], consumeErrors=True - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) auth_events.update({a.event_id: a for a in results}) required_auth.update( a_id for event in results for a_id, _ in event.auth_events @@ -552,10 +554,10 @@ class FederationHandler(BaseHandler): event_ids = list(extremities.keys()) - states = yield defer.gatherResults([ - self.state_handler.resolve_state_groups(room_id, [e]) + states = yield preserve_context_over_deferred(defer.gatherResults([ + preserve_fn(self.state_handler.resolve_state_groups)(room_id, [e]) for e in event_ids - ]) + ])) states = dict(zip(event_ids, [s[1] for s in states])) for e_id, _ in sorted_extremeties_tuple: @@ -1166,9 +1168,9 @@ class FederationHandler(BaseHandler): a bunch of outliers, but not a chunk of individual events that depend on each other for state calculations. """ - contexts = yield defer.gatherResults( + contexts = yield preserve_context_over_deferred(defer.gatherResults( [ - self._prep_event( + preserve_fn(self._prep_event)( origin, ev_info["event"], state=ev_info.get("state"), @@ -1176,7 +1178,7 @@ class FederationHandler(BaseHandler): ) for ev_info in event_infos ] - ) + )) yield self.store.persist_events( [ @@ -1460,9 +1462,9 @@ class FederationHandler(BaseHandler): # Do auth conflict res. logger.info("Different auth: %s", different_auth) - different_events = yield defer.gatherResults( + different_events = yield preserve_context_over_deferred(defer.gatherResults( [ - self.store.get_event( + preserve_fn(self.store.get_event)( d, allow_none=True, allow_rejected=False, @@ -1471,7 +1473,7 @@ class FederationHandler(BaseHandler): if d in have_events and not have_events[d] ], consumeErrors=True - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) if different_events: local_view = dict(auth_events) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index dc76d34a52..4c3cd9d12e 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -28,7 +28,8 @@ from synapse.types import ( from synapse.util import unwrapFirstError from synapse.util.async import concurrently_execute, run_on_reactor, ReadWriteLock from synapse.util.caches.snapshot_cache import SnapshotCache -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred +from synapse.util.metrics import measure_func from synapse.visibility import filter_events_for_client from ._base import BaseHandler @@ -502,15 +503,17 @@ class MessageHandler(BaseHandler): lambda states: states[event.event_id] ) - (messages, token), current_state = yield defer.gatherResults( - [ - self.store.get_recent_events_for_room( - event.room_id, - limit=limit, - end_token=room_end_token, - ), - deferred_room_state, - ] + (messages, token), current_state = yield preserve_context_over_deferred( + defer.gatherResults( + [ + preserve_fn(self.store.get_recent_events_for_room)( + event.room_id, + limit=limit, + end_token=room_end_token, + ), + deferred_room_state, + ] + ) ).addErrback(unwrapFirstError) messages = yield filter_events_for_client( @@ -719,9 +722,9 @@ class MessageHandler(BaseHandler): presence, receipts, (messages, token) = yield defer.gatherResults( [ - get_presence(), - get_receipts(), - self.store.get_recent_events_for_room( + preserve_fn(get_presence)(), + preserve_fn(get_receipts)(), + preserve_fn(self.store.get_recent_events_for_room)( room_id, limit=limit, end_token=now_token.room_key, @@ -755,6 +758,7 @@ class MessageHandler(BaseHandler): defer.returnValue(ret) + @measure_func("_create_new_client_event") @defer.inlineCallbacks def _create_new_client_event(self, builder, prev_event_ids=None): if prev_event_ids: @@ -806,6 +810,7 @@ class MessageHandler(BaseHandler): (event, context,) ) + @measure_func("handle_new_client_event") @defer.inlineCallbacks def handle_new_client_event( self, @@ -934,7 +939,7 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def _notify(): yield run_on_reactor() - self.notifier.on_new_room_event( + yield self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, extra_users=extra_users ) @@ -944,6 +949,6 @@ class MessageHandler(BaseHandler): # If invite, remove room_state from unsigned before sending. event.unsigned.pop("invite_room_state", None) - federation_handler.handle_new_event( + preserve_fn(federation_handler.handle_new_event)( event, destinations=destinations, ) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 5589296c09..46181984c0 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -16,7 +16,9 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError -from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.logcontext import ( + PreserveLoggingContext, preserve_fn, preserve_context_over_deferred, +) from synapse.util.metrics import Measure from synapse.types import UserID @@ -169,13 +171,13 @@ class TypingHandler(object): deferreds = [] for domain in domains: if domain == self.server_name: - self._push_update_local( + preserve_fn(self._push_update_local)( room_id=room_id, user_id=user_id, typing=typing ) else: - deferreds.append(self.federation.send_edu( + deferreds.append(preserve_fn(self.federation.send_edu)( destination=domain, edu_type="m.typing", content={ @@ -185,7 +187,9 @@ class TypingHandler(object): }, )) - yield defer.DeferredList(deferreds, consumeErrors=True) + yield preserve_context_over_deferred( + defer.DeferredList(deferreds, consumeErrors=True) + ) @defer.inlineCallbacks def _recv_edu(self, origin, content): diff --git a/synapse/notifier.py b/synapse/notifier.py index c48024096d..b86648f5e4 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -19,7 +19,7 @@ from synapse.api.errors import AuthError from synapse.util.logutils import log_function from synapse.util.async import ObservableDeferred -from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.logcontext import PreserveLoggingContext, preserve_fn from synapse.util.metrics import Measure from synapse.types import StreamToken from synapse.visibility import filter_events_for_client @@ -174,6 +174,7 @@ class Notifier(object): lambda: len(self.user_to_user_stream), ) + @preserve_fn def on_new_room_event(self, event, room_stream_id, max_room_stream_id, extra_users=[]): """ Used by handlers to inform the notifier something has happened @@ -195,6 +196,7 @@ class Notifier(object): self.notify_replication() + @preserve_fn def _notify_pending_new_room_events(self, max_room_stream_id): """Notify for the room events that were queued waiting for a previous event to be persisted. @@ -212,6 +214,7 @@ class Notifier(object): else: self._on_new_room_event(event, room_stream_id, extra_users) + @preserve_fn def _on_new_room_event(self, event, room_stream_id, extra_users=[]): """Notify any user streams that are interested in this room event""" # poke any interested application service. @@ -226,6 +229,7 @@ class Notifier(object): rooms=[event.room_id], ) + @preserve_fn def on_new_event(self, stream_key, new_token, users=[], rooms=[]): """ Used to inform listeners that something has happend event wise. @@ -252,6 +256,7 @@ class Notifier(object): self.notify_replication() + @preserve_fn def on_new_replication_data(self): """Used to inform replication listeners that something has happend without waking up any of the normal user event streams""" diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py index d555a33e9a..becb8ef1ae 100644 --- a/synapse/push/push_tools.py +++ b/synapse/push/push_tools.py @@ -17,14 +17,15 @@ from twisted.internet import defer from synapse.util.presentable_names import ( calculate_room_name, name_from_member_event ) +from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred @defer.inlineCallbacks def get_badge_count(store, user_id): - invites, joins = yield defer.gatherResults([ - store.get_invited_rooms_for_user(user_id), - store.get_rooms_for_user(user_id), - ], consumeErrors=True) + invites, joins = yield preserve_context_over_deferred(defer.gatherResults([ + preserve_fn(store.get_invited_rooms_for_user)(user_id), + preserve_fn(store.get_rooms_for_user)(user_id), + ], consumeErrors=True)) my_receipts_by_room = yield store.get_receipts_for_user( user_id, "m.read", diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 54c0f1b849..3837be523d 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -17,7 +17,7 @@ from twisted.internet import defer import pusher -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred from synapse.util.async import run_on_reactor import logging @@ -130,10 +130,12 @@ class PusherPool: if u in self.pushers: for p in self.pushers[u].values(): deferreds.append( - p.on_new_notifications(min_stream_id, max_stream_id) + preserve_fn(p.on_new_notifications)( + min_stream_id, max_stream_id + ) ) - yield defer.gatherResults(deferreds) + yield preserve_context_over_deferred(defer.gatherResults(deferreds)) except: logger.exception("Exception in pusher on_new_notifications") @@ -155,10 +157,10 @@ class PusherPool: if u in self.pushers: for p in self.pushers[u].values(): deferreds.append( - p.on_new_receipts(min_stream_id, max_stream_id) + preserve_fn(p.on_new_receipts)(min_stream_id, max_stream_id) ) - yield defer.gatherResults(deferreds) + yield preserve_context_over_deferred(defer.gatherResults(deferreds)) except: logger.exception("Exception in pusher on_new_receipts") diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 943f5676a3..2121bd75ea 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -403,10 +403,9 @@ class RegisterRestServlet(RestServlet): # register the user's device device_id = params.get("device_id") initial_display_name = params.get("initial_device_display_name") - device_id = self.device_handler.check_device_registered( + return self.device_handler.check_device_registered( user_id, device_id, initial_display_name ) - return device_id @defer.inlineCallbacks def _do_guest_registration(self): diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 97aef25321..57e5005285 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -20,7 +20,9 @@ from synapse.events import FrozenEvent, USE_FROZEN_DICTS from synapse.events.utils import prune_event from synapse.util.async import ObservableDeferred -from synapse.util.logcontext import preserve_fn, PreserveLoggingContext +from synapse.util.logcontext import ( + preserve_fn, PreserveLoggingContext, preserve_context_over_deferred +) from synapse.util.logutils import log_function from synapse.util.metrics import Measure from synapse.api.constants import EventTypes @@ -202,7 +204,7 @@ class EventsStore(SQLBaseStore): deferreds = [] for room_id, evs_ctxs in partitioned.items(): - d = self._event_persist_queue.add_to_queue( + d = preserve_fn(self._event_persist_queue.add_to_queue)( room_id, evs_ctxs, backfilled=backfilled, current_state=None, @@ -212,7 +214,9 @@ class EventsStore(SQLBaseStore): for room_id in partitioned.keys(): self._maybe_start_persisting(room_id) - return defer.gatherResults(deferreds, consumeErrors=True) + return preserve_context_over_deferred( + defer.gatherResults(deferreds, consumeErrors=True) + ) @defer.inlineCallbacks @log_function @@ -225,7 +229,7 @@ class EventsStore(SQLBaseStore): self._maybe_start_persisting(event.room_id) - yield deferred + yield preserve_context_over_deferred(deferred) max_persisted_id = yield self._stream_id_gen.get_current_token() defer.returnValue((event.internal_metadata.stream_ordering, max_persisted_id)) @@ -1088,7 +1092,7 @@ class EventsStore(SQLBaseStore): if not allow_rejected: rows[:] = [r for r in rows if not r["rejects"]] - res = yield defer.gatherResults( + res = yield preserve_context_over_deferred(defer.gatherResults( [ preserve_fn(self._get_event_from_row)( row["internal_metadata"], row["json"], row["redacts"], @@ -1097,7 +1101,7 @@ class EventsStore(SQLBaseStore): for row in rows ], consumeErrors=True - ) + )) defer.returnValue({ e.event.event_id: e diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 862c5c3ea1..0577a0525b 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -39,7 +39,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached from synapse.api.constants import EventTypes from synapse.types import RoomStreamToken -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred from synapse.storage.engines import PostgresEngine, Sqlite3Engine import logging @@ -234,12 +234,12 @@ class StreamStore(SQLBaseStore): results = {} room_ids = list(room_ids) for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)): - res = yield defer.gatherResults([ + res = yield preserve_context_over_deferred(defer.gatherResults([ preserve_fn(self.get_room_events_stream_for_room)( room_id, from_key, to_key, limit, order=order, ) for room_id in rm_ids - ]) + ])) results.update(dict(zip(rm_ids, res))) defer.returnValue(results) diff --git a/synapse/util/async.py b/synapse/util/async.py index c84b23ff46..347fb1e380 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -146,10 +146,10 @@ def concurrently_execute(func, args, limit): except StopIteration: pass - return defer.gatherResults([ + return preserve_context_over_deferred(defer.gatherResults([ preserve_fn(_concurrently_execute_inner)() for _ in xrange(limit) - ], consumeErrors=True).addErrback(unwrapFirstError) + ], consumeErrors=True)).addErrback(unwrapFirstError) class Linearizer(object): @@ -181,7 +181,8 @@ class Linearizer(object): self.key_to_defer[key] = new_defer if current_defer: - yield preserve_context_over_deferred(current_defer) + with PreserveLoggingContext(): + yield current_defer @contextmanager def _ctx_manager(): @@ -264,7 +265,7 @@ class ReadWriteLock(object): curr_readers.clear() self.key_to_current_writer[key] = new_defer - yield defer.gatherResults(to_wait_on) + yield preserve_context_over_deferred(defer.gatherResults(to_wait_on)) @contextmanager def _ctx_manager(): diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 7a87045f87..6c83eb213d 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -297,12 +297,13 @@ def preserve_context_over_fn(fn, *args, **kwargs): return res -def preserve_context_over_deferred(deferred): +def preserve_context_over_deferred(deferred, context=None): """Given a deferred wrap it such that any callbacks added later to it will be invoked with the current context. """ - current_context = LoggingContext.current_context() - d = _PreservingContextDeferred(current_context) + if context is None: + context = LoggingContext.current_context() + d = _PreservingContextDeferred(context) deferred.chainDeferred(d) return d @@ -316,7 +317,13 @@ def preserve_fn(f): def g(*args, **kwargs): with PreserveLoggingContext(current): - return f(*args, **kwargs) + res = f(*args, **kwargs) + if isinstance(res, defer.Deferred): + return preserve_context_over_deferred( + res, context=LoggingContext.sentinel + ) + else: + return res return g diff --git a/synapse/visibility.py b/synapse/visibility.py index 948ad51772..cc12c0a23d 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -17,7 +17,7 @@ from twisted.internet import defer from synapse.api.constants import Membership, EventTypes -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred import logging @@ -55,12 +55,12 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state): given events events ([synapse.events.EventBase]): list of events to filter """ - forgotten = yield defer.gatherResults([ + forgotten = yield preserve_context_over_deferred(defer.gatherResults([ preserve_fn(store.who_forgot_in_room)( room_id, ) for room_id in frozenset(e.room_id for e in events) - ], consumeErrors=True) + ], consumeErrors=True)) # Set of membership event_ids that have been forgotten event_id_forgotten = frozenset( -- cgit 1.4.1