diff options
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r-- | synapse/notifier.py | 55 |
1 files changed, 22 insertions, 33 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py index 1acd899fab..60e5409895 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -220,6 +220,8 @@ class Notifier: # down. self.remote_server_up_callbacks: List[Callable[[str], None]] = [] + self._third_party_rules = hs.get_third_party_event_rules() + self.clock = hs.get_clock() self.appservice_handler = hs.get_application_service_handler() self._pusher_pool = hs.get_pusherpool() @@ -267,7 +269,7 @@ class Notifier: """ self.replication_callbacks.append(cb) - def on_new_room_event( + async def on_new_room_event( self, event: EventBase, event_pos: PersistedEventPosition, @@ -275,9 +277,10 @@ class Notifier: extra_users: Optional[Collection[UserID]] = None, ): """Unwraps event and calls `on_new_room_event_args`.""" - self.on_new_room_event_args( + await self.on_new_room_event_args( event_pos=event_pos, room_id=event.room_id, + event_id=event.event_id, event_type=event.type, state_key=event.get("state_key"), membership=event.content.get("membership"), @@ -285,9 +288,10 @@ class Notifier: extra_users=extra_users or [], ) - def on_new_room_event_args( + async def on_new_room_event_args( self, room_id: str, + event_id: str, event_type: str, state_key: Optional[str], membership: Optional[str], @@ -302,7 +306,10 @@ class Notifier: listening to the room, and any listeners for the users in the `extra_users` param. - The events can be peristed out of order. The notifier will wait + This also notifies modules listening on new events via the + `on_new_event` callback. + + The events can be persisted out of order. The notifier will wait until all previous events have been persisted before notifying the client streams. """ @@ -318,6 +325,8 @@ class Notifier: ) self._notify_pending_new_room_events(max_room_stream_token) + await self._third_party_rules.on_new_event(event_id) + self.notify_replication() def _notify_pending_new_room_events(self, max_room_stream_token: RoomStreamToken): @@ -374,29 +383,6 @@ class Notifier: except Exception: logger.exception("Error notifying application services of event") - def _notify_app_services_ephemeral( - self, - stream_key: str, - new_token: Union[int, RoomStreamToken], - users: Optional[Collection[Union[str, UserID]]] = None, - ) -> None: - """Notify application services of ephemeral event activity. - - Args: - stream_key: The stream the event came from. - new_token: The value of the new stream token. - users: The users that should be informed of the new event, if any. - """ - try: - stream_token = None - if isinstance(new_token, int): - stream_token = new_token - self.appservice_handler.notify_interested_services_ephemeral( - stream_key, stream_token, users or [] - ) - except Exception: - logger.exception("Error notifying application services of event") - def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken): try: self._pusher_pool.on_new_notifications(max_room_stream_token) @@ -458,12 +444,15 @@ class Notifier: self.notify_replication() - # Notify appservices - self._notify_app_services_ephemeral( - stream_key, - new_token, - users, - ) + # Notify appservices. + try: + self.appservice_handler.notify_interested_services_ephemeral( + stream_key, + new_token, + users, + ) + except Exception: + logger.exception("Error notifying application services of event") def on_new_replication_data(self) -> None: """Used to inform replication listeners that something has happened |