summary refs log tree commit diff
diff options
context:
space:
mode:
authorBrendan Abolivier <babolivier@matrix.org>2021-05-27 18:10:58 +0200
committerGitHub <noreply@github.com>2021-05-27 17:10:58 +0100
commitf828a70be331105c98ebfbe3738ef57d9d54df5b (patch)
tree8b0db82d8a37be8bff5b1e5dd3371f2ad40caa31
parentMerge tag 'v1.35.0rc2' into develop (diff)
downloadsynapse-f828a70be331105c98ebfbe3738ef57d9d54df5b.tar.xz
Limit the number of events sent over replication when persisting events. (#10082)
Diffstat (limited to '')
-rw-r--r--changelog.d/10082.bugfix1
-rw-r--r--synapse/handlers/federation.py17
2 files changed, 11 insertions, 7 deletions
diff --git a/changelog.d/10082.bugfix b/changelog.d/10082.bugfix
new file mode 100644
index 0000000000..b4f8bcc4fa
--- /dev/null
+++ b/changelog.d/10082.bugfix
@@ -0,0 +1 @@
+Fixed a bug causing replication requests to fail when receiving a lot of events via federation.
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 678f6b7707..bf11315251 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -91,6 +91,7 @@ from synapse.types import (
     get_domain_from_id,
 )
 from synapse.util.async_helpers import Linearizer, concurrently_execute
+from synapse.util.iterutils import batch_iter
 from synapse.util.retryutils import NotRetryingDestination
 from synapse.util.stringutils import shortstr
 from synapse.visibility import filter_events_for_server
@@ -3053,13 +3054,15 @@ class FederationHandler(BaseHandler):
         """
         instance = self.config.worker.events_shard_config.get_instance(room_id)
         if instance != self._instance_name:
-            result = await self._send_events(
-                instance_name=instance,
-                store=self.store,
-                room_id=room_id,
-                event_and_contexts=event_and_contexts,
-                backfilled=backfilled,
-            )
+            # Limit the number of events sent over federation.
+            for batch in batch_iter(event_and_contexts, 1000):
+                result = await self._send_events(
+                    instance_name=instance,
+                    store=self.store,
+                    room_id=room_id,
+                    event_and_contexts=batch,
+                    backfilled=backfilled,
+                )
             return result["max_stream_id"]
         else:
             assert self.storage.persistence