summary refs log tree commit diff
path: root/synapse/notifier.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r--synapse/notifier.py68
1 files changed, 55 insertions, 13 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index eb56b26f21..a17352ef46 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -28,6 +28,7 @@ from typing import (
     Union,
 )
 
+import attr
 from prometheus_client import Counter
 
 from twisted.internet import defer
@@ -173,6 +174,17 @@ class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))):
         return bool(self.events)
 
 
+@attr.s(slots=True, frozen=True)
+class _PendingRoomEventEntry:
+    event_pos = attr.ib(type=PersistedEventPosition)
+    extra_users = attr.ib(type=Collection[UserID])
+
+    room_id = attr.ib(type=str)
+    type = attr.ib(type=str)
+    state_key = attr.ib(type=Optional[str])
+    membership = attr.ib(type=Optional[str])
+
+
 class Notifier:
     """ This class is responsible for notifying any listeners when there are
     new events available for it.
@@ -190,9 +202,7 @@ class Notifier:
         self.storage = hs.get_storage()
         self.event_sources = hs.get_event_sources()
         self.store = hs.get_datastore()
-        self.pending_new_room_events = (
-            []
-        )  # type: List[Tuple[PersistedEventPosition, EventBase, Collection[UserID]]]
+        self.pending_new_room_events = []  # type: List[_PendingRoomEventEntry]
 
         # Called when there are new things to stream over replication
         self.replication_callbacks = []  # type: List[Callable[[], None]]
@@ -255,7 +265,29 @@ class Notifier:
         max_room_stream_token: RoomStreamToken,
         extra_users: Collection[UserID] = [],
     ):
-        """ Used by handlers to inform the notifier something has happened
+        """Unwraps event and calls `on_new_room_event_args`.
+        """
+        self.on_new_room_event_args(
+            event_pos=event_pos,
+            room_id=event.room_id,
+            event_type=event.type,
+            state_key=event.get("state_key"),
+            membership=event.content.get("membership"),
+            max_room_stream_token=max_room_stream_token,
+            extra_users=extra_users,
+        )
+
+    def on_new_room_event_args(
+        self,
+        room_id: str,
+        event_type: str,
+        state_key: Optional[str],
+        membership: Optional[str],
+        event_pos: PersistedEventPosition,
+        max_room_stream_token: RoomStreamToken,
+        extra_users: Collection[UserID] = [],
+    ):
+        """Used by handlers to inform the notifier something has happened
         in the room, room event wise.
 
         This triggers the notifier to wake up any listeners that are
@@ -266,7 +298,16 @@ class Notifier:
         until all previous events have been persisted before notifying
         the client streams.
         """
-        self.pending_new_room_events.append((event_pos, event, extra_users))
+        self.pending_new_room_events.append(
+            _PendingRoomEventEntry(
+                event_pos=event_pos,
+                extra_users=extra_users,
+                room_id=room_id,
+                type=event_type,
+                state_key=state_key,
+                membership=membership,
+            )
+        )
         self._notify_pending_new_room_events(max_room_stream_token)
 
         self.notify_replication()
@@ -284,18 +325,19 @@ class Notifier:
         users = set()  # type: Set[UserID]
         rooms = set()  # type: Set[str]
 
-        for event_pos, event, extra_users in pending:
-            if event_pos.persisted_after(max_room_stream_token):
-                self.pending_new_room_events.append((event_pos, event, extra_users))
+        for entry in pending:
+            if entry.event_pos.persisted_after(max_room_stream_token):
+                self.pending_new_room_events.append(entry)
             else:
                 if (
-                    event.type == EventTypes.Member
-                    and event.membership == Membership.JOIN
+                    entry.type == EventTypes.Member
+                    and entry.membership == Membership.JOIN
+                    and entry.state_key
                 ):
-                    self._user_joined_room(event.state_key, event.room_id)
+                    self._user_joined_room(entry.state_key, entry.room_id)
 
-                users.update(extra_users)
-                rooms.add(event.room_id)
+                users.update(entry.extra_users)
+                rooms.add(entry.room_id)
 
         if users or rooms:
             self.on_new_event(