diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index dd285452cd..b440280b74 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -17,7 +17,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes
from synapse.util.metrics import Measure
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
import logging
@@ -163,10 +163,10 @@ class ApplicationServicesHandler(object):
def query_3pe(self, kind, protocol, fields):
services = yield self._get_services_for_3pn(protocol)
- results = yield defer.DeferredList([
- self.appservice_api.query_3pe(service, kind, protocol, fields)
+ results = yield preserve_context_over_deferred(defer.DeferredList([
+ preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields)
for service in services
- ], consumeErrors=True)
+ ], consumeErrors=True))
ret = []
for (success, result) in results:
@@ -176,6 +176,16 @@ class ApplicationServicesHandler(object):
defer.returnValue(ret)
@defer.inlineCallbacks
+ def get_3pe_protocols(self):
+ services = yield self.store.get_app_services()
+ protocols = {}
+ for s in services:
+ for p in s.protocols:
+ protocols[p] = yield self.appservice_api.get_3pe_protocol(s, p)
+
+ defer.returnValue(protocols)
+
+ @defer.inlineCallbacks
def _get_services_for_event(self, event):
"""Retrieve a list of application services interested in this event.
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index fdc6cbfa00..01a761715b 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -26,7 +26,9 @@ from synapse.api.errors import (
from synapse.api.constants import EventTypes, Membership, RejectedReason
from synapse.events.validator import EventValidator
from synapse.util import unwrapFirstError
-from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
+from synapse.util.logcontext import (
+ PreserveLoggingContext, preserve_fn, preserve_context_over_deferred
+)
from synapse.util.logutils import log_function
from synapse.util.async import run_on_reactor
from synapse.util.frozenutils import unfreeze
@@ -361,9 +363,9 @@ class FederationHandler(BaseHandler):
missing_auth - failed_to_fetch
)
- results = yield defer.gatherResults(
+ results = yield preserve_context_over_deferred(defer.gatherResults(
[
- self.replication_layer.get_pdu(
+ preserve_fn(self.replication_layer.get_pdu)(
[dest],
event_id,
outlier=True,
@@ -372,7 +374,7 @@ class FederationHandler(BaseHandler):
for event_id in missing_auth - failed_to_fetch
],
consumeErrors=True
- ).addErrback(unwrapFirstError)
+ )).addErrback(unwrapFirstError)
auth_events.update({a.event_id: a for a in results if a})
required_auth.update(
a_id for event in results for a_id, _ in event.auth_events if event
@@ -552,10 +554,10 @@ class FederationHandler(BaseHandler):
event_ids = list(extremities.keys())
- states = yield defer.gatherResults([
- self.state_handler.resolve_state_groups(room_id, [e])
+ states = yield preserve_context_over_deferred(defer.gatherResults([
+ preserve_fn(self.state_handler.resolve_state_groups)(room_id, [e])
for e in event_ids
- ])
+ ]))
states = dict(zip(event_ids, [s[1] for s in states]))
for e_id, _ in sorted_extremeties_tuple:
@@ -1166,9 +1168,9 @@ class FederationHandler(BaseHandler):
a bunch of outliers, but not a chunk of individual events that depend
on each other for state calculations.
"""
- contexts = yield defer.gatherResults(
+ contexts = yield preserve_context_over_deferred(defer.gatherResults(
[
- self._prep_event(
+ preserve_fn(self._prep_event)(
origin,
ev_info["event"],
state=ev_info.get("state"),
@@ -1176,7 +1178,7 @@ class FederationHandler(BaseHandler):
)
for ev_info in event_infos
]
- )
+ ))
yield self.store.persist_events(
[
@@ -1460,9 +1462,9 @@ class FederationHandler(BaseHandler):
# Do auth conflict res.
logger.info("Different auth: %s", different_auth)
- different_events = yield defer.gatherResults(
+ different_events = yield preserve_context_over_deferred(defer.gatherResults(
[
- self.store.get_event(
+ preserve_fn(self.store.get_event)(
d,
allow_none=True,
allow_rejected=False,
@@ -1471,7 +1473,7 @@ class FederationHandler(BaseHandler):
if d in have_events and not have_events[d]
],
consumeErrors=True
- ).addErrback(unwrapFirstError)
+ )).addErrback(unwrapFirstError)
if different_events:
local_view = dict(auth_events)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index dc76d34a52..4c3cd9d12e 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -28,7 +28,8 @@ from synapse.types import (
from synapse.util import unwrapFirstError
from synapse.util.async import concurrently_execute, run_on_reactor, ReadWriteLock
from synapse.util.caches.snapshot_cache import SnapshotCache
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
+from synapse.util.metrics import measure_func
from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
@@ -502,15 +503,17 @@ class MessageHandler(BaseHandler):
lambda states: states[event.event_id]
)
- (messages, token), current_state = yield defer.gatherResults(
- [
- self.store.get_recent_events_for_room(
- event.room_id,
- limit=limit,
- end_token=room_end_token,
- ),
- deferred_room_state,
- ]
+ (messages, token), current_state = yield preserve_context_over_deferred(
+ defer.gatherResults(
+ [
+ preserve_fn(self.store.get_recent_events_for_room)(
+ event.room_id,
+ limit=limit,
+ end_token=room_end_token,
+ ),
+ deferred_room_state,
+ ]
+ )
).addErrback(unwrapFirstError)
messages = yield filter_events_for_client(
@@ -719,9 +722,9 @@ class MessageHandler(BaseHandler):
presence, receipts, (messages, token) = yield defer.gatherResults(
[
- get_presence(),
- get_receipts(),
- self.store.get_recent_events_for_room(
+ preserve_fn(get_presence)(),
+ preserve_fn(get_receipts)(),
+ preserve_fn(self.store.get_recent_events_for_room)(
room_id,
limit=limit,
end_token=now_token.room_key,
@@ -755,6 +758,7 @@ class MessageHandler(BaseHandler):
defer.returnValue(ret)
+ @measure_func("_create_new_client_event")
@defer.inlineCallbacks
def _create_new_client_event(self, builder, prev_event_ids=None):
if prev_event_ids:
@@ -806,6 +810,7 @@ class MessageHandler(BaseHandler):
(event, context,)
)
+ @measure_func("handle_new_client_event")
@defer.inlineCallbacks
def handle_new_client_event(
self,
@@ -934,7 +939,7 @@ class MessageHandler(BaseHandler):
@defer.inlineCallbacks
def _notify():
yield run_on_reactor()
- self.notifier.on_new_room_event(
+ yield self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id,
extra_users=extra_users
)
@@ -944,6 +949,6 @@ class MessageHandler(BaseHandler):
# If invite, remove room_state from unsigned before sending.
event.unsigned.pop("invite_room_state", None)
- federation_handler.handle_new_event(
+ preserve_fn(federation_handler.handle_new_event)(
event, destinations=destinations,
)
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 5589296c09..46181984c0 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -16,7 +16,9 @@
from twisted.internet import defer
from synapse.api.errors import SynapseError, AuthError
-from synapse.util.logcontext import PreserveLoggingContext
+from synapse.util.logcontext import (
+ PreserveLoggingContext, preserve_fn, preserve_context_over_deferred,
+)
from synapse.util.metrics import Measure
from synapse.types import UserID
@@ -169,13 +171,13 @@ class TypingHandler(object):
deferreds = []
for domain in domains:
if domain == self.server_name:
- self._push_update_local(
+ preserve_fn(self._push_update_local)(
room_id=room_id,
user_id=user_id,
typing=typing
)
else:
- deferreds.append(self.federation.send_edu(
+ deferreds.append(preserve_fn(self.federation.send_edu)(
destination=domain,
edu_type="m.typing",
content={
@@ -185,7 +187,9 @@ class TypingHandler(object):
},
))
- yield defer.DeferredList(deferreds, consumeErrors=True)
+ yield preserve_context_over_deferred(
+ defer.DeferredList(deferreds, consumeErrors=True)
+ )
@defer.inlineCallbacks
def _recv_edu(self, origin, content):
|