summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/replication.py31
-rw-r--r--synapse/federation/transport.py17
2 files changed, 42 insertions, 6 deletions
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index e12510017f..96b82f00cb 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -291,6 +291,13 @@ class ReplicationLayer(object):
     def on_incoming_transaction(self, transaction_data):
         transaction = Transaction(**transaction_data)
 
+        for p in transaction.pdus:
+            if "age" in p:
+                p["age_ts"] = int(self._clock.time_msec()) - int(p["age"])
+                del p["age"]
+
+        pdu_list = [Pdu(**p) for p in transaction.pdus]
+
         logger.debug("[%s] Got transaction", transaction.transaction_id)
 
         response = yield self.transaction_actions.have_responded(transaction)
@@ -303,8 +310,6 @@ class ReplicationLayer(object):
 
         logger.debug("[%s] Transacition is new", transaction.transaction_id)
 
-        pdu_list = [Pdu(**p) for p in transaction.pdus]
-
         dl = []
         for pdu in pdu_list:
             dl.append(self._handle_new_pdu(pdu))
@@ -405,9 +410,14 @@ class ReplicationLayer(object):
         """Returns a new Transaction containing the given PDUs suitable for
         transmission.
         """
+        pdus = [p.get_dict() for p in pdu_list]
+        for p in pdus:
+            if "age_ts" in pdus:
+                p["age"] = int(self.clock.time_msec()) - p["age_ts"]
+
         return Transaction(
-            pdus=[p.get_dict() for p in pdu_list],
             origin=self.server_name,
+            pdus=pdus,
             ts=int(self._clock.time_msec()),
             destination=None,
         )
@@ -593,8 +603,21 @@ class _TransactionQueue(object):
             logger.debug("TX [%s] Sending transaction...", destination)
 
             # Actually send the transaction
+
+            # FIXME (erikj): This is a bit of a hack to make the Pdu age
+            # keys work
+            def cb(transaction):
+                now = int(self._clock.time_msec())
+                if "pdus" in transaction:
+                    for p in transaction["pdus"]:
+                        if "age_ts" in p:
+                            p["age"] = now - int(p["age_ts"])
+
+                return transaction
+
             code, response = yield self.transport_layer.send_transaction(
-                transaction
+                transaction,
+                on_send_callback=cb,
             )
 
             logger.debug("TX [%s] Sent transaction", destination)
diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py
index 6e62ae7c74..afc777ec9e 100644
--- a/synapse/federation/transport.py
+++ b/synapse/federation/transport.py
@@ -144,7 +144,7 @@ class TransportLayer(object):
 
     @defer.inlineCallbacks
     @log_function
-    def send_transaction(self, transaction):
+    def send_transaction(self, transaction, on_send_callback=None):
         """ Sends the given Transaction to it's destination
 
         Args:
@@ -165,10 +165,23 @@ class TransportLayer(object):
 
         data = transaction.get_dict()
 
+        # FIXME (erikj): This is a bit of a hack to make the Pdu age
+        # keys work
+        def cb(destination, method, path_bytes, producer):
+            if not on_send_callback:
+                return
+
+            transaction = json.loads(producer.body)
+
+            new_transaction = on_send_callback(transaction)
+
+            producer.reset(new_transaction)
+
         code, response = yield self.client.put_json(
             transaction.destination,
             path=PREFIX + "/send/%s/" % transaction.transaction_id,
-            data=data
+            data=data,
+            on_send_callback=cb,
         )
 
         logger.debug(