summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2021-04-30 15:17:50 +0100
committerErik Johnston <erik@matrix.org>2021-04-30 15:17:50 +0100
commit49da5e9ec4441db17fd9c3a29456afb04f82bb4d (patch)
tree4d3b360fd2c1ec2ab57f0a5ce0dda8ee93fb79f5
parentLog memory usage (diff)
downloadsynapse-49da5e9ec4441db17fd9c3a29456afb04f82bb4d.tar.xz
Chunk _check_sigs_and_hash_and_fetch
-rw-r--r--synapse/federation/federation_client.py26
1 files changed, 16 insertions, 10 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 93f7b96e36..3153240f91 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -34,7 +34,6 @@ from typing import (
 
 import attr
 import ijson
-from synapse.logging.context import get_thread_resource_usage
 from prometheus_client import Counter
 
 from twisted.internet import defer
@@ -57,11 +56,16 @@ from synapse.api.room_versions import (
 )
 from synapse.events import EventBase, builder
 from synapse.federation.federation_base import FederationBase, event_from_pdu_json
-from synapse.logging.context import make_deferred_yieldable, preserve_fn
+from synapse.logging.context import (
+    get_thread_resource_usage,
+    make_deferred_yieldable,
+    preserve_fn,
+)
 from synapse.logging.utils import log_function
 from synapse.types import JsonDict, get_domain_from_id
 from synapse.util import unwrapFirstError
 from synapse.util.caches.expiringcache import ExpiringCache
+from synapse.util.iterutils import batch_iter
 from synapse.util.retryutils import NotRetryingDestination
 
 if TYPE_CHECKING:
@@ -705,8 +709,6 @@ class FederationClient(FederationBase):
 
             logger.info("Parsed auth chain: %d", len(auth_chain))
 
-            pdus = {p.event_id: p for p in itertools.chain(state, auth_chain)}
-
             create_event = None
             for e in state:
                 if (e.type, e.state_key) == (EventTypes.Create, ""):
@@ -730,12 +732,16 @@ class FederationClient(FederationBase):
                     % (create_room_version,)
                 )
 
-            valid_pdus = await self._check_sigs_and_hash_and_fetch(
-                destination,
-                list(pdus.values()),
-                outlier=True,
-                room_version=room_version,
-            )
+            valid_pdus = []
+
+            for chunk in batch_iter(itertools.chain(state, auth_chain), 1000):
+                new_valid_pdus = await self._check_sigs_and_hash_and_fetch(
+                    destination,
+                    chunk,
+                    outlier=True,
+                    room_version=room_version,
+                )
+                valid_pdus.extend(new_valid_pdus)
 
             logger.info("_check_sigs_and_hash_and_fetch done")