diff options
author | Richard van der Hoff <richard@matrix.org> | 2017-10-06 15:18:58 +0100 |
---|---|---|
committer | Richard van der Hoff <richard@matrix.org> | 2017-10-09 18:10:53 +0100 |
commit | ba5b9b80a56a449ffab44afaf4661d5b44277898 (patch) | |
tree | acd8595d9b4de8ed66894dbb29a9f51763dd4fa2 /synapse/federation/federation_server.py | |
parent | Merge pull request #2516 from matrix-org/rav/fix_fed_server_origin_check (diff) | |
download | synapse-ba5b9b80a56a449ffab44afaf4661d5b44277898.tar.xz |
fed server: refactor on_incoming_transaction
Move as much as possible to after the have_responded check, and reduce the number of times we iterate over the pdu list.
Diffstat (limited to 'synapse/federation/federation_server.py')
-rw-r--r-- | synapse/federation/federation_server.py | 53 |
1 files changed, 29 insertions, 24 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index e791a1266d..fa4ec2ad3c 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -109,22 +109,11 @@ class FederationServer(FederationBase): @defer.inlineCallbacks @log_function def on_incoming_transaction(self, transaction_data): - transaction = Transaction(**transaction_data) - - received_pdus_counter.inc_by(len(transaction.pdus)) - - for p in transaction.pdus: - if "unsigned" in p: - unsigned = p["unsigned"] - if "age" in unsigned: - p["age"] = unsigned["age"] - if "age" in p: - p["age_ts"] = int(self._clock.time_msec()) - int(p["age"]) - del p["age"] + # keep this as early as possible to make the calculated origin ts as + # accurate as possible. + request_time = int(self._clock.time_msec()) - pdu_list = [ - self.event_from_pdu_json(p) for p in transaction.pdus - ] + transaction = Transaction(**transaction_data) logger.debug("[%s] Got transaction", transaction.transaction_id) @@ -140,17 +129,35 @@ class FederationServer(FederationBase): logger.debug("[%s] Transaction is new", transaction.transaction_id) - results = [] + received_pdus_counter.inc_by(len(transaction.pdus)) + + pdu_list = [] + + for p in transaction.pdus: + if "unsigned" in p: + unsigned = p["unsigned"] + if "age" in unsigned: + p["age"] = unsigned["age"] + if "age" in p: + p["age_ts"] = request_time - int(p["age"]) + del p["age"] + + event = self.event_from_pdu_json(p) + pdu_list.append(event) + + pdu_results = {} for pdu in pdu_list: + event_id = pdu.event_id try: yield self._handle_received_pdu(transaction.origin, pdu) - results.append({}) + pdu_results[event_id] = {} except FederationError as e: + logger.warn("Error handling PDU %s: %s", event_id, e) self.send_failure(e, transaction.origin) - results.append({"error": str(e)}) + pdu_results[event_id] = {"error": str(e)} except Exception as e: - results.append({"error": str(e)}) + pdu_results[event_id] = {"error": str(e)} logger.exception("Failed to handle PDU") if hasattr(transaction, "edus"): @@ -164,14 +171,12 @@ class FederationServer(FederationBase): for failure in getattr(transaction, "pdu_failures", []): logger.info("Got failure %r", failure) - logger.debug("Returning: %s", str(results)) - response = { - "pdus": dict(zip( - (p.event_id for p in pdu_list), results - )), + "pdus": pdu_results, } + logger.debug("Returning: %s", str(response)) + yield self.transaction_actions.set_response( transaction, 200, response |