summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/federation/federation_server.py53
1 files changed, 34 insertions, 19 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index b2dffa2c3d..f00d59e701 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -12,14 +12,12 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-
 from twisted.internet import defer
 
 from .federation_base import FederationBase
 from .units import Transaction, Edu
 
-from synapse.util.async import Linearizer
+from synapse.util import async
 from synapse.util.logutils import log_function
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.events import FrozenEvent
@@ -33,6 +31,9 @@ from synapse.crypto.event_signing import compute_event_signature
 import simplejson as json
 import logging
 
+# when processing incoming transactions, we try to handle multiple rooms in
+# parallel, up to this limit.
+TRANSACTION_CONCURRENCY_LIMIT = 10
 
 logger = logging.getLogger(__name__)
 
@@ -52,8 +53,8 @@ class FederationServer(FederationBase):
 
         self.auth = hs.get_auth()
 
-        self._server_linearizer = Linearizer("fed_server")
-        self._transaction_linearizer = Linearizer("fed_txn_handler")
+        self._server_linearizer = async.Linearizer("fed_server")
+        self._transaction_linearizer = async.Linearizer("fed_txn_handler")
 
         # We cache responses to state queries, as they take a while and often
         # come in waves.
@@ -159,7 +160,7 @@ class FederationServer(FederationBase):
 
         received_pdus_counter.inc_by(len(transaction.pdus))
 
-        pdu_list = []
+        pdus_by_room = {}
 
         for p in transaction.pdus:
             if "unsigned" in p:
@@ -171,22 +172,36 @@ class FederationServer(FederationBase):
                 del p["age"]
 
             event = self.event_from_pdu_json(p)
-            pdu_list.append(event)
+            room_id = event.room_id
+            pdus_by_room.setdefault(room_id, []).append(event)
 
         pdu_results = {}
 
-        for pdu in pdu_list:
-            event_id = pdu.event_id
-            try:
-                yield self._handle_received_pdu(transaction.origin, pdu)
-                pdu_results[event_id] = {}
-            except FederationError as e:
-                logger.warn("Error handling PDU %s: %s", event_id, e)
-                self.send_failure(e, transaction.origin)
-                pdu_results[event_id] = {"error": str(e)}
-            except Exception as e:
-                pdu_results[event_id] = {"error": str(e)}
-                logger.exception("Failed to handle PDU")
+        # we can process different rooms in parallel (which is useful if they
+        # require callouts to other servers to fetch missing events), but
+        # impose a limit to avoid going too crazy with ram/cpu.
+        @defer.inlineCallbacks
+        def process_pdus_for_room(room_id):
+            logger.debug("Processing PDUs for %s", room_id)
+            for pdu in pdus_by_room[room_id]:
+                event_id = pdu.event_id
+                try:
+                    yield self._handle_received_pdu(
+                        transaction.origin, pdu
+                    )
+                    pdu_results[event_id] = {}
+                except FederationError as e:
+                    logger.warn("Error handling PDU %s: %s", event_id, e)
+                    self.send_failure(e, transaction.origin)
+                    pdu_results[event_id] = {"error": str(e)}
+                except Exception as e:
+                    pdu_results[event_id] = {"error": str(e)}
+                    logger.exception("Failed to handle PDU %s", event_id)
+
+        yield async.concurrently_execute(
+            process_pdus_for_room, pdus_by_room.keys(),
+            TRANSACTION_CONCURRENCY_LIMIT,
+        )
 
         if hasattr(transaction, "edus"):
             for edu in (Edu(**x) for x in transaction.edus):