diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index e791a1266d..fa4ec2ad3c 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -109,22 +109,11 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks
@log_function
def on_incoming_transaction(self, transaction_data):
- transaction = Transaction(**transaction_data)
-
- received_pdus_counter.inc_by(len(transaction.pdus))
-
- for p in transaction.pdus:
- if "unsigned" in p:
- unsigned = p["unsigned"]
- if "age" in unsigned:
- p["age"] = unsigned["age"]
- if "age" in p:
- p["age_ts"] = int(self._clock.time_msec()) - int(p["age"])
- del p["age"]
+ # keep this as early as possible to make the calculated origin ts as
+ # accurate as possible.
+ request_time = int(self._clock.time_msec())
- pdu_list = [
- self.event_from_pdu_json(p) for p in transaction.pdus
- ]
+ transaction = Transaction(**transaction_data)
logger.debug("[%s] Got transaction", transaction.transaction_id)
@@ -140,17 +129,35 @@ class FederationServer(FederationBase):
logger.debug("[%s] Transaction is new", transaction.transaction_id)
- results = []
+ received_pdus_counter.inc_by(len(transaction.pdus))
+
+ pdu_list = []
+
+ for p in transaction.pdus:
+ if "unsigned" in p:
+ unsigned = p["unsigned"]
+ if "age" in unsigned:
+ p["age"] = unsigned["age"]
+ if "age" in p:
+ p["age_ts"] = request_time - int(p["age"])
+ del p["age"]
+
+ event = self.event_from_pdu_json(p)
+ pdu_list.append(event)
+
+ pdu_results = {}
for pdu in pdu_list:
+ event_id = pdu.event_id
try:
yield self._handle_received_pdu(transaction.origin, pdu)
- results.append({})
+ pdu_results[event_id] = {}
except FederationError as e:
+ logger.warn("Error handling PDU %s: %s", event_id, e)
self.send_failure(e, transaction.origin)
- results.append({"error": str(e)})
+ pdu_results[event_id] = {"error": str(e)}
except Exception as e:
- results.append({"error": str(e)})
+ pdu_results[event_id] = {"error": str(e)}
logger.exception("Failed to handle PDU")
if hasattr(transaction, "edus"):
@@ -164,14 +171,12 @@ class FederationServer(FederationBase):
for failure in getattr(transaction, "pdu_failures", []):
logger.info("Got failure %r", failure)
- logger.debug("Returning: %s", str(results))
-
response = {
- "pdus": dict(zip(
- (p.event_id for p in pdu_list), results
- )),
+ "pdus": pdu_results,
}
+ logger.debug("Returning: %s", str(response))
+
yield self.transaction_actions.set_response(
transaction,
200, response
|