From 5bd9369a62c6d6cf677e9eef7d58096449542cdf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Sep 2014 13:26:05 +0100 Subject: Correctly handle the 'age' key in events and pdus --- synapse/federation/replication.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) (limited to 'synapse/federation/replication.py') diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index e12510017f..c79ce44688 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -291,6 +291,12 @@ class ReplicationLayer(object): def on_incoming_transaction(self, transaction_data): transaction = Transaction(**transaction_data) + for p in transaction.pdus: + if "age" in p: + p["age_ts"] = int(self.clock.time_msec()) - int(p["age"]) + + pdu_list = [Pdu(**p) for p in transaction.pdus] + logger.debug("[%s] Got transaction", transaction.transaction_id) response = yield self.transaction_actions.have_responded(transaction) @@ -303,8 +309,6 @@ class ReplicationLayer(object): logger.debug("[%s] Transacition is new", transaction.transaction_id) - pdu_list = [Pdu(**p) for p in transaction.pdus] - dl = [] for pdu in pdu_list: dl.append(self._handle_new_pdu(pdu)) @@ -405,9 +409,14 @@ class ReplicationLayer(object): """Returns a new Transaction containing the given PDUs suitable for transmission. """ + pdus = [p.get_dict() for p in pdu_list] + for p in pdus: + if "age_ts" in pdus: + p["age"] = int(self.clock.time_msec()) - p["age_ts"] + return Transaction( - pdus=[p.get_dict() for p in pdu_list], origin=self.server_name, + pdus=pdus, ts=int(self._clock.time_msec()), destination=None, ) -- cgit 1.4.1 From 6ac0b4ade86d1bdb59c01ff8edff6b149cf1981e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Sep 2014 14:54:25 +0100 Subject: Fix 'age' key to update on retries --- synapse/federation/replication.py | 19 ++++++++++++++++--- synapse/federation/transport.py | 17 +++++++++++++++-- synapse/http/client.py | 13 ++++++++++--- 3 files changed, 41 insertions(+), 8 deletions(-) (limited to 'synapse/federation/replication.py') diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index c79ce44688..a48a7ac15f 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -292,8 +292,8 @@ class ReplicationLayer(object): transaction = Transaction(**transaction_data) for p in transaction.pdus: - if "age" in p: - p["age_ts"] = int(self.clock.time_msec()) - int(p["age"]) + if "age_ts" in p: + p["age"] = int(self._clock.time_msec()) - int(p["age_ts"]) pdu_list = [Pdu(**p) for p in transaction.pdus] @@ -602,8 +602,21 @@ class _TransactionQueue(object): logger.debug("TX [%s] Sending transaction...", destination) # Actually send the transaction + + # FIXME (erikj): This is a bit of a hack to make the Pdu age + # keys work + def cb(transaction): + now = int(self._clock.time_msec()) + if "pdus" in transaction: + for p in transaction["pdus"]: + if "age_ts" in p: + p["age"] = now - int(p["age_ts"]) + + return transaction + code, response = yield self.transport_layer.send_transaction( - transaction + transaction, + on_send_callback=cb, ) logger.debug("TX [%s] Sent transaction", destination) diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py index 6e62ae7c74..afc777ec9e 100644 --- a/synapse/federation/transport.py +++ b/synapse/federation/transport.py @@ -144,7 +144,7 @@ class TransportLayer(object): @defer.inlineCallbacks @log_function - def send_transaction(self, transaction): + def send_transaction(self, transaction, on_send_callback=None): """ Sends the given Transaction to it's destination Args: @@ -165,10 +165,23 @@ class TransportLayer(object): data = transaction.get_dict() + # FIXME (erikj): This is a bit of a hack to make the Pdu age + # keys work + def cb(destination, method, path_bytes, producer): + if not on_send_callback: + return + + transaction = json.loads(producer.body) + + new_transaction = on_send_callback(transaction) + + producer.reset(new_transaction) + code, response = yield self.client.put_json( transaction.destination, path=PREFIX + "/send/%s/" % transaction.transaction_id, - data=data + data=data, + on_send_callback=cb, ) logger.debug( diff --git a/synapse/http/client.py b/synapse/http/client.py index ece6318e00..eb11bfd4d5 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -122,7 +122,7 @@ class TwistedHttpClient(HttpClient): self.hs = hs @defer.inlineCallbacks - def put_json(self, destination, path, data): + def put_json(self, destination, path, data, on_send_callback=None): if destination in _destination_mappings: destination = _destination_mappings[destination] @@ -131,7 +131,8 @@ class TwistedHttpClient(HttpClient): "PUT", path.encode("ascii"), producer=_JsonProducer(data), - headers_dict={"Content-Type": ["application/json"]} + headers_dict={"Content-Type": ["application/json"]}, + on_send_callback=on_send_callback, ) logger.debug("Getting resp body") @@ -218,7 +219,7 @@ class TwistedHttpClient(HttpClient): @defer.inlineCallbacks def _create_request(self, destination, method, path_bytes, param_bytes=b"", query_bytes=b"", producer=None, headers_dict={}, - retry_on_dns_fail=True): + retry_on_dns_fail=True, on_send_callback=None): """ Creates and sends a request to the given url """ headers_dict[b"User-Agent"] = [b"Synapse"] @@ -242,6 +243,9 @@ class TwistedHttpClient(HttpClient): endpoint = self._getEndpoint(reactor, destination); while True: + if on_send_callback: + on_send_callback(destination, method, path_bytes, producer) + try: response = yield self.agent.request( destination, @@ -310,6 +314,9 @@ class _JsonProducer(object): """ Used by the twisted http client to create the HTTP body from json """ def __init__(self, jsn): + self.reset(jsn) + + def reset(self, jsn): self.body = encode_canonical_json(jsn) self.length = len(self.body) -- cgit 1.4.1 From 40d2f38abe604525fb03622995c377904f1ea3dd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Sep 2014 16:55:39 +0100 Subject: Fix bug where we incorrectly calculated 'age_ts' from 'age' key rather than the reverse. Don't transmit age_ts to clients for now. --- synapse/api/events/__init__.py | 1 + synapse/federation/replication.py | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) (limited to 'synapse/federation/replication.py') diff --git a/synapse/api/events/__init__.py b/synapse/api/events/__init__.py index 72c493db57..add81ec3e6 100644 --- a/synapse/api/events/__init__.py +++ b/synapse/api/events/__init__.py @@ -25,6 +25,7 @@ def serialize_event(hs, e): d = e.get_dict() if "age_ts" in d: d["age"] = int(hs.get_clock().time_msec()) - d["age_ts"] + del d["age_ts"] return d diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index a48a7ac15f..96b82f00cb 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -292,8 +292,9 @@ class ReplicationLayer(object): transaction = Transaction(**transaction_data) for p in transaction.pdus: - if "age_ts" in p: - p["age"] = int(self._clock.time_msec()) - int(p["age_ts"]) + if "age" in p: + p["age_ts"] = int(self._clock.time_msec()) - int(p["age"]) + del p["age"] pdu_list = [Pdu(**p) for p in transaction.pdus] -- cgit 1.4.1