summary refs log tree commit diff
path: root/synapse/storage/receipts.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/receipts.py')
-rw-r--r--synapse/storage/receipts.py45
1 files changed, 30 insertions, 15 deletions
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 4202a6b3dc..6b9d848eaa 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -31,7 +31,7 @@ class ReceiptsStore(SQLBaseStore):
         super(ReceiptsStore, self).__init__(hs)
 
         self._receipts_stream_cache = StreamChangeCache(
-            "ReceiptsRoomChangeCache", self._receipts_id_gen.get_max_token(None)
+            "ReceiptsRoomChangeCache", self._receipts_id_gen.get_max_token()
         )
 
     @cached(num_args=2)
@@ -62,18 +62,17 @@ class ReceiptsStore(SQLBaseStore):
 
     @cachedInlineCallbacks(num_args=2)
     def get_receipts_for_user(self, user_id, receipt_type):
-        def f(txn):
-            sql = (
-                "SELECT room_id,event_id "
-                "FROM receipts_linearized "
-                "WHERE user_id = ? AND receipt_type = ? "
-            )
-            txn.execute(sql, (user_id, receipt_type))
-            return txn.fetchall()
+        rows = yield self._simple_select_list(
+            table="receipts_linearized",
+            keyvalues={
+                "user_id": user_id,
+                "receipt_type": receipt_type,
+            },
+            retcols=("room_id", "event_id"),
+            desc="get_receipts_for_user",
+        )
 
-        defer.returnValue(dict(
-            (yield self.runInteraction("get_receipts_for_user", f))
-        ))
+        defer.returnValue({row["room_id"]: row["event_id"] for row in rows})
 
     @defer.inlineCallbacks
     def get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None):
@@ -222,7 +221,7 @@ class ReceiptsStore(SQLBaseStore):
         defer.returnValue(results)
 
     def get_max_receipt_stream_id(self):
-        return self._receipts_id_gen.get_max_token(self)
+        return self._receipts_id_gen.get_max_token()
 
     def insert_linearized_receipt_txn(self, txn, room_id, receipt_type,
                                       user_id, event_id, data, stream_id):
@@ -330,7 +329,7 @@ class ReceiptsStore(SQLBaseStore):
                 "insert_receipt_conv", graph_to_linear
             )
 
-        stream_id_manager = yield self._receipts_id_gen.get_next(self)
+        stream_id_manager = self._receipts_id_gen.get_next()
         with stream_id_manager as stream_id:
             have_persisted = yield self.runInteraction(
                 "insert_linearized_receipt",
@@ -347,7 +346,7 @@ class ReceiptsStore(SQLBaseStore):
             room_id, receipt_type, user_id, event_ids, data
         )
 
-        max_persisted_id = yield self._stream_id_gen.get_max_token(self)
+        max_persisted_id = self._stream_id_gen.get_max_token()
 
         defer.returnValue((stream_id, max_persisted_id))
 
@@ -390,3 +389,19 @@ class ReceiptsStore(SQLBaseStore):
                 "data": json.dumps(data),
             }
         )
+
+    def get_all_updated_receipts(self, last_id, current_id, limit):
+        def get_all_updated_receipts_txn(txn):
+            sql = (
+                "SELECT stream_id, room_id, receipt_type, user_id, event_id, data"
+                " FROM receipts_linearized"
+                " WHERE ? < stream_id AND stream_id <= ?"
+                " ORDER BY stream_id ASC"
+                " LIMIT ?"
+            )
+            txn.execute(sql, (last_id, current_id, limit))
+
+            return txn.fetchall()
+        return self.runInteraction(
+            "get_all_updated_receipts", get_all_updated_receipts_txn
+        )