diff --git a/synapse/notifier.py b/synapse/notifier.py
index c42bb8266a..26b97cf766 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -294,35 +294,31 @@ class Notifier:
"""
self._new_join_in_room_callbacks.append(cb)
- async def on_new_room_event(
+ async def on_new_room_events(
self,
- event: EventBase,
- event_pos: PersistedEventPosition,
+ events_and_pos: List[Tuple[EventBase, PersistedEventPosition]],
max_room_stream_token: RoomStreamToken,
extra_users: Optional[Collection[UserID]] = None,
) -> None:
- """Unwraps event and calls `on_new_room_event_args`."""
- await self.on_new_room_event_args(
- event_pos=event_pos,
- room_id=event.room_id,
- event_id=event.event_id,
- event_type=event.type,
- state_key=event.get("state_key"),
- membership=event.content.get("membership"),
- max_room_stream_token=max_room_stream_token,
- extra_users=extra_users or [],
- )
+ """Creates a _PendingRoomEventEntry for each of the listed events and calls
+ notify_new_room_events with the results."""
+ event_entries = []
+ for event, pos in events_and_pos:
+ entry = self.create_pending_room_event_entry(
+ pos,
+ extra_users,
+ event.room_id,
+ event.type,
+ event.get("state_key"),
+ event.content.get("membership"),
+ )
+ event_entries.append((entry, event.event_id))
+ await self.notify_new_room_events(event_entries, max_room_stream_token)
- async def on_new_room_event_args(
+ async def notify_new_room_events(
self,
- room_id: str,
- event_id: str,
- event_type: str,
- state_key: Optional[str],
- membership: Optional[str],
- event_pos: PersistedEventPosition,
+ event_entries: List[Tuple[_PendingRoomEventEntry, str]],
max_room_stream_token: RoomStreamToken,
- extra_users: Optional[Collection[UserID]] = None,
) -> None:
"""Used by handlers to inform the notifier something has happened
in the room, room event wise.
@@ -338,22 +334,33 @@ class Notifier:
until all previous events have been persisted before notifying
the client streams.
"""
- self.pending_new_room_events.append(
- _PendingRoomEventEntry(
- event_pos=event_pos,
- extra_users=extra_users or [],
- room_id=room_id,
- type=event_type,
- state_key=state_key,
- membership=membership,
- )
- )
- self._notify_pending_new_room_events(max_room_stream_token)
+ for event_entry, event_id in event_entries:
+ self.pending_new_room_events.append(event_entry)
+ await self._third_party_rules.on_new_event(event_id)
- await self._third_party_rules.on_new_event(event_id)
+ self._notify_pending_new_room_events(max_room_stream_token)
self.notify_replication()
+ def create_pending_room_event_entry(
+ self,
+ event_pos: PersistedEventPosition,
+ extra_users: Optional[Collection[UserID]],
+ room_id: str,
+ event_type: str,
+ state_key: Optional[str],
+ membership: Optional[str],
+ ) -> _PendingRoomEventEntry:
+ """Creates and returns a _PendingRoomEventEntry"""
+ return _PendingRoomEventEntry(
+ event_pos=event_pos,
+ extra_users=extra_users or [],
+ room_id=room_id,
+ type=event_type,
+ state_key=state_key,
+ membership=membership,
+ )
+
def _notify_pending_new_room_events(
self, max_room_stream_token: RoomStreamToken
) -> None:
|