summary refs log tree commit diff
path: root/synapse/notifier.py
diff options
context:
space:
mode:
authorShay <hillerys@element.io>2022-10-05 10:12:48 -0700
committerGitHub <noreply@github.com>2022-10-05 10:12:48 -0700
commit7b7478e8b65cceb9e7362c6c1cb932b569a6f383 (patch)
tree46298955947645bfbc9a07ae47c1738037d903ef /synapse/notifier.py
parentComplement test image: capture logs from nginx (#14063) (diff)
downloadsynapse-7b7478e8b65cceb9e7362c6c1cb932b569a6f383.tar.xz
Batch up notifications after event persistence (#14033)
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r--synapse/notifier.py75
1 files changed, 41 insertions, 34 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index c42bb8266a..26b97cf766 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -294,35 +294,31 @@ class Notifier:
         """
         self._new_join_in_room_callbacks.append(cb)
 
-    async def on_new_room_event(
+    async def on_new_room_events(
         self,
-        event: EventBase,
-        event_pos: PersistedEventPosition,
+        events_and_pos: List[Tuple[EventBase, PersistedEventPosition]],
         max_room_stream_token: RoomStreamToken,
         extra_users: Optional[Collection[UserID]] = None,
     ) -> None:
-        """Unwraps event and calls `on_new_room_event_args`."""
-        await self.on_new_room_event_args(
-            event_pos=event_pos,
-            room_id=event.room_id,
-            event_id=event.event_id,
-            event_type=event.type,
-            state_key=event.get("state_key"),
-            membership=event.content.get("membership"),
-            max_room_stream_token=max_room_stream_token,
-            extra_users=extra_users or [],
-        )
+        """Creates a _PendingRoomEventEntry for each of the listed events and calls
+        notify_new_room_events with the results."""
+        event_entries = []
+        for event, pos in events_and_pos:
+            entry = self.create_pending_room_event_entry(
+                pos,
+                extra_users,
+                event.room_id,
+                event.type,
+                event.get("state_key"),
+                event.content.get("membership"),
+            )
+            event_entries.append((entry, event.event_id))
+        await self.notify_new_room_events(event_entries, max_room_stream_token)
 
-    async def on_new_room_event_args(
+    async def notify_new_room_events(
         self,
-        room_id: str,
-        event_id: str,
-        event_type: str,
-        state_key: Optional[str],
-        membership: Optional[str],
-        event_pos: PersistedEventPosition,
+        event_entries: List[Tuple[_PendingRoomEventEntry, str]],
         max_room_stream_token: RoomStreamToken,
-        extra_users: Optional[Collection[UserID]] = None,
     ) -> None:
         """Used by handlers to inform the notifier something has happened
         in the room, room event wise.
@@ -338,22 +334,33 @@ class Notifier:
         until all previous events have been persisted before notifying
         the client streams.
         """
-        self.pending_new_room_events.append(
-            _PendingRoomEventEntry(
-                event_pos=event_pos,
-                extra_users=extra_users or [],
-                room_id=room_id,
-                type=event_type,
-                state_key=state_key,
-                membership=membership,
-            )
-        )
-        self._notify_pending_new_room_events(max_room_stream_token)
+        for event_entry, event_id in event_entries:
+            self.pending_new_room_events.append(event_entry)
+            await self._third_party_rules.on_new_event(event_id)
 
-        await self._third_party_rules.on_new_event(event_id)
+        self._notify_pending_new_room_events(max_room_stream_token)
 
         self.notify_replication()
 
+    def create_pending_room_event_entry(
+        self,
+        event_pos: PersistedEventPosition,
+        extra_users: Optional[Collection[UserID]],
+        room_id: str,
+        event_type: str,
+        state_key: Optional[str],
+        membership: Optional[str],
+    ) -> _PendingRoomEventEntry:
+        """Creates and returns a _PendingRoomEventEntry"""
+        return _PendingRoomEventEntry(
+            event_pos=event_pos,
+            extra_users=extra_users or [],
+            room_id=room_id,
+            type=event_type,
+            state_key=state_key,
+            membership=membership,
+        )
+
     def _notify_pending_new_room_events(
         self, max_room_stream_token: RoomStreamToken
     ) -> None: