From e7fd336a53a4ca489cdafc389b494d5477019dc0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 9 Sep 2020 16:17:50 +0100 Subject: Fixup pusher pool notifications --- synapse/handlers/federation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 43f2986f89..74d7ac8a67 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2970,7 +2970,7 @@ class FederationHandler(BaseHandler): event, event_stream_id, max_stream_id, extra_users=extra_users ) - await self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id) + await self.pusher_pool.on_new_notifications(max_stream_id) async def _clean_room_for_join(self, room_id: str) -> None: """Called to clean up any data in DB for a given room, ready for the -- cgit 1.5.1 From dc9dcdbd59d4f839c7a96780f7464460fae27851 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 9 Sep 2020 16:19:22 +0100 Subject: Revert "Fixup pusher pool notifications" This reverts commit e7fd336a53a4ca489cdafc389b494d5477019dc0. --- synapse/handlers/federation.py | 2 +- synapse/handlers/message.py | 2 +- synapse/push/emailpusher.py | 2 +- synapse/push/httppusher.py | 2 +- synapse/push/pusherpool.py | 19 +++---------------- synapse/replication/tcp/client.py | 3 +-- tests/handlers/test_typing.py | 1 - 7 files changed, 8 insertions(+), 23 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 74d7ac8a67..43f2986f89 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2970,7 +2970,7 @@ class FederationHandler(BaseHandler): event, event_stream_id, max_stream_id, extra_users=extra_users ) - await self.pusher_pool.on_new_notifications(max_stream_id) + await self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id) async def _clean_room_for_join(self, room_id: str) -> None: """Called to clean up any data in DB for a given room, ready for the diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index d1556659e3..8a7b4916cd 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1145,7 +1145,7 @@ class EventCreationHandler: # If there's an expiry timestamp on the event, schedule its expiry. self._message_handler.maybe_schedule_expiry(event) - await self.pusher_pool.on_new_notifications(max_stream_id) + await self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id) def _notify(): try: diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 28bd8ab748..b7ea4438e0 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -91,7 +91,7 @@ class EmailPusher: pass self.timed_call = None - def on_new_notifications(self, max_stream_ordering): + def on_new_notifications(self, min_stream_ordering, max_stream_ordering): if self.max_stream_ordering: self.max_stream_ordering = max( max_stream_ordering, self.max_stream_ordering diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 26706bf3e1..f21fa9b659 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -114,7 +114,7 @@ class HttpPusher: if should_check_for_notifs: self._start_processing() - def on_new_notifications(self, max_stream_ordering): + def on_new_notifications(self, min_stream_ordering, max_stream_ordering): self.max_stream_ordering = max( max_stream_ordering, self.max_stream_ordering or 0 ) diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index fa8473bf8d..3c3262a88c 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -64,12 +64,6 @@ class PusherPool: self._pusher_shard_config = hs.config.push.pusher_shard_config self._instance_name = hs.get_instance_name() - # Record the last stream ID that we were poked about so we can get - # changes since then. We set this to the current max stream ID on - # startup as every individual pusher will have checked for changes on - # startup. - self._last_room_stream_id_seen = self.store.get_room_max_stream_ordering() - # map from user id to app_id:pushkey to pusher self.pushers = {} # type: Dict[str, Dict[str, Union[HttpPusher, EmailPusher]]] @@ -184,27 +178,20 @@ class PusherPool: ) await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"]) - async def on_new_notifications(self, max_stream_id): + async def on_new_notifications(self, min_stream_id, max_stream_id): if not self.pushers: # nothing to do here. return - if max_stream_id < self._last_room_stream_id_seen: - # Nothing to do - return - - prev_stream_id = self._last_room_stream_id_seen - self._last_room_stream_id_seen = max_stream_id - try: users_affected = await self.store.get_push_action_users_in_range( - prev_stream_id, max_stream_id + min_stream_id, max_stream_id ) for u in users_affected: if u in self.pushers: for p in self.pushers[u].values(): - p.on_new_notifications(max_stream_id) + p.on_new_notifications(min_stream_id, max_stream_id) except Exception: logger.exception("Exception in pusher on_new_notifications") diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index ccd3147dfd..d6ecf5b327 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -154,8 +154,7 @@ class ReplicationDataHandler: max_token = self.store.get_room_max_stream_ordering() self.notifier.on_new_room_event(event, token, max_token, extra_users) - max_token = self.store.get_room_max_stream_ordering() - await self.pusher_pool.on_new_notifications(max_token) + await self.pusher_pool.on_new_notifications(token, token) # Notify any waiting deferreds. The list is ordered by position so we # just iterate through the list until we reach a position that is diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index f306a09bfa..ae6bc24f4c 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -80,7 +80,6 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): "get_user_directory_stream_pos", "get_current_state_deltas", "get_device_updates_by_remote", - "get_room_max_stream_ordering", ] ) -- cgit 1.5.1 From c9dbee50aefc22390f600a0219ca7fa1ae9acd88 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 9 Sep 2020 16:56:08 +0100 Subject: Fixup pusher pool notifications (#8287) `pusher_pool.on_new_notifications` expected a min and max stream ID, however that was not what we were passing in. Instead, let's just pass it the current max stream ID and have it track the last stream ID it got passed. I believe that it mostly worked as we called the function for every event. However, it would break for events that got persisted out of order, i.e, that were persisted but the max stream ID wasn't incremented as not all preceding events had finished persisting, and push for that event would be delayed until another event got pushed to the effected users. --- changelog.d/8287.bugfix | 1 + synapse/handlers/federation.py | 2 +- synapse/handlers/message.py | 2 +- synapse/push/emailpusher.py | 2 +- synapse/push/httppusher.py | 2 +- synapse/push/pusherpool.py | 19 ++++++++++++++++--- synapse/replication/tcp/client.py | 3 ++- tests/handlers/test_typing.py | 1 + 8 files changed, 24 insertions(+), 8 deletions(-) create mode 100644 changelog.d/8287.bugfix (limited to 'synapse/handlers/federation.py') diff --git a/changelog.d/8287.bugfix b/changelog.d/8287.bugfix new file mode 100644 index 0000000000..839781aa07 --- /dev/null +++ b/changelog.d/8287.bugfix @@ -0,0 +1 @@ +Fix edge case where push could get delayed for a user until a later event was pushed. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 43f2986f89..74d7ac8a67 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2970,7 +2970,7 @@ class FederationHandler(BaseHandler): event, event_stream_id, max_stream_id, extra_users=extra_users ) - await self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id) + await self.pusher_pool.on_new_notifications(max_stream_id) async def _clean_room_for_join(self, room_id: str) -> None: """Called to clean up any data in DB for a given room, ready for the diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 8a7b4916cd..d1556659e3 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1145,7 +1145,7 @@ class EventCreationHandler: # If there's an expiry timestamp on the event, schedule its expiry. self._message_handler.maybe_schedule_expiry(event) - await self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id) + await self.pusher_pool.on_new_notifications(max_stream_id) def _notify(): try: diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index b7ea4438e0..28bd8ab748 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -91,7 +91,7 @@ class EmailPusher: pass self.timed_call = None - def on_new_notifications(self, min_stream_ordering, max_stream_ordering): + def on_new_notifications(self, max_stream_ordering): if self.max_stream_ordering: self.max_stream_ordering = max( max_stream_ordering, self.max_stream_ordering diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index f21fa9b659..26706bf3e1 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -114,7 +114,7 @@ class HttpPusher: if should_check_for_notifs: self._start_processing() - def on_new_notifications(self, min_stream_ordering, max_stream_ordering): + def on_new_notifications(self, max_stream_ordering): self.max_stream_ordering = max( max_stream_ordering, self.max_stream_ordering or 0 ) diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 3c3262a88c..fa8473bf8d 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -64,6 +64,12 @@ class PusherPool: self._pusher_shard_config = hs.config.push.pusher_shard_config self._instance_name = hs.get_instance_name() + # Record the last stream ID that we were poked about so we can get + # changes since then. We set this to the current max stream ID on + # startup as every individual pusher will have checked for changes on + # startup. + self._last_room_stream_id_seen = self.store.get_room_max_stream_ordering() + # map from user id to app_id:pushkey to pusher self.pushers = {} # type: Dict[str, Dict[str, Union[HttpPusher, EmailPusher]]] @@ -178,20 +184,27 @@ class PusherPool: ) await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"]) - async def on_new_notifications(self, min_stream_id, max_stream_id): + async def on_new_notifications(self, max_stream_id): if not self.pushers: # nothing to do here. return + if max_stream_id < self._last_room_stream_id_seen: + # Nothing to do + return + + prev_stream_id = self._last_room_stream_id_seen + self._last_room_stream_id_seen = max_stream_id + try: users_affected = await self.store.get_push_action_users_in_range( - min_stream_id, max_stream_id + prev_stream_id, max_stream_id ) for u in users_affected: if u in self.pushers: for p in self.pushers[u].values(): - p.on_new_notifications(min_stream_id, max_stream_id) + p.on_new_notifications(max_stream_id) except Exception: logger.exception("Exception in pusher on_new_notifications") diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index d6ecf5b327..ccd3147dfd 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -154,7 +154,8 @@ class ReplicationDataHandler: max_token = self.store.get_room_max_stream_ordering() self.notifier.on_new_room_event(event, token, max_token, extra_users) - await self.pusher_pool.on_new_notifications(token, token) + max_token = self.store.get_room_max_stream_ordering() + await self.pusher_pool.on_new_notifications(max_token) # Notify any waiting deferreds. The list is ordered by position so we # just iterate through the list until we reach a position that is diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index ae6bc24f4c..f306a09bfa 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -80,6 +80,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): "get_user_directory_stream_pos", "get_current_state_deltas", "get_device_updates_by_remote", + "get_room_max_stream_ordering", ] ) -- cgit 1.5.1 From 2ea1c682490a19e0e2df069702c1dbe419389fa4 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 9 Sep 2020 12:22:00 -0400 Subject: Remove some unused distributor signals (#8216) Removes the `user_joined_room` and stops calling it since there are no observers. Also cleans-up some other unused signals and related code. --- changelog.d/8216.misc | 1 + synapse/handlers/events.py | 4 --- synapse/handlers/federation.py | 43 +---------------------------- synapse/handlers/room_member.py | 42 +++------------------------- synapse/handlers/room_member_worker.py | 9 ------ synapse/replication/http/membership.py | 10 +++---- synapse/util/distributor.py | 50 ++++++---------------------------- 7 files changed, 18 insertions(+), 141 deletions(-) create mode 100644 changelog.d/8216.misc (limited to 'synapse/handlers/federation.py') diff --git a/changelog.d/8216.misc b/changelog.d/8216.misc new file mode 100644 index 0000000000..b38911b0e5 --- /dev/null +++ b/changelog.d/8216.misc @@ -0,0 +1 @@ +Simplify the distributor code to avoid unnecessary work. diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index b05e32f457..fdce54c5c3 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -39,10 +39,6 @@ class EventStreamHandler(BaseHandler): def __init__(self, hs: "HomeServer"): super(EventStreamHandler, self).__init__(hs) - self.distributor = hs.get_distributor() - self.distributor.declare("started_user_eventstream") - self.distributor.declare("stopped_user_eventstream") - self.clock = hs.get_clock() self.notifier = hs.get_notifier() diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 74d7ac8a67..be9b0701a0 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -69,7 +69,6 @@ from synapse.replication.http.federation import ( ReplicationFederationSendEventsRestServlet, ReplicationStoreRoomOnInviteRestServlet, ) -from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet from synapse.state import StateResolutionStore, resolve_events_with_store from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.types import ( @@ -80,7 +79,6 @@ from synapse.types import ( get_domain_from_id, ) from synapse.util.async_helpers import Linearizer, concurrently_execute -from synapse.util.distributor import user_joined_room from synapse.util.retryutils import NotRetryingDestination from synapse.util.stringutils import shortstr from synapse.visibility import filter_events_for_server @@ -141,9 +139,6 @@ class FederationHandler(BaseHandler): self._replication = hs.get_replication_data_handler() self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs) - self._notify_user_membership_change = ReplicationUserJoinedLeftRoomRestServlet.make_client( - hs - ) self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client( hs ) @@ -704,31 +699,10 @@ class FederationHandler(BaseHandler): logger.debug("[%s %s] Processing event: %s", room_id, event_id, event) try: - context = await self._handle_new_event(origin, event, state=state) + await self._handle_new_event(origin, event, state=state) except AuthError as e: raise FederationError("ERROR", e.code, e.msg, affected=event.event_id) - if event.type == EventTypes.Member: - if event.membership == Membership.JOIN: - # Only fire user_joined_room if the user has acutally - # joined the room. Don't bother if the user is just - # changing their profile info. - newly_joined = True - - prev_state_ids = await context.get_prev_state_ids() - - prev_state_id = prev_state_ids.get((event.type, event.state_key)) - if prev_state_id: - prev_state = await self.store.get_event( - prev_state_id, allow_none=True - ) - if prev_state and prev_state.membership == Membership.JOIN: - newly_joined = False - - if newly_joined: - user = UserID.from_string(event.state_key) - await self.user_joined_room(user, room_id) - # For encrypted messages we check that we know about the sending device, # if we don't then we mark the device cache for that user as stale. if event.type == EventTypes.Encrypted: @@ -1550,11 +1524,6 @@ class FederationHandler(BaseHandler): event.signatures, ) - if event.type == EventTypes.Member: - if event.content["membership"] == Membership.JOIN: - user = UserID.from_string(event.state_key) - await self.user_joined_room(user, event.room_id) - prev_state_ids = await context.get_prev_state_ids() state_ids = list(prev_state_ids.values()) @@ -2984,16 +2953,6 @@ class FederationHandler(BaseHandler): else: await self.store.clean_room_for_join(room_id) - async def user_joined_room(self, user: UserID, room_id: str) -> None: - """Called when a new user has joined the room - """ - if self.config.worker_app: - await self._notify_user_membership_change( - room_id=room_id, user_id=user.to_string(), change="joined" - ) - else: - user_joined_room(self.distributor, user, room_id) - async def get_room_complexity( self, remote_room_hosts: List[str], room_id: str ) -> Optional[dict]: diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 32b7e323fa..100f335b80 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -40,7 +40,7 @@ from synapse.events.validator import EventValidator from synapse.storage.roommember import RoomsForUser from synapse.types import JsonDict, Requester, RoomAlias, RoomID, StateMap, UserID from synapse.util.async_helpers import Linearizer -from synapse.util.distributor import user_joined_room, user_left_room +from synapse.util.distributor import user_left_room from ._base import BaseHandler @@ -148,17 +148,6 @@ class RoomMemberHandler: """ raise NotImplementedError() - @abc.abstractmethod - async def _user_joined_room(self, target: UserID, room_id: str) -> None: - """Notifies distributor on master process that the user has joined the - room. - - Args: - target - room_id - """ - raise NotImplementedError() - @abc.abstractmethod async def _user_left_room(self, target: UserID, room_id: str) -> None: """Notifies distributor on master process that the user has left the @@ -221,7 +210,6 @@ class RoomMemberHandler: prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None) - newly_joined = False if event.membership == Membership.JOIN: newly_joined = True if prev_member_event_id: @@ -246,12 +234,7 @@ class RoomMemberHandler: requester, event, context, extra_users=[target], ratelimit=ratelimit, ) - if event.membership == Membership.JOIN and newly_joined: - # Only fire user_joined_room if the user has actually joined the - # room. Don't bother if the user is just changing their profile - # info. - await self._user_joined_room(target, room_id) - elif event.membership == Membership.LEAVE: + if event.membership == Membership.LEAVE: if prev_member_event_id: prev_member_event = await self.store.get_event(prev_member_event_id) if prev_member_event.membership == Membership.JOIN: @@ -726,17 +709,7 @@ class RoomMemberHandler: (EventTypes.Member, event.state_key), None ) - if event.membership == Membership.JOIN: - # Only fire user_joined_room if the user has actually joined the - # room. Don't bother if the user is just changing their profile - # info. - newly_joined = True - if prev_member_event_id: - prev_member_event = await self.store.get_event(prev_member_event_id) - newly_joined = prev_member_event.membership != Membership.JOIN - if newly_joined: - await self._user_joined_room(target_user, room_id) - elif event.membership == Membership.LEAVE: + if event.membership == Membership.LEAVE: if prev_member_event_id: prev_member_event = await self.store.get_event(prev_member_event_id) if prev_member_event.membership == Membership.JOIN: @@ -1002,10 +975,9 @@ class RoomMemberHandler: class RoomMemberMasterHandler(RoomMemberHandler): def __init__(self, hs): - super(RoomMemberMasterHandler, self).__init__(hs) + super().__init__(hs) self.distributor = hs.get_distributor() - self.distributor.declare("user_joined_room") self.distributor.declare("user_left_room") async def _is_remote_room_too_complex( @@ -1085,7 +1057,6 @@ class RoomMemberMasterHandler(RoomMemberHandler): event_id, stream_id = await self.federation_handler.do_invite_join( remote_room_hosts, room_id, user.to_string(), content ) - await self._user_joined_room(user, room_id) # Check the room we just joined wasn't too large, if we didn't fetch the # complexity of it before. @@ -1228,11 +1199,6 @@ class RoomMemberMasterHandler(RoomMemberHandler): ) return event.event_id, stream_id - async def _user_joined_room(self, target: UserID, room_id: str) -> None: - """Implements RoomMemberHandler._user_joined_room - """ - user_joined_room(self.distributor, target, room_id) - async def _user_left_room(self, target: UserID, room_id: str) -> None: """Implements RoomMemberHandler._user_left_room """ diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py index 897338fd54..e7f34737c6 100644 --- a/synapse/handlers/room_member_worker.py +++ b/synapse/handlers/room_member_worker.py @@ -57,8 +57,6 @@ class RoomMemberWorkerHandler(RoomMemberHandler): content=content, ) - await self._user_joined_room(user, room_id) - return ret["event_id"], ret["stream_id"] async def remote_reject_invite( @@ -81,13 +79,6 @@ class RoomMemberWorkerHandler(RoomMemberHandler): ) return ret["event_id"], ret["stream_id"] - async def _user_joined_room(self, target: UserID, room_id: str) -> None: - """Implements RoomMemberHandler._user_joined_room - """ - await self._notify_change_client( - user_id=target.to_string(), room_id=room_id, change="joined" - ) - async def _user_left_room(self, target: UserID, room_id: str) -> None: """Implements RoomMemberHandler._user_left_room """ diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py index 741329ab5f..08095fdf7d 100644 --- a/synapse/replication/http/membership.py +++ b/synapse/replication/http/membership.py @@ -19,7 +19,7 @@ from typing import TYPE_CHECKING, Optional from synapse.http.servlet import parse_json_object_from_request from synapse.replication.http._base import ReplicationEndpoint from synapse.types import JsonDict, Requester, UserID -from synapse.util.distributor import user_joined_room, user_left_room +from synapse.util.distributor import user_left_room if TYPE_CHECKING: from synapse.server import HomeServer @@ -181,9 +181,9 @@ class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint): Args: room_id (str) user_id (str) - change (str): Either "joined" or "left" + change (str): "left" """ - assert change in ("joined", "left") + assert change == "left" return {} @@ -192,9 +192,7 @@ class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint): user = UserID.from_string(user_id) - if change == "joined": - user_joined_room(self.distributor, user, room_id) - elif change == "left": + if change == "left": user_left_room(self.distributor, user, room_id) else: raise Exception("Unrecognized change: %r", change) diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py index a750261e77..f73e95393c 100644 --- a/synapse/util/distributor.py +++ b/synapse/util/distributor.py @@ -16,8 +16,6 @@ import inspect import logging from twisted.internet import defer -from twisted.internet.defer import Deferred, fail, succeed -from twisted.python import failure from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.metrics.background_process_metrics import run_as_background_process @@ -29,11 +27,6 @@ def user_left_room(distributor, user, room_id): distributor.fire("user_left_room", user=user, room_id=room_id) -# XXX: this is no longer used. We should probably kill it. -def user_joined_room(distributor, user, room_id): - distributor.fire("user_joined_room", user=user, room_id=room_id) - - class Distributor: """A central dispatch point for loosely-connected pieces of code to register, observe, and fire signals. @@ -81,28 +74,6 @@ class Distributor: run_as_background_process(name, self.signals[name].fire, *args, **kwargs) -def maybeAwaitableDeferred(f, *args, **kw): - """ - Invoke a function that may or may not return a Deferred or an Awaitable. - - This is a modified version of twisted.internet.defer.maybeDeferred. - """ - try: - result = f(*args, **kw) - except Exception: - return fail(failure.Failure(captureVars=Deferred.debug)) - - if isinstance(result, Deferred): - return result - # Handle the additional case of an awaitable being returned. - elif inspect.isawaitable(result): - return defer.ensureDeferred(result) - elif isinstance(result, failure.Failure): - return fail(result) - else: - return succeed(result) - - class Signal: """A Signal is a dispatch point that stores a list of callables as observers of it. @@ -132,22 +103,17 @@ class Signal: Returns a Deferred that will complete when all the observers have completed.""" - def do(observer): - def eb(failure): + async def do(observer): + try: + result = observer(*args, **kwargs) + if inspect.isawaitable(result): + result = await result + return result + except Exception as e: logger.warning( - "%s signal observer %s failed: %r", - self.name, - observer, - failure, - exc_info=( - failure.type, - failure.value, - failure.getTracebackObject(), - ), + "%s signal observer %s failed: %r", self.name, observer, e, ) - return maybeAwaitableDeferred(observer, *args, **kwargs).addErrback(eb) - deferreds = [run_in_background(do, o) for o in self.observers] return make_deferred_yieldable( -- cgit 1.5.1