diff --git a/synapse/notifier.py b/synapse/notifier.py
index eb56b26f21..a17352ef46 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -28,6 +28,7 @@ from typing import (
Union,
)
+import attr
from prometheus_client import Counter
from twisted.internet import defer
@@ -173,6 +174,17 @@ class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))):
return bool(self.events)
+@attr.s(slots=True, frozen=True)
+class _PendingRoomEventEntry:
+ event_pos = attr.ib(type=PersistedEventPosition)
+ extra_users = attr.ib(type=Collection[UserID])
+
+ room_id = attr.ib(type=str)
+ type = attr.ib(type=str)
+ state_key = attr.ib(type=Optional[str])
+ membership = attr.ib(type=Optional[str])
+
+
class Notifier:
""" This class is responsible for notifying any listeners when there are
new events available for it.
@@ -190,9 +202,7 @@ class Notifier:
self.storage = hs.get_storage()
self.event_sources = hs.get_event_sources()
self.store = hs.get_datastore()
- self.pending_new_room_events = (
- []
- ) # type: List[Tuple[PersistedEventPosition, EventBase, Collection[UserID]]]
+ self.pending_new_room_events = [] # type: List[_PendingRoomEventEntry]
# Called when there are new things to stream over replication
self.replication_callbacks = [] # type: List[Callable[[], None]]
@@ -255,7 +265,29 @@ class Notifier:
max_room_stream_token: RoomStreamToken,
extra_users: Collection[UserID] = [],
):
- """ Used by handlers to inform the notifier something has happened
+ """Unwraps event and calls `on_new_room_event_args`.
+ """
+ self.on_new_room_event_args(
+ event_pos=event_pos,
+ room_id=event.room_id,
+ event_type=event.type,
+ state_key=event.get("state_key"),
+ membership=event.content.get("membership"),
+ max_room_stream_token=max_room_stream_token,
+ extra_users=extra_users,
+ )
+
+ def on_new_room_event_args(
+ self,
+ room_id: str,
+ event_type: str,
+ state_key: Optional[str],
+ membership: Optional[str],
+ event_pos: PersistedEventPosition,
+ max_room_stream_token: RoomStreamToken,
+ extra_users: Collection[UserID] = [],
+ ):
+ """Used by handlers to inform the notifier something has happened
in the room, room event wise.
This triggers the notifier to wake up any listeners that are
@@ -266,7 +298,16 @@ class Notifier:
until all previous events have been persisted before notifying
the client streams.
"""
- self.pending_new_room_events.append((event_pos, event, extra_users))
+ self.pending_new_room_events.append(
+ _PendingRoomEventEntry(
+ event_pos=event_pos,
+ extra_users=extra_users,
+ room_id=room_id,
+ type=event_type,
+ state_key=state_key,
+ membership=membership,
+ )
+ )
self._notify_pending_new_room_events(max_room_stream_token)
self.notify_replication()
@@ -284,18 +325,19 @@ class Notifier:
users = set() # type: Set[UserID]
rooms = set() # type: Set[str]
- for event_pos, event, extra_users in pending:
- if event_pos.persisted_after(max_room_stream_token):
- self.pending_new_room_events.append((event_pos, event, extra_users))
+ for entry in pending:
+ if entry.event_pos.persisted_after(max_room_stream_token):
+ self.pending_new_room_events.append(entry)
else:
if (
- event.type == EventTypes.Member
- and event.membership == Membership.JOIN
+ entry.type == EventTypes.Member
+ and entry.membership == Membership.JOIN
+ and entry.state_key
):
- self._user_joined_room(event.state_key, event.room_id)
+ self._user_joined_room(entry.state_key, entry.room_id)
- users.update(extra_users)
- rooms.add(event.room_id)
+ users.update(entry.extra_users)
+ rooms.add(entry.room_id)
if users or rooms:
self.on_new_event(
|