diff options
author | Brendan Abolivier <babolivier@matrix.org> | 2021-05-27 18:10:58 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-05-27 17:10:58 +0100 |
commit | f828a70be331105c98ebfbe3738ef57d9d54df5b (patch) | |
tree | 8b0db82d8a37be8bff5b1e5dd3371f2ad40caa31 | |
parent | Merge tag 'v1.35.0rc2' into develop (diff) | |
download | synapse-f828a70be331105c98ebfbe3738ef57d9d54df5b.tar.xz |
Limit the number of events sent over replication when persisting events. (#10082)
Diffstat (limited to '')
-rw-r--r-- | changelog.d/10082.bugfix | 1 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 17 |
2 files changed, 11 insertions, 7 deletions
diff --git a/changelog.d/10082.bugfix b/changelog.d/10082.bugfix new file mode 100644 index 0000000000..b4f8bcc4fa --- /dev/null +++ b/changelog.d/10082.bugfix @@ -0,0 +1 @@ +Fixed a bug causing replication requests to fail when receiving a lot of events via federation. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 678f6b7707..bf11315251 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -91,6 +91,7 @@ from synapse.types import ( get_domain_from_id, ) from synapse.util.async_helpers import Linearizer, concurrently_execute +from synapse.util.iterutils import batch_iter from synapse.util.retryutils import NotRetryingDestination from synapse.util.stringutils import shortstr from synapse.visibility import filter_events_for_server @@ -3053,13 +3054,15 @@ class FederationHandler(BaseHandler): """ instance = self.config.worker.events_shard_config.get_instance(room_id) if instance != self._instance_name: - result = await self._send_events( - instance_name=instance, - store=self.store, - room_id=room_id, - event_and_contexts=event_and_contexts, - backfilled=backfilled, - ) + # Limit the number of events sent over federation. + for batch in batch_iter(event_and_contexts, 1000): + result = await self._send_events( + instance_name=instance, + store=self.store, + room_id=room_id, + event_and_contexts=batch, + backfilled=backfilled, + ) return result["max_stream_id"] else: assert self.storage.persistence |