diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 778d8869b3..da319943cc 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -2240,8 +2240,8 @@ class FederationEventHandler:
event_pos = PersistedEventPosition(
self._instance_name, event.internal_metadata.stream_ordering
)
- await self._notifier.on_new_room_event(
- event, event_pos, max_stream_token, extra_users=extra_users
+ await self._notifier.on_new_room_events(
+ [(event, event_pos)], max_stream_token, extra_users=extra_users
)
if event.type == EventTypes.Member and event.membership == Membership.JOIN:
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 00e7645ba5..da1acea275 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -1872,6 +1872,7 @@ class EventCreationHandler:
events_and_context, backfilled=backfilled
)
+ events_and_pos = []
for event in persisted_events:
if self._ephemeral_events_enabled:
# If there's an expiry timestamp on the event, schedule its expiry.
@@ -1880,25 +1881,23 @@ class EventCreationHandler:
stream_ordering = event.internal_metadata.stream_ordering
assert stream_ordering is not None
pos = PersistedEventPosition(self._instance_name, stream_ordering)
-
- async def _notify() -> None:
- try:
- await self.notifier.on_new_room_event(
- event, pos, max_stream_token, extra_users=extra_users
- )
- except Exception:
- logger.exception(
- "Error notifying about new room event %s",
- event.event_id,
- )
-
- run_in_background(_notify)
+ events_and_pos.append((event, pos))
if event.type == EventTypes.Message:
# We don't want to block sending messages on any presence code. This
# matters as sometimes presence code can take a while.
run_in_background(self._bump_active_time, requester.user)
+ async def _notify() -> None:
+ try:
+ await self.notifier.on_new_room_events(
+ events_and_pos, max_stream_token, extra_users=extra_users
+ )
+ except Exception:
+ logger.exception("Error notifying about new room events")
+
+ run_in_background(_notify)
+
return persisted_events[-1]
async def _maybe_kick_guest_users(
diff --git a/synapse/notifier.py b/synapse/notifier.py
index c42bb8266a..26b97cf766 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -294,35 +294,31 @@ class Notifier:
"""
self._new_join_in_room_callbacks.append(cb)
- async def on_new_room_event(
+ async def on_new_room_events(
self,
- event: EventBase,
- event_pos: PersistedEventPosition,
+ events_and_pos: List[Tuple[EventBase, PersistedEventPosition]],
max_room_stream_token: RoomStreamToken,
extra_users: Optional[Collection[UserID]] = None,
) -> None:
- """Unwraps event and calls `on_new_room_event_args`."""
- await self.on_new_room_event_args(
- event_pos=event_pos,
- room_id=event.room_id,
- event_id=event.event_id,
- event_type=event.type,
- state_key=event.get("state_key"),
- membership=event.content.get("membership"),
- max_room_stream_token=max_room_stream_token,
- extra_users=extra_users or [],
- )
+ """Creates a _PendingRoomEventEntry for each of the listed events and calls
+ notify_new_room_events with the results."""
+ event_entries = []
+ for event, pos in events_and_pos:
+ entry = self.create_pending_room_event_entry(
+ pos,
+ extra_users,
+ event.room_id,
+ event.type,
+ event.get("state_key"),
+ event.content.get("membership"),
+ )
+ event_entries.append((entry, event.event_id))
+ await self.notify_new_room_events(event_entries, max_room_stream_token)
- async def on_new_room_event_args(
+ async def notify_new_room_events(
self,
- room_id: str,
- event_id: str,
- event_type: str,
- state_key: Optional[str],
- membership: Optional[str],
- event_pos: PersistedEventPosition,
+ event_entries: List[Tuple[_PendingRoomEventEntry, str]],
max_room_stream_token: RoomStreamToken,
- extra_users: Optional[Collection[UserID]] = None,
) -> None:
"""Used by handlers to inform the notifier something has happened
in the room, room event wise.
@@ -338,22 +334,33 @@ class Notifier:
until all previous events have been persisted before notifying
the client streams.
"""
- self.pending_new_room_events.append(
- _PendingRoomEventEntry(
- event_pos=event_pos,
- extra_users=extra_users or [],
- room_id=room_id,
- type=event_type,
- state_key=state_key,
- membership=membership,
- )
- )
- self._notify_pending_new_room_events(max_room_stream_token)
+ for event_entry, event_id in event_entries:
+ self.pending_new_room_events.append(event_entry)
+ await self._third_party_rules.on_new_event(event_id)
- await self._third_party_rules.on_new_event(event_id)
+ self._notify_pending_new_room_events(max_room_stream_token)
self.notify_replication()
+ def create_pending_room_event_entry(
+ self,
+ event_pos: PersistedEventPosition,
+ extra_users: Optional[Collection[UserID]],
+ room_id: str,
+ event_type: str,
+ state_key: Optional[str],
+ membership: Optional[str],
+ ) -> _PendingRoomEventEntry:
+ """Creates and returns a _PendingRoomEventEntry"""
+ return _PendingRoomEventEntry(
+ event_pos=event_pos,
+ extra_users=extra_users or [],
+ room_id=room_id,
+ type=event_type,
+ state_key=state_key,
+ membership=membership,
+ )
+
def _notify_pending_new_room_events(
self, max_room_stream_token: RoomStreamToken
) -> None:
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index b2522f98ca..18252a2958 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -210,15 +210,16 @@ class ReplicationDataHandler:
max_token = self.store.get_room_max_token()
event_pos = PersistedEventPosition(instance_name, token)
- await self.notifier.on_new_room_event_args(
- event_pos=event_pos,
- max_room_stream_token=max_token,
- extra_users=extra_users,
- room_id=row.data.room_id,
- event_id=row.data.event_id,
- event_type=row.data.type,
- state_key=row.data.state_key,
- membership=row.data.membership,
+ event_entry = self.notifier.create_pending_room_event_entry(
+ event_pos,
+ extra_users,
+ row.data.room_id,
+ row.data.type,
+ row.data.state_key,
+ row.data.membership,
+ )
+ await self.notifier.notify_new_room_events(
+ [(event_entry, row.data.event_id)], max_token
)
# If this event is a join, make a note of it so we have an accurate
|