diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 053ed84805..8a052f071b 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -36,6 +36,7 @@ class ReceiptsHandler(BaseHandler):
self.federation.register_edu_handler(
"m.receipt", self._received_remote_receipt
)
+ self.clock = self.hs.get_clock()
self._receipt_cache = None
@@ -51,6 +52,9 @@ class ReceiptsHandler(BaseHandler):
"receipt_type": receipt_type,
"user_id": user_id,
"event_ids": [event_id],
+ "data": {
+ "ts": self.clock.time_msec()
+ }
}
is_new = yield self._handle_new_receipts([receipt])
@@ -65,12 +69,12 @@ class ReceiptsHandler(BaseHandler):
"room_id": room_id,
"receipt_type": receipt_type,
"user_id": user_id,
- "event_ids": [event_id],
+ "event_ids": user_values["event_ids"],
+ "data": user_values.get("data", {}),
}
for room_id, room_values in content.items()
- for event_id, ev_values in room_values.items()
- for receipt_type, users in ev_values.items()
- for user_id in users
+ for receipt_type, users in room_values.items()
+ for user_id, user_values in users.items()
]
yield self._handle_new_receipts(receipts)
@@ -82,9 +86,10 @@ class ReceiptsHandler(BaseHandler):
receipt_type = receipt["receipt_type"]
user_id = receipt["user_id"]
event_ids = receipt["event_ids"]
+ data = receipt["data"]
res = yield self.store.insert_receipt(
- room_id, receipt_type, user_id, event_ids,
+ room_id, receipt_type, user_id, event_ids, data
)
if not res:
@@ -108,6 +113,7 @@ class ReceiptsHandler(BaseHandler):
receipt_type = receipt["receipt_type"]
user_id = receipt["user_id"]
event_ids = receipt["event_ids"]
+ data = receipt["data"]
remotedomains = set()
@@ -124,10 +130,12 @@ class ReceiptsHandler(BaseHandler):
edu_type="m.receipt",
content={
room_id: {
- event_id: {
- receipt_type: [user_id]
+ receipt_type: {
+ user_id: {
+ "event_ids": event_ids,
+ "data": data,
+ }
}
- for event_id in event_ids
},
},
)
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 503f68f858..c4e6b02bdf 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -21,6 +21,7 @@ from synapse.util import unwrapFirstError
from blist import sorteddict
import logging
+import ujson as json
logger = logging.getLogger(__name__)
@@ -91,8 +92,8 @@ class ReceiptsStore(SQLBaseStore):
content.setdefault(
row["event_id"], {}
).setdefault(
- row["receipt_type"], []
- ).append(row["user_id"])
+ row["receipt_type"], {}
+ )[row["user_id"]] = json.loads(row["data"])
defer.returnValue([{
"type": "m.receipt",
@@ -124,7 +125,7 @@ class ReceiptsStore(SQLBaseStore):
defer.returnValue(result)
def insert_linearized_receipt_txn(self, txn, room_id, receipt_type,
- user_id, event_id, stream_id):
+ user_id, event_id, data, stream_id):
# We don't want to clobber receipts for more recent events, so we
# have to compare orderings of existing receipts
@@ -172,13 +173,14 @@ class ReceiptsStore(SQLBaseStore):
"receipt_type": receipt_type,
"user_id": user_id,
"event_id": event_id,
+ "data": json.dumps(data),
}
)
return True
@defer.inlineCallbacks
- def insert_receipt(self, room_id, receipt_type, user_id, event_ids):
+ def insert_receipt(self, room_id, receipt_type, user_id, event_ids, data):
if not event_ids:
return
@@ -214,6 +216,7 @@ class ReceiptsStore(SQLBaseStore):
"insert_linearized_receipt",
self.insert_linearized_receipt_txn,
room_id, receipt_type, user_id, linearized_event_id,
+ data,
stream_id=stream_id,
)
@@ -221,22 +224,22 @@ class ReceiptsStore(SQLBaseStore):
defer.returnValue(None)
yield self.insert_graph_receipt(
- room_id, receipt_type, user_id, event_ids
+ room_id, receipt_type, user_id, event_ids, data
)
max_persisted_id = yield self._stream_id_gen.get_max_token(self)
defer.returnValue((stream_id, max_persisted_id))
- def insert_graph_receipt(self, room_id, receipt_type,
- user_id, event_ids):
+ def insert_graph_receipt(self, room_id, receipt_type, user_id, event_ids,
+ data):
return self.runInteraction(
"insert_graph_receipt",
self.insert_graph_receipt_txn,
- room_id, receipt_type, user_id, event_ids,
+ room_id, receipt_type, user_id, event_ids, data
)
def insert_graph_receipt_txn(self, txn, room_id, receipt_type,
- user_id, event_ids):
+ user_id, event_ids, data):
self._simple_delete_txn(
txn,
table="receipts_graph",
@@ -246,18 +249,16 @@ class ReceiptsStore(SQLBaseStore):
"user_id": user_id,
}
)
- self._simple_insert_many_txn(
+ self._simple_insert_txn(
txn,
table="receipts_graph",
- values=[
- {
- "room_id": room_id,
- "receipt_type": receipt_type,
- "user_id": user_id,
- "event_id": event_id,
- }
- for event_id in event_ids
- ],
+ values={
+ "room_id": room_id,
+ "receipt_type": receipt_type,
+ "user_id": user_id,
+ "event_ids": json.dumps(event_ids),
+ "data": json.dumps(data),
+ }
)
diff --git a/synapse/storage/schema/delta/21/receipts.sql b/synapse/storage/schema/delta/21/receipts.sql
index ac7738e371..2f64d609fc 100644
--- a/synapse/storage/schema/delta/21/receipts.sql
+++ b/synapse/storage/schema/delta/21/receipts.sql
@@ -18,11 +18,9 @@ CREATE TABLE IF NOT EXISTS receipts_graph(
room_id TEXT NOT NULL,
receipt_type TEXT NOT NULL,
user_id TEXT NOT NULL,
- event_id TEXT NOT NULL
-);
-
-CREATE INDEX receipts_graph_room_tuple ON receipts_graph(
- room_id, receipt_type, user_id
+ event_ids TEXT NOT NULL,
+ data TEXT NOT NULL,
+ CONSTRAINT receipts_graph_uniqueness UNIQUE (room_id, receipt_type, user_id)
);
CREATE TABLE IF NOT EXISTS receipts_linearized (
@@ -30,11 +28,9 @@ CREATE TABLE IF NOT EXISTS receipts_linearized (
room_id TEXT NOT NULL,
receipt_type TEXT NOT NULL,
user_id TEXT NOT NULL,
- event_id TEXT NOT NULL
-);
-
-CREATE INDEX receipts_linearized_room_tuple ON receipts_linearized(
- room_id, receipt_type, user_id
+ event_id TEXT NOT NULL,
+ data TEXT NOT NULL,
+ CONSTRAINT receipts_linearized_uniqueness UNIQUE (room_id, receipt_type, user_id)
);
CREATE INDEX receipts_linearized_id ON receipts_linearized(
|