diff --git a/synapse/api/events/__init__.py b/synapse/api/events/__init__.py
index 5f300de108..72c493db57 100644
--- a/synapse/api/events/__init__.py
+++ b/synapse/api/events/__init__.py
@@ -17,6 +17,18 @@ from synapse.api.errors import SynapseError, Codes
from synapse.util.jsonobject import JsonEncodedObject
+def serialize_event(hs, e):
+ # FIXME(erikj): To handle the case of presence events and the like
+ if not isinstance(e, SynapseEvent):
+ return e
+
+ d = e.get_dict()
+ if "age_ts" in d:
+ d["age"] = int(hs.get_clock().time_msec()) - d["age_ts"]
+
+ return d
+
+
class SynapseEvent(JsonEncodedObject):
"""Base class for Synapse events. These are JSON objects which must abide
@@ -43,6 +55,7 @@ class SynapseEvent(JsonEncodedObject):
"content", # HTTP body, JSON
"state_key",
"required_power_level",
+ "age_ts",
]
internal_keys = [
diff --git a/synapse/api/events/factory.py b/synapse/api/events/factory.py
index 5e38cdbc44..d3d96d73eb 100644
--- a/synapse/api/events/factory.py
+++ b/synapse/api/events/factory.py
@@ -59,6 +59,14 @@ class EventFactory(object):
if "ts" not in kwargs:
kwargs["ts"] = int(self.clock.time_msec())
+ # The "age" key is a delta timestamp that should be converted into an
+ # absolute timestamp the minute we see it.
+ if "age" in kwargs:
+ kwargs["age_ts"] = int(self.clock.time_msec()) - int(kwargs["age"])
+ del kwargs["age"]
+ elif "age_ts" not in kwargs:
+ kwargs["age_ts"] = int(self.clock.time_msec())
+
if etype in self._event_list:
handler = self._event_list[etype]
else:
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index e12510017f..c79ce44688 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -291,6 +291,12 @@ class ReplicationLayer(object):
def on_incoming_transaction(self, transaction_data):
transaction = Transaction(**transaction_data)
+ for p in transaction.pdus:
+ if "age" in p:
+ p["age_ts"] = int(self.clock.time_msec()) - int(p["age"])
+
+ pdu_list = [Pdu(**p) for p in transaction.pdus]
+
logger.debug("[%s] Got transaction", transaction.transaction_id)
response = yield self.transaction_actions.have_responded(transaction)
@@ -303,8 +309,6 @@ class ReplicationLayer(object):
logger.debug("[%s] Transacition is new", transaction.transaction_id)
- pdu_list = [Pdu(**p) for p in transaction.pdus]
-
dl = []
for pdu in pdu_list:
dl.append(self._handle_new_pdu(pdu))
@@ -405,9 +409,14 @@ class ReplicationLayer(object):
"""Returns a new Transaction containing the given PDUs suitable for
transmission.
"""
+ pdus = [p.get_dict() for p in pdu_list]
+ for p in pdus:
+ if "age_ts" in pdus:
+ p["age"] = int(self.clock.time_msec()) - p["age_ts"]
+
return Transaction(
- pdus=[p.get_dict() for p in pdu_list],
origin=self.server_name,
+ pdus=pdus,
ts=int(self._clock.time_msec()),
destination=None,
)
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index fd24a11fb8..93dcd40324 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -15,7 +15,6 @@
from twisted.internet import defer
-from synapse.api.events import SynapseEvent
from synapse.util.logutils import log_function
from ._base import BaseHandler
@@ -71,10 +70,7 @@ class EventStreamHandler(BaseHandler):
auth_user, room_ids, pagin_config, timeout
)
- chunks = [
- e.get_dict() if isinstance(e, SynapseEvent) else e
- for e in events
- ]
+ chunks = [self.hs.serialize_event(e) for e in events]
chunk = {
"chunk": chunks,
@@ -92,7 +88,9 @@ class EventStreamHandler(BaseHandler):
# 10 seconds of grace to allow the client to reconnect again
# before we think they're gone
def _later():
- logger.debug("_later stopped_user_eventstream %s", auth_user)
+ logger.debug(
+ "_later stopped_user_eventstream %s", auth_user
+ )
self.distributor.fire(
"stopped_user_eventstream", auth_user
)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 87fc04478b..b63863e5b2 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -124,7 +124,7 @@ class MessageHandler(BaseHandler):
)
chunk = {
- "chunk": [e.get_dict() for e in events],
+ "chunk": [self.hs.serialize_event(e) for e in events],
"start": pagin_config.from_token.to_string(),
"end": next_token.to_string(),
}
@@ -296,7 +296,7 @@ class MessageHandler(BaseHandler):
end_token = now_token.copy_and_replace("room_key", token[1])
d["messages"] = {
- "chunk": [m.get_dict() for m in messages],
+ "chunk": [self.hs.serialize_event(m) for m in messages],
"start": start_token.to_string(),
"end": end_token.to_string(),
}
@@ -304,7 +304,7 @@ class MessageHandler(BaseHandler):
current_state = yield self.store.get_current_state(
event.room_id
)
- d["state"] = [c.get_dict() for c in current_state]
+ d["state"] = [self.hs.serialize_event(c) for c in current_state]
except:
logger.exception("Failed to get snapshot")
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 310cb46fe7..5bc1280432 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -335,7 +335,7 @@ class RoomMemberHandler(BaseHandler):
member_list = yield self.store.get_room_members(room_id=room_id)
event_list = [
- entry.get_dict()
+ self.hs.serialize_event(entry)
for entry in member_list
]
chunk_data = {
diff --git a/synapse/rest/events.py b/synapse/rest/events.py
index 7fde143200..097195d7cc 100644
--- a/synapse/rest/events.py
+++ b/synapse/rest/events.py
@@ -59,7 +59,7 @@ class EventRestServlet(RestServlet):
event = yield handler.get_event(auth_user, event_id)
if event:
- defer.returnValue((200, event.get_dict()))
+ defer.returnValue((200, self.hs.serialize_event(event)))
else:
defer.returnValue((404, "Event not found."))
diff --git a/synapse/rest/room.py b/synapse/rest/room.py
index cef700c81c..ecb1e346d9 100644
--- a/synapse/rest/room.py
+++ b/synapse/rest/room.py
@@ -378,7 +378,7 @@ class RoomTriggerBackfill(RestServlet):
handler = self.handlers.federation_handler
events = yield handler.backfill(remote_server, room_id, limit)
- res = [event.get_dict() for event in events]
+ res = [self.hs.serialize_event(event) for event in events]
defer.returnValue((200, res))
diff --git a/synapse/server.py b/synapse/server.py
index 83368ea5a7..7c185537aa 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -20,6 +20,7 @@
# Imports required for the default HomeServer() implementation
from synapse.federation import initialize_http_replication
+from synapse.api.events import serialize_event
from synapse.api.events.factory import EventFactory
from synapse.notifier import Notifier
from synapse.api.auth import Auth
@@ -138,6 +139,9 @@ class BaseHomeServer(object):
object."""
return RoomID.from_string(s, hs=self)
+ def serialize_event(self, e):
+ return serialize_event(self, e)
+
# Build magic accessors for every dependency
for depname in BaseHomeServer.DEPENDENCIES:
BaseHomeServer._make_dependency_method(depname)
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 8deaaf93bd..cf88bfc22b 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -315,6 +315,10 @@ class SQLBaseStore(object):
d["content"] = json.loads(d["content"])
del d["unrecognized_keys"]
+ if "age_ts" not in d:
+ # For compatibility
+ d["age_ts"] = d["ts"] if "ts" in d else 0
+
return self.event_factory.create_event(
etype=d["type"],
**d
|