diff options
author | Erik Johnston <erikj@element.io> | 2024-05-14 14:39:11 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-05-14 14:39:11 +0100 |
commit | ebe77381b0e32a063d615b79fb7cbd727222fc4c (patch) | |
tree | 4360938e32688609a688c3841fffe2e2aabd39c6 /synapse/replication/tcp/client.py | |
parent | Improve perf of sync device lists (#17191) (diff) | |
download | synapse-ebe77381b0e32a063d615b79fb7cbd727222fc4c.tar.xz |
Reduce pauses on large device list changes (#17192)
For large accounts waking up all the relevant notifier streams can cause pauses of the reactor.
Diffstat (limited to '')
-rw-r--r-- | synapse/replication/tcp/client.py | 13 |
1 files changed, 10 insertions, 3 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index ba257d34e6..5e5387fdcb 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -55,6 +55,7 @@ from synapse.replication.tcp.streams.partial_state import ( ) from synapse.types import PersistedEventPosition, ReadReceipt, StreamKeyType, UserID from synapse.util.async_helpers import Linearizer, timeout_deferred +from synapse.util.iterutils import batch_iter from synapse.util.metrics import Measure if TYPE_CHECKING: @@ -150,9 +151,15 @@ class ReplicationDataHandler: if row.entity.startswith("@") and not row.is_signature: room_ids = await self.store.get_rooms_for_user(row.entity) all_room_ids.update(room_ids) - self.notifier.on_new_event( - StreamKeyType.DEVICE_LIST, token, rooms=all_room_ids - ) + + # `all_room_ids` can be large, so let's wake up those streams in batches + for batched_room_ids in batch_iter(all_room_ids, 100): + self.notifier.on_new_event( + StreamKeyType.DEVICE_LIST, token, rooms=batched_room_ids + ) + + # Yield to reactor so that we don't block. + await self._clock.sleep(0) elif stream_name == PushersStream.NAME: for row in rows: if row.deleted: |