summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/federation/transport/client.py7
-rw-r--r--synapse/handlers/federation.py17
-rw-r--r--synapse/http/matrixfederationclient.py14
-rw-r--r--synapse/http/site.py4
4 files changed, 33 insertions, 9 deletions
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index e93ab83f7f..5b4f5d17f7 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -35,6 +35,11 @@ from synapse.types import JsonDict
 
 logger = logging.getLogger(__name__)
 
+# Send join responses can be huge, so we set a separate limit here. The response
+# is parsed in a streaming manner, which helps alleviate the issue of memory
+# usage a bit.
+MAX_RESPONSE_SIZE_SEND_JOIN = 500 * 1024 * 1024
+
 
 class TransportLayerClient:
     """Sends federation HTTP requests to other servers"""
@@ -261,6 +266,7 @@ class TransportLayerClient:
             path=path,
             data=content,
             parser=SendJoinParser(room_version, v1_api=True),
+            max_response_size=MAX_RESPONSE_SIZE_SEND_JOIN,
         )
 
         return response
@@ -276,6 +282,7 @@ class TransportLayerClient:
             path=path,
             data=content,
             parser=SendJoinParser(room_version, v1_api=False),
+            max_response_size=MAX_RESPONSE_SIZE_SEND_JOIN,
         )
 
         return response
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 678f6b7707..bf11315251 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -91,6 +91,7 @@ from synapse.types import (
     get_domain_from_id,
 )
 from synapse.util.async_helpers import Linearizer, concurrently_execute
+from synapse.util.iterutils import batch_iter
 from synapse.util.retryutils import NotRetryingDestination
 from synapse.util.stringutils import shortstr
 from synapse.visibility import filter_events_for_server
@@ -3053,13 +3054,15 @@ class FederationHandler(BaseHandler):
         """
         instance = self.config.worker.events_shard_config.get_instance(room_id)
         if instance != self._instance_name:
-            result = await self._send_events(
-                instance_name=instance,
-                store=self.store,
-                room_id=room_id,
-                event_and_contexts=event_and_contexts,
-                backfilled=backfilled,
-            )
+            # Limit the number of events sent over federation.
+            for batch in batch_iter(event_and_contexts, 1000):
+                result = await self._send_events(
+                    instance_name=instance,
+                    store=self.store,
+                    room_id=room_id,
+                    event_and_contexts=batch,
+                    backfilled=backfilled,
+                )
             return result["max_stream_id"]
         else:
             assert self.storage.persistence
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index f5503b394b..1998990a14 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -205,6 +205,7 @@ async def _handle_response(
     response: IResponse,
     start_ms: int,
     parser: ByteParser[T],
+    max_response_size: Optional[int] = None,
 ) -> T:
     """
     Reads the body of a response with a timeout and sends it to a parser
@@ -216,15 +217,20 @@ async def _handle_response(
         response: response to the request
         start_ms: Timestamp when request was made
         parser: The parser for the response
+        max_response_size: The maximum size to read from the response, if None
+            uses the default.
 
     Returns:
         The parsed response
     """
 
+    if max_response_size is None:
+        max_response_size = MAX_RESPONSE_SIZE
+
     try:
         check_content_type_is(response.headers, parser.CONTENT_TYPE)
 
-        d = read_body_with_max_size(response, parser, MAX_RESPONSE_SIZE)
+        d = read_body_with_max_size(response, parser, max_response_size)
         d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor)
 
         length = await make_deferred_yieldable(d)
@@ -735,6 +741,7 @@ class MatrixFederationHttpClient:
         backoff_on_404: bool = False,
         try_trailing_slash_on_400: bool = False,
         parser: Literal[None] = None,
+        max_response_size: Optional[int] = None,
     ) -> Union[JsonDict, list]:
         ...
 
@@ -752,6 +759,7 @@ class MatrixFederationHttpClient:
         backoff_on_404: bool = False,
         try_trailing_slash_on_400: bool = False,
         parser: Optional[ByteParser[T]] = None,
+        max_response_size: Optional[int] = None,
     ) -> T:
         ...
 
@@ -768,6 +776,7 @@ class MatrixFederationHttpClient:
         backoff_on_404: bool = False,
         try_trailing_slash_on_400: bool = False,
         parser: Optional[ByteParser] = None,
+        max_response_size: Optional[int] = None,
     ):
         """Sends the specified json data using PUT
 
@@ -803,6 +812,8 @@ class MatrixFederationHttpClient:
                 enabled.
             parser: The parser to use to decode the response. Defaults to
                 parsing as JSON.
+            max_response_size: The maximum size to read from the response, if None
+                uses the default.
 
         Returns:
             Succeeds when we get a 2xx HTTP response. The
@@ -853,6 +864,7 @@ class MatrixFederationHttpClient:
             response,
             start_ms,
             parser=parser,
+            max_response_size=max_response_size,
         )
 
         return body
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 671fd3fbcc..40754b7bea 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -105,8 +105,10 @@ class SynapseRequest(Request):
         assert self.content, "handleContentChunk() called before gotLength()"
         if self.content.tell() + len(data) > self._max_request_body_size:
             logger.warning(
-                "Aborting connection from %s because the request exceeds maximum size",
+                "Aborting connection from %s because the request exceeds maximum size: %s %s",
                 self.client,
+                self.get_method(),
+                self.get_redacted_uri(),
             )
             self.transport.abortConnection()
             return