summary refs log tree commit diff
path: root/synapse/replication/tcp/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/client.py')
-rw-r--r--synapse/replication/tcp/client.py13
1 files changed, 10 insertions, 3 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index ba257d34e6..5e5387fdcb 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -55,6 +55,7 @@ from synapse.replication.tcp.streams.partial_state import (
 )
 from synapse.types import PersistedEventPosition, ReadReceipt, StreamKeyType, UserID
 from synapse.util.async_helpers import Linearizer, timeout_deferred
+from synapse.util.iterutils import batch_iter
 from synapse.util.metrics import Measure
 
 if TYPE_CHECKING:
@@ -150,9 +151,15 @@ class ReplicationDataHandler:
                 if row.entity.startswith("@") and not row.is_signature:
                     room_ids = await self.store.get_rooms_for_user(row.entity)
                     all_room_ids.update(room_ids)
-            self.notifier.on_new_event(
-                StreamKeyType.DEVICE_LIST, token, rooms=all_room_ids
-            )
+
+            # `all_room_ids` can be large, so let's wake up those streams in batches
+            for batched_room_ids in batch_iter(all_room_ids, 100):
+                self.notifier.on_new_event(
+                    StreamKeyType.DEVICE_LIST, token, rooms=batched_room_ids
+                )
+
+                # Yield to reactor so that we don't block.
+                await self._clock.sleep(0)
         elif stream_name == PushersStream.NAME:
             for row in rows:
                 if row.deleted: