summary refs log tree commit diff
path: root/synapse/federation/replication.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/replication.py')
-rw-r--r--synapse/federation/replication.py29
1 files changed, 15 insertions, 14 deletions
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index 96b82f00cb..092411eaf9 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -159,7 +159,8 @@ class ReplicationLayer(object):
         return defer.succeed(None)
 
     @log_function
-    def make_query(self, destination, query_type, args):
+    def make_query(self, destination, query_type, args,
+                   retry_on_dns_fail=True):
         """Sends a federation Query to a remote homeserver of the given type
         and arguments.
 
@@ -174,7 +175,9 @@ class ReplicationLayer(object):
             a Deferred which will eventually yield a JSON object from the
             response
         """
-        return self.transport_layer.make_query(destination, query_type, args)
+        return self.transport_layer.make_query(
+            destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail
+        )
 
     @defer.inlineCallbacks
     @log_function
@@ -316,7 +319,7 @@ class ReplicationLayer(object):
 
         if hasattr(transaction, "edus"):
             for edu in [Edu(**x) for x in transaction.edus]:
-                self.received_edu(edu.origin, edu.edu_type, edu.content)
+                self.received_edu(transaction.origin, edu.edu_type, edu.content)
 
         results = yield defer.DeferredList(dl)
 
@@ -418,7 +421,7 @@ class ReplicationLayer(object):
         return Transaction(
             origin=self.server_name,
             pdus=pdus,
-            ts=int(self._clock.time_msec()),
+            origin_server_ts=int(self._clock.time_msec()),
             destination=None,
         )
 
@@ -489,7 +492,6 @@ class _TransactionQueue(object):
     """
 
     def __init__(self, hs, transaction_actions, transport_layer):
-
         self.server_name = hs.hostname
         self.transaction_actions = transaction_actions
         self.transport_layer = transport_layer
@@ -587,8 +589,8 @@ class _TransactionQueue(object):
             logger.debug("TX [%s] Persisting transaction...", destination)
 
             transaction = Transaction.create_new(
-                ts=self._clock.time_msec(),
-                transaction_id=self._next_txn_id,
+                origin_server_ts=self._clock.time_msec(),
+                transaction_id=str(self._next_txn_id),
                 origin=self.server_name,
                 destination=destination,
                 pdus=pdus,
@@ -606,18 +608,17 @@ class _TransactionQueue(object):
 
             # FIXME (erikj): This is a bit of a hack to make the Pdu age
             # keys work
-            def cb(transaction):
+            def json_data_cb():
+                data = transaction.get_dict()
                 now = int(self._clock.time_msec())
-                if "pdus" in transaction:
-                    for p in transaction["pdus"]:
+                if "pdus" in data:
+                    for p in data["pdus"]:
                         if "age_ts" in p:
                             p["age"] = now - int(p["age_ts"])
-
-                return transaction
+                return data
 
             code, response = yield self.transport_layer.send_transaction(
-                transaction,
-                on_send_callback=cb,
+                transaction, json_data_cb
             )
 
             logger.debug("TX [%s] Sent transaction", destination)