summary refs log tree commit diff
diff options
context:
space:
mode:
authorRichard van der Hoff <github@rvanderhoff.org.uk>2017-10-09 18:19:23 +0100
committerGitHub <noreply@github.com>2017-10-09 18:19:23 +0100
commit4d24becf7fb30214ad2494131b37d6fad5534433 (patch)
treeacd8595d9b4de8ed66894dbb29a9f51763dd4fa2
parentMerge pull request #2516 from matrix-org/rav/fix_fed_server_origin_check (diff)
parentfed server: refactor on_incoming_transaction (diff)
downloadsynapse-4d24becf7fb30214ad2494131b37d6fad5534433.tar.xz
Merge pull request #2517 from matrix-org/rav/fed_server_refactor
fed server: refactor on_incoming_transaction
Diffstat (limited to '')
-rw-r--r--synapse/federation/federation_server.py53
1 files changed, 29 insertions, 24 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index e791a1266d..fa4ec2ad3c 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -109,22 +109,11 @@ class FederationServer(FederationBase):
     @defer.inlineCallbacks
     @log_function
     def on_incoming_transaction(self, transaction_data):
-        transaction = Transaction(**transaction_data)
-
-        received_pdus_counter.inc_by(len(transaction.pdus))
-
-        for p in transaction.pdus:
-            if "unsigned" in p:
-                unsigned = p["unsigned"]
-                if "age" in unsigned:
-                    p["age"] = unsigned["age"]
-            if "age" in p:
-                p["age_ts"] = int(self._clock.time_msec()) - int(p["age"])
-                del p["age"]
+        # keep this as early as possible to make the calculated origin ts as
+        # accurate as possible.
+        request_time = int(self._clock.time_msec())
 
-        pdu_list = [
-            self.event_from_pdu_json(p) for p in transaction.pdus
-        ]
+        transaction = Transaction(**transaction_data)
 
         logger.debug("[%s] Got transaction", transaction.transaction_id)
 
@@ -140,17 +129,35 @@ class FederationServer(FederationBase):
 
         logger.debug("[%s] Transaction is new", transaction.transaction_id)
 
-        results = []
+        received_pdus_counter.inc_by(len(transaction.pdus))
+
+        pdu_list = []
+
+        for p in transaction.pdus:
+            if "unsigned" in p:
+                unsigned = p["unsigned"]
+                if "age" in unsigned:
+                    p["age"] = unsigned["age"]
+            if "age" in p:
+                p["age_ts"] = request_time - int(p["age"])
+                del p["age"]
+
+            event = self.event_from_pdu_json(p)
+            pdu_list.append(event)
+
+        pdu_results = {}
 
         for pdu in pdu_list:
+            event_id = pdu.event_id
             try:
                 yield self._handle_received_pdu(transaction.origin, pdu)
-                results.append({})
+                pdu_results[event_id] = {}
             except FederationError as e:
+                logger.warn("Error handling PDU %s: %s", event_id, e)
                 self.send_failure(e, transaction.origin)
-                results.append({"error": str(e)})
+                pdu_results[event_id] = {"error": str(e)}
             except Exception as e:
-                results.append({"error": str(e)})
+                pdu_results[event_id] = {"error": str(e)}
                 logger.exception("Failed to handle PDU")
 
         if hasattr(transaction, "edus"):
@@ -164,14 +171,12 @@ class FederationServer(FederationBase):
             for failure in getattr(transaction, "pdu_failures", []):
                 logger.info("Got failure %r", failure)
 
-        logger.debug("Returning: %s", str(results))
-
         response = {
-            "pdus": dict(zip(
-                (p.event_id for p in pdu_list), results
-            )),
+            "pdus": pdu_results,
         }
 
+        logger.debug("Returning: %s", str(response))
+
         yield self.transaction_actions.set_response(
             transaction,
             200, response