From ab18441573dc14cea1fe4082b2a89b9d392a4b9f Mon Sep 17 00:00:00 2001 From: Šimon Brandner Date: Fri, 5 Aug 2022 17:09:33 +0200 Subject: Support stable identifiers for MSC2285: private read receipts. (#13273) This adds support for the stable identifiers of MSC2285 while continuing to support the unstable identifiers behind the configuration flag. These will be removed in a future version. --- synapse/api/constants.py | 3 +- synapse/config/experimental.py | 2 +- synapse/handlers/initial_sync.py | 11 +-- synapse/handlers/receipts.py | 36 ++++++--- synapse/replication/tcp/client.py | 5 +- synapse/rest/client/notifications.py | 7 +- synapse/rest/client/read_marker.py | 8 +- synapse/rest/client/receipts.py | 10 ++- synapse/rest/client/versions.py | 1 + .../storage/databases/main/event_push_actions.py | 85 ++++++++++++++++++---- 10 files changed, 126 insertions(+), 42 deletions(-) (limited to 'synapse') diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 789859e69e..1d46fb0e43 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -257,7 +257,8 @@ class GuestAccess: class ReceiptTypes: READ: Final = "m.read" - READ_PRIVATE: Final = "org.matrix.msc2285.read.private" + READ_PRIVATE: Final = "m.read.private" + UNSTABLE_READ_PRIVATE: Final = "org.matrix.msc2285.read.private" FULLY_READ: Final = "m.fully_read" diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index c2ecd977cd..7d17c958bb 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -32,7 +32,7 @@ class ExperimentalConfig(Config): # MSC2716 (importing historical messages) self.msc2716_enabled: bool = experimental.get("msc2716_enabled", False) - # MSC2285 (private read receipts) + # MSC2285 (unstable private read receipts) self.msc2285_enabled: bool = experimental.get("msc2285_enabled", False) # MSC3244 (room version capabilities) diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index 85b472f250..6484e47e5f 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -143,8 +143,8 @@ class InitialSyncHandler: joined_rooms, to_key=int(now_token.receipt_key), ) - if self.hs.config.experimental.msc2285_enabled: - receipt = ReceiptEventSource.filter_out_private_receipts(receipt, user_id) + + receipt = ReceiptEventSource.filter_out_private_receipts(receipt, user_id) tags_by_room = await self.store.get_tags_for_user(user_id) @@ -456,11 +456,8 @@ class InitialSyncHandler: ) if not receipts: return [] - if self.hs.config.experimental.msc2285_enabled: - receipts = ReceiptEventSource.filter_out_private_receipts( - receipts, user_id - ) - return receipts + + return ReceiptEventSource.filter_out_private_receipts(receipts, user_id) presence, receipts, (messages, token) = await make_deferred_yieldable( gather_results( diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 43d2882b0a..d4a866b346 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -163,7 +163,10 @@ class ReceiptsHandler: if not is_new: return - if self.federation_sender and receipt_type != ReceiptTypes.READ_PRIVATE: + if self.federation_sender and receipt_type not in ( + ReceiptTypes.READ_PRIVATE, + ReceiptTypes.UNSTABLE_READ_PRIVATE, + ): await self.federation_sender.send_read_receipt(receipt) @@ -203,24 +206,38 @@ class ReceiptEventSource(EventSource[int, JsonDict]): for event_id, orig_event_content in room.get("content", {}).items(): event_content = orig_event_content # If there are private read receipts, additional logic is necessary. - if ReceiptTypes.READ_PRIVATE in event_content: + if ( + ReceiptTypes.READ_PRIVATE in event_content + or ReceiptTypes.UNSTABLE_READ_PRIVATE in event_content + ): # Make a copy without private read receipts to avoid leaking # other user's private read receipts.. event_content = { receipt_type: receipt_value for receipt_type, receipt_value in event_content.items() - if receipt_type != ReceiptTypes.READ_PRIVATE + if receipt_type + not in ( + ReceiptTypes.READ_PRIVATE, + ReceiptTypes.UNSTABLE_READ_PRIVATE, + ) } # Copy the current user's private read receipt from the # original content, if it exists. - user_private_read_receipt = orig_event_content[ - ReceiptTypes.READ_PRIVATE - ].get(user_id, None) + user_private_read_receipt = orig_event_content.get( + ReceiptTypes.READ_PRIVATE, {} + ).get(user_id, None) if user_private_read_receipt: event_content[ReceiptTypes.READ_PRIVATE] = { user_id: user_private_read_receipt } + user_unstable_private_read_receipt = orig_event_content.get( + ReceiptTypes.UNSTABLE_READ_PRIVATE, {} + ).get(user_id, None) + if user_unstable_private_read_receipt: + event_content[ReceiptTypes.UNSTABLE_READ_PRIVATE] = { + user_id: user_unstable_private_read_receipt + } # Include the event if there is at least one non-private read # receipt or the current user has a private read receipt. @@ -256,10 +273,9 @@ class ReceiptEventSource(EventSource[int, JsonDict]): room_ids, from_key=from_key, to_key=to_key ) - if self.config.experimental.msc2285_enabled: - events = ReceiptEventSource.filter_out_private_receipts( - events, user.to_string() - ) + events = ReceiptEventSource.filter_out_private_receipts( + events, user.to_string() + ) return events, to_key diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index e4f2201c92..1ed7230e32 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -416,7 +416,10 @@ class FederationSenderHandler: if not self._is_mine_id(receipt.user_id): continue # Private read receipts never get sent over federation. - if receipt.receipt_type == ReceiptTypes.READ_PRIVATE: + if receipt.receipt_type in ( + ReceiptTypes.READ_PRIVATE, + ReceiptTypes.UNSTABLE_READ_PRIVATE, + ): continue receipt_info = ReadReceipt( receipt.room_id, diff --git a/synapse/rest/client/notifications.py b/synapse/rest/client/notifications.py index 24bc7c9095..a73322a6a4 100644 --- a/synapse/rest/client/notifications.py +++ b/synapse/rest/client/notifications.py @@ -58,7 +58,12 @@ class NotificationsServlet(RestServlet): ) receipts_by_room = await self.store.get_receipts_for_user_with_orderings( - user_id, [ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE] + user_id, + [ + ReceiptTypes.READ, + ReceiptTypes.READ_PRIVATE, + ReceiptTypes.UNSTABLE_READ_PRIVATE, + ], ) notif_event_ids = [pa.event_id for pa in push_actions] diff --git a/synapse/rest/client/read_marker.py b/synapse/rest/client/read_marker.py index 8896f2df50..aaad8b233f 100644 --- a/synapse/rest/client/read_marker.py +++ b/synapse/rest/client/read_marker.py @@ -40,9 +40,13 @@ class ReadMarkerRestServlet(RestServlet): self.read_marker_handler = hs.get_read_marker_handler() self.presence_handler = hs.get_presence_handler() - self._known_receipt_types = {ReceiptTypes.READ, ReceiptTypes.FULLY_READ} + self._known_receipt_types = { + ReceiptTypes.READ, + ReceiptTypes.FULLY_READ, + ReceiptTypes.READ_PRIVATE, + } if hs.config.experimental.msc2285_enabled: - self._known_receipt_types.add(ReceiptTypes.READ_PRIVATE) + self._known_receipt_types.add(ReceiptTypes.UNSTABLE_READ_PRIVATE) async def on_POST( self, request: SynapseRequest, room_id: str diff --git a/synapse/rest/client/receipts.py b/synapse/rest/client/receipts.py index 409bfd43c1..c6108fc5eb 100644 --- a/synapse/rest/client/receipts.py +++ b/synapse/rest/client/receipts.py @@ -44,11 +44,13 @@ class ReceiptRestServlet(RestServlet): self.read_marker_handler = hs.get_read_marker_handler() self.presence_handler = hs.get_presence_handler() - self._known_receipt_types = {ReceiptTypes.READ} + self._known_receipt_types = { + ReceiptTypes.READ, + ReceiptTypes.READ_PRIVATE, + ReceiptTypes.FULLY_READ, + } if hs.config.experimental.msc2285_enabled: - self._known_receipt_types.update( - (ReceiptTypes.READ_PRIVATE, ReceiptTypes.FULLY_READ) - ) + self._known_receipt_types.add(ReceiptTypes.UNSTABLE_READ_PRIVATE) async def on_POST( self, request: SynapseRequest, room_id: str, receipt_type: str, event_id: str diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py index 0366986755..c9a830cbac 100644 --- a/synapse/rest/client/versions.py +++ b/synapse/rest/client/versions.py @@ -94,6 +94,7 @@ class VersionsRestServlet(RestServlet): # Supports the busy presence state described in MSC3026. "org.matrix.msc3026.busy_presence": self.config.experimental.msc3026_enabled, # Supports receiving private read receipts as per MSC2285 + "org.matrix.msc2285.stable": True, # TODO: Remove when MSC2285 becomes a part of the spec "org.matrix.msc2285": self.config.experimental.msc2285_enabled, # Supports filtering of /publicRooms by room type as per MSC3827 "org.matrix.msc3827.stable": True, diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 5db70f9a60..161aad0f89 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -80,7 +80,7 @@ import attr from synapse.api.constants import ReceiptTypes from synapse.metrics.background_process_metrics import wrap_as_background_process -from synapse.storage._base import SQLBaseStore, db_to_json +from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage.database import ( DatabasePool, LoggingDatabaseConnection, @@ -259,7 +259,11 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas txn, user_id, room_id, - receipt_types=(ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE), + receipt_types=( + ReceiptTypes.READ, + ReceiptTypes.READ_PRIVATE, + ReceiptTypes.UNSTABLE_READ_PRIVATE, + ), ) stream_ordering = None @@ -448,6 +452,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas The list will be ordered by ascending stream_ordering. The list will have between 0~limit entries. """ + # find rooms that have a read receipt in them and return the next # push actions def get_after_receipt( @@ -455,7 +460,18 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas ) -> List[Tuple[str, str, int, str, bool]]: # find rooms that have a read receipt in them and return the next # push actions - sql = """ + + receipt_types_clause, args = make_in_list_sql_clause( + self.database_engine, + "receipt_type", + ( + ReceiptTypes.READ, + ReceiptTypes.READ_PRIVATE, + ReceiptTypes.UNSTABLE_READ_PRIVATE, + ), + ) + + sql = f""" SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, ep.highlight FROM ( @@ -463,10 +479,10 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas MAX(stream_ordering) as stream_ordering FROM events INNER JOIN receipts_linearized USING (room_id, event_id) - WHERE receipt_type = 'm.read' AND user_id = ? + WHERE {receipt_types_clause} AND user_id = ? GROUP BY room_id ) AS rl, - event_push_actions AS ep + event_push_actions AS ep WHERE ep.room_id = rl.room_id AND ep.stream_ordering > rl.stream_ordering @@ -476,7 +492,9 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas AND ep.notif = 1 ORDER BY ep.stream_ordering ASC LIMIT ? """ - args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit] + args.extend( + (user_id, user_id, min_stream_ordering, max_stream_ordering, limit) + ) txn.execute(sql, args) return cast(List[Tuple[str, str, int, str, bool]], txn.fetchall()) @@ -490,7 +508,17 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas def get_no_receipt( txn: LoggingTransaction, ) -> List[Tuple[str, str, int, str, bool]]: - sql = """ + receipt_types_clause, args = make_in_list_sql_clause( + self.database_engine, + "receipt_type", + ( + ReceiptTypes.READ, + ReceiptTypes.READ_PRIVATE, + ReceiptTypes.UNSTABLE_READ_PRIVATE, + ), + ) + + sql = f""" SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, ep.highlight FROM event_push_actions AS ep @@ -498,7 +526,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas WHERE ep.room_id NOT IN ( SELECT room_id FROM receipts_linearized - WHERE receipt_type = 'm.read' AND user_id = ? + WHERE {receipt_types_clause} AND user_id = ? GROUP BY room_id ) AND ep.user_id = ? @@ -507,7 +535,9 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas AND ep.notif = 1 ORDER BY ep.stream_ordering ASC LIMIT ? """ - args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit] + args.extend( + (user_id, user_id, min_stream_ordering, max_stream_ordering, limit) + ) txn.execute(sql, args) return cast(List[Tuple[str, str, int, str, bool]], txn.fetchall()) @@ -557,12 +587,23 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas The list will be ordered by descending received_ts. The list will have between 0~limit entries. """ + # find rooms that have a read receipt in them and return the most recent # push actions def get_after_receipt( txn: LoggingTransaction, ) -> List[Tuple[str, str, int, str, bool, int]]: - sql = """ + receipt_types_clause, args = make_in_list_sql_clause( + self.database_engine, + "receipt_type", + ( + ReceiptTypes.READ, + ReceiptTypes.READ_PRIVATE, + ReceiptTypes.UNSTABLE_READ_PRIVATE, + ), + ) + + sql = f""" SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, ep.highlight, e.received_ts FROM ( @@ -570,7 +611,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas MAX(stream_ordering) as stream_ordering FROM events INNER JOIN receipts_linearized USING (room_id, event_id) - WHERE receipt_type = 'm.read' AND user_id = ? + WHERE {receipt_types_clause} AND user_id = ? GROUP BY room_id ) AS rl, event_push_actions AS ep @@ -584,7 +625,9 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas AND ep.notif = 1 ORDER BY ep.stream_ordering DESC LIMIT ? """ - args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit] + args.extend( + (user_id, user_id, min_stream_ordering, max_stream_ordering, limit) + ) txn.execute(sql, args) return cast(List[Tuple[str, str, int, str, bool, int]], txn.fetchall()) @@ -598,7 +641,17 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas def get_no_receipt( txn: LoggingTransaction, ) -> List[Tuple[str, str, int, str, bool, int]]: - sql = """ + receipt_types_clause, args = make_in_list_sql_clause( + self.database_engine, + "receipt_type", + ( + ReceiptTypes.READ, + ReceiptTypes.READ_PRIVATE, + ReceiptTypes.UNSTABLE_READ_PRIVATE, + ), + ) + + sql = f""" SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, ep.highlight, e.received_ts FROM event_push_actions AS ep @@ -606,7 +659,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas WHERE ep.room_id NOT IN ( SELECT room_id FROM receipts_linearized - WHERE receipt_type = 'm.read' AND user_id = ? + WHERE {receipt_types_clause} AND user_id = ? GROUP BY room_id ) AND ep.user_id = ? @@ -615,7 +668,9 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas AND ep.notif = 1 ORDER BY ep.stream_ordering DESC LIMIT ? """ - args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit] + args.extend( + (user_id, user_id, min_stream_ordering, max_stream_ordering, limit) + ) txn.execute(sql, args) return cast(List[Tuple[str, str, int, str, bool, int]], txn.fetchall()) -- cgit 1.4.1