summary refs log tree commit diff
path: root/synapse/storage/databases/main/receipts.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/receipts.py')
-rw-r--r--synapse/storage/databases/main/receipts.py72
1 files changed, 40 insertions, 32 deletions
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 3bab1024ea..b2645ab43c 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -313,25 +313,25 @@ class ReceiptsWorkerStore(SQLBaseStore):
     ) -> Sequence[JsonMapping]:
         """See get_linearized_receipts_for_room"""
 
-        def f(txn: LoggingTransaction) -> List[Dict[str, Any]]:
+        def f(txn: LoggingTransaction) -> List[Tuple[str, str, str, str]]:
             if from_key:
                 sql = (
-                    "SELECT * FROM receipts_linearized WHERE"
+                    "SELECT receipt_type, user_id, event_id, data"
+                    " FROM receipts_linearized WHERE"
                     " room_id = ? AND stream_id > ? AND stream_id <= ?"
                 )
 
                 txn.execute(sql, (room_id, from_key, to_key))
             else:
                 sql = (
-                    "SELECT * FROM receipts_linearized WHERE"
+                    "SELECT receipt_type, user_id, event_id, data"
+                    " FROM receipts_linearized WHERE"
                     " room_id = ? AND stream_id <= ?"
                 )
 
                 txn.execute(sql, (room_id, to_key))
 
-            rows = self.db_pool.cursor_to_dict(txn)
-
-            return rows
+            return cast(List[Tuple[str, str, str, str]], txn.fetchall())
 
         rows = await self.db_pool.runInteraction("get_linearized_receipts_for_room", f)
 
@@ -339,10 +339,10 @@ class ReceiptsWorkerStore(SQLBaseStore):
             return []
 
         content: JsonDict = {}
-        for row in rows:
-            content.setdefault(row["event_id"], {}).setdefault(row["receipt_type"], {})[
-                row["user_id"]
-            ] = db_to_json(row["data"])
+        for receipt_type, user_id, event_id, data in rows:
+            content.setdefault(event_id, {}).setdefault(receipt_type, {})[
+                user_id
+            ] = db_to_json(data)
 
         return [{"type": EduTypes.RECEIPT, "room_id": room_id, "content": content}]
 
@@ -357,10 +357,13 @@ class ReceiptsWorkerStore(SQLBaseStore):
         if not room_ids:
             return {}
 
-        def f(txn: LoggingTransaction) -> List[Dict[str, Any]]:
+        def f(
+            txn: LoggingTransaction,
+        ) -> List[Tuple[str, str, str, str, Optional[str], str]]:
             if from_key:
                 sql = """
-                    SELECT * FROM receipts_linearized WHERE
+                    SELECT room_id, receipt_type, user_id, event_id, thread_id, data
+                    FROM receipts_linearized WHERE
                     stream_id > ? AND stream_id <= ? AND
                 """
                 clause, args = make_in_list_sql_clause(
@@ -370,7 +373,8 @@ class ReceiptsWorkerStore(SQLBaseStore):
                 txn.execute(sql + clause, [from_key, to_key] + list(args))
             else:
                 sql = """
-                    SELECT * FROM receipts_linearized WHERE
+                    SELECT room_id, receipt_type, user_id, event_id, thread_id, data
+                    FROM receipts_linearized WHERE
                     stream_id <= ? AND
                 """
 
@@ -380,29 +384,31 @@ class ReceiptsWorkerStore(SQLBaseStore):
 
                 txn.execute(sql + clause, [to_key] + list(args))
 
-            return self.db_pool.cursor_to_dict(txn)
+            return cast(
+                List[Tuple[str, str, str, str, Optional[str], str]], txn.fetchall()
+            )
 
         txn_results = await self.db_pool.runInteraction(
             "_get_linearized_receipts_for_rooms", f
         )
 
         results: JsonDict = {}
-        for row in txn_results:
+        for room_id, receipt_type, user_id, event_id, thread_id, data in txn_results:
             # We want a single event per room, since we want to batch the
             # receipts by room, event and type.
             room_event = results.setdefault(
-                row["room_id"],
-                {"type": EduTypes.RECEIPT, "room_id": row["room_id"], "content": {}},
+                room_id,
+                {"type": EduTypes.RECEIPT, "room_id": room_id, "content": {}},
             )
 
             # The content is of the form:
             # {"$foo:bar": { "read": { "@user:host": <receipt> }, .. }, .. }
-            event_entry = room_event["content"].setdefault(row["event_id"], {})
-            receipt_type = event_entry.setdefault(row["receipt_type"], {})
+            event_entry = room_event["content"].setdefault(event_id, {})
+            receipt_type_dict = event_entry.setdefault(receipt_type, {})
 
-            receipt_type[row["user_id"]] = db_to_json(row["data"])
-            if row["thread_id"]:
-                receipt_type[row["user_id"]]["thread_id"] = row["thread_id"]
+            receipt_type_dict[user_id] = db_to_json(data)
+            if thread_id:
+                receipt_type_dict[user_id]["thread_id"] = thread_id
 
         results = {
             room_id: [results[room_id]] if room_id in results else []
@@ -428,10 +434,11 @@ class ReceiptsWorkerStore(SQLBaseStore):
             A dictionary of roomids to a list of receipts.
         """
 
-        def f(txn: LoggingTransaction) -> List[Dict[str, Any]]:
+        def f(txn: LoggingTransaction) -> List[Tuple[str, str, str, str, str]]:
             if from_key:
                 sql = """
-                    SELECT * FROM receipts_linearized WHERE
+                    SELECT room_id, receipt_type, user_id, event_id, data
+                    FROM receipts_linearized WHERE
                     stream_id > ? AND stream_id <= ?
                     ORDER BY stream_id DESC
                     LIMIT 100
@@ -439,7 +446,8 @@ class ReceiptsWorkerStore(SQLBaseStore):
                 txn.execute(sql, [from_key, to_key])
             else:
                 sql = """
-                    SELECT * FROM receipts_linearized WHERE
+                    SELECT room_id, receipt_type, user_id, event_id, data
+                    FROM receipts_linearized WHERE
                     stream_id <= ?
                     ORDER BY stream_id DESC
                     LIMIT 100
@@ -447,27 +455,27 @@ class ReceiptsWorkerStore(SQLBaseStore):
 
                 txn.execute(sql, [to_key])
 
-            return self.db_pool.cursor_to_dict(txn)
+            return cast(List[Tuple[str, str, str, str, str]], txn.fetchall())
 
         txn_results = await self.db_pool.runInteraction(
             "get_linearized_receipts_for_all_rooms", f
         )
 
         results: JsonDict = {}
-        for row in txn_results:
+        for room_id, receipt_type, user_id, event_id, data in txn_results:
             # We want a single event per room, since we want to batch the
             # receipts by room, event and type.
             room_event = results.setdefault(
-                row["room_id"],
-                {"type": EduTypes.RECEIPT, "room_id": row["room_id"], "content": {}},
+                room_id,
+                {"type": EduTypes.RECEIPT, "room_id": room_id, "content": {}},
             )
 
             # The content is of the form:
             # {"$foo:bar": { "read": { "@user:host": <receipt> }, .. }, .. }
-            event_entry = room_event["content"].setdefault(row["event_id"], {})
-            receipt_type = event_entry.setdefault(row["receipt_type"], {})
+            event_entry = room_event["content"].setdefault(event_id, {})
+            receipt_type_dict = event_entry.setdefault(receipt_type, {})
 
-            receipt_type[row["user_id"]] = db_to_json(row["data"])
+            receipt_type_dict[user_id] = db_to_json(data)
 
         return results