diff options
author | Shay <hillerys@element.io> | 2022-10-05 10:12:48 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-10-05 10:12:48 -0700 |
commit | 7b7478e8b65cceb9e7362c6c1cb932b569a6f383 (patch) | |
tree | 46298955947645bfbc9a07ae47c1738037d903ef /synapse/handlers | |
parent | Complement test image: capture logs from nginx (#14063) (diff) | |
download | synapse-7b7478e8b65cceb9e7362c6c1cb932b569a6f383.tar.xz |
Batch up notifications after event persistence (#14033)
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/federation_event.py | 4 | ||||
-rw-r--r-- | synapse/handlers/message.py | 25 |
2 files changed, 14 insertions, 15 deletions
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 778d8869b3..da319943cc 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -2240,8 +2240,8 @@ class FederationEventHandler: event_pos = PersistedEventPosition( self._instance_name, event.internal_metadata.stream_ordering ) - await self._notifier.on_new_room_event( - event, event_pos, max_stream_token, extra_users=extra_users + await self._notifier.on_new_room_events( + [(event, event_pos)], max_stream_token, extra_users=extra_users ) if event.type == EventTypes.Member and event.membership == Membership.JOIN: diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 00e7645ba5..da1acea275 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1872,6 +1872,7 @@ class EventCreationHandler: events_and_context, backfilled=backfilled ) + events_and_pos = [] for event in persisted_events: if self._ephemeral_events_enabled: # If there's an expiry timestamp on the event, schedule its expiry. @@ -1880,25 +1881,23 @@ class EventCreationHandler: stream_ordering = event.internal_metadata.stream_ordering assert stream_ordering is not None pos = PersistedEventPosition(self._instance_name, stream_ordering) - - async def _notify() -> None: - try: - await self.notifier.on_new_room_event( - event, pos, max_stream_token, extra_users=extra_users - ) - except Exception: - logger.exception( - "Error notifying about new room event %s", - event.event_id, - ) - - run_in_background(_notify) + events_and_pos.append((event, pos)) if event.type == EventTypes.Message: # We don't want to block sending messages on any presence code. This # matters as sometimes presence code can take a while. run_in_background(self._bump_active_time, requester.user) + async def _notify() -> None: + try: + await self.notifier.on_new_room_events( + events_and_pos, max_stream_token, extra_users=extra_users + ) + except Exception: + logger.exception("Error notifying about new room events") + + run_in_background(_notify) + return persisted_events[-1] async def _maybe_kick_guest_users( |