diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index 84977e7e57..a8dd038b0b 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -66,6 +66,8 @@ class ReplicationLayer(object):
hs, self.transaction_actions, transport_layer
)
+ self.keyring = hs.get_keyring()
+
self.handler = None
self.edu_handlers = {}
self.query_handlers = {}
@@ -291,6 +293,10 @@ class ReplicationLayer(object):
@defer.inlineCallbacks
@log_function
def on_incoming_transaction(self, transaction_data):
+ yield self.keyring.verify_json_for_server(
+ transaction_data["origin"], transaction_data
+ )
+
transaction = Transaction(**transaction_data)
for p in transaction.pdus:
@@ -590,7 +596,7 @@ class _TransactionQueue(object):
transaction = Transaction.create_new(
ts=self._clock.time_msec(),
- transaction_id=self._next_txn_id,
+ transaction_id=str(self._next_txn_id),
origin=self.server_name,
destination=destination,
pdus=pdus,
@@ -611,20 +617,18 @@ class _TransactionQueue(object):
# FIXME (erikj): This is a bit of a hack to make the Pdu age
# keys work
- def cb(transaction):
+ def json_data_cb():
+ data = transaction.get_dict()
now = int(self._clock.time_msec())
- if "pdus" in transaction:
- for p in transaction["pdus"]:
+ if "pdus" in data:
+ for p in data["pdus"]:
if "age_ts" in p:
p["age"] = now - int(p["age_ts"])
-
- transaction = sign_json(transaction, server_name, signing_key)
-
- return transaction
+ data = sign_json(data, server_name, signing_key)
+ return data
code, response = yield self.transport_layer.send_transaction(
- transaction,
- on_send_callback=cb,
+ transaction, json_data_cb
)
logger.debug("TX [%s] Sent transaction", destination)
diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py
index afc777ec9e..5d595b7433 100644
--- a/synapse/federation/transport.py
+++ b/synapse/federation/transport.py
@@ -144,7 +144,7 @@ class TransportLayer(object):
@defer.inlineCallbacks
@log_function
- def send_transaction(self, transaction, on_send_callback=None):
+ def send_transaction(self, transaction, json_data_callback=None):
""" Sends the given Transaction to it's destination
Args:
@@ -163,24 +163,26 @@ class TransportLayer(object):
if transaction.destination == self.server_name:
raise RuntimeError("Transport layer cannot send to itself!")
- data = transaction.get_dict()
+ if json_data_callback is None:
+ def json_data_callback():
+ return transaction.get_dict()
# FIXME (erikj): This is a bit of a hack to make the Pdu age
# keys work
def cb(destination, method, path_bytes, producer):
- if not on_send_callback:
- return
+ json_data = json_data_callback()
+ del json_data["destination"]
+ del json_data["transaction_id"]
+ producer.reset(json_data)
- transaction = json.loads(producer.body)
-
- new_transaction = on_send_callback(transaction)
-
- producer.reset(new_transaction)
+ json_data = transaction.get_dict()
+ del json_data["destination"]
+ del json_data["transaction_id"]
code, response = yield self.client.put_json(
transaction.destination,
path=PREFIX + "/send/%s/" % transaction.transaction_id,
- data=data,
+ data=json_data,
on_send_callback=cb,
)
diff --git a/synapse/federation/units.py b/synapse/federation/units.py
index 622fe66a8f..1ca123d1bf 100644
--- a/synapse/federation/units.py
+++ b/synapse/federation/units.py
@@ -186,9 +186,6 @@ class Transaction(JsonEncodedObject):
"previous_ids",
"pdus",
"edus",
- ]
-
- internal_keys = [
"transaction_id",
"destination",
]
|