summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/handlers/receipts.py24
-rw-r--r--synapse/storage/receipts.py39
-rw-r--r--synapse/storage/schema/delta/21/receipts.sql16
3 files changed, 42 insertions, 37 deletions
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 053ed84805..8a052f071b 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -36,6 +36,7 @@ class ReceiptsHandler(BaseHandler):
         self.federation.register_edu_handler(
             "m.receipt", self._received_remote_receipt
         )
+        self.clock = self.hs.get_clock()
 
         self._receipt_cache = None
 
@@ -51,6 +52,9 @@ class ReceiptsHandler(BaseHandler):
             "receipt_type": receipt_type,
             "user_id": user_id,
             "event_ids": [event_id],
+            "data": {
+                "ts": self.clock.time_msec()
+            }
         }
 
         is_new = yield self._handle_new_receipts([receipt])
@@ -65,12 +69,12 @@ class ReceiptsHandler(BaseHandler):
                 "room_id": room_id,
                 "receipt_type": receipt_type,
                 "user_id": user_id,
-                "event_ids": [event_id],
+                "event_ids": user_values["event_ids"],
+                "data": user_values.get("data", {}),
             }
             for room_id, room_values in content.items()
-            for event_id, ev_values in room_values.items()
-            for receipt_type, users in ev_values.items()
-            for user_id in users
+            for receipt_type, users in room_values.items()
+            for user_id, user_values in users.items()
         ]
 
         yield self._handle_new_receipts(receipts)
@@ -82,9 +86,10 @@ class ReceiptsHandler(BaseHandler):
             receipt_type = receipt["receipt_type"]
             user_id = receipt["user_id"]
             event_ids = receipt["event_ids"]
+            data = receipt["data"]
 
             res = yield self.store.insert_receipt(
-                room_id, receipt_type, user_id, event_ids,
+                room_id, receipt_type, user_id, event_ids, data
             )
 
             if not res:
@@ -108,6 +113,7 @@ class ReceiptsHandler(BaseHandler):
             receipt_type = receipt["receipt_type"]
             user_id = receipt["user_id"]
             event_ids = receipt["event_ids"]
+            data = receipt["data"]
 
             remotedomains = set()
 
@@ -124,10 +130,12 @@ class ReceiptsHandler(BaseHandler):
                     edu_type="m.receipt",
                     content={
                         room_id: {
-                            event_id: {
-                                receipt_type: [user_id]
+                            receipt_type: {
+                                user_id: {
+                                    "event_ids": event_ids,
+                                    "data": data,
+                                }
                             }
-                            for event_id in event_ids
                         },
                     },
                 )
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 503f68f858..c4e6b02bdf 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -21,6 +21,7 @@ from synapse.util import unwrapFirstError
 
 from blist import sorteddict
 import logging
+import ujson as json
 
 
 logger = logging.getLogger(__name__)
@@ -91,8 +92,8 @@ class ReceiptsStore(SQLBaseStore):
             content.setdefault(
                 row["event_id"], {}
             ).setdefault(
-                row["receipt_type"], []
-            ).append(row["user_id"])
+                row["receipt_type"], {}
+            )[row["user_id"]] = json.loads(row["data"])
 
         defer.returnValue([{
             "type": "m.receipt",
@@ -124,7 +125,7 @@ class ReceiptsStore(SQLBaseStore):
         defer.returnValue(result)
 
     def insert_linearized_receipt_txn(self, txn, room_id, receipt_type,
-                                      user_id, event_id, stream_id):
+                                      user_id, event_id, data, stream_id):
 
         # We don't want to clobber receipts for more recent events, so we
         # have to compare orderings of existing receipts
@@ -172,13 +173,14 @@ class ReceiptsStore(SQLBaseStore):
                 "receipt_type": receipt_type,
                 "user_id": user_id,
                 "event_id": event_id,
+                "data": json.dumps(data),
             }
         )
 
         return True
 
     @defer.inlineCallbacks
-    def insert_receipt(self, room_id, receipt_type, user_id, event_ids):
+    def insert_receipt(self, room_id, receipt_type, user_id, event_ids, data):
         if not event_ids:
             return
 
@@ -214,6 +216,7 @@ class ReceiptsStore(SQLBaseStore):
                 "insert_linearized_receipt",
                 self.insert_linearized_receipt_txn,
                 room_id, receipt_type, user_id, linearized_event_id,
+                data,
                 stream_id=stream_id,
             )
 
@@ -221,22 +224,22 @@ class ReceiptsStore(SQLBaseStore):
                 defer.returnValue(None)
 
         yield self.insert_graph_receipt(
-            room_id, receipt_type, user_id, event_ids
+            room_id, receipt_type, user_id, event_ids, data
         )
 
         max_persisted_id = yield self._stream_id_gen.get_max_token(self)
         defer.returnValue((stream_id, max_persisted_id))
 
-    def insert_graph_receipt(self, room_id, receipt_type,
-                             user_id, event_ids):
+    def insert_graph_receipt(self, room_id, receipt_type, user_id, event_ids,
+                             data):
         return self.runInteraction(
             "insert_graph_receipt",
             self.insert_graph_receipt_txn,
-            room_id, receipt_type, user_id, event_ids,
+            room_id, receipt_type, user_id, event_ids, data
         )
 
     def insert_graph_receipt_txn(self, txn, room_id, receipt_type,
-                                 user_id, event_ids):
+                                 user_id, event_ids, data):
         self._simple_delete_txn(
             txn,
             table="receipts_graph",
@@ -246,18 +249,16 @@ class ReceiptsStore(SQLBaseStore):
                 "user_id": user_id,
             }
         )
-        self._simple_insert_many_txn(
+        self._simple_insert_txn(
             txn,
             table="receipts_graph",
-            values=[
-                {
-                    "room_id": room_id,
-                    "receipt_type": receipt_type,
-                    "user_id": user_id,
-                    "event_id": event_id,
-                }
-                for event_id in event_ids
-            ],
+            values={
+                "room_id": room_id,
+                "receipt_type": receipt_type,
+                "user_id": user_id,
+                "event_ids": json.dumps(event_ids),
+                "data": json.dumps(data),
+            }
         )
 
 
diff --git a/synapse/storage/schema/delta/21/receipts.sql b/synapse/storage/schema/delta/21/receipts.sql
index ac7738e371..2f64d609fc 100644
--- a/synapse/storage/schema/delta/21/receipts.sql
+++ b/synapse/storage/schema/delta/21/receipts.sql
@@ -18,11 +18,9 @@ CREATE TABLE IF NOT EXISTS receipts_graph(
     room_id TEXT NOT NULL,
     receipt_type TEXT NOT NULL,
     user_id TEXT NOT NULL,
-    event_id TEXT NOT NULL
-);
-
-CREATE INDEX receipts_graph_room_tuple ON receipts_graph(
-  room_id, receipt_type, user_id
+    event_ids TEXT NOT NULL,
+    data TEXT NOT NULL,
+    CONSTRAINT receipts_graph_uniqueness UNIQUE (room_id, receipt_type, user_id)
 );
 
 CREATE TABLE IF NOT EXISTS receipts_linearized (
@@ -30,11 +28,9 @@ CREATE TABLE IF NOT EXISTS receipts_linearized (
     room_id TEXT NOT NULL,
     receipt_type TEXT NOT NULL,
     user_id TEXT NOT NULL,
-    event_id TEXT NOT NULL
-);
-
-CREATE INDEX receipts_linearized_room_tuple ON receipts_linearized(
-  room_id, receipt_type, user_id
+    event_id TEXT NOT NULL,
+    data TEXT NOT NULL,
+    CONSTRAINT receipts_linearized_uniqueness UNIQUE (room_id, receipt_type, user_id)
 );
 
 CREATE INDEX receipts_linearized_id ON receipts_linearized(