diff options
Diffstat (limited to '')
-rw-r--r-- | synapse/handlers/_base.py | 10 | ||||
-rw-r--r-- | synapse/handlers/events.py | 11 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 50 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 20 | ||||
-rw-r--r-- | synapse/handlers/register.py | 2 | ||||
-rw-r--r-- | synapse/handlers/room.py | 11 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 40 |
7 files changed, 57 insertions, 87 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 1423df6cf3..fa83d3e464 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -293,19 +293,11 @@ class BaseHandler(object): with PreserveLoggingContext(): # Don't block waiting on waking up all the listeners. - notify_d = self.notifier.on_new_room_event( + self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, extra_users=extra_users ) - def log_failure(f): - logger.warn( - "Failed to notify about %s: %s", - event.event_id, f.value - ) - - notify_d.addErrback(log_failure) - # If invite, remove room_state from unsigned before sending. event.unsigned.pop("invite_room_state", None) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 5ad8f3779a..4933c31c19 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -18,6 +18,7 @@ from twisted.internet import defer from synapse.util.logutils import log_function from synapse.types import UserID from synapse.events.utils import serialize_event +from synapse.util.logcontext import preserve_context_over_fn from ._base import BaseHandler @@ -29,11 +30,17 @@ logger = logging.getLogger(__name__) def started_user_eventstream(distributor, user): - return distributor.fire("started_user_eventstream", user) + return preserve_context_over_fn( + distributor.fire, + "started_user_eventstream", user + ) def stopped_user_eventstream(distributor, user): - return distributor.fire("stopped_user_eventstream", user) + return preserve_context_over_fn( + distributor.fire, + "stopped_user_eventstream", user + ) class EventStreamHandler(BaseHandler): diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 2ce1e9d6c7..b78b0502d9 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -221,19 +221,11 @@ class FederationHandler(BaseHandler): extra_users.append(target_user) with PreserveLoggingContext(): - d = self.notifier.on_new_room_event( + self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, extra_users=extra_users ) - def log_failure(f): - logger.warn( - "Failed to notify about %s: %s", - event.event_id, f.value - ) - - d.addErrback(log_failure) - if event.type == EventTypes.Member: if event.membership == Membership.JOIN: prev_state = context.current_state.get((event.type, event.state_key)) @@ -643,19 +635,11 @@ class FederationHandler(BaseHandler): ) with PreserveLoggingContext(): - d = self.notifier.on_new_room_event( + self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, extra_users=[joinee] ) - def log_failure(f): - logger.warn( - "Failed to notify about %s: %s", - event.event_id, f.value - ) - - d.addErrback(log_failure) - logger.debug("Finished joining %s to %s", joinee, room_id) finally: room_queue = self.room_queues[room_id] @@ -730,18 +714,10 @@ class FederationHandler(BaseHandler): extra_users.append(target_user) with PreserveLoggingContext(): - d = self.notifier.on_new_room_event( + self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, extra_users=extra_users ) - def log_failure(f): - logger.warn( - "Failed to notify about %s: %s", - event.event_id, f.value - ) - - d.addErrback(log_failure) - if event.type == EventTypes.Member: if event.content["membership"] == Membership.JOIN: user = UserID.from_string(event.state_key) @@ -811,19 +787,11 @@ class FederationHandler(BaseHandler): target_user = UserID.from_string(event.state_key) with PreserveLoggingContext(): - d = self.notifier.on_new_room_event( + self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, extra_users=[target_user], ) - def log_failure(f): - logger.warn( - "Failed to notify about %s: %s", - event.event_id, f.value - ) - - d.addErrback(log_failure) - defer.returnValue(event) @defer.inlineCallbacks @@ -948,18 +916,10 @@ class FederationHandler(BaseHandler): extra_users.append(target_user) with PreserveLoggingContext(): - d = self.notifier.on_new_room_event( + self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, extra_users=extra_users ) - def log_failure(f): - logger.warn( - "Failed to notify about %s: %s", - event.event_id, f.value - ) - - d.addErrback(log_failure) - new_pdu = event destinations = set() diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index d0c21ff5c9..b61394f2b5 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -378,9 +378,9 @@ class PresenceHandler(BaseHandler): was_polling = target_user in self._user_cachemap if now_online and not was_polling: - self.start_polling_presence(target_user, state=state) + yield self.start_polling_presence(target_user, state=state) elif not now_online and was_polling: - self.stop_polling_presence(target_user) + yield self.stop_polling_presence(target_user) # TODO(paul): perform a presence push as part of start/stop poll so # we don't have to do this all the time @@ -394,7 +394,8 @@ class PresenceHandler(BaseHandler): if now - prev_state.state.get("last_active", 0) < LAST_ACTIVE_GRANULARITY: return - self.changed_presencelike_data(user, {"last_active": now}) + with PreserveLoggingContext(): + self.changed_presencelike_data(user, {"last_active": now}) def get_joined_rooms_for_user(self, user): """Get the list of rooms a user is joined to. @@ -466,11 +467,12 @@ class PresenceHandler(BaseHandler): local_user, room_ids=[room_id], add_to_cache=False ) - self.push_update_to_local_and_remote( - observed_user=local_user, - users_to_push=[user], - statuscache=statuscache, - ) + with PreserveLoggingContext(): + self.push_update_to_local_and_remote( + observed_user=local_user, + users_to_push=[user], + statuscache=statuscache, + ) @defer.inlineCallbacks def send_presence_invite(self, observer_user, observed_user): @@ -556,7 +558,7 @@ class PresenceHandler(BaseHandler): observer_user.localpart, observed_user.to_string() ) - self.start_polling_presence( + yield self.start_polling_presence( observer_user, target_user=observed_user ) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 2660fd21a2..24c850ae9b 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -186,7 +186,7 @@ class RegistrationHandler(BaseHandler): token=token, password_hash="" ) - registered_user(self.distributor, user) + yield registered_user(self.distributor, user) defer.returnValue((user_id, token)) @defer.inlineCallbacks diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index bfd7e44e9f..a8e3a9029c 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -25,6 +25,7 @@ from synapse.api.constants import ( from synapse.api.errors import AuthError, StoreError, SynapseError, Codes from synapse.util import stringutils, unwrapFirstError from synapse.util.async import run_on_reactor +from synapse.util.logcontext import preserve_context_over_fn from signedjson.sign import verify_signed_json from signedjson.key import decode_verify_key_bytes @@ -46,11 +47,17 @@ def collect_presencelike_data(distributor, user, content): def user_left_room(distributor, user, room_id): - return distributor.fire("user_left_room", user=user, room_id=room_id) + return preserve_context_over_fn( + distributor.fire, + "user_left_room", user=user, room_id=room_id + ) def user_joined_room(distributor, user, room_id): - return distributor.fire("user_joined_room", user=user, room_id=room_id) + return preserve_context_over_fn( + distributor.fire, + "user_joined_room", user=user, room_id=room_id + ) class RoomCreationHandler(BaseHandler): diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 72271f2626..3f1cda5b0b 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -18,7 +18,7 @@ from ._base import BaseHandler from synapse.streams.config import PaginationConfig from synapse.api.constants import Membership, EventTypes from synapse.util import unwrapFirstError -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from twisted.internet import defer @@ -241,15 +241,16 @@ class SyncHandler(BaseHandler): deferreds = [] for event in room_list: if event.membership == Membership.JOIN: - room_sync_deferred = self.full_state_sync_for_joined_room( - room_id=event.room_id, - sync_config=sync_config, - now_token=now_token, - timeline_since_token=timeline_since_token, - ephemeral_by_room=ephemeral_by_room, - tags_by_room=tags_by_room, - account_data_by_room=account_data_by_room, - ) + with PreserveLoggingContext(LoggingContext.current_context()): + room_sync_deferred = self.full_state_sync_for_joined_room( + room_id=event.room_id, + sync_config=sync_config, + now_token=now_token, + timeline_since_token=timeline_since_token, + ephemeral_by_room=ephemeral_by_room, + tags_by_room=tags_by_room, + account_data_by_room=account_data_by_room, + ) room_sync_deferred.addCallback(joined.append) deferreds.append(room_sync_deferred) elif event.membership == Membership.INVITE: @@ -262,15 +263,16 @@ class SyncHandler(BaseHandler): leave_token = now_token.copy_and_replace( "room_key", "s%d" % (event.stream_ordering,) ) - room_sync_deferred = self.full_state_sync_for_archived_room( - sync_config=sync_config, - room_id=event.room_id, - leave_event_id=event.event_id, - leave_token=leave_token, - timeline_since_token=timeline_since_token, - tags_by_room=tags_by_room, - account_data_by_room=account_data_by_room, - ) + with PreserveLoggingContext(LoggingContext.current_context()): + room_sync_deferred = self.full_state_sync_for_archived_room( + sync_config=sync_config, + room_id=event.room_id, + leave_event_id=event.event_id, + leave_token=leave_token, + timeline_since_token=timeline_since_token, + tags_by_room=tags_by_room, + account_data_by_room=account_data_by_room, + ) room_sync_deferred.addCallback(archived.append) deferreds.append(room_sync_deferred) |