diff --git a/synapse/notifier.py b/synapse/notifier.py
index 2e993411b9..a17352ef46 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -28,6 +28,7 @@ from typing import (
Union,
)
+import attr
from prometheus_client import Counter
from twisted.internet import defer
@@ -40,7 +41,6 @@ from synapse.handlers.presence import format_user_presence_state
from synapse.logging.context import PreserveLoggingContext
from synapse.logging.utils import log_function
from synapse.metrics import LaterGauge
-from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.streams.config import PaginationConfig
from synapse.types import (
Collection,
@@ -174,6 +174,17 @@ class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))):
return bool(self.events)
+@attr.s(slots=True, frozen=True)
+class _PendingRoomEventEntry:
+ event_pos = attr.ib(type=PersistedEventPosition)
+ extra_users = attr.ib(type=Collection[UserID])
+
+ room_id = attr.ib(type=str)
+ type = attr.ib(type=str)
+ state_key = attr.ib(type=Optional[str])
+ membership = attr.ib(type=Optional[str])
+
+
class Notifier:
""" This class is responsible for notifying any listeners when there are
new events available for it.
@@ -191,9 +202,7 @@ class Notifier:
self.storage = hs.get_storage()
self.event_sources = hs.get_event_sources()
self.store = hs.get_datastore()
- self.pending_new_room_events = (
- []
- ) # type: List[Tuple[PersistedEventPosition, EventBase, Collection[UserID]]]
+ self.pending_new_room_events = [] # type: List[_PendingRoomEventEntry]
# Called when there are new things to stream over replication
self.replication_callbacks = [] # type: List[Callable[[], None]]
@@ -256,7 +265,29 @@ class Notifier:
max_room_stream_token: RoomStreamToken,
extra_users: Collection[UserID] = [],
):
- """ Used by handlers to inform the notifier something has happened
+ """Unwraps event and calls `on_new_room_event_args`.
+ """
+ self.on_new_room_event_args(
+ event_pos=event_pos,
+ room_id=event.room_id,
+ event_type=event.type,
+ state_key=event.get("state_key"),
+ membership=event.content.get("membership"),
+ max_room_stream_token=max_room_stream_token,
+ extra_users=extra_users,
+ )
+
+ def on_new_room_event_args(
+ self,
+ room_id: str,
+ event_type: str,
+ state_key: Optional[str],
+ membership: Optional[str],
+ event_pos: PersistedEventPosition,
+ max_room_stream_token: RoomStreamToken,
+ extra_users: Collection[UserID] = [],
+ ):
+ """Used by handlers to inform the notifier something has happened
in the room, room event wise.
This triggers the notifier to wake up any listeners that are
@@ -267,7 +298,16 @@ class Notifier:
until all previous events have been persisted before notifying
the client streams.
"""
- self.pending_new_room_events.append((event_pos, event, extra_users))
+ self.pending_new_room_events.append(
+ _PendingRoomEventEntry(
+ event_pos=event_pos,
+ extra_users=extra_users,
+ room_id=room_id,
+ type=event_type,
+ state_key=state_key,
+ membership=membership,
+ )
+ )
self._notify_pending_new_room_events(max_room_stream_token)
self.notify_replication()
@@ -285,18 +325,19 @@ class Notifier:
users = set() # type: Set[UserID]
rooms = set() # type: Set[str]
- for event_pos, event, extra_users in pending:
- if event_pos.persisted_after(max_room_stream_token):
- self.pending_new_room_events.append((event_pos, event, extra_users))
+ for entry in pending:
+ if entry.event_pos.persisted_after(max_room_stream_token):
+ self.pending_new_room_events.append(entry)
else:
if (
- event.type == EventTypes.Member
- and event.membership == Membership.JOIN
+ entry.type == EventTypes.Member
+ and entry.membership == Membership.JOIN
+ and entry.state_key
):
- self._user_joined_room(event.state_key, event.room_id)
+ self._user_joined_room(entry.state_key, entry.room_id)
- users.update(extra_users)
- rooms.add(event.room_id)
+ users.update(entry.extra_users)
+ rooms.add(entry.room_id)
if users or rooms:
self.on_new_event(
@@ -310,44 +351,37 @@ class Notifier:
"""
# poke any interested application service.
- run_as_background_process(
- "_notify_app_services", self._notify_app_services, max_room_stream_token
- )
-
- run_as_background_process(
- "_notify_pusher_pool", self._notify_pusher_pool, max_room_stream_token
- )
+ self._notify_app_services(max_room_stream_token)
+ self._notify_pusher_pool(max_room_stream_token)
if self.federation_sender:
self.federation_sender.notify_new_events(max_room_stream_token)
- async def _notify_app_services(self, max_room_stream_token: RoomStreamToken):
+ def _notify_app_services(self, max_room_stream_token: RoomStreamToken):
try:
- await self.appservice_handler.notify_interested_services(
- max_room_stream_token
- )
+ self.appservice_handler.notify_interested_services(max_room_stream_token)
except Exception:
logger.exception("Error notifying application services of event")
- async def _notify_app_services_ephemeral(
+ def _notify_app_services_ephemeral(
self,
stream_key: str,
new_token: Union[int, RoomStreamToken],
- users: Collection[UserID] = [],
+ users: Collection[Union[str, UserID]] = [],
):
try:
stream_token = None
if isinstance(new_token, int):
stream_token = new_token
- await self.appservice_handler.notify_interested_services_ephemeral(
+ self.appservice_handler.notify_interested_services_ephemeral(
stream_key, stream_token, users
)
except Exception:
logger.exception("Error notifying application services of event")
- async def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
+ def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
try:
- await self._pusher_pool.on_new_notifications(max_room_stream_token)
+ self._pusher_pool.on_new_notifications(max_room_stream_token)
except Exception:
logger.exception("Error pusher pool of event")
@@ -384,16 +418,12 @@ class Notifier:
self.notify_replication()
# Notify appservices
- run_as_background_process(
- "_notify_app_services_ephemeral",
- self._notify_app_services_ephemeral,
- stream_key,
- new_token,
- users,
+ self._notify_app_services_ephemeral(
+ stream_key, new_token, users,
)
def on_new_replication_data(self) -> None:
- """Used to inform replication listeners that something has happend
+ """Used to inform replication listeners that something has happened
without waking up any of the normal user event streams"""
self.notify_replication()
|