diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index 96b82f00cb..092411eaf9 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -159,7 +159,8 @@ class ReplicationLayer(object):
return defer.succeed(None)
@log_function
- def make_query(self, destination, query_type, args):
+ def make_query(self, destination, query_type, args,
+ retry_on_dns_fail=True):
"""Sends a federation Query to a remote homeserver of the given type
and arguments.
@@ -174,7 +175,9 @@ class ReplicationLayer(object):
a Deferred which will eventually yield a JSON object from the
response
"""
- return self.transport_layer.make_query(destination, query_type, args)
+ return self.transport_layer.make_query(
+ destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail
+ )
@defer.inlineCallbacks
@log_function
@@ -316,7 +319,7 @@ class ReplicationLayer(object):
if hasattr(transaction, "edus"):
for edu in [Edu(**x) for x in transaction.edus]:
- self.received_edu(edu.origin, edu.edu_type, edu.content)
+ self.received_edu(transaction.origin, edu.edu_type, edu.content)
results = yield defer.DeferredList(dl)
@@ -418,7 +421,7 @@ class ReplicationLayer(object):
return Transaction(
origin=self.server_name,
pdus=pdus,
- ts=int(self._clock.time_msec()),
+ origin_server_ts=int(self._clock.time_msec()),
destination=None,
)
@@ -489,7 +492,6 @@ class _TransactionQueue(object):
"""
def __init__(self, hs, transaction_actions, transport_layer):
-
self.server_name = hs.hostname
self.transaction_actions = transaction_actions
self.transport_layer = transport_layer
@@ -587,8 +589,8 @@ class _TransactionQueue(object):
logger.debug("TX [%s] Persisting transaction...", destination)
transaction = Transaction.create_new(
- ts=self._clock.time_msec(),
- transaction_id=self._next_txn_id,
+ origin_server_ts=self._clock.time_msec(),
+ transaction_id=str(self._next_txn_id),
origin=self.server_name,
destination=destination,
pdus=pdus,
@@ -606,18 +608,17 @@ class _TransactionQueue(object):
# FIXME (erikj): This is a bit of a hack to make the Pdu age
# keys work
- def cb(transaction):
+ def json_data_cb():
+ data = transaction.get_dict()
now = int(self._clock.time_msec())
- if "pdus" in transaction:
- for p in transaction["pdus"]:
+ if "pdus" in data:
+ for p in data["pdus"]:
if "age_ts" in p:
p["age"] = now - int(p["age_ts"])
-
- return transaction
+ return data
code, response = yield self.transport_layer.send_transaction(
- transaction,
- on_send_callback=cb,
+ transaction, json_data_cb
)
logger.debug("TX [%s] Sent transaction", destination)
|