From ebe77381b0e32a063d615b79fb7cbd727222fc4c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 14 May 2024 14:39:11 +0100 Subject: Reduce pauses on large device list changes (#17192) For large accounts waking up all the relevant notifier streams can cause pauses of the reactor. --- synapse/replication/tcp/client.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) (limited to 'synapse/replication') diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index ba257d34e6..5e5387fdcb 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -55,6 +55,7 @@ from synapse.replication.tcp.streams.partial_state import ( ) from synapse.types import PersistedEventPosition, ReadReceipt, StreamKeyType, UserID from synapse.util.async_helpers import Linearizer, timeout_deferred +from synapse.util.iterutils import batch_iter from synapse.util.metrics import Measure if TYPE_CHECKING: @@ -150,9 +151,15 @@ class ReplicationDataHandler: if row.entity.startswith("@") and not row.is_signature: room_ids = await self.store.get_rooms_for_user(row.entity) all_room_ids.update(room_ids) - self.notifier.on_new_event( - StreamKeyType.DEVICE_LIST, token, rooms=all_room_ids - ) + + # `all_room_ids` can be large, so let's wake up those streams in batches + for batched_room_ids in batch_iter(all_room_ids, 100): + self.notifier.on_new_event( + StreamKeyType.DEVICE_LIST, token, rooms=batched_room_ids + ) + + # Yield to reactor so that we don't block. + await self._clock.sleep(0) elif stream_name == PushersStream.NAME: for row in rows: if row.deleted: -- cgit 1.4.1