diff --git a/changelog.d/10082.bugfix b/changelog.d/10082.bugfix
new file mode 100644
index 0000000000..b4f8bcc4fa
--- /dev/null
+++ b/changelog.d/10082.bugfix
@@ -0,0 +1 @@
+Fixed a bug causing replication requests to fail when receiving a lot of events via federation.
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 678f6b7707..bf11315251 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -91,6 +91,7 @@ from synapse.types import (
get_domain_from_id,
)
from synapse.util.async_helpers import Linearizer, concurrently_execute
+from synapse.util.iterutils import batch_iter
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import shortstr
from synapse.visibility import filter_events_for_server
@@ -3053,13 +3054,15 @@ class FederationHandler(BaseHandler):
"""
instance = self.config.worker.events_shard_config.get_instance(room_id)
if instance != self._instance_name:
- result = await self._send_events(
- instance_name=instance,
- store=self.store,
- room_id=room_id,
- event_and_contexts=event_and_contexts,
- backfilled=backfilled,
- )
+ # Limit the number of events sent over federation.
+ for batch in batch_iter(event_and_contexts, 1000):
+ result = await self._send_events(
+ instance_name=instance,
+ store=self.store,
+ room_id=room_id,
+ event_and_contexts=batch,
+ backfilled=backfilled,
+ )
return result["max_stream_id"]
else:
assert self.storage.persistence
|