diff options
author | Richard van der Hoff <github@rvanderhoff.org.uk> | 2017-10-10 10:20:51 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-10-10 10:20:51 +0100 |
commit | 426f8b0f66006492080a5a14b9486c071048dca3 (patch) | |
tree | 9a0b3ac2d5d2ae478813769aff7b7f6181202381 | |
parent | Merge pull request #2517 from matrix-org/rav/fed_server_refactor (diff) | |
parent | Fed server: use a linearizer for ongoing transactions (diff) | |
download | synapse-426f8b0f66006492080a5a14b9486c071048dca3.tar.xz |
Merge pull request #2518 from matrix-org/rav/linearize_incoming_transactions
Fed server: use a linearizer for ongoing transactions
-rw-r--r-- | synapse/federation/federation_server.py | 30 |
1 files changed, 29 insertions, 1 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index fa4ec2ad3c..b2dffa2c3d 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -53,6 +53,7 @@ class FederationServer(FederationBase): self.auth = hs.get_auth() self._server_linearizer = Linearizer("fed_server") + self._transaction_linearizer = Linearizer("fed_txn_handler") # We cache responses to state queries, as they take a while and often # come in waves. @@ -111,12 +112,39 @@ class FederationServer(FederationBase): def on_incoming_transaction(self, transaction_data): # keep this as early as possible to make the calculated origin ts as # accurate as possible. - request_time = int(self._clock.time_msec()) + request_time = self._clock.time_msec() transaction = Transaction(**transaction_data) + if not transaction.transaction_id: + raise Exception("Transaction missing transaction_id") + if not transaction.origin: + raise Exception("Transaction missing origin") + logger.debug("[%s] Got transaction", transaction.transaction_id) + # use a linearizer to ensure that we don't process the same transaction + # multiple times in parallel. + with (yield self._transaction_linearizer.queue( + (transaction.origin, transaction.transaction_id), + )): + result = yield self._handle_incoming_transaction( + transaction, request_time, + ) + + defer.returnValue(result) + + @defer.inlineCallbacks + def _handle_incoming_transaction(self, transaction, request_time): + """ Process an incoming transaction and return the HTTP response + + Args: + transaction (Transaction): incoming transaction + request_time (int): timestamp that the HTTP request arrived at + + Returns: + Deferred[(int, object)]: http response code and body + """ response = yield self.transaction_actions.have_responded(transaction) if response: |