diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index 788a49b8e8..c4993aa5ee 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -295,6 +295,10 @@ class ReplicationLayer(object):
transaction = Transaction(**transaction_data)
for p in transaction.pdus:
+ if "meta" in p:
+ meta = p["meta"]
+ if "age" in meta:
+ p["age"] = meta["age"]
if "age" in p:
p["age_ts"] = int(self._clock.time_msec()) - int(p["age"])
del p["age"]
@@ -414,14 +418,16 @@ class ReplicationLayer(object):
transmission.
"""
pdus = [p.get_dict() for p in pdu_list]
+ time_now = self._clock.time_msec()
for p in pdus:
- if "age_ts" in pdus:
- p["age"] = int(self.clock.time_msec()) - p["age_ts"]
-
+ if "age_ts" in p:
+ age = time_now - p["age_ts"]
+ p.setdefault("meta", {})["age"] = int(age)
+ del p["age_ts"]
return Transaction(
origin=self.server_name,
pdus=pdus,
- ts=int(self._clock.time_msec()),
+ ts=int(time_now),
destination=None,
)
@@ -589,7 +595,7 @@ class _TransactionQueue(object):
logger.debug("TX [%s] Persisting transaction...", destination)
transaction = Transaction.create_new(
- ts=self._clock.time_msec(),
+ ts=int(self._clock.time_msec()),
transaction_id=str(self._next_txn_id),
origin=self.server_name,
destination=destination,
@@ -614,7 +620,9 @@ class _TransactionQueue(object):
if "pdus" in data:
for p in data["pdus"]:
if "age_ts" in p:
- p["age"] = now - int(p["age_ts"])
+ meta = p.setdefault("meta", {})
+ meta["age"] = now - int(p["age_ts"])
+ del p["age_ts"]
return data
code, response = yield self.transport_layer.send_transaction(
diff --git a/synapse/federation/units.py b/synapse/federation/units.py
index 6a43007837..c4a10a4123 100644
--- a/synapse/federation/units.py
+++ b/synapse/federation/units.py
@@ -68,11 +68,11 @@ class Pdu(JsonEncodedObject):
"signatures",
"is_state", # Below this are keys valid only for State Pdus.
"state_key",
- "power_level",
"prev_state_id",
"prev_state_origin",
"required_power_level",
"user_id",
+ "meta"
]
internal_keys = [
@@ -124,6 +124,10 @@ class Pdu(JsonEncodedObject):
if pdu_tuple:
d = copy.copy(pdu_tuple.pdu_entry._asdict())
+ for k in d.keys():
+ if d[k] is None:
+ del d[k]
+
d["content"] = json.loads(d["content_json"])
del d["content_json"]
|