From 921913935176f5bd3df5e5b960d87c94a2adb304 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 23 Aug 2016 15:23:39 +0100 Subject: Preserve some logcontexts --- synapse/handlers/appservice.py | 8 ++++---- synapse/handlers/federation.py | 28 +++++++++++++++------------- synapse/handlers/message.py | 35 ++++++++++++++++++++--------------- synapse/handlers/typing.py | 12 ++++++++---- 4 files changed, 47 insertions(+), 36 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index dd285452cd..306686a384 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -17,7 +17,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes from synapse.util.metrics import Measure -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred import logging @@ -163,10 +163,10 @@ class ApplicationServicesHandler(object): def query_3pe(self, kind, protocol, fields): services = yield self._get_services_for_3pn(protocol) - results = yield defer.DeferredList([ - self.appservice_api.query_3pe(service, kind, protocol, fields) + results = yield preserve_context_over_deferred(defer.DeferredList([ + preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields) for service in services - ], consumeErrors=True) + ], consumeErrors=True)) ret = [] for (success, result) in results: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 328f8f4842..fe3092b14b 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -26,7 +26,9 @@ from synapse.api.errors import ( from synapse.api.constants import EventTypes, Membership, RejectedReason from synapse.events.validator import EventValidator from synapse.util import unwrapFirstError -from synapse.util.logcontext import PreserveLoggingContext, preserve_fn +from synapse.util.logcontext import ( + PreserveLoggingContext, preserve_fn, preserve_context_over_deferred +) from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor from synapse.util.frozenutils import unfreeze @@ -361,9 +363,9 @@ class FederationHandler(BaseHandler): missing_auth - failed_to_fetch ) - results = yield defer.gatherResults( + results = yield preserve_context_over_deferred(defer.gatherResults( [ - self.replication_layer.get_pdu( + preserve_fn(self.replication_layer.get_pdu)( [dest], event_id, outlier=True, @@ -372,7 +374,7 @@ class FederationHandler(BaseHandler): for event_id in missing_auth - failed_to_fetch ], consumeErrors=True - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) auth_events.update({a.event_id: a for a in results}) required_auth.update( a_id for event in results for a_id, _ in event.auth_events @@ -552,10 +554,10 @@ class FederationHandler(BaseHandler): event_ids = list(extremities.keys()) - states = yield defer.gatherResults([ - self.state_handler.resolve_state_groups(room_id, [e]) + states = yield preserve_context_over_deferred(defer.gatherResults([ + preserve_fn(self.state_handler.resolve_state_groups)(room_id, [e]) for e in event_ids - ]) + ])) states = dict(zip(event_ids, [s[1] for s in states])) for e_id, _ in sorted_extremeties_tuple: @@ -1166,9 +1168,9 @@ class FederationHandler(BaseHandler): a bunch of outliers, but not a chunk of individual events that depend on each other for state calculations. """ - contexts = yield defer.gatherResults( + contexts = yield preserve_context_over_deferred(defer.gatherResults( [ - self._prep_event( + preserve_fn(self._prep_event)( origin, ev_info["event"], state=ev_info.get("state"), @@ -1176,7 +1178,7 @@ class FederationHandler(BaseHandler): ) for ev_info in event_infos ] - ) + )) yield self.store.persist_events( [ @@ -1460,9 +1462,9 @@ class FederationHandler(BaseHandler): # Do auth conflict res. logger.info("Different auth: %s", different_auth) - different_events = yield defer.gatherResults( + different_events = yield preserve_context_over_deferred(defer.gatherResults( [ - self.store.get_event( + preserve_fn(self.store.get_event)( d, allow_none=True, allow_rejected=False, @@ -1471,7 +1473,7 @@ class FederationHandler(BaseHandler): if d in have_events and not have_events[d] ], consumeErrors=True - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) if different_events: local_view = dict(auth_events) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index dc76d34a52..4c3cd9d12e 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -28,7 +28,8 @@ from synapse.types import ( from synapse.util import unwrapFirstError from synapse.util.async import concurrently_execute, run_on_reactor, ReadWriteLock from synapse.util.caches.snapshot_cache import SnapshotCache -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred +from synapse.util.metrics import measure_func from synapse.visibility import filter_events_for_client from ._base import BaseHandler @@ -502,15 +503,17 @@ class MessageHandler(BaseHandler): lambda states: states[event.event_id] ) - (messages, token), current_state = yield defer.gatherResults( - [ - self.store.get_recent_events_for_room( - event.room_id, - limit=limit, - end_token=room_end_token, - ), - deferred_room_state, - ] + (messages, token), current_state = yield preserve_context_over_deferred( + defer.gatherResults( + [ + preserve_fn(self.store.get_recent_events_for_room)( + event.room_id, + limit=limit, + end_token=room_end_token, + ), + deferred_room_state, + ] + ) ).addErrback(unwrapFirstError) messages = yield filter_events_for_client( @@ -719,9 +722,9 @@ class MessageHandler(BaseHandler): presence, receipts, (messages, token) = yield defer.gatherResults( [ - get_presence(), - get_receipts(), - self.store.get_recent_events_for_room( + preserve_fn(get_presence)(), + preserve_fn(get_receipts)(), + preserve_fn(self.store.get_recent_events_for_room)( room_id, limit=limit, end_token=now_token.room_key, @@ -755,6 +758,7 @@ class MessageHandler(BaseHandler): defer.returnValue(ret) + @measure_func("_create_new_client_event") @defer.inlineCallbacks def _create_new_client_event(self, builder, prev_event_ids=None): if prev_event_ids: @@ -806,6 +810,7 @@ class MessageHandler(BaseHandler): (event, context,) ) + @measure_func("handle_new_client_event") @defer.inlineCallbacks def handle_new_client_event( self, @@ -934,7 +939,7 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def _notify(): yield run_on_reactor() - self.notifier.on_new_room_event( + yield self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, extra_users=extra_users ) @@ -944,6 +949,6 @@ class MessageHandler(BaseHandler): # If invite, remove room_state from unsigned before sending. event.unsigned.pop("invite_room_state", None) - federation_handler.handle_new_event( + preserve_fn(federation_handler.handle_new_event)( event, destinations=destinations, ) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 5589296c09..46181984c0 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -16,7 +16,9 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError -from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.logcontext import ( + PreserveLoggingContext, preserve_fn, preserve_context_over_deferred, +) from synapse.util.metrics import Measure from synapse.types import UserID @@ -169,13 +171,13 @@ class TypingHandler(object): deferreds = [] for domain in domains: if domain == self.server_name: - self._push_update_local( + preserve_fn(self._push_update_local)( room_id=room_id, user_id=user_id, typing=typing ) else: - deferreds.append(self.federation.send_edu( + deferreds.append(preserve_fn(self.federation.send_edu)( destination=domain, edu_type="m.typing", content={ @@ -185,7 +187,9 @@ class TypingHandler(object): }, )) - yield defer.DeferredList(deferreds, consumeErrors=True) + yield preserve_context_over_deferred( + defer.DeferredList(deferreds, consumeErrors=True) + ) @defer.inlineCallbacks def _recv_edu(self, origin, content): -- cgit 1.5.1 From 9899824b851bc1e1ccaa0da8cb2dc5bf783014e8 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 24 Aug 2016 12:33:01 +0100 Subject: Initial hack at the 3PN protocols metadata lookup API --- synapse/appservice/__init__.py | 2 ++ synapse/handlers/appservice.py | 8 +++++++ synapse/rest/client/v2_alpha/thirdparty.py | 34 ++++++++++++++++++++++++++++++ 3 files changed, 44 insertions(+) (limited to 'synapse/handlers') diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index bde9b51b2e..126a10efb7 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -88,6 +88,8 @@ class ApplicationService(object): self.sender = sender self.namespaces = self._check_namespaces(namespaces) self.id = id + + # .protocols is a publicly visible field if protocols: self.protocols = set(protocols) else: diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index dd285452cd..6273ff524c 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -175,6 +175,14 @@ class ApplicationServicesHandler(object): defer.returnValue(ret) + @defer.inlineCallbacks + def get_3pe_protocols(self): + services = yield self.store.get_app_services() + protocols = set() + for s in services: + protocols.update(s.protocols) + defer.returnValue(protocols) + @defer.inlineCallbacks def _get_services_for_event(self, event): """Retrieve a list of application services interested in this event. diff --git a/synapse/rest/client/v2_alpha/thirdparty.py b/synapse/rest/client/v2_alpha/thirdparty.py index 8821e101bf..2decadb001 100644 --- a/synapse/rest/client/v2_alpha/thirdparty.py +++ b/synapse/rest/client/v2_alpha/thirdparty.py @@ -25,6 +25,39 @@ from ._base import client_v2_patterns logger = logging.getLogger(__name__) +class ThirdPartyProtocolsServlet(RestServlet): + PATTERNS = client_v2_patterns("/thirdparty/protocols", releases=()) + + META = { + # TODO(paul): Declare kinds of metadata in here + } + + def __init__(self, hs): + super(ThirdPartyProtocolsServlet, self).__init__() + + self.auth = hs.get_auth() + self.appservice_handler = hs.get_application_service_handler() + + @defer.inlineCallbacks + def on_GET(self, request): + yield self.auth.get_user_by_req(request) + + protocols = yield self.appservice_handler.get_3pe_protocols() + + result = {} + # TODO(paul): Eventually this kind of metadata wants to come from the + # ASes themselves + for protocol in protocols: + if protocol in self.META: + result[protocol] = self.META[protocol] + else: + # We don't know any metadata for it, but we'd best at least + # still declare that we know it exists + result[protocol] = {} + + defer.returnValue((200, result)) + + class ThirdPartyUserServlet(RestServlet): PATTERNS = client_v2_patterns("/thirdparty/user(/(?P[^/]+))?$", releases=()) @@ -74,5 +107,6 @@ class ThirdPartyLocationServlet(RestServlet): def register_servlets(hs, http_server): + ThirdPartyProtocolsServlet(hs).register(http_server) ThirdPartyUserServlet(hs).register(http_server) ThirdPartyLocationServlet(hs).register(http_server) -- cgit 1.5.1 From 8e1ed09dff902c84beb9dff59305aa78e8772d81 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 24 Aug 2016 13:01:53 +0100 Subject: Move static knowledge of protocol metadata into AS handler; cache the result --- synapse/handlers/appservice.py | 26 ++++++++++++++++++++++++-- synapse/rest/client/v2_alpha/thirdparty.py | 21 +-------------------- 2 files changed, 25 insertions(+), 22 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 6273ff524c..1c5c5ec302 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -37,6 +37,13 @@ def log_failure(failure): class ApplicationServicesHandler(object): + PROTOCOL_META = { + # TODO(paul): Declare kinds of metadata in here + "gitter": { + "user_fields": ["username"], + } + } + def __init__(self, hs): self.store = hs.get_datastore() self.is_mine_id = hs.is_mine_id @@ -48,6 +55,7 @@ class ApplicationServicesHandler(object): self.current_max = 0 self.is_processing = False + self.supported_protocols = None @defer.inlineCallbacks def notify_interested_services(self, current_id): @@ -177,10 +185,24 @@ class ApplicationServicesHandler(object): @defer.inlineCallbacks def get_3pe_protocols(self): + # The set of supported AS protocols and the metadata about them is + # effectively static during the lifetime of a homeserver process. We + # can look this up once and just cache it. + if self.supported_protocols: + defer.returnValue(self.supported_protocols) + services = yield self.store.get_app_services() - protocols = set() + protocols = {} for s in services: - protocols.update(s.protocols) + for p in s.protocols: + if p in self.PROTOCOL_META: + protocols[p] = self.PROTOCOL_META[p] + else: + # We don't know any metadata for it, but we'd best at least + # still declare that we know it exists + protocols[p] = {} + + self.supported_protocols = protocols defer.returnValue(protocols) @defer.inlineCallbacks diff --git a/synapse/rest/client/v2_alpha/thirdparty.py b/synapse/rest/client/v2_alpha/thirdparty.py index ae7901551b..bbc3e9b962 100644 --- a/synapse/rest/client/v2_alpha/thirdparty.py +++ b/synapse/rest/client/v2_alpha/thirdparty.py @@ -28,13 +28,6 @@ logger = logging.getLogger(__name__) class ThirdPartyProtocolsServlet(RestServlet): PATTERNS = client_v2_patterns("/thirdparty/protocols", releases=()) - META = { - # TODO(paul): Declare kinds of metadata in here - "gitter": { - "user_fields": ["username"], - } - } - def __init__(self, hs): super(ThirdPartyProtocolsServlet, self).__init__() @@ -46,19 +39,7 @@ class ThirdPartyProtocolsServlet(RestServlet): yield self.auth.get_user_by_req(request) protocols = yield self.appservice_handler.get_3pe_protocols() - - result = {} - # TODO(paul): Eventually this kind of metadata wants to come from the - # ASes themselves - for protocol in protocols: - if protocol in self.META: - result[protocol] = self.META[protocol] - else: - # We don't know any metadata for it, but we'd best at least - # still declare that we know it exists - result[protocol] = {} - - defer.returnValue((200, result)) + defer.returnValue((200, protocols)) class ThirdPartyUserServlet(RestServlet): -- cgit 1.5.1 From 5474824975b5372665b0921960a0c3887a49efdb Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Thu, 25 Aug 2016 15:10:06 +0100 Subject: Actually query over AS API for 3PE lookup metadata --- synapse/appservice/api.py | 20 ++++++++++++++++++++ synapse/handlers/appservice.py | 14 +------------- 2 files changed, 21 insertions(+), 13 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index 07ec33b9a1..1de0e7240f 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -52,6 +52,13 @@ class ApplicationServiceApi(SimpleHttpClient): pushing. """ + PROTOCOL_META = { + # TODO(paul): Declare kinds of metadata in here + "gitter": { + "user_fields": ["username"], + } + } + def __init__(self, hs): super(ApplicationServiceApi, self).__init__(hs) self.clock = hs.get_clock() @@ -131,6 +138,19 @@ class ApplicationServiceApi(SimpleHttpClient): logger.warning("query_3pe to %s threw exception %s", uri, ex) defer.returnValue([]) + @defer.inlineCallbacks + def get_3pe_protocol(self, service, protocol): + # TODO: cache + uri = "%s/thirdparty/protocol/%s" % (service.url, urllib.quote(protocol)) + try: + response = yield self.get_json(uri, {}) + defer.returnValue(response) + except Exception as ex: + logger.warning("query_3pe_protocol to %s threw exception %s", + uri, ex + ) + defer.returnValue({}) + @defer.inlineCallbacks def push_bulk(self, service, events, txn_id=None): events = self._serialize(events) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 1c5c5ec302..efb9da798d 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -37,13 +37,6 @@ def log_failure(failure): class ApplicationServicesHandler(object): - PROTOCOL_META = { - # TODO(paul): Declare kinds of metadata in here - "gitter": { - "user_fields": ["username"], - } - } - def __init__(self, hs): self.store = hs.get_datastore() self.is_mine_id = hs.is_mine_id @@ -195,12 +188,7 @@ class ApplicationServicesHandler(object): protocols = {} for s in services: for p in s.protocols: - if p in self.PROTOCOL_META: - protocols[p] = self.PROTOCOL_META[p] - else: - # We don't know any metadata for it, but we'd best at least - # still declare that we know it exists - protocols[p] = {} + protocols[p] = yield self.appservice_api.get_3pe_protocol(s, p) self.supported_protocols = protocols defer.returnValue(protocols) -- cgit 1.5.1 From c435bfee9c2536b23bd2fbd0a590231a4bd2e050 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Thu, 25 Aug 2016 15:57:07 +0100 Subject: Don't need toplevel cache on 3PE lookup metadata any more --- synapse/handlers/appservice.py | 8 -------- 1 file changed, 8 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index efb9da798d..94d7817432 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -48,7 +48,6 @@ class ApplicationServicesHandler(object): self.current_max = 0 self.is_processing = False - self.supported_protocols = None @defer.inlineCallbacks def notify_interested_services(self, current_id): @@ -178,19 +177,12 @@ class ApplicationServicesHandler(object): @defer.inlineCallbacks def get_3pe_protocols(self): - # The set of supported AS protocols and the metadata about them is - # effectively static during the lifetime of a homeserver process. We - # can look this up once and just cache it. - if self.supported_protocols: - defer.returnValue(self.supported_protocols) - services = yield self.store.get_app_services() protocols = {} for s in services: for p in s.protocols: protocols[p] = yield self.appservice_api.get_3pe_protocol(s, p) - self.supported_protocols = protocols defer.returnValue(protocols) @defer.inlineCallbacks -- cgit 1.5.1