diff --git a/synapse/notifier.py b/synapse/notifier.py
index 1acd899fab..60e5409895 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -220,6 +220,8 @@ class Notifier:
# down.
self.remote_server_up_callbacks: List[Callable[[str], None]] = []
+ self._third_party_rules = hs.get_third_party_event_rules()
+
self.clock = hs.get_clock()
self.appservice_handler = hs.get_application_service_handler()
self._pusher_pool = hs.get_pusherpool()
@@ -267,7 +269,7 @@ class Notifier:
"""
self.replication_callbacks.append(cb)
- def on_new_room_event(
+ async def on_new_room_event(
self,
event: EventBase,
event_pos: PersistedEventPosition,
@@ -275,9 +277,10 @@ class Notifier:
extra_users: Optional[Collection[UserID]] = None,
):
"""Unwraps event and calls `on_new_room_event_args`."""
- self.on_new_room_event_args(
+ await self.on_new_room_event_args(
event_pos=event_pos,
room_id=event.room_id,
+ event_id=event.event_id,
event_type=event.type,
state_key=event.get("state_key"),
membership=event.content.get("membership"),
@@ -285,9 +288,10 @@ class Notifier:
extra_users=extra_users or [],
)
- def on_new_room_event_args(
+ async def on_new_room_event_args(
self,
room_id: str,
+ event_id: str,
event_type: str,
state_key: Optional[str],
membership: Optional[str],
@@ -302,7 +306,10 @@ class Notifier:
listening to the room, and any listeners for the users in the
`extra_users` param.
- The events can be peristed out of order. The notifier will wait
+ This also notifies modules listening on new events via the
+ `on_new_event` callback.
+
+ The events can be persisted out of order. The notifier will wait
until all previous events have been persisted before notifying
the client streams.
"""
@@ -318,6 +325,8 @@ class Notifier:
)
self._notify_pending_new_room_events(max_room_stream_token)
+ await self._third_party_rules.on_new_event(event_id)
+
self.notify_replication()
def _notify_pending_new_room_events(self, max_room_stream_token: RoomStreamToken):
@@ -374,29 +383,6 @@ class Notifier:
except Exception:
logger.exception("Error notifying application services of event")
- def _notify_app_services_ephemeral(
- self,
- stream_key: str,
- new_token: Union[int, RoomStreamToken],
- users: Optional[Collection[Union[str, UserID]]] = None,
- ) -> None:
- """Notify application services of ephemeral event activity.
-
- Args:
- stream_key: The stream the event came from.
- new_token: The value of the new stream token.
- users: The users that should be informed of the new event, if any.
- """
- try:
- stream_token = None
- if isinstance(new_token, int):
- stream_token = new_token
- self.appservice_handler.notify_interested_services_ephemeral(
- stream_key, stream_token, users or []
- )
- except Exception:
- logger.exception("Error notifying application services of event")
-
def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
try:
self._pusher_pool.on_new_notifications(max_room_stream_token)
@@ -458,12 +444,15 @@ class Notifier:
self.notify_replication()
- # Notify appservices
- self._notify_app_services_ephemeral(
- stream_key,
- new_token,
- users,
- )
+ # Notify appservices.
+ try:
+ self.appservice_handler.notify_interested_services_ephemeral(
+ stream_key,
+ new_token,
+ users,
+ )
+ except Exception:
+ logger.exception("Error notifying application services of event")
def on_new_replication_data(self) -> None:
"""Used to inform replication listeners that something has happened
|