summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/appservice.py8
-rw-r--r--synapse/handlers/federation.py28
-rw-r--r--synapse/handlers/message.py35
-rw-r--r--synapse/handlers/typing.py12
4 files changed, 47 insertions, 36 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index dd285452cd..306686a384 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -17,7 +17,7 @@ from twisted.internet import defer
 
 from synapse.api.constants import EventTypes
 from synapse.util.metrics import Measure
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
 
 import logging
 
@@ -163,10 +163,10 @@ class ApplicationServicesHandler(object):
     def query_3pe(self, kind, protocol, fields):
         services = yield self._get_services_for_3pn(protocol)
 
-        results = yield defer.DeferredList([
-            self.appservice_api.query_3pe(service, kind, protocol, fields)
+        results = yield preserve_context_over_deferred(defer.DeferredList([
+            preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields)
             for service in services
-        ], consumeErrors=True)
+        ], consumeErrors=True))
 
         ret = []
         for (success, result) in results:
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index fdc6cbfa00..01a761715b 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -26,7 +26,9 @@ from synapse.api.errors import (
 from synapse.api.constants import EventTypes, Membership, RejectedReason
 from synapse.events.validator import EventValidator
 from synapse.util import unwrapFirstError
-from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
+from synapse.util.logcontext import (
+    PreserveLoggingContext, preserve_fn, preserve_context_over_deferred
+)
 from synapse.util.logutils import log_function
 from synapse.util.async import run_on_reactor
 from synapse.util.frozenutils import unfreeze
@@ -361,9 +363,9 @@ class FederationHandler(BaseHandler):
                     missing_auth - failed_to_fetch
                 )
 
-                results = yield defer.gatherResults(
+                results = yield preserve_context_over_deferred(defer.gatherResults(
                     [
-                        self.replication_layer.get_pdu(
+                        preserve_fn(self.replication_layer.get_pdu)(
                             [dest],
                             event_id,
                             outlier=True,
@@ -372,7 +374,7 @@ class FederationHandler(BaseHandler):
                         for event_id in missing_auth - failed_to_fetch
                     ],
                     consumeErrors=True
-                ).addErrback(unwrapFirstError)
+                )).addErrback(unwrapFirstError)
                 auth_events.update({a.event_id: a for a in results if a})
                 required_auth.update(
                     a_id for event in results for a_id, _ in event.auth_events if event
@@ -552,10 +554,10 @@ class FederationHandler(BaseHandler):
 
         event_ids = list(extremities.keys())
 
-        states = yield defer.gatherResults([
-            self.state_handler.resolve_state_groups(room_id, [e])
+        states = yield preserve_context_over_deferred(defer.gatherResults([
+            preserve_fn(self.state_handler.resolve_state_groups)(room_id, [e])
             for e in event_ids
-        ])
+        ]))
         states = dict(zip(event_ids, [s[1] for s in states]))
 
         for e_id, _ in sorted_extremeties_tuple:
@@ -1166,9 +1168,9 @@ class FederationHandler(BaseHandler):
         a bunch of outliers, but not a chunk of individual events that depend
         on each other for state calculations.
         """
-        contexts = yield defer.gatherResults(
+        contexts = yield preserve_context_over_deferred(defer.gatherResults(
             [
-                self._prep_event(
+                preserve_fn(self._prep_event)(
                     origin,
                     ev_info["event"],
                     state=ev_info.get("state"),
@@ -1176,7 +1178,7 @@ class FederationHandler(BaseHandler):
                 )
                 for ev_info in event_infos
             ]
-        )
+        ))
 
         yield self.store.persist_events(
             [
@@ -1460,9 +1462,9 @@ class FederationHandler(BaseHandler):
             # Do auth conflict res.
             logger.info("Different auth: %s", different_auth)
 
-            different_events = yield defer.gatherResults(
+            different_events = yield preserve_context_over_deferred(defer.gatherResults(
                 [
-                    self.store.get_event(
+                    preserve_fn(self.store.get_event)(
                         d,
                         allow_none=True,
                         allow_rejected=False,
@@ -1471,7 +1473,7 @@ class FederationHandler(BaseHandler):
                     if d in have_events and not have_events[d]
                 ],
                 consumeErrors=True
-            ).addErrback(unwrapFirstError)
+            )).addErrback(unwrapFirstError)
 
             if different_events:
                 local_view = dict(auth_events)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index dc76d34a52..4c3cd9d12e 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -28,7 +28,8 @@ from synapse.types import (
 from synapse.util import unwrapFirstError
 from synapse.util.async import concurrently_execute, run_on_reactor, ReadWriteLock
 from synapse.util.caches.snapshot_cache import SnapshotCache
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
+from synapse.util.metrics import measure_func
 from synapse.visibility import filter_events_for_client
 
 from ._base import BaseHandler
@@ -502,15 +503,17 @@ class MessageHandler(BaseHandler):
                         lambda states: states[event.event_id]
                     )
 
-                (messages, token), current_state = yield defer.gatherResults(
-                    [
-                        self.store.get_recent_events_for_room(
-                            event.room_id,
-                            limit=limit,
-                            end_token=room_end_token,
-                        ),
-                        deferred_room_state,
-                    ]
+                (messages, token), current_state = yield preserve_context_over_deferred(
+                    defer.gatherResults(
+                        [
+                            preserve_fn(self.store.get_recent_events_for_room)(
+                                event.room_id,
+                                limit=limit,
+                                end_token=room_end_token,
+                            ),
+                            deferred_room_state,
+                        ]
+                    )
                 ).addErrback(unwrapFirstError)
 
                 messages = yield filter_events_for_client(
@@ -719,9 +722,9 @@ class MessageHandler(BaseHandler):
 
         presence, receipts, (messages, token) = yield defer.gatherResults(
             [
-                get_presence(),
-                get_receipts(),
-                self.store.get_recent_events_for_room(
+                preserve_fn(get_presence)(),
+                preserve_fn(get_receipts)(),
+                preserve_fn(self.store.get_recent_events_for_room)(
                     room_id,
                     limit=limit,
                     end_token=now_token.room_key,
@@ -755,6 +758,7 @@ class MessageHandler(BaseHandler):
 
         defer.returnValue(ret)
 
+    @measure_func("_create_new_client_event")
     @defer.inlineCallbacks
     def _create_new_client_event(self, builder, prev_event_ids=None):
         if prev_event_ids:
@@ -806,6 +810,7 @@ class MessageHandler(BaseHandler):
             (event, context,)
         )
 
+    @measure_func("handle_new_client_event")
     @defer.inlineCallbacks
     def handle_new_client_event(
         self,
@@ -934,7 +939,7 @@ class MessageHandler(BaseHandler):
         @defer.inlineCallbacks
         def _notify():
             yield run_on_reactor()
-            self.notifier.on_new_room_event(
+            yield self.notifier.on_new_room_event(
                 event, event_stream_id, max_stream_id,
                 extra_users=extra_users
             )
@@ -944,6 +949,6 @@ class MessageHandler(BaseHandler):
         # If invite, remove room_state from unsigned before sending.
         event.unsigned.pop("invite_room_state", None)
 
-        federation_handler.handle_new_event(
+        preserve_fn(federation_handler.handle_new_event)(
             event, destinations=destinations,
         )
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 5589296c09..46181984c0 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -16,7 +16,9 @@
 from twisted.internet import defer
 
 from synapse.api.errors import SynapseError, AuthError
-from synapse.util.logcontext import PreserveLoggingContext
+from synapse.util.logcontext import (
+    PreserveLoggingContext, preserve_fn, preserve_context_over_deferred,
+)
 from synapse.util.metrics import Measure
 from synapse.types import UserID
 
@@ -169,13 +171,13 @@ class TypingHandler(object):
         deferreds = []
         for domain in domains:
             if domain == self.server_name:
-                self._push_update_local(
+                preserve_fn(self._push_update_local)(
                     room_id=room_id,
                     user_id=user_id,
                     typing=typing
                 )
             else:
-                deferreds.append(self.federation.send_edu(
+                deferreds.append(preserve_fn(self.federation.send_edu)(
                     destination=domain,
                     edu_type="m.typing",
                     content={
@@ -185,7 +187,9 @@ class TypingHandler(object):
                     },
                 ))
 
-        yield defer.DeferredList(deferreds, consumeErrors=True)
+        yield preserve_context_over_deferred(
+            defer.DeferredList(deferreds, consumeErrors=True)
+        )
 
     @defer.inlineCallbacks
     def _recv_edu(self, origin, content):