diff options
author | Patrick Cloke <patrickc@matrix.org> | 2022-07-29 10:32:21 -0400 |
---|---|---|
committer | Patrick Cloke <patrickc@matrix.org> | 2022-08-05 08:18:31 -0400 |
commit | fbd6727760c1170a7c7d01535579b5abb05b995f (patch) | |
tree | c31c27b6b80536c87acad726e6d8310e6c523a21 | |
parent | Add a thread ID to receipts. (diff) | |
download | synapse-fbd6727760c1170a7c7d01535579b5abb05b995f.tar.xz |
Send the thread ID over replication.
-rw-r--r-- | synapse/replication/tcp/client.py | 2 | ||||
-rw-r--r-- | synapse/replication/tcp/streams/_base.py | 1 | ||||
-rw-r--r-- | synapse/storage/databases/main/receipts.py | 12 |
3 files changed, 9 insertions, 6 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 1b2412405c..93d53d68db 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -423,7 +423,7 @@ class FederationSenderHandler: receipt.receipt_type, receipt.user_id, [receipt.event_id], - thread_id=None, # TODO + thread_id=receipt.thread_id, data=receipt.data, ) await self.federation_sender.send_read_receipt(receipt_info) diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 398bebeaa6..e01155ad59 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -361,6 +361,7 @@ class ReceiptsStream(Stream): receipt_type: str user_id: str event_id: str + thread_id: Optional[str] data: dict NAME = "receipts" diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 757d3ceb4f..51347c1cf9 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -522,7 +522,7 @@ class ReceiptsWorkerStore(SQLBaseStore): async def get_all_updated_receipts( self, instance_name: str, last_id: int, current_id: int, limit: int - ) -> Tuple[List[Tuple[int, list]], int, bool]: + ) -> Tuple[List[Tuple[int, Tuple[str, str, str, str, str, JsonDict]]], int, bool]: """Get updates for receipts replication stream. Args: @@ -549,9 +549,11 @@ class ReceiptsWorkerStore(SQLBaseStore): def get_all_updated_receipts_txn( txn: LoggingTransaction, - ) -> Tuple[List[Tuple[int, list]], int, bool]: + ) -> Tuple[ + List[Tuple[int, Tuple[str, str, str, str, str, JsonDict]]], int, bool + ]: sql = """ - SELECT stream_id, room_id, receipt_type, user_id, event_id, data + SELECT stream_id, room_id, receipt_type, user_id, event_id, thread_id, data FROM receipts_linearized WHERE ? < stream_id AND stream_id <= ? ORDER BY stream_id ASC @@ -560,8 +562,8 @@ class ReceiptsWorkerStore(SQLBaseStore): txn.execute(sql, (last_id, current_id, limit)) updates = cast( - List[Tuple[int, list]], - [(r[0], r[1:5] + (db_to_json(r[5]),)) for r in txn], + List[Tuple[int, Tuple[str, str, str, str, str, JsonDict]]], + [(r[0], r[1:6] + (db_to_json(r[6]),)) for r in txn], ) limited = False |