summary refs log tree commit diff
path: root/synapse/federation/federation_server.py
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2017-10-06 15:31:58 +0100
committerRichard van der Hoff <richard@matrix.org>2017-10-09 18:30:10 +0100
commit4c7c4d4061ae298ce6df445c888b91d3e5791164 (patch)
tree9a0b3ac2d5d2ae478813769aff7b7f6181202381 /synapse/federation/federation_server.py
parentfed server: refactor on_incoming_transaction (diff)
downloadsynapse-4c7c4d4061ae298ce6df445c888b91d3e5791164.tar.xz
Fed server: use a linearizer for ongoing transactions
We don't want to process the same transaction multiple times concurrently, so
use a linearizer.
Diffstat (limited to 'synapse/federation/federation_server.py')
-rw-r--r--synapse/federation/federation_server.py30
1 files changed, 29 insertions, 1 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index fa4ec2ad3c..b2dffa2c3d 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -53,6 +53,7 @@ class FederationServer(FederationBase):
         self.auth = hs.get_auth()
 
         self._server_linearizer = Linearizer("fed_server")
+        self._transaction_linearizer = Linearizer("fed_txn_handler")
 
         # We cache responses to state queries, as they take a while and often
         # come in waves.
@@ -111,12 +112,39 @@ class FederationServer(FederationBase):
     def on_incoming_transaction(self, transaction_data):
         # keep this as early as possible to make the calculated origin ts as
         # accurate as possible.
-        request_time = int(self._clock.time_msec())
+        request_time = self._clock.time_msec()
 
         transaction = Transaction(**transaction_data)
 
+        if not transaction.transaction_id:
+            raise Exception("Transaction missing transaction_id")
+        if not transaction.origin:
+            raise Exception("Transaction missing origin")
+
         logger.debug("[%s] Got transaction", transaction.transaction_id)
 
+        # use a linearizer to ensure that we don't process the same transaction
+        # multiple times in parallel.
+        with (yield self._transaction_linearizer.queue(
+                (transaction.origin, transaction.transaction_id),
+        )):
+            result = yield self._handle_incoming_transaction(
+                transaction, request_time,
+            )
+
+        defer.returnValue(result)
+
+    @defer.inlineCallbacks
+    def _handle_incoming_transaction(self, transaction, request_time):
+        """ Process an incoming transaction and return the HTTP response
+
+        Args:
+            transaction (Transaction): incoming transaction
+            request_time (int): timestamp that the HTTP request arrived at
+
+        Returns:
+            Deferred[(int, object)]: http response code and body
+        """
         response = yield self.transaction_actions.have_responded(transaction)
 
         if response: