summary refs log tree commit diff
diff options
context:
space:
mode:
authorPatrick Cloke <patrickc@matrix.org>2022-07-29 10:32:21 -0400
committerPatrick Cloke <patrickc@matrix.org>2022-08-05 08:18:31 -0400
commitfbd6727760c1170a7c7d01535579b5abb05b995f (patch)
treec31c27b6b80536c87acad726e6d8310e6c523a21
parentAdd a thread ID to receipts. (diff)
downloadsynapse-fbd6727760c1170a7c7d01535579b5abb05b995f.tar.xz
Send the thread ID over replication.
-rw-r--r--synapse/replication/tcp/client.py2
-rw-r--r--synapse/replication/tcp/streams/_base.py1
-rw-r--r--synapse/storage/databases/main/receipts.py12
3 files changed, 9 insertions, 6 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 1b2412405c..93d53d68db 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -423,7 +423,7 @@ class FederationSenderHandler:
                 receipt.receipt_type,
                 receipt.user_id,
                 [receipt.event_id],
-                thread_id=None,  # TODO
+                thread_id=receipt.thread_id,
                 data=receipt.data,
             )
             await self.federation_sender.send_read_receipt(receipt_info)
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 398bebeaa6..e01155ad59 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -361,6 +361,7 @@ class ReceiptsStream(Stream):
         receipt_type: str
         user_id: str
         event_id: str
+        thread_id: Optional[str]
         data: dict
 
     NAME = "receipts"
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 757d3ceb4f..51347c1cf9 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -522,7 +522,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
 
     async def get_all_updated_receipts(
         self, instance_name: str, last_id: int, current_id: int, limit: int
-    ) -> Tuple[List[Tuple[int, list]], int, bool]:
+    ) -> Tuple[List[Tuple[int, Tuple[str, str, str, str, str, JsonDict]]], int, bool]:
         """Get updates for receipts replication stream.
 
         Args:
@@ -549,9 +549,11 @@ class ReceiptsWorkerStore(SQLBaseStore):
 
         def get_all_updated_receipts_txn(
             txn: LoggingTransaction,
-        ) -> Tuple[List[Tuple[int, list]], int, bool]:
+        ) -> Tuple[
+            List[Tuple[int, Tuple[str, str, str, str, str, JsonDict]]], int, bool
+        ]:
             sql = """
-                SELECT stream_id, room_id, receipt_type, user_id, event_id, data
+                SELECT stream_id, room_id, receipt_type, user_id, event_id, thread_id, data
                 FROM receipts_linearized
                 WHERE ? < stream_id AND stream_id <= ?
                 ORDER BY stream_id ASC
@@ -560,8 +562,8 @@ class ReceiptsWorkerStore(SQLBaseStore):
             txn.execute(sql, (last_id, current_id, limit))
 
             updates = cast(
-                List[Tuple[int, list]],
-                [(r[0], r[1:5] + (db_to_json(r[5]),)) for r in txn],
+                List[Tuple[int, Tuple[str, str, str, str, str, JsonDict]]],
+                [(r[0], r[1:6] + (db_to_json(r[6]),)) for r in txn],
             )
 
             limited = False