diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index c79ce44688..a48a7ac15f 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -292,8 +292,8 @@ class ReplicationLayer(object):
transaction = Transaction(**transaction_data)
for p in transaction.pdus:
- if "age" in p:
- p["age_ts"] = int(self.clock.time_msec()) - int(p["age"])
+ if "age_ts" in p:
+ p["age"] = int(self._clock.time_msec()) - int(p["age_ts"])
pdu_list = [Pdu(**p) for p in transaction.pdus]
@@ -602,8 +602,21 @@ class _TransactionQueue(object):
logger.debug("TX [%s] Sending transaction...", destination)
# Actually send the transaction
+
+ # FIXME (erikj): This is a bit of a hack to make the Pdu age
+ # keys work
+ def cb(transaction):
+ now = int(self._clock.time_msec())
+ if "pdus" in transaction:
+ for p in transaction["pdus"]:
+ if "age_ts" in p:
+ p["age"] = now - int(p["age_ts"])
+
+ return transaction
+
code, response = yield self.transport_layer.send_transaction(
- transaction
+ transaction,
+ on_send_callback=cb,
)
logger.debug("TX [%s] Sent transaction", destination)
diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py
index 6e62ae7c74..afc777ec9e 100644
--- a/synapse/federation/transport.py
+++ b/synapse/federation/transport.py
@@ -144,7 +144,7 @@ class TransportLayer(object):
@defer.inlineCallbacks
@log_function
- def send_transaction(self, transaction):
+ def send_transaction(self, transaction, on_send_callback=None):
""" Sends the given Transaction to it's destination
Args:
@@ -165,10 +165,23 @@ class TransportLayer(object):
data = transaction.get_dict()
+ # FIXME (erikj): This is a bit of a hack to make the Pdu age
+ # keys work
+ def cb(destination, method, path_bytes, producer):
+ if not on_send_callback:
+ return
+
+ transaction = json.loads(producer.body)
+
+ new_transaction = on_send_callback(transaction)
+
+ producer.reset(new_transaction)
+
code, response = yield self.client.put_json(
transaction.destination,
path=PREFIX + "/send/%s/" % transaction.transaction_id,
- data=data
+ data=data,
+ on_send_callback=cb,
)
logger.debug(
diff --git a/synapse/http/client.py b/synapse/http/client.py
index ece6318e00..eb11bfd4d5 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -122,7 +122,7 @@ class TwistedHttpClient(HttpClient):
self.hs = hs
@defer.inlineCallbacks
- def put_json(self, destination, path, data):
+ def put_json(self, destination, path, data, on_send_callback=None):
if destination in _destination_mappings:
destination = _destination_mappings[destination]
@@ -131,7 +131,8 @@ class TwistedHttpClient(HttpClient):
"PUT",
path.encode("ascii"),
producer=_JsonProducer(data),
- headers_dict={"Content-Type": ["application/json"]}
+ headers_dict={"Content-Type": ["application/json"]},
+ on_send_callback=on_send_callback,
)
logger.debug("Getting resp body")
@@ -218,7 +219,7 @@ class TwistedHttpClient(HttpClient):
@defer.inlineCallbacks
def _create_request(self, destination, method, path_bytes, param_bytes=b"",
query_bytes=b"", producer=None, headers_dict={},
- retry_on_dns_fail=True):
+ retry_on_dns_fail=True, on_send_callback=None):
""" Creates and sends a request to the given url
"""
headers_dict[b"User-Agent"] = [b"Synapse"]
@@ -242,6 +243,9 @@ class TwistedHttpClient(HttpClient):
endpoint = self._getEndpoint(reactor, destination);
while True:
+ if on_send_callback:
+ on_send_callback(destination, method, path_bytes, producer)
+
try:
response = yield self.agent.request(
destination,
@@ -310,6 +314,9 @@ class _JsonProducer(object):
""" Used by the twisted http client to create the HTTP body from json
"""
def __init__(self, jsn):
+ self.reset(jsn)
+
+ def reset(self, jsn):
self.body = encode_canonical_json(jsn)
self.length = len(self.body)
|