diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 4202a6b3dc..6b9d848eaa 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -31,7 +31,7 @@ class ReceiptsStore(SQLBaseStore):
super(ReceiptsStore, self).__init__(hs)
self._receipts_stream_cache = StreamChangeCache(
- "ReceiptsRoomChangeCache", self._receipts_id_gen.get_max_token(None)
+ "ReceiptsRoomChangeCache", self._receipts_id_gen.get_max_token()
)
@cached(num_args=2)
@@ -62,18 +62,17 @@ class ReceiptsStore(SQLBaseStore):
@cachedInlineCallbacks(num_args=2)
def get_receipts_for_user(self, user_id, receipt_type):
- def f(txn):
- sql = (
- "SELECT room_id,event_id "
- "FROM receipts_linearized "
- "WHERE user_id = ? AND receipt_type = ? "
- )
- txn.execute(sql, (user_id, receipt_type))
- return txn.fetchall()
+ rows = yield self._simple_select_list(
+ table="receipts_linearized",
+ keyvalues={
+ "user_id": user_id,
+ "receipt_type": receipt_type,
+ },
+ retcols=("room_id", "event_id"),
+ desc="get_receipts_for_user",
+ )
- defer.returnValue(dict(
- (yield self.runInteraction("get_receipts_for_user", f))
- ))
+ defer.returnValue({row["room_id"]: row["event_id"] for row in rows})
@defer.inlineCallbacks
def get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None):
@@ -222,7 +221,7 @@ class ReceiptsStore(SQLBaseStore):
defer.returnValue(results)
def get_max_receipt_stream_id(self):
- return self._receipts_id_gen.get_max_token(self)
+ return self._receipts_id_gen.get_max_token()
def insert_linearized_receipt_txn(self, txn, room_id, receipt_type,
user_id, event_id, data, stream_id):
@@ -330,7 +329,7 @@ class ReceiptsStore(SQLBaseStore):
"insert_receipt_conv", graph_to_linear
)
- stream_id_manager = yield self._receipts_id_gen.get_next(self)
+ stream_id_manager = self._receipts_id_gen.get_next()
with stream_id_manager as stream_id:
have_persisted = yield self.runInteraction(
"insert_linearized_receipt",
@@ -347,7 +346,7 @@ class ReceiptsStore(SQLBaseStore):
room_id, receipt_type, user_id, event_ids, data
)
- max_persisted_id = yield self._stream_id_gen.get_max_token(self)
+ max_persisted_id = self._stream_id_gen.get_max_token()
defer.returnValue((stream_id, max_persisted_id))
@@ -390,3 +389,19 @@ class ReceiptsStore(SQLBaseStore):
"data": json.dumps(data),
}
)
+
+ def get_all_updated_receipts(self, last_id, current_id, limit):
+ def get_all_updated_receipts_txn(txn):
+ sql = (
+ "SELECT stream_id, room_id, receipt_type, user_id, event_id, data"
+ " FROM receipts_linearized"
+ " WHERE ? < stream_id AND stream_id <= ?"
+ " ORDER BY stream_id ASC"
+ " LIMIT ?"
+ )
+ txn.execute(sql, (last_id, current_id, limit))
+
+ return txn.fetchall()
+ return self.runInteraction(
+ "get_all_updated_receipts", get_all_updated_receipts_txn
+ )
|