summary refs log tree commit diff
path: root/synapse/notifier.py
diff options
context:
space:
mode:
authorDavid Robertson <davidr@element.io>2021-11-17 14:19:27 +0000
committerDavid Robertson <davidr@element.io>2021-11-17 14:19:27 +0000
commit077b74929f8f412395d1156e1b97eb16701059fa (patch)
tree25ecb8e93ec3ba275596bfb31efa45018645d47b /synapse/notifier.py
parentCorrect target of link to the modules page from the Password Auth Providers p... (diff)
parent1.47.0 (diff)
downloadsynapse-077b74929f8f412395d1156e1b97eb16701059fa.tar.xz
Merge remote-tracking branch 'origin/release-v1.47'
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r--synapse/notifier.py55
1 files changed, 22 insertions, 33 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 1acd899fab..60e5409895 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -220,6 +220,8 @@ class Notifier:
         # down.
         self.remote_server_up_callbacks: List[Callable[[str], None]] = []
 
+        self._third_party_rules = hs.get_third_party_event_rules()
+
         self.clock = hs.get_clock()
         self.appservice_handler = hs.get_application_service_handler()
         self._pusher_pool = hs.get_pusherpool()
@@ -267,7 +269,7 @@ class Notifier:
         """
         self.replication_callbacks.append(cb)
 
-    def on_new_room_event(
+    async def on_new_room_event(
         self,
         event: EventBase,
         event_pos: PersistedEventPosition,
@@ -275,9 +277,10 @@ class Notifier:
         extra_users: Optional[Collection[UserID]] = None,
     ):
         """Unwraps event and calls `on_new_room_event_args`."""
-        self.on_new_room_event_args(
+        await self.on_new_room_event_args(
             event_pos=event_pos,
             room_id=event.room_id,
+            event_id=event.event_id,
             event_type=event.type,
             state_key=event.get("state_key"),
             membership=event.content.get("membership"),
@@ -285,9 +288,10 @@ class Notifier:
             extra_users=extra_users or [],
         )
 
-    def on_new_room_event_args(
+    async def on_new_room_event_args(
         self,
         room_id: str,
+        event_id: str,
         event_type: str,
         state_key: Optional[str],
         membership: Optional[str],
@@ -302,7 +306,10 @@ class Notifier:
         listening to the room, and any listeners for the users in the
         `extra_users` param.
 
-        The events can be peristed out of order. The notifier will wait
+        This also notifies modules listening on new events via the
+        `on_new_event` callback.
+
+        The events can be persisted out of order. The notifier will wait
         until all previous events have been persisted before notifying
         the client streams.
         """
@@ -318,6 +325,8 @@ class Notifier:
         )
         self._notify_pending_new_room_events(max_room_stream_token)
 
+        await self._third_party_rules.on_new_event(event_id)
+
         self.notify_replication()
 
     def _notify_pending_new_room_events(self, max_room_stream_token: RoomStreamToken):
@@ -374,29 +383,6 @@ class Notifier:
         except Exception:
             logger.exception("Error notifying application services of event")
 
-    def _notify_app_services_ephemeral(
-        self,
-        stream_key: str,
-        new_token: Union[int, RoomStreamToken],
-        users: Optional[Collection[Union[str, UserID]]] = None,
-    ) -> None:
-        """Notify application services of ephemeral event activity.
-
-        Args:
-            stream_key: The stream the event came from.
-            new_token: The value of the new stream token.
-            users: The users that should be informed of the new event, if any.
-        """
-        try:
-            stream_token = None
-            if isinstance(new_token, int):
-                stream_token = new_token
-            self.appservice_handler.notify_interested_services_ephemeral(
-                stream_key, stream_token, users or []
-            )
-        except Exception:
-            logger.exception("Error notifying application services of event")
-
     def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
         try:
             self._pusher_pool.on_new_notifications(max_room_stream_token)
@@ -458,12 +444,15 @@ class Notifier:
 
             self.notify_replication()
 
-            # Notify appservices
-            self._notify_app_services_ephemeral(
-                stream_key,
-                new_token,
-                users,
-            )
+            # Notify appservices.
+            try:
+                self.appservice_handler.notify_interested_services_ephemeral(
+                    stream_key,
+                    new_token,
+                    users,
+                )
+            except Exception:
+                logger.exception("Error notifying application services of event")
 
     def on_new_replication_data(self) -> None:
         """Used to inform replication listeners that something has happened