Limit the number of events sent over replication when persisting events. (#10082)
1 files changed, 10 insertions, 7 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 678f6b7707..bf11315251 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -91,6 +91,7 @@ from synapse.types import (
get_domain_from_id,
)
from synapse.util.async_helpers import Linearizer, concurrently_execute
+from synapse.util.iterutils import batch_iter
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import shortstr
from synapse.visibility import filter_events_for_server
@@ -3053,13 +3054,15 @@ class FederationHandler(BaseHandler):
"""
instance = self.config.worker.events_shard_config.get_instance(room_id)
if instance != self._instance_name:
- result = await self._send_events(
- instance_name=instance,
- store=self.store,
- room_id=room_id,
- event_and_contexts=event_and_contexts,
- backfilled=backfilled,
- )
+ # Limit the number of events sent over federation.
+ for batch in batch_iter(event_and_contexts, 1000):
+ result = await self._send_events(
+ instance_name=instance,
+ store=self.store,
+ room_id=room_id,
+ event_and_contexts=batch,
+ backfilled=backfilled,
+ )
return result["max_stream_id"]
else:
assert self.storage.persistence
|