diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index 1a50a2ec98..63d05f2531 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -19,7 +19,6 @@ from .room import (
)
from .room_member import RoomMemberHandler
from .message import MessageHandler
-from .events import EventStreamHandler, EventHandler
from .federation import FederationHandler
from .profile import ProfileHandler
from .directory import DirectoryHandler
@@ -53,8 +52,6 @@ class Handlers(object):
self.message_handler = MessageHandler(hs)
self.room_creation_handler = RoomCreationHandler(hs)
self.room_member_handler = RoomMemberHandler(hs)
- self.event_stream_handler = EventStreamHandler(hs)
- self.event_handler = EventHandler(hs)
self.federation_handler = FederationHandler(hs)
self.profile_handler = ProfileHandler(hs)
self.directory_handler = DirectoryHandler(hs)
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 051ccdb380..306686a384 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -16,7 +16,8 @@
from twisted.internet import defer
from synapse.api.constants import EventTypes
-from synapse.appservice import ApplicationService
+from synapse.util.metrics import Measure
+from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
import logging
@@ -42,36 +43,73 @@ class ApplicationServicesHandler(object):
self.appservice_api = hs.get_application_service_api()
self.scheduler = hs.get_application_service_scheduler()
self.started_scheduler = False
+ self.clock = hs.get_clock()
+ self.notify_appservices = hs.config.notify_appservices
+
+ self.current_max = 0
+ self.is_processing = False
@defer.inlineCallbacks
- def notify_interested_services(self, event):
+ def notify_interested_services(self, current_id):
"""Notifies (pushes) all application services interested in this event.
Pushing is done asynchronously, so this method won't block for any
prolonged length of time.
Args:
- event(Event): The event to push out to interested services.
+ current_id(int): The current maximum ID.
"""
- # Gather interested services
- services = yield self._get_services_for_event(event)
- if len(services) == 0:
- return # no services need notifying
-
- # Do we know this user exists? If not, poke the user query API for
- # all services which match that user regex. This needs to block as these
- # user queries need to be made BEFORE pushing the event.
- yield self._check_user_exists(event.sender)
- if event.type == EventTypes.Member:
- yield self._check_user_exists(event.state_key)
-
- if not self.started_scheduler:
- self.scheduler.start().addErrback(log_failure)
- self.started_scheduler = True
-
- # Fork off pushes to these services
- for service in services:
- self.scheduler.submit_event_for_as(service, event)
+ services = yield self.store.get_app_services()
+ if not services or not self.notify_appservices:
+ return
+
+ self.current_max = max(self.current_max, current_id)
+ if self.is_processing:
+ return
+
+ with Measure(self.clock, "notify_interested_services"):
+ self.is_processing = True
+ try:
+ upper_bound = self.current_max
+ limit = 100
+ while True:
+ upper_bound, events = yield self.store.get_new_events_for_appservice(
+ upper_bound, limit
+ )
+
+ if not events:
+ break
+
+ for event in events:
+ # Gather interested services
+ services = yield self._get_services_for_event(event)
+ if len(services) == 0:
+ continue # no services need notifying
+
+ # Do we know this user exists? If not, poke the user
+ # query API for all services which match that user regex.
+ # This needs to block as these user queries need to be
+ # made BEFORE pushing the event.
+ yield self._check_user_exists(event.sender)
+ if event.type == EventTypes.Member:
+ yield self._check_user_exists(event.state_key)
+
+ if not self.started_scheduler:
+ self.scheduler.start().addErrback(log_failure)
+ self.started_scheduler = True
+
+ # Fork off pushes to these services
+ for service in services:
+ preserve_fn(self.scheduler.submit_event_for_as)(
+ service, event
+ )
+
+ yield self.store.set_appservice_last_pos(upper_bound)
+
+ if len(events) < limit:
+ break
+ finally:
+ self.is_processing = False
@defer.inlineCallbacks
def query_user_exists(self, user_id):
@@ -104,11 +142,12 @@ class ApplicationServicesHandler(object):
association can be found.
"""
room_alias_str = room_alias.to_string()
- alias_query_services = yield self._get_services_for_event(
- event=None,
- restrict_to=ApplicationService.NS_ALIASES,
- alias_list=[room_alias_str]
- )
+ services = yield self.store.get_app_services()
+ alias_query_services = [
+ s for s in services if (
+ s.is_interested_in_alias(room_alias_str)
+ )
+ ]
for alias_service in alias_query_services:
is_known_alias = yield self.appservice_api.query_alias(
alias_service, room_alias_str
@@ -121,34 +160,35 @@ class ApplicationServicesHandler(object):
defer.returnValue(result)
@defer.inlineCallbacks
- def _get_services_for_event(self, event, restrict_to="", alias_list=None):
+ def query_3pe(self, kind, protocol, fields):
+ services = yield self._get_services_for_3pn(protocol)
+
+ results = yield preserve_context_over_deferred(defer.DeferredList([
+ preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields)
+ for service in services
+ ], consumeErrors=True))
+
+ ret = []
+ for (success, result) in results:
+ if success:
+ ret.extend(result)
+
+ defer.returnValue(ret)
+
+ @defer.inlineCallbacks
+ def _get_services_for_event(self, event):
"""Retrieve a list of application services interested in this event.
Args:
event(Event): The event to check. Can be None if alias_list is not.
- restrict_to(str): The namespace to restrict regex tests to.
- alias_list: A list of aliases to get services for. If None, this
- list is obtained from the database.
Returns:
list<ApplicationService>: A list of services interested in this
event based on the service regex.
"""
- member_list = None
- if hasattr(event, "room_id"):
- # We need to know the aliases associated with this event.room_id,
- # if any.
- if not alias_list:
- alias_list = yield self.store.get_aliases_for_room(
- event.room_id
- )
- # We need to know the members associated with this event.room_id,
- # if any.
- member_list = yield self.store.get_users_in_room(event.room_id)
-
services = yield self.store.get_app_services()
interested_list = [
s for s in services if (
- s.is_interested(event, restrict_to, alias_list, member_list)
+ yield s.is_interested(event, self.store)
)
]
defer.returnValue(interested_list)
@@ -164,6 +204,14 @@ class ApplicationServicesHandler(object):
defer.returnValue(interested_list)
@defer.inlineCallbacks
+ def _get_services_for_3pn(self, protocol):
+ services = yield self.store.get_app_services()
+ interested_list = [
+ s for s in services if s.is_interested_in_protocol(protocol)
+ ]
+ defer.returnValue(interested_list)
+
+ @defer.inlineCallbacks
def _is_unknown_user(self, user_id):
if not self.is_mine_id(user_id):
# we don't know if they are unknown or not since it isn't one of our
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 2e138f328f..6986930c0d 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -70,11 +70,11 @@ class AuthHandler(BaseHandler):
self.ldap_uri = hs.config.ldap_uri
self.ldap_start_tls = hs.config.ldap_start_tls
self.ldap_base = hs.config.ldap_base
- self.ldap_filter = hs.config.ldap_filter
self.ldap_attributes = hs.config.ldap_attributes
if self.ldap_mode == LDAPMode.SEARCH:
self.ldap_bind_dn = hs.config.ldap_bind_dn
self.ldap_bind_password = hs.config.ldap_bind_password
+ self.ldap_filter = hs.config.ldap_filter
self.hs = hs # FIXME better possibility to access registrationHandler later?
self.device_handler = hs.get_device_handler()
@@ -660,7 +660,7 @@ class AuthHandler(BaseHandler):
else:
logger.warn(
"ldap registration failed: unexpected (%d!=1) amount of results",
- len(result)
+ len(conn.response)
)
defer.returnValue(False)
@@ -719,13 +719,14 @@ class AuthHandler(BaseHandler):
return macaroon.serialize()
def validate_short_term_login_token_and_get_user_id(self, login_token):
+ auth_api = self.hs.get_auth()
try:
macaroon = pymacaroons.Macaroon.deserialize(login_token)
- auth_api = self.hs.get_auth()
- auth_api.validate_macaroon(macaroon, "login", True)
- return self.get_user_from_macaroon(macaroon)
- except (pymacaroons.exceptions.MacaroonException, TypeError, ValueError):
- raise AuthError(401, "Invalid token", errcode=Codes.UNKNOWN_TOKEN)
+ user_id = auth_api.get_user_id_from_macaroon(macaroon)
+ auth_api.validate_macaroon(macaroon, "login", True, user_id)
+ return user_id
+ except Exception:
+ raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN)
def _generate_base_macaroon(self, user_id):
macaroon = pymacaroons.Macaroon(
@@ -736,21 +737,11 @@ class AuthHandler(BaseHandler):
macaroon.add_first_party_caveat("user_id = %s" % (user_id,))
return macaroon
- def get_user_from_macaroon(self, macaroon):
- user_prefix = "user_id = "
- for caveat in macaroon.caveats:
- if caveat.caveat_id.startswith(user_prefix):
- return caveat.caveat_id[len(user_prefix):]
- raise AuthError(
- self.INVALID_TOKEN_HTTP_STATUS, "No user_id found in token",
- errcode=Codes.UNKNOWN_TOKEN
- )
-
@defer.inlineCallbacks
def set_password(self, user_id, newpassword, requester=None):
password_hash = self.hash(newpassword)
- except_access_token_ids = [requester.access_token_id] if requester else []
+ except_access_token_id = requester.access_token_id if requester else None
try:
yield self.store.user_set_password_hash(user_id, password_hash)
@@ -759,10 +750,10 @@ class AuthHandler(BaseHandler):
raise SynapseError(404, "Unknown user", Codes.NOT_FOUND)
raise e
yield self.store.user_delete_access_tokens(
- user_id, except_access_token_ids
+ user_id, except_access_token_id
)
yield self.hs.get_pusherpool().remove_pushers_by_user(
- user_id, except_access_token_ids
+ user_id, except_access_token_id
)
@defer.inlineCallbacks
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 618cb53629..01a761715b 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -26,7 +26,9 @@ from synapse.api.errors import (
from synapse.api.constants import EventTypes, Membership, RejectedReason
from synapse.events.validator import EventValidator
from synapse.util import unwrapFirstError
-from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
+from synapse.util.logcontext import (
+ PreserveLoggingContext, preserve_fn, preserve_context_over_deferred
+)
from synapse.util.logutils import log_function
from synapse.util.async import run_on_reactor
from synapse.util.frozenutils import unfreeze
@@ -249,7 +251,7 @@ class FederationHandler(BaseHandler):
if ev.type != EventTypes.Member:
continue
try:
- domain = UserID.from_string(ev.state_key).domain
+ domain = get_domain_from_id(ev.state_key)
except:
continue
@@ -274,7 +276,7 @@ class FederationHandler(BaseHandler):
@log_function
@defer.inlineCallbacks
- def backfill(self, dest, room_id, limit, extremities=[]):
+ def backfill(self, dest, room_id, limit, extremities):
""" Trigger a backfill request to `dest` for the given `room_id`
This will attempt to get more events from the remote. This may return
@@ -284,9 +286,6 @@ class FederationHandler(BaseHandler):
if dest == self.server_name:
raise SynapseError(400, "Can't backfill from self.")
- if not extremities:
- extremities = yield self.store.get_oldest_events_in_room(room_id)
-
events = yield self.replication_layer.backfill(
dest,
room_id,
@@ -364,9 +363,9 @@ class FederationHandler(BaseHandler):
missing_auth - failed_to_fetch
)
- results = yield defer.gatherResults(
+ results = yield preserve_context_over_deferred(defer.gatherResults(
[
- self.replication_layer.get_pdu(
+ preserve_fn(self.replication_layer.get_pdu)(
[dest],
event_id,
outlier=True,
@@ -375,10 +374,10 @@ class FederationHandler(BaseHandler):
for event_id in missing_auth - failed_to_fetch
],
consumeErrors=True
- ).addErrback(unwrapFirstError)
- auth_events.update({a.event_id: a for a in results})
+ )).addErrback(unwrapFirstError)
+ auth_events.update({a.event_id: a for a in results if a})
required_auth.update(
- a_id for event in results for a_id, _ in event.auth_events
+ a_id for event in results for a_id, _ in event.auth_events if event
)
missing_auth = required_auth - set(auth_events)
@@ -455,6 +454,10 @@ class FederationHandler(BaseHandler):
)
max_depth = sorted_extremeties_tuple[0][1]
+ # We don't want to specify too many extremities as it causes the backfill
+ # request URI to be too long.
+ extremities = dict(sorted_extremeties_tuple[:5])
+
if current_depth > max_depth:
logger.debug(
"Not backfilling as we don't need to. %d < %d",
@@ -551,10 +554,10 @@ class FederationHandler(BaseHandler):
event_ids = list(extremities.keys())
- states = yield defer.gatherResults([
- self.state_handler.resolve_state_groups(room_id, [e])
+ states = yield preserve_context_over_deferred(defer.gatherResults([
+ preserve_fn(self.state_handler.resolve_state_groups)(room_id, [e])
for e in event_ids
- ])
+ ]))
states = dict(zip(event_ids, [s[1] for s in states]))
for e_id, _ in sorted_extremeties_tuple:
@@ -1093,16 +1096,17 @@ class FederationHandler(BaseHandler):
)
if event:
- # FIXME: This is a temporary work around where we occasionally
- # return events slightly differently than when they were
- # originally signed
- event.signatures.update(
- compute_event_signature(
- event,
- self.hs.hostname,
- self.hs.config.signing_key[0]
+ if self.hs.is_mine_id(event.event_id):
+ # FIXME: This is a temporary work around where we occasionally
+ # return events slightly differently than when they were
+ # originally signed
+ event.signatures.update(
+ compute_event_signature(
+ event,
+ self.hs.hostname,
+ self.hs.config.signing_key[0]
+ )
)
- )
if do_auth:
in_room = yield self.auth.check_host_in_room(
@@ -1112,6 +1116,12 @@ class FederationHandler(BaseHandler):
if not in_room:
raise AuthError(403, "Host not in room.")
+ events = yield self._filter_events_for_server(
+ origin, event.room_id, [event]
+ )
+
+ event = events[0]
+
defer.returnValue(event)
else:
defer.returnValue(None)
@@ -1158,9 +1168,9 @@ class FederationHandler(BaseHandler):
a bunch of outliers, but not a chunk of individual events that depend
on each other for state calculations.
"""
- contexts = yield defer.gatherResults(
+ contexts = yield preserve_context_over_deferred(defer.gatherResults(
[
- self._prep_event(
+ preserve_fn(self._prep_event)(
origin,
ev_info["event"],
state=ev_info.get("state"),
@@ -1168,7 +1178,7 @@ class FederationHandler(BaseHandler):
)
for ev_info in event_infos
]
- )
+ ))
yield self.store.persist_events(
[
@@ -1452,9 +1462,9 @@ class FederationHandler(BaseHandler):
# Do auth conflict res.
logger.info("Different auth: %s", different_auth)
- different_events = yield defer.gatherResults(
+ different_events = yield preserve_context_over_deferred(defer.gatherResults(
[
- self.store.get_event(
+ preserve_fn(self.store.get_event)(
d,
allow_none=True,
allow_rejected=False,
@@ -1463,7 +1473,7 @@ class FederationHandler(BaseHandler):
if d in have_events and not have_events[d]
],
consumeErrors=True
- ).addErrback(unwrapFirstError)
+ )).addErrback(unwrapFirstError)
if different_events:
local_view = dict(auth_events)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index dc76d34a52..4c3cd9d12e 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -28,7 +28,8 @@ from synapse.types import (
from synapse.util import unwrapFirstError
from synapse.util.async import concurrently_execute, run_on_reactor, ReadWriteLock
from synapse.util.caches.snapshot_cache import SnapshotCache
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
+from synapse.util.metrics import measure_func
from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
@@ -502,15 +503,17 @@ class MessageHandler(BaseHandler):
lambda states: states[event.event_id]
)
- (messages, token), current_state = yield defer.gatherResults(
- [
- self.store.get_recent_events_for_room(
- event.room_id,
- limit=limit,
- end_token=room_end_token,
- ),
- deferred_room_state,
- ]
+ (messages, token), current_state = yield preserve_context_over_deferred(
+ defer.gatherResults(
+ [
+ preserve_fn(self.store.get_recent_events_for_room)(
+ event.room_id,
+ limit=limit,
+ end_token=room_end_token,
+ ),
+ deferred_room_state,
+ ]
+ )
).addErrback(unwrapFirstError)
messages = yield filter_events_for_client(
@@ -719,9 +722,9 @@ class MessageHandler(BaseHandler):
presence, receipts, (messages, token) = yield defer.gatherResults(
[
- get_presence(),
- get_receipts(),
- self.store.get_recent_events_for_room(
+ preserve_fn(get_presence)(),
+ preserve_fn(get_receipts)(),
+ preserve_fn(self.store.get_recent_events_for_room)(
room_id,
limit=limit,
end_token=now_token.room_key,
@@ -755,6 +758,7 @@ class MessageHandler(BaseHandler):
defer.returnValue(ret)
+ @measure_func("_create_new_client_event")
@defer.inlineCallbacks
def _create_new_client_event(self, builder, prev_event_ids=None):
if prev_event_ids:
@@ -806,6 +810,7 @@ class MessageHandler(BaseHandler):
(event, context,)
)
+ @measure_func("handle_new_client_event")
@defer.inlineCallbacks
def handle_new_client_event(
self,
@@ -934,7 +939,7 @@ class MessageHandler(BaseHandler):
@defer.inlineCallbacks
def _notify():
yield run_on_reactor()
- self.notifier.on_new_room_event(
+ yield self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id,
extra_users=extra_users
)
@@ -944,6 +949,6 @@ class MessageHandler(BaseHandler):
# If invite, remove room_state from unsigned before sending.
event.unsigned.pop("invite_room_state", None)
- federation_handler.handle_new_event(
+ preserve_fn(federation_handler.handle_new_event)(
event, destinations=destinations,
)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 6b70fa3817..6a1fe76c88 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -503,7 +503,7 @@ class PresenceHandler(object):
defer.returnValue(states)
@defer.inlineCallbacks
- def _get_interested_parties(self, states):
+ def _get_interested_parties(self, states, calculate_remote_hosts=True):
"""Given a list of states return which entities (rooms, users, servers)
are interested in the given states.
@@ -526,14 +526,15 @@ class PresenceHandler(object):
users_to_states.setdefault(state.user_id, []).append(state)
hosts_to_states = {}
- for room_id, states in room_ids_to_states.items():
- local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
- if not local_states:
- continue
+ if calculate_remote_hosts:
+ for room_id, states in room_ids_to_states.items():
+ local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
+ if not local_states:
+ continue
- hosts = yield self.store.get_joined_hosts_for_room(room_id)
- for host in hosts:
- hosts_to_states.setdefault(host, []).extend(local_states)
+ hosts = yield self.store.get_joined_hosts_for_room(room_id)
+ for host in hosts:
+ hosts_to_states.setdefault(host, []).extend(local_states)
for user_id, states in users_to_states.items():
local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
@@ -565,6 +566,16 @@ class PresenceHandler(object):
self._push_to_remotes(hosts_to_states)
+ @defer.inlineCallbacks
+ def notify_for_states(self, state, stream_id):
+ parties = yield self._get_interested_parties([state])
+ room_ids_to_states, users_to_states, hosts_to_states = parties
+
+ self.notifier.on_new_event(
+ "presence_key", stream_id, rooms=room_ids_to_states.keys(),
+ users=[UserID.from_string(u) for u in users_to_states.keys()]
+ )
+
def _push_to_remotes(self, hosts_to_states):
"""Sends state updates to remote servers.
@@ -672,7 +683,7 @@ class PresenceHandler(object):
])
@defer.inlineCallbacks
- def set_state(self, target_user, state):
+ def set_state(self, target_user, state, ignore_status_msg=False):
"""Set the presence state of the user.
"""
status_msg = state.get("status_msg", None)
@@ -689,10 +700,13 @@ class PresenceHandler(object):
prev_state = yield self.current_state_for_user(user_id)
new_fields = {
- "state": presence,
- "status_msg": status_msg if presence != PresenceState.OFFLINE else None
+ "state": presence
}
+ if not ignore_status_msg:
+ msg = status_msg if presence != PresenceState.OFFLINE else None
+ new_fields["status_msg"] = msg
+
if presence == PresenceState.ONLINE:
new_fields["last_active_ts"] = self.clock.time_msec()
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 8cec8fc4ed..8b17632fdc 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -59,10 +59,13 @@ class RoomMemberHandler(BaseHandler):
prev_event_ids,
txn_id=None,
ratelimit=True,
+ content=None,
):
+ if content is None:
+ content = {}
msg_handler = self.hs.get_handlers().message_handler
- content = {"membership": membership}
+ content["membership"] = membership
if requester.is_guest:
content["kind"] = "guest"
@@ -140,8 +143,9 @@ class RoomMemberHandler(BaseHandler):
remote_room_hosts=None,
third_party_signed=None,
ratelimit=True,
+ content=None,
):
- key = (target, room_id,)
+ key = (room_id,)
with (yield self.member_linearizer.queue(key)):
result = yield self._update_membership(
@@ -153,6 +157,7 @@ class RoomMemberHandler(BaseHandler):
remote_room_hosts=remote_room_hosts,
third_party_signed=third_party_signed,
ratelimit=ratelimit,
+ content=content,
)
defer.returnValue(result)
@@ -168,7 +173,11 @@ class RoomMemberHandler(BaseHandler):
remote_room_hosts=None,
third_party_signed=None,
ratelimit=True,
+ content=None,
):
+ if content is None:
+ content = {}
+
effective_membership_state = action
if action in ["kick", "unban"]:
effective_membership_state = "leave"
@@ -218,7 +227,7 @@ class RoomMemberHandler(BaseHandler):
if inviter and not self.hs.is_mine(inviter):
remote_room_hosts.append(inviter.domain)
- content = {"membership": Membership.JOIN}
+ content["membership"] = Membership.JOIN
profile = self.hs.get_handlers().profile_handler
content["displayname"] = yield profile.get_displayname(target)
@@ -272,6 +281,7 @@ class RoomMemberHandler(BaseHandler):
txn_id=txn_id,
ratelimit=ratelimit,
prev_event_ids=latest_event_ids,
+ content=content,
)
@defer.inlineCallbacks
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 0ee4ebe504..c8dfd02e7b 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -464,10 +464,10 @@ class SyncHandler(object):
else:
state = {}
- defer.returnValue({
- (e.type, e.state_key): e
- for e in sync_config.filter_collection.filter_room_state(state.values())
- })
+ defer.returnValue({
+ (e.type, e.state_key): e
+ for e in sync_config.filter_collection.filter_room_state(state.values())
+ })
@defer.inlineCallbacks
def unread_notifs_for_room_id(self, room_id, sync_config):
@@ -485,9 +485,9 @@ class SyncHandler(object):
)
defer.returnValue(notifs)
- # There is no new information in this period, so your notification
- # count is whatever it was last time.
- defer.returnValue(None)
+ # There is no new information in this period, so your notification
+ # count is whatever it was last time.
+ defer.returnValue(None)
@defer.inlineCallbacks
def generate_sync_result(self, sync_config, since_token=None, full_state=False):
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 5589296c09..46181984c0 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -16,7 +16,9 @@
from twisted.internet import defer
from synapse.api.errors import SynapseError, AuthError
-from synapse.util.logcontext import PreserveLoggingContext
+from synapse.util.logcontext import (
+ PreserveLoggingContext, preserve_fn, preserve_context_over_deferred,
+)
from synapse.util.metrics import Measure
from synapse.types import UserID
@@ -169,13 +171,13 @@ class TypingHandler(object):
deferreds = []
for domain in domains:
if domain == self.server_name:
- self._push_update_local(
+ preserve_fn(self._push_update_local)(
room_id=room_id,
user_id=user_id,
typing=typing
)
else:
- deferreds.append(self.federation.send_edu(
+ deferreds.append(preserve_fn(self.federation.send_edu)(
destination=domain,
edu_type="m.typing",
content={
@@ -185,7 +187,9 @@ class TypingHandler(object):
},
))
- yield defer.DeferredList(deferreds, consumeErrors=True)
+ yield preserve_context_over_deferred(
+ defer.DeferredList(deferreds, consumeErrors=True)
+ )
@defer.inlineCallbacks
def _recv_edu(self, origin, content):
|