summary refs log tree commit diff
path: root/synapse/notifier.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r--synapse/notifier.py89
1 files changed, 56 insertions, 33 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index dfb096e589..a8fd3ef886 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -42,7 +42,7 @@ from synapse.logging.utils import log_function
 from synapse.metrics import LaterGauge
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.streams.config import PaginationConfig
-from synapse.types import Collection, StreamToken, UserID
+from synapse.types import Collection, RoomStreamToken, StreamToken, UserID
 from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
 from synapse.util.metrics import Measure
 from synapse.visibility import filter_events_for_client
@@ -68,7 +68,7 @@ def count(func: Callable[[T], bool], it: Iterable[T]) -> int:
     return n
 
 
-class _NotificationListener(object):
+class _NotificationListener:
     """ This represents a single client connection to the events stream.
     The events stream handler will have yielded to the deferred, so to
     notify the handler it is sufficient to resolve the deferred.
@@ -80,7 +80,7 @@ class _NotificationListener(object):
         self.deferred = deferred
 
 
-class _NotifierUserStream(object):
+class _NotifierUserStream:
     """This represents a user connected to the event stream.
     It tracks the most recent stream token for that user.
     At a given point a user may have a number of streams listening for
@@ -112,7 +112,9 @@ class _NotifierUserStream(object):
         with PreserveLoggingContext():
             self.notify_deferred = ObservableDeferred(defer.Deferred())
 
-    def notify(self, stream_key: str, stream_id: int, time_now_ms: int):
+    def notify(
+        self, stream_key: str, stream_id: Union[int, RoomStreamToken], time_now_ms: int,
+    ):
         """Notify any listeners for this user of a new event from an
         event source.
         Args:
@@ -162,13 +164,11 @@ class _NotifierUserStream(object):
 
 
 class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))):
-    def __nonzero__(self):
+    def __bool__(self):
         return bool(self.events)
 
-    __bool__ = __nonzero__  # python3
-
 
-class Notifier(object):
+class Notifier:
     """ This class is responsible for notifying any listeners when there are
     new events available for it.
 
@@ -187,7 +187,7 @@ class Notifier(object):
         self.store = hs.get_datastore()
         self.pending_new_room_events = (
             []
-        )  # type: List[Tuple[int, EventBase, Collection[Union[str, UserID]]]]
+        )  # type: List[Tuple[int, EventBase, Collection[UserID]]]
 
         # Called when there are new things to stream over replication
         self.replication_callbacks = []  # type: List[Callable[[], None]]
@@ -198,6 +198,7 @@ class Notifier(object):
 
         self.clock = hs.get_clock()
         self.appservice_handler = hs.get_application_service_handler()
+        self._pusher_pool = hs.get_pusherpool()
 
         self.federation_sender = None
         if hs.should_send_federation():
@@ -247,7 +248,7 @@ class Notifier(object):
         event: EventBase,
         room_stream_id: int,
         max_room_stream_id: int,
-        extra_users: Collection[Union[str, UserID]] = [],
+        extra_users: Collection[UserID] = [],
     ):
         """ Used by handlers to inform the notifier something has happened
         in the room, room event wise.
@@ -274,47 +275,68 @@ class Notifier(object):
         """
         pending = self.pending_new_room_events
         self.pending_new_room_events = []
+
+        users = set()  # type: Set[UserID]
+        rooms = set()  # type: Set[str]
+
         for room_stream_id, event, extra_users in pending:
             if room_stream_id > max_room_stream_id:
                 self.pending_new_room_events.append(
                     (room_stream_id, event, extra_users)
                 )
             else:
-                self._on_new_room_event(event, room_stream_id, extra_users)
+                if (
+                    event.type == EventTypes.Member
+                    and event.membership == Membership.JOIN
+                ):
+                    self._user_joined_room(event.state_key, event.room_id)
+
+                users.update(extra_users)
+                rooms.add(event.room_id)
+
+        if users or rooms:
+            self.on_new_event(
+                "room_key",
+                RoomStreamToken(None, max_room_stream_id),
+                users=users,
+                rooms=rooms,
+            )
+            self._on_updated_room_token(max_room_stream_id)
+
+    def _on_updated_room_token(self, max_room_stream_id: int):
+        """Poke services that might care that the room position has been
+        updated.
+        """
 
-    def _on_new_room_event(
-        self,
-        event: EventBase,
-        room_stream_id: int,
-        extra_users: Collection[Union[str, UserID]] = [],
-    ):
-        """Notify any user streams that are interested in this room event"""
         # poke any interested application service.
         run_as_background_process(
-            "notify_app_services", self._notify_app_services, room_stream_id
+            "_notify_app_services", self._notify_app_services, max_room_stream_id
         )
 
-        if self.federation_sender:
-            self.federation_sender.notify_new_events(room_stream_id)
-
-        if event.type == EventTypes.Member and event.membership == Membership.JOIN:
-            self._user_joined_room(event.state_key, event.room_id)
-
-        self.on_new_event(
-            "room_key", room_stream_id, users=extra_users, rooms=[event.room_id]
+        run_as_background_process(
+            "_notify_pusher_pool", self._notify_pusher_pool, max_room_stream_id
         )
 
-    async def _notify_app_services(self, room_stream_id: int):
+        if self.federation_sender:
+            self.federation_sender.notify_new_events(max_room_stream_id)
+
+    async def _notify_app_services(self, max_room_stream_id: int):
         try:
-            await self.appservice_handler.notify_interested_services(room_stream_id)
+            await self.appservice_handler.notify_interested_services(max_room_stream_id)
         except Exception:
             logger.exception("Error notifying application services of event")
 
+    async def _notify_pusher_pool(self, max_room_stream_id: int):
+        try:
+            await self._pusher_pool.on_new_notifications(max_room_stream_id)
+        except Exception:
+            logger.exception("Error pusher pool of event")
+
     def on_new_event(
         self,
         stream_key: str,
-        new_token: int,
-        users: Collection[Union[str, UserID]] = [],
+        new_token: Union[int, RoomStreamToken],
+        users: Collection[UserID] = [],
         rooms: Collection[str] = [],
     ):
         """ Used to inform listeners that something has happened event wise.
@@ -432,8 +454,9 @@ class Notifier(object):
         If explicit_room_id is set, that room will be polled for events only if
         it is world readable or the user has joined the room.
         """
-        from_token = pagination_config.from_token
-        if not from_token:
+        if pagination_config.from_token:
+            from_token = pagination_config.from_token
+        else:
             from_token = self.event_sources.get_current_token()
 
         limit = pagination_config.limit