diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/appservice.py | 8 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 28 | ||||
-rw-r--r-- | synapse/handlers/message.py | 35 | ||||
-rw-r--r-- | synapse/handlers/typing.py | 12 |
4 files changed, 47 insertions, 36 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 94d7817432..b440280b74 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -17,7 +17,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes from synapse.util.metrics import Measure -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred import logging @@ -163,10 +163,10 @@ class ApplicationServicesHandler(object): def query_3pe(self, kind, protocol, fields): services = yield self._get_services_for_3pn(protocol) - results = yield defer.DeferredList([ - self.appservice_api.query_3pe(service, kind, protocol, fields) + results = yield preserve_context_over_deferred(defer.DeferredList([ + preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields) for service in services - ], consumeErrors=True) + ], consumeErrors=True)) ret = [] for (success, result) in results: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index fdc6cbfa00..01a761715b 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -26,7 +26,9 @@ from synapse.api.errors import ( from synapse.api.constants import EventTypes, Membership, RejectedReason from synapse.events.validator import EventValidator from synapse.util import unwrapFirstError -from synapse.util.logcontext import PreserveLoggingContext, preserve_fn +from synapse.util.logcontext import ( + PreserveLoggingContext, preserve_fn, preserve_context_over_deferred +) from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor from synapse.util.frozenutils import unfreeze @@ -361,9 +363,9 @@ class FederationHandler(BaseHandler): missing_auth - failed_to_fetch ) - results = yield defer.gatherResults( + results = yield preserve_context_over_deferred(defer.gatherResults( [ - self.replication_layer.get_pdu( + preserve_fn(self.replication_layer.get_pdu)( [dest], event_id, outlier=True, @@ -372,7 +374,7 @@ class FederationHandler(BaseHandler): for event_id in missing_auth - failed_to_fetch ], consumeErrors=True - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) auth_events.update({a.event_id: a for a in results if a}) required_auth.update( a_id for event in results for a_id, _ in event.auth_events if event @@ -552,10 +554,10 @@ class FederationHandler(BaseHandler): event_ids = list(extremities.keys()) - states = yield defer.gatherResults([ - self.state_handler.resolve_state_groups(room_id, [e]) + states = yield preserve_context_over_deferred(defer.gatherResults([ + preserve_fn(self.state_handler.resolve_state_groups)(room_id, [e]) for e in event_ids - ]) + ])) states = dict(zip(event_ids, [s[1] for s in states])) for e_id, _ in sorted_extremeties_tuple: @@ -1166,9 +1168,9 @@ class FederationHandler(BaseHandler): a bunch of outliers, but not a chunk of individual events that depend on each other for state calculations. """ - contexts = yield defer.gatherResults( + contexts = yield preserve_context_over_deferred(defer.gatherResults( [ - self._prep_event( + preserve_fn(self._prep_event)( origin, ev_info["event"], state=ev_info.get("state"), @@ -1176,7 +1178,7 @@ class FederationHandler(BaseHandler): ) for ev_info in event_infos ] - ) + )) yield self.store.persist_events( [ @@ -1460,9 +1462,9 @@ class FederationHandler(BaseHandler): # Do auth conflict res. logger.info("Different auth: %s", different_auth) - different_events = yield defer.gatherResults( + different_events = yield preserve_context_over_deferred(defer.gatherResults( [ - self.store.get_event( + preserve_fn(self.store.get_event)( d, allow_none=True, allow_rejected=False, @@ -1471,7 +1473,7 @@ class FederationHandler(BaseHandler): if d in have_events and not have_events[d] ], consumeErrors=True - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) if different_events: local_view = dict(auth_events) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index dc76d34a52..4c3cd9d12e 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -28,7 +28,8 @@ from synapse.types import ( from synapse.util import unwrapFirstError from synapse.util.async import concurrently_execute, run_on_reactor, ReadWriteLock from synapse.util.caches.snapshot_cache import SnapshotCache -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred +from synapse.util.metrics import measure_func from synapse.visibility import filter_events_for_client from ._base import BaseHandler @@ -502,15 +503,17 @@ class MessageHandler(BaseHandler): lambda states: states[event.event_id] ) - (messages, token), current_state = yield defer.gatherResults( - [ - self.store.get_recent_events_for_room( - event.room_id, - limit=limit, - end_token=room_end_token, - ), - deferred_room_state, - ] + (messages, token), current_state = yield preserve_context_over_deferred( + defer.gatherResults( + [ + preserve_fn(self.store.get_recent_events_for_room)( + event.room_id, + limit=limit, + end_token=room_end_token, + ), + deferred_room_state, + ] + ) ).addErrback(unwrapFirstError) messages = yield filter_events_for_client( @@ -719,9 +722,9 @@ class MessageHandler(BaseHandler): presence, receipts, (messages, token) = yield defer.gatherResults( [ - get_presence(), - get_receipts(), - self.store.get_recent_events_for_room( + preserve_fn(get_presence)(), + preserve_fn(get_receipts)(), + preserve_fn(self.store.get_recent_events_for_room)( room_id, limit=limit, end_token=now_token.room_key, @@ -755,6 +758,7 @@ class MessageHandler(BaseHandler): defer.returnValue(ret) + @measure_func("_create_new_client_event") @defer.inlineCallbacks def _create_new_client_event(self, builder, prev_event_ids=None): if prev_event_ids: @@ -806,6 +810,7 @@ class MessageHandler(BaseHandler): (event, context,) ) + @measure_func("handle_new_client_event") @defer.inlineCallbacks def handle_new_client_event( self, @@ -934,7 +939,7 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def _notify(): yield run_on_reactor() - self.notifier.on_new_room_event( + yield self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, extra_users=extra_users ) @@ -944,6 +949,6 @@ class MessageHandler(BaseHandler): # If invite, remove room_state from unsigned before sending. event.unsigned.pop("invite_room_state", None) - federation_handler.handle_new_event( + preserve_fn(federation_handler.handle_new_event)( event, destinations=destinations, ) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 5589296c09..46181984c0 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -16,7 +16,9 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError -from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.logcontext import ( + PreserveLoggingContext, preserve_fn, preserve_context_over_deferred, +) from synapse.util.metrics import Measure from synapse.types import UserID @@ -169,13 +171,13 @@ class TypingHandler(object): deferreds = [] for domain in domains: if domain == self.server_name: - self._push_update_local( + preserve_fn(self._push_update_local)( room_id=room_id, user_id=user_id, typing=typing ) else: - deferreds.append(self.federation.send_edu( + deferreds.append(preserve_fn(self.federation.send_edu)( destination=domain, edu_type="m.typing", content={ @@ -185,7 +187,9 @@ class TypingHandler(object): }, )) - yield defer.DeferredList(deferreds, consumeErrors=True) + yield preserve_context_over_deferred( + defer.DeferredList(deferreds, consumeErrors=True) + ) @defer.inlineCallbacks def _recv_edu(self, origin, content): |