summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorEric Eastwood <eric.eastwood@beta.gouv.fr>2024-07-30 12:49:55 -0500
committerGitHub <noreply@github.com>2024-07-30 12:49:55 -0500
commitb221f0b84b984d236ea11383cc21f6d07ca3c2ec (patch)
treeaa1ddc9638a46f057f4b138a0483ae81385f1dd9 /synapse
parentMerge branch 'master' into develop (diff)
downloadsynapse-b221f0b84b984d236ea11383cc21f6d07ca3c2ec.tar.xz
Sliding Sync: Add receipts extension (MSC3960) (#17489)
[MSC3960](https://github.com/matrix-org/matrix-spec-proposals/pull/3960): Receipts extension

Based on
[MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575):
Sliding Sync
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/receipts.py4
-rw-r--r--synapse/handlers/sliding_sync.py268
-rw-r--r--synapse/rest/client/sync.py6
-rw-r--r--synapse/types/handlers/__init__.py20
-rw-r--r--synapse/types/rest/client/__init__.py18
5 files changed, 253 insertions, 63 deletions
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 8674a8fcdd..d04c76be2a 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -286,8 +286,10 @@ class ReceiptEventSource(EventSource[MultiWriterStreamToken, JsonMapping]):
         room_ids: Iterable[str],
         is_guest: bool,
         explicit_room_id: Optional[str] = None,
+        to_key: Optional[MultiWriterStreamToken] = None,
     ) -> Tuple[List[JsonMapping], MultiWriterStreamToken]:
-        to_key = self.get_current_key()
+        if to_key is None:
+            to_key = self.get_current_key()
 
         if from_key == to_key:
             return [], to_key
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 73414dbf69..7a734f6712 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -49,6 +49,7 @@ from synapse.types import (
     DeviceListUpdates,
     JsonDict,
     JsonMapping,
+    MultiWriterStreamToken,
     PersistedEventPosition,
     Requester,
     RoomStreamToken,
@@ -493,8 +494,7 @@ class SlidingSyncHandler:
 
         # Assemble sliding window lists
         lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
-        # Keep track of the rooms that we're going to display and need to fetch more
-        # info about
+        # Keep track of the rooms that we can display and need to fetch more info about
         relevant_room_map: Dict[str, RoomSyncConfig] = {}
         if has_lists and sync_config.lists is not None:
             sync_room_map = await self.filter_rooms_relevant_for_sync(
@@ -622,6 +622,8 @@ class SlidingSyncHandler:
 
         # Filter out rooms that haven't received updates and we've sent down
         # previously.
+        # Keep track of the rooms that we're going to display and need to fetch more info about
+        relevant_rooms_to_send_map = relevant_room_map
         if from_token:
             rooms_should_send = set()
 
@@ -659,7 +661,7 @@ class SlidingSyncHandler:
                 relevant_room_map.keys(), from_token.stream_token.room_key
             )
             rooms_should_send.update(rooms_that_have_updates)
-            relevant_room_map = {
+            relevant_rooms_to_send_map = {
                 room_id: room_sync_config
                 for room_id, room_sync_config in relevant_room_map.items()
                 if room_id in rooms_should_send
@@ -671,7 +673,7 @@ class SlidingSyncHandler:
             room_sync_result = await self.get_room_sync_data(
                 sync_config=sync_config,
                 room_id=room_id,
-                room_sync_config=relevant_room_map[room_id],
+                room_sync_config=relevant_rooms_to_send_map[room_id],
                 room_membership_for_user_at_to_token=room_membership_for_user_map[
                     room_id
                 ],
@@ -683,13 +685,20 @@ class SlidingSyncHandler:
             if room_sync_result or not from_token:
                 rooms[room_id] = room_sync_result
 
-        if relevant_room_map:
+        if relevant_rooms_to_send_map:
             with start_active_span("sliding_sync.generate_room_entries"):
-                await concurrently_execute(handle_room, relevant_room_map, 10)
+                await concurrently_execute(handle_room, relevant_rooms_to_send_map, 10)
 
         extensions = await self.get_extensions_response(
             sync_config=sync_config,
-            lists=lists,
+            actual_lists=lists,
+            # We're purposely using `relevant_room_map` instead of
+            # `relevant_rooms_to_send_map` here. This needs to be all room_ids we could
+            # send regardless of whether they have an event update or not. The
+            # extensions care about more than just normal events in the rooms (like
+            # account data, read receipts, typing indicators, to-device messages, etc).
+            actual_room_ids=set(relevant_room_map.keys()),
+            actual_room_response_map=rooms,
             from_token=from_token,
             to_token=to_token,
         )
@@ -698,7 +707,7 @@ class SlidingSyncHandler:
             connection_position = await self.connection_store.record_rooms(
                 sync_config=sync_config,
                 from_token=from_token,
-                sent_room_ids=relevant_room_map.keys(),
+                sent_room_ids=relevant_rooms_to_send_map.keys(),
                 # TODO: We need to calculate which rooms have had updates since the `from_token` but were not included in the `sent_room_ids`
                 unsent_room_ids=[],
             )
@@ -1902,7 +1911,9 @@ class SlidingSyncHandler:
     async def get_extensions_response(
         self,
         sync_config: SlidingSyncConfig,
-        lists: Dict[str, SlidingSyncResult.SlidingWindowList],
+        actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList],
+        actual_room_ids: Set[str],
+        actual_room_response_map: Dict[str, SlidingSyncResult.RoomResult],
         to_token: StreamToken,
         from_token: Optional[SlidingSyncStreamToken],
     ) -> SlidingSyncResult.Extensions:
@@ -1910,7 +1921,11 @@ class SlidingSyncHandler:
 
         Args:
             sync_config: Sync configuration
-            lists: Sliding window API. A map of list key to list results.
+            actual_lists: Sliding window API. A map of list key to list results in the
+                Sliding Sync response.
+            actual_room_ids: The actual room IDs in the the Sliding Sync response.
+            actual_room_response_map: A map of room ID to room results in the the
+                Sliding Sync response.
             to_token: The point in the stream to sync up to.
             from_token: The point in the stream to sync from.
         """
@@ -1939,18 +1954,103 @@ class SlidingSyncHandler:
         if sync_config.extensions.account_data is not None:
             account_data_response = await self.get_account_data_extension_response(
                 sync_config=sync_config,
-                lists=lists,
+                actual_lists=actual_lists,
+                actual_room_ids=actual_room_ids,
                 account_data_request=sync_config.extensions.account_data,
                 to_token=to_token,
                 from_token=from_token,
             )
 
+        receipts_response = None
+        if sync_config.extensions.receipts is not None:
+            receipts_response = await self.get_receipts_extension_response(
+                sync_config=sync_config,
+                actual_lists=actual_lists,
+                actual_room_ids=actual_room_ids,
+                actual_room_response_map=actual_room_response_map,
+                receipts_request=sync_config.extensions.receipts,
+                to_token=to_token,
+                from_token=from_token,
+            )
+
         return SlidingSyncResult.Extensions(
             to_device=to_device_response,
             e2ee=e2ee_response,
             account_data=account_data_response,
+            receipts=receipts_response,
         )
 
+    def find_relevant_room_ids_for_extension(
+        self,
+        requested_lists: Optional[List[str]],
+        requested_room_ids: Optional[List[str]],
+        actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList],
+        actual_room_ids: Set[str],
+    ) -> Set[str]:
+        """
+        Handle the reserved `lists`/`rooms` keys for extensions. Extensions should only
+        return results for rooms in the Sliding Sync response. This matches up the
+        requested rooms/lists with the actual lists/rooms in the Sliding Sync response.
+
+        {"lists": []}                    // Do not process any lists.
+        {"lists": ["rooms", "dms"]}      // Process only a subset of lists.
+        {"lists": ["*"]}                 // Process all lists defined in the Sliding Window API. (This is the default.)
+
+        {"rooms": []}                    // Do not process any specific rooms.
+        {"rooms": ["!a:b", "!c:d"]}      // Process only a subset of room subscriptions.
+        {"rooms": ["*"]}                 // Process all room subscriptions defined in the Room Subscription API. (This is the default.)
+
+        Args:
+            requested_lists: The `lists` from the extension request.
+            requested_room_ids: The `rooms` from the extension request.
+            actual_lists: The actual lists from the Sliding Sync response.
+            actual_room_ids: The actual room subscriptions from the Sliding Sync request.
+        """
+
+        # We only want to include account data for rooms that are already in the sliding
+        # sync response AND that were requested in the account data request.
+        relevant_room_ids: Set[str] = set()
+
+        # See what rooms from the room subscriptions we should get account data for
+        if requested_room_ids is not None:
+            for room_id in requested_room_ids:
+                # A wildcard means we process all rooms from the room subscriptions
+                if room_id == "*":
+                    relevant_room_ids.update(actual_room_ids)
+                    break
+
+                if room_id in actual_room_ids:
+                    relevant_room_ids.add(room_id)
+
+        # See what rooms from the sliding window lists we should get account data for
+        if requested_lists is not None:
+            for list_key in requested_lists:
+                # Just some typing because we share the variable name in multiple places
+                actual_list: Optional[SlidingSyncResult.SlidingWindowList] = None
+
+                # A wildcard means we process rooms from all lists
+                if list_key == "*":
+                    for actual_list in actual_lists.values():
+                        # We only expect a single SYNC operation for any list
+                        assert len(actual_list.ops) == 1
+                        sync_op = actual_list.ops[0]
+                        assert sync_op.op == OperationType.SYNC
+
+                        relevant_room_ids.update(sync_op.room_ids)
+
+                    break
+
+                actual_list = actual_lists.get(list_key)
+                if actual_list is not None:
+                    # We only expect a single SYNC operation for any list
+                    assert len(actual_list.ops) == 1
+                    sync_op = actual_list.ops[0]
+                    assert sync_op.op == OperationType.SYNC
+
+                    relevant_room_ids.update(sync_op.room_ids)
+
+        return relevant_room_ids
+
     @trace
     async def get_to_device_extension_response(
         self,
@@ -2081,7 +2181,8 @@ class SlidingSyncHandler:
     async def get_account_data_extension_response(
         self,
         sync_config: SlidingSyncConfig,
-        lists: Dict[str, SlidingSyncResult.SlidingWindowList],
+        actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList],
+        actual_room_ids: Set[str],
         account_data_request: SlidingSyncConfig.Extensions.AccountDataExtension,
         to_token: StreamToken,
         from_token: Optional[SlidingSyncStreamToken],
@@ -2090,7 +2191,9 @@ class SlidingSyncHandler:
 
         Args:
             sync_config: Sync configuration
-            lists: Sliding window API. A map of list key to list results.
+            actual_lists: Sliding window API. A map of list key to list results in the
+                Sliding Sync response.
+            actual_room_ids: The actual room IDs in the the Sliding Sync response.
             account_data_request: The account_data extension from the request
             to_token: The point in the stream to sync up to.
             from_token: The point in the stream to sync from.
@@ -2103,6 +2206,7 @@ class SlidingSyncHandler:
 
         global_account_data_map: Mapping[str, JsonMapping] = {}
         if from_token is not None:
+            # TODO: This should take into account the `from_token` and `to_token`
             global_account_data_map = (
                 await self.store.get_updated_global_account_data_for_user(
                     user_id, from_token.stream_token.account_data_key
@@ -2114,76 +2218,40 @@ class SlidingSyncHandler:
             )
             if have_push_rules_changed:
                 global_account_data_map = dict(global_account_data_map)
+                # TODO: This should take into account the `from_token` and `to_token`
                 global_account_data_map[AccountDataTypes.PUSH_RULES] = (
                     await self.push_rules_handler.push_rules_for_user(sync_config.user)
                 )
         else:
+            # TODO: This should take into account the `to_token`
             all_global_account_data = await self.store.get_global_account_data_for_user(
                 user_id
             )
 
             global_account_data_map = dict(all_global_account_data)
+            # TODO: This should take into account the  `to_token`
             global_account_data_map[AccountDataTypes.PUSH_RULES] = (
                 await self.push_rules_handler.push_rules_for_user(sync_config.user)
             )
 
-        # We only want to include account data for rooms that are already in the sliding
-        # sync response AND that were requested in the account data request.
-        relevant_room_ids: Set[str] = set()
-
-        # See what rooms from the room subscriptions we should get account data for
-        if (
-            account_data_request.rooms is not None
-            and sync_config.room_subscriptions is not None
-        ):
-            actual_room_ids = sync_config.room_subscriptions.keys()
-
-            for room_id in account_data_request.rooms:
-                # A wildcard means we process all rooms from the room subscriptions
-                if room_id == "*":
-                    relevant_room_ids.update(sync_config.room_subscriptions.keys())
-                    break
-
-                if room_id in actual_room_ids:
-                    relevant_room_ids.add(room_id)
-
-        # See what rooms from the sliding window lists we should get account data for
-        if account_data_request.lists is not None:
-            for list_key in account_data_request.lists:
-                # Just some typing because we share the variable name in multiple places
-                actual_list: Optional[SlidingSyncResult.SlidingWindowList] = None
-
-                # A wildcard means we process rooms from all lists
-                if list_key == "*":
-                    for actual_list in lists.values():
-                        # We only expect a single SYNC operation for any list
-                        assert len(actual_list.ops) == 1
-                        sync_op = actual_list.ops[0]
-                        assert sync_op.op == OperationType.SYNC
-
-                        relevant_room_ids.update(sync_op.room_ids)
-
-                    break
-
-                actual_list = lists.get(list_key)
-                if actual_list is not None:
-                    # We only expect a single SYNC operation for any list
-                    assert len(actual_list.ops) == 1
-                    sync_op = actual_list.ops[0]
-                    assert sync_op.op == OperationType.SYNC
-
-                    relevant_room_ids.update(sync_op.room_ids)
-
         # Fetch room account data
         account_data_by_room_map: Mapping[str, Mapping[str, JsonMapping]] = {}
+        relevant_room_ids = self.find_relevant_room_ids_for_extension(
+            requested_lists=account_data_request.lists,
+            requested_room_ids=account_data_request.rooms,
+            actual_lists=actual_lists,
+            actual_room_ids=actual_room_ids,
+        )
         if len(relevant_room_ids) > 0:
             if from_token is not None:
+                # TODO: This should take into account the `from_token` and `to_token`
                 account_data_by_room_map = (
                     await self.store.get_updated_room_account_data_for_user(
                         user_id, from_token.stream_token.account_data_key
                     )
                 )
             else:
+                # TODO: This should take into account the `to_token`
                 account_data_by_room_map = (
                     await self.store.get_room_account_data_for_user(user_id)
                 )
@@ -2200,6 +2268,86 @@ class SlidingSyncHandler:
             account_data_by_room_map=account_data_by_room_map,
         )
 
+    async def get_receipts_extension_response(
+        self,
+        sync_config: SlidingSyncConfig,
+        actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList],
+        actual_room_ids: Set[str],
+        actual_room_response_map: Dict[str, SlidingSyncResult.RoomResult],
+        receipts_request: SlidingSyncConfig.Extensions.ReceiptsExtension,
+        to_token: StreamToken,
+        from_token: Optional[SlidingSyncStreamToken],
+    ) -> Optional[SlidingSyncResult.Extensions.ReceiptsExtension]:
+        """Handle Receipts extension (MSC3960)
+
+        Args:
+            sync_config: Sync configuration
+            actual_lists: Sliding window API. A map of list key to list results in the
+                Sliding Sync response.
+            actual_room_ids: The actual room IDs in the the Sliding Sync response.
+            actual_room_response_map: A map of room ID to room results in the the
+                Sliding Sync response.
+            account_data_request: The account_data extension from the request
+            to_token: The point in the stream to sync up to.
+            from_token: The point in the stream to sync from.
+        """
+        # Skip if the extension is not enabled
+        if not receipts_request.enabled:
+            return None
+
+        relevant_room_ids = self.find_relevant_room_ids_for_extension(
+            requested_lists=receipts_request.lists,
+            requested_room_ids=receipts_request.rooms,
+            actual_lists=actual_lists,
+            actual_room_ids=actual_room_ids,
+        )
+
+        room_id_to_receipt_map: Dict[str, JsonMapping] = {}
+        if len(relevant_room_ids) > 0:
+            receipt_source = self.event_sources.sources.receipt
+            receipts, _ = await receipt_source.get_new_events(
+                user=sync_config.user,
+                from_key=(
+                    from_token.stream_token.receipt_key
+                    if from_token
+                    else MultiWriterStreamToken(stream=0)
+                ),
+                to_key=to_token.receipt_key,
+                # This is a dummy value and isn't used in the function
+                limit=0,
+                room_ids=relevant_room_ids,
+                is_guest=False,
+            )
+
+            for receipt in receipts:
+                # These fields should exist for every receipt
+                room_id = receipt["room_id"]
+                type = receipt["type"]
+                content = receipt["content"]
+
+                room_result = actual_room_response_map.get(room_id)
+                if room_result is not None:
+                    if room_result.initial:
+                        # TODO: In the future, it would be good to fetch less receipts
+                        # out of the database in the first place but we would need to
+                        # add a new `event_id` index to `receipts_linearized`.
+                        relevant_event_ids = [
+                            event.event_id for event in room_result.timeline_events
+                        ]
+
+                        assert isinstance(content, dict)
+                        content = {
+                            event_id: content_value
+                            for event_id, content_value in content.items()
+                            if event_id in relevant_event_ids
+                        }
+
+                room_id_to_receipt_map[room_id] = {"type": type, "content": content}
+
+        return SlidingSyncResult.Extensions.ReceiptsExtension(
+            room_id_to_receipt_map=room_id_to_receipt_map,
+        )
+
 
 class HaveSentRoomFlag(Enum):
     """Flag for whether we have sent the room down a sliding sync connection.
diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index ccfce6bd53..c607d08de5 100644
--- a/synapse/rest/client/sync.py
+++ b/synapse/rest/client/sync.py
@@ -1150,6 +1150,12 @@ class SlidingSyncRestServlet(RestServlet):
                 },
             }
 
+        if extensions.receipts is not None:
+            serialized_extensions["receipts"] = {
+                # Same as the the top-level `account_data.events` field in Sync v2.
+                "rooms": extensions.receipts.room_id_to_receipt_map,
+            }
+
         return serialized_extensions
 
 
diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py
index f26cc0e903..2f7e92665c 100644
--- a/synapse/types/handlers/__init__.py
+++ b/synapse/types/handlers/__init__.py
@@ -152,7 +152,7 @@ class SlidingSyncResult:
     Attributes:
         next_pos: The next position token in the sliding window to request (next_batch).
         lists: Sliding window API. A map of list key to list results.
-        rooms: Room subscription API. A map of room ID to room subscription to room results.
+        rooms: Room subscription API. A map of room ID to room results.
         extensions: Extensions API. A map of extension key to extension results.
     """
 
@@ -361,12 +361,28 @@ class SlidingSyncResult:
                     self.global_account_data_map or self.account_data_by_room_map
                 )
 
+        @attr.s(slots=True, frozen=True, auto_attribs=True)
+        class ReceiptsExtension:
+            """The Receipts extension (MSC3960)
+
+            Attributes:
+                room_id_to_receipt_map: Mapping from room_id to `m.receipt` event (type, content)
+            """
+
+            room_id_to_receipt_map: Mapping[str, JsonMapping]
+
+            def __bool__(self) -> bool:
+                return bool(self.room_id_to_receipt_map)
+
         to_device: Optional[ToDeviceExtension] = None
         e2ee: Optional[E2eeExtension] = None
         account_data: Optional[AccountDataExtension] = None
+        receipts: Optional[ReceiptsExtension] = None
 
         def __bool__(self) -> bool:
-            return bool(self.to_device or self.e2ee or self.account_data)
+            return bool(
+                self.to_device or self.e2ee or self.account_data or self.receipts
+            )
 
     next_pos: SlidingSyncStreamToken
     lists: Dict[str, SlidingWindowList]
diff --git a/synapse/types/rest/client/__init__.py b/synapse/types/rest/client/__init__.py
index dfe3b1e0f7..4e632e4492 100644
--- a/synapse/types/rest/client/__init__.py
+++ b/synapse/types/rest/client/__init__.py
@@ -342,9 +342,27 @@ class SlidingSyncBody(RequestBodyModel):
             # Process all room subscriptions defined in the Room Subscription API. (This is the default.)
             rooms: Optional[List[StrictStr]] = ["*"]
 
+        class ReceiptsExtension(RequestBodyModel):
+            """The Receipts extension (MSC3960)
+
+            Attributes:
+                enabled
+                lists: List of list keys (from the Sliding Window API) to apply this
+                    extension to.
+                rooms: List of room IDs (from the Room Subscription API) to apply this
+                    extension to.
+            """
+
+            enabled: Optional[StrictBool] = False
+            # Process all lists defined in the Sliding Window API. (This is the default.)
+            lists: Optional[List[StrictStr]] = ["*"]
+            # Process all room subscriptions defined in the Room Subscription API. (This is the default.)
+            rooms: Optional[List[StrictStr]] = ["*"]
+
         to_device: Optional[ToDeviceExtension] = None
         e2ee: Optional[E2eeExtension] = None
         account_data: Optional[AccountDataExtension] = None
+        receipts: Optional[ReceiptsExtension] = None
 
     conn_id: Optional[str]