summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erikj@element.io>2024-05-14 14:39:11 +0100
committerGitHub <noreply@github.com>2024-05-14 14:39:11 +0100
commitebe77381b0e32a063d615b79fb7cbd727222fc4c (patch)
tree4360938e32688609a688c3841fffe2e2aabd39c6
parentImprove perf of sync device lists (#17191) (diff)
downloadsynapse-ebe77381b0e32a063d615b79fb7cbd727222fc4c.tar.xz
Reduce pauses on large device list changes (#17192)
For large accounts waking up all the relevant notifier streams can cause
pauses of the reactor.
-rw-r--r--changelog.d/17192.misc1
-rw-r--r--synapse/replication/tcp/client.py13
2 files changed, 11 insertions, 3 deletions
diff --git a/changelog.d/17192.misc b/changelog.d/17192.misc
new file mode 100644

index 0000000000..25e157a50a --- /dev/null +++ b/changelog.d/17192.misc
@@ -0,0 +1 @@ +Improve performance by fixing a reactor pause. diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index ba257d34e6..5e5387fdcb 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py
@@ -55,6 +55,7 @@ from synapse.replication.tcp.streams.partial_state import ( ) from synapse.types import PersistedEventPosition, ReadReceipt, StreamKeyType, UserID from synapse.util.async_helpers import Linearizer, timeout_deferred +from synapse.util.iterutils import batch_iter from synapse.util.metrics import Measure if TYPE_CHECKING: @@ -150,9 +151,15 @@ class ReplicationDataHandler: if row.entity.startswith("@") and not row.is_signature: room_ids = await self.store.get_rooms_for_user(row.entity) all_room_ids.update(room_ids) - self.notifier.on_new_event( - StreamKeyType.DEVICE_LIST, token, rooms=all_room_ids - ) + + # `all_room_ids` can be large, so let's wake up those streams in batches + for batched_room_ids in batch_iter(all_room_ids, 100): + self.notifier.on_new_event( + StreamKeyType.DEVICE_LIST, token, rooms=batched_room_ids + ) + + # Yield to reactor so that we don't block. + await self._clock.sleep(0) elif stream_name == PushersStream.NAME: for row in rows: if row.deleted: