diff options
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r-- | synapse/notifier.py | 62 |
1 files changed, 39 insertions, 23 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py index 71f2370874..16f19c938e 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -25,7 +25,6 @@ from typing import ( Set, Tuple, TypeVar, - Union, ) from prometheus_client import Counter @@ -187,7 +186,7 @@ class Notifier: self.store = hs.get_datastore() self.pending_new_room_events = ( [] - ) # type: List[Tuple[int, EventBase, Collection[Union[str, UserID]]]] + ) # type: List[Tuple[int, EventBase, Collection[UserID]]] # Called when there are new things to stream over replication self.replication_callbacks = [] # type: List[Callable[[], None]] @@ -198,6 +197,7 @@ class Notifier: self.clock = hs.get_clock() self.appservice_handler = hs.get_application_service_handler() + self._pusher_pool = hs.get_pusherpool() self.federation_sender = None if hs.should_send_federation(): @@ -247,7 +247,7 @@ class Notifier: event: EventBase, room_stream_id: int, max_room_stream_id: int, - extra_users: Collection[Union[str, UserID]] = [], + extra_users: Collection[UserID] = [], ): """ Used by handlers to inform the notifier something has happened in the room, room event wise. @@ -274,47 +274,63 @@ class Notifier: """ pending = self.pending_new_room_events self.pending_new_room_events = [] + + users = set() # type: Set[UserID] + rooms = set() # type: Set[str] + for room_stream_id, event, extra_users in pending: if room_stream_id > max_room_stream_id: self.pending_new_room_events.append( (room_stream_id, event, extra_users) ) else: - self._on_new_room_event(event, room_stream_id, extra_users) + if ( + event.type == EventTypes.Member + and event.membership == Membership.JOIN + ): + self._user_joined_room(event.state_key, event.room_id) + + users.update(extra_users) + rooms.add(event.room_id) + + if users or rooms: + self.on_new_event("room_key", max_room_stream_id, users=users, rooms=rooms) + self._on_updated_room_token(max_room_stream_id) + + def _on_updated_room_token(self, max_room_stream_id: int): + """Poke services that might care that the room position has been + updated. + """ - def _on_new_room_event( - self, - event: EventBase, - room_stream_id: int, - extra_users: Collection[Union[str, UserID]] = [], - ): - """Notify any user streams that are interested in this room event""" # poke any interested application service. run_as_background_process( - "notify_app_services", self._notify_app_services, room_stream_id + "_notify_app_services", self._notify_app_services, max_room_stream_id ) - if self.federation_sender: - self.federation_sender.notify_new_events(room_stream_id) - - if event.type == EventTypes.Member and event.membership == Membership.JOIN: - self._user_joined_room(event.state_key, event.room_id) - - self.on_new_event( - "room_key", room_stream_id, users=extra_users, rooms=[event.room_id] + run_as_background_process( + "_notify_pusher_pool", self._notify_pusher_pool, max_room_stream_id ) - async def _notify_app_services(self, room_stream_id: int): + if self.federation_sender: + self.federation_sender.notify_new_events(max_room_stream_id) + + async def _notify_app_services(self, max_room_stream_id: int): try: - await self.appservice_handler.notify_interested_services(room_stream_id) + await self.appservice_handler.notify_interested_services(max_room_stream_id) except Exception: logger.exception("Error notifying application services of event") + async def _notify_pusher_pool(self, max_room_stream_id: int): + try: + await self._pusher_pool.on_new_notifications(max_room_stream_id) + except Exception: + logger.exception("Error pusher pool of event") + def on_new_event( self, stream_key: str, new_token: int, - users: Collection[Union[str, UserID]] = [], + users: Collection[UserID] = [], rooms: Collection[str] = [], ): """ Used to inform listeners that something has happened event wise. |