summary refs log tree commit diff
path: root/synapse/federation/replication.py
diff options
context:
space:
mode:
authorEmmanuel ROHEE <erohee@amdocs.com>2014-09-16 08:50:53 +0200
committerEmmanuel ROHEE <erohee@amdocs.com>2014-09-16 08:50:53 +0200
commitfaee41c303c51f2d0ba9392643e67a8d100fbfa9 (patch)
treea586ca2ca460de50eb37c301409ccd97f9f40abf /synapse/federation/replication.py
parentFilter room where the user has been banned (diff)
parentFix bug where we didn't always get 'prev_content' key (diff)
downloadsynapse-faee41c303c51f2d0ba9392643e67a8d100fbfa9.tar.xz
Merge remote-tracking branch 'origin/develop' into webclient_data_centralisation
Diffstat (limited to 'synapse/federation/replication.py')
-rw-r--r--synapse/federation/replication.py31
1 files changed, 27 insertions, 4 deletions
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py

index e12510017f..96b82f00cb 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py
@@ -291,6 +291,13 @@ class ReplicationLayer(object): def on_incoming_transaction(self, transaction_data): transaction = Transaction(**transaction_data) + for p in transaction.pdus: + if "age" in p: + p["age_ts"] = int(self._clock.time_msec()) - int(p["age"]) + del p["age"] + + pdu_list = [Pdu(**p) for p in transaction.pdus] + logger.debug("[%s] Got transaction", transaction.transaction_id) response = yield self.transaction_actions.have_responded(transaction) @@ -303,8 +310,6 @@ class ReplicationLayer(object): logger.debug("[%s] Transacition is new", transaction.transaction_id) - pdu_list = [Pdu(**p) for p in transaction.pdus] - dl = [] for pdu in pdu_list: dl.append(self._handle_new_pdu(pdu)) @@ -405,9 +410,14 @@ class ReplicationLayer(object): """Returns a new Transaction containing the given PDUs suitable for transmission. """ + pdus = [p.get_dict() for p in pdu_list] + for p in pdus: + if "age_ts" in pdus: + p["age"] = int(self.clock.time_msec()) - p["age_ts"] + return Transaction( - pdus=[p.get_dict() for p in pdu_list], origin=self.server_name, + pdus=pdus, ts=int(self._clock.time_msec()), destination=None, ) @@ -593,8 +603,21 @@ class _TransactionQueue(object): logger.debug("TX [%s] Sending transaction...", destination) # Actually send the transaction + + # FIXME (erikj): This is a bit of a hack to make the Pdu age + # keys work + def cb(transaction): + now = int(self._clock.time_msec()) + if "pdus" in transaction: + for p in transaction["pdus"]: + if "age_ts" in p: + p["age"] = now - int(p["age_ts"]) + + return transaction + code, response = yield self.transport_layer.send_transaction( - transaction + transaction, + on_send_callback=cb, ) logger.debug("TX [%s] Sent transaction", destination)