diff --git a/changelog.d/17505.feature b/changelog.d/17505.feature
new file mode 100644
index 0000000000..ca0c2bd70f
--- /dev/null
+++ b/changelog.d/17505.feature
@@ -0,0 +1 @@
+Add typing notification extension support to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index d04c76be2a..c776654d12 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -288,6 +288,10 @@ class ReceiptEventSource(EventSource[MultiWriterStreamToken, JsonMapping]):
explicit_room_id: Optional[str] = None,
to_key: Optional[MultiWriterStreamToken] = None,
) -> Tuple[List[JsonMapping], MultiWriterStreamToken]:
+ """
+ Find read receipts for given rooms (> `from_token` and <= `to_token`)
+ """
+
if to_key is None:
to_key = self.get_current_key()
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 530e7b7b4e..8467766518 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -2284,11 +2284,24 @@ class SlidingSyncHandler:
from_token=from_token,
)
+ typing_response = None
+ if sync_config.extensions.typing is not None:
+ typing_response = await self.get_typing_extension_response(
+ sync_config=sync_config,
+ actual_lists=actual_lists,
+ actual_room_ids=actual_room_ids,
+ actual_room_response_map=actual_room_response_map,
+ typing_request=sync_config.extensions.typing,
+ to_token=to_token,
+ from_token=from_token,
+ )
+
return SlidingSyncResult.Extensions(
to_device=to_device_response,
e2ee=e2ee_response,
account_data=account_data_response,
receipts=receipts_response,
+ typing=typing_response,
)
def find_relevant_room_ids_for_extension(
@@ -2615,6 +2628,8 @@ class SlidingSyncHandler:
room_id_to_receipt_map: Dict[str, JsonMapping] = {}
if len(relevant_room_ids) > 0:
+ # TODO: Take connection tracking into account so that when a room comes back
+ # into range we can send the receipts that were missed.
receipt_source = self.event_sources.sources.receipt
receipts, _ = await receipt_source.get_new_events(
user=sync_config.user,
@@ -2636,6 +2651,8 @@ class SlidingSyncHandler:
type = receipt["type"]
content = receipt["content"]
+ # For `inital: True` rooms, we only want to include receipts for events
+ # in the timeline.
room_result = actual_room_response_map.get(room_id)
if room_result is not None:
if room_result.initial:
@@ -2659,6 +2676,70 @@ class SlidingSyncHandler:
room_id_to_receipt_map=room_id_to_receipt_map,
)
+ async def get_typing_extension_response(
+ self,
+ sync_config: SlidingSyncConfig,
+ actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList],
+ actual_room_ids: Set[str],
+ actual_room_response_map: Dict[str, SlidingSyncResult.RoomResult],
+ typing_request: SlidingSyncConfig.Extensions.TypingExtension,
+ to_token: StreamToken,
+ from_token: Optional[SlidingSyncStreamToken],
+ ) -> Optional[SlidingSyncResult.Extensions.TypingExtension]:
+ """Handle Typing Notification extension (MSC3961)
+
+ Args:
+ sync_config: Sync configuration
+ actual_lists: Sliding window API. A map of list key to list results in the
+ Sliding Sync response.
+ actual_room_ids: The actual room IDs in the the Sliding Sync response.
+ actual_room_response_map: A map of room ID to room results in the the
+ Sliding Sync response.
+ account_data_request: The account_data extension from the request
+ to_token: The point in the stream to sync up to.
+ from_token: The point in the stream to sync from.
+ """
+ # Skip if the extension is not enabled
+ if not typing_request.enabled:
+ return None
+
+ relevant_room_ids = self.find_relevant_room_ids_for_extension(
+ requested_lists=typing_request.lists,
+ requested_room_ids=typing_request.rooms,
+ actual_lists=actual_lists,
+ actual_room_ids=actual_room_ids,
+ )
+
+ room_id_to_typing_map: Dict[str, JsonMapping] = {}
+ if len(relevant_room_ids) > 0:
+ # Note: We don't need to take connection tracking into account for typing
+ # notifications because they'll get anything still relevant and hasn't timed
+ # out when the room comes into range. We consider the gap where the room
+ # fell out of range, as long enough for any typing notifications to have
+ # timed out (it's not worth the 30 seconds of data we may have missed).
+ typing_source = self.event_sources.sources.typing
+ typing_notifications, _ = await typing_source.get_new_events(
+ user=sync_config.user,
+ from_key=(from_token.stream_token.typing_key if from_token else 0),
+ to_key=to_token.typing_key,
+ # This is a dummy value and isn't used in the function
+ limit=0,
+ room_ids=relevant_room_ids,
+ is_guest=False,
+ )
+
+ for typing_notification in typing_notifications:
+ # These fields should exist for every typing notification
+ room_id = typing_notification["room_id"]
+ type = typing_notification["type"]
+ content = typing_notification["content"]
+
+ room_id_to_typing_map[room_id] = {"type": type, "content": content}
+
+ return SlidingSyncResult.Extensions.TypingExtension(
+ room_id_to_typing_map=room_id_to_typing_map,
+ )
+
class HaveSentRoomFlag(Enum):
"""Flag for whether we have sent the room down a sliding sync connection.
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 4c87718337..8d693fee30 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -565,7 +565,12 @@ class TypingNotificationEventSource(EventSource[int, JsonMapping]):
room_ids: Iterable[str],
is_guest: bool,
explicit_room_id: Optional[str] = None,
+ to_key: Optional[int] = None,
) -> Tuple[List[JsonMapping], int]:
+ """
+ Find typing notifications for given rooms (> `from_token` and <= `to_token`)
+ """
+
with Measure(self.clock, "typing.get_new_events"):
from_key = int(from_key)
handler = self.get_typing_handler()
@@ -574,7 +579,9 @@ class TypingNotificationEventSource(EventSource[int, JsonMapping]):
for room_id in room_ids:
if room_id not in handler._room_serials:
continue
- if handler._room_serials[room_id] <= from_key:
+ if handler._room_serials[room_id] <= from_key or (
+ to_key is not None and handler._room_serials[room_id] > to_key
+ ):
continue
events.append(self._make_event_for(room_id))
diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index c607d08de5..4f2c552af2 100644
--- a/synapse/rest/client/sync.py
+++ b/synapse/rest/client/sync.py
@@ -1152,10 +1152,14 @@ class SlidingSyncRestServlet(RestServlet):
if extensions.receipts is not None:
serialized_extensions["receipts"] = {
- # Same as the the top-level `account_data.events` field in Sync v2.
"rooms": extensions.receipts.room_id_to_receipt_map,
}
+ if extensions.typing is not None:
+ serialized_extensions["typing"] = {
+ "rooms": extensions.typing.room_id_to_typing_map,
+ }
+
return serialized_extensions
diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py
index a6adf6c9ea..363f060bef 100644
--- a/synapse/types/handlers/__init__.py
+++ b/synapse/types/handlers/__init__.py
@@ -366,7 +366,8 @@ class SlidingSyncResult:
"""The Receipts extension (MSC3960)
Attributes:
- room_id_to_receipt_map: Mapping from room_id to `m.receipt` event (type, content)
+ room_id_to_receipt_map: Mapping from room_id to `m.receipt` ephemeral
+ event (type, content)
"""
room_id_to_receipt_map: Mapping[str, JsonMapping]
@@ -374,14 +375,33 @@ class SlidingSyncResult:
def __bool__(self) -> bool:
return bool(self.room_id_to_receipt_map)
+ @attr.s(slots=True, frozen=True, auto_attribs=True)
+ class TypingExtension:
+ """The Typing Notification extension (MSC3961)
+
+ Attributes:
+ room_id_to_typing_map: Mapping from room_id to `m.typing` ephemeral
+ event (type, content)
+ """
+
+ room_id_to_typing_map: Mapping[str, JsonMapping]
+
+ def __bool__(self) -> bool:
+ return bool(self.room_id_to_typing_map)
+
to_device: Optional[ToDeviceExtension] = None
e2ee: Optional[E2eeExtension] = None
account_data: Optional[AccountDataExtension] = None
receipts: Optional[ReceiptsExtension] = None
+ typing: Optional[TypingExtension] = None
def __bool__(self) -> bool:
return bool(
- self.to_device or self.e2ee or self.account_data or self.receipts
+ self.to_device
+ or self.e2ee
+ or self.account_data
+ or self.receipts
+ or self.typing
)
next_pos: SlidingSyncStreamToken
diff --git a/synapse/types/rest/client/__init__.py b/synapse/types/rest/client/__init__.py
index 4e632e4492..93b537ab7b 100644
--- a/synapse/types/rest/client/__init__.py
+++ b/synapse/types/rest/client/__init__.py
@@ -359,10 +359,28 @@ class SlidingSyncBody(RequestBodyModel):
# Process all room subscriptions defined in the Room Subscription API. (This is the default.)
rooms: Optional[List[StrictStr]] = ["*"]
+ class TypingExtension(RequestBodyModel):
+ """The Typing Notification extension (MSC3961)
+
+ Attributes:
+ enabled
+ lists: List of list keys (from the Sliding Window API) to apply this
+ extension to.
+ rooms: List of room IDs (from the Room Subscription API) to apply this
+ extension to.
+ """
+
+ enabled: Optional[StrictBool] = False
+ # Process all lists defined in the Sliding Window API. (This is the default.)
+ lists: Optional[List[StrictStr]] = ["*"]
+ # Process all room subscriptions defined in the Room Subscription API. (This is the default.)
+ rooms: Optional[List[StrictStr]] = ["*"]
+
to_device: Optional[ToDeviceExtension] = None
e2ee: Optional[E2eeExtension] = None
account_data: Optional[AccountDataExtension] = None
receipts: Optional[ReceiptsExtension] = None
+ typing: Optional[TypingExtension] = None
conn_id: Optional[str]
diff --git a/tests/rest/client/sliding_sync/test_extension_typing.py b/tests/rest/client/sliding_sync/test_extension_typing.py
new file mode 100644
index 0000000000..7f523e0f10
--- /dev/null
+++ b/tests/rest/client/sliding_sync/test_extension_typing.py
@@ -0,0 +1,482 @@
+#
+# This file is licensed under the Affero General Public License (AGPL) version 3.
+#
+# Copyright (C) 2024 New Vector, Ltd
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# See the GNU Affero General Public License for more details:
+# <https://www.gnu.org/licenses/agpl-3.0.html>.
+#
+import logging
+
+from twisted.test.proto_helpers import MemoryReactor
+
+import synapse.rest.admin
+from synapse.api.constants import EduTypes
+from synapse.rest.client import login, room, sync
+from synapse.server import HomeServer
+from synapse.types import StreamKeyType
+from synapse.util import Clock
+
+from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase
+from tests.server import TimedOutException
+
+logger = logging.getLogger(__name__)
+
+
+class SlidingSyncTypingExtensionTestCase(SlidingSyncBase):
+ """Tests for the typing notification sliding sync extension"""
+
+ servlets = [
+ synapse.rest.admin.register_servlets,
+ login.register_servlets,
+ room.register_servlets,
+ sync.register_servlets,
+ ]
+
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+ self.store = hs.get_datastores().main
+
+ def test_no_data_initial_sync(self) -> None:
+ """
+ Test that enabling the typing extension works during an intitial sync,
+ even if there is no-data.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+
+ # Make an initial Sliding Sync request with the typing extension enabled
+ sync_body = {
+ "lists": {},
+ "extensions": {
+ "typing": {
+ "enabled": True,
+ }
+ },
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+
+ self.assertIncludes(
+ response_body["extensions"]["typing"].get("rooms").keys(),
+ set(),
+ exact=True,
+ )
+
+ def test_no_data_incremental_sync(self) -> None:
+ """
+ Test that enabling typing extension works during an incremental sync, even
+ if there is no-data.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+
+ sync_body = {
+ "lists": {},
+ "extensions": {
+ "typing": {
+ "enabled": True,
+ }
+ },
+ }
+ _, from_token = self.do_sync(sync_body, tok=user1_tok)
+
+ # Make an incremental Sliding Sync request with the typing extension enabled
+ response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
+
+ self.assertIncludes(
+ response_body["extensions"]["typing"].get("rooms").keys(),
+ set(),
+ exact=True,
+ )
+
+ def test_typing_initial_sync(self) -> None:
+ """
+ On initial sync, we return all typing notifications for rooms that we request
+ and are being returned in the Sliding Sync response.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+ user3_id = self.register_user("user3", "pass")
+ user3_tok = self.login(user3_id, "pass")
+ user4_id = self.register_user("user4", "pass")
+ user4_tok = self.login(user4_id, "pass")
+
+ # Create a room
+ room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+ self.helper.join(room_id1, user1_id, tok=user1_tok)
+ self.helper.join(room_id1, user3_id, tok=user3_tok)
+ self.helper.join(room_id1, user4_id, tok=user4_tok)
+ # User1 starts typing in room1
+ channel = self.make_request(
+ "PUT",
+ f"/rooms/{room_id1}/typing/{user1_id}",
+ b'{"typing": true, "timeout": 30000}',
+ access_token=user1_tok,
+ )
+ self.assertEqual(channel.code, 200, channel.json_body)
+ # User2 starts typing in room1
+ channel = self.make_request(
+ "PUT",
+ f"/rooms/{room_id1}/typing/{user2_id}",
+ b'{"typing": true, "timeout": 30000}',
+ access_token=user2_tok,
+ )
+ self.assertEqual(channel.code, 200, channel.json_body)
+
+ # Create another room
+ room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok)
+ self.helper.join(room_id2, user1_id, tok=user1_tok)
+ self.helper.join(room_id2, user3_id, tok=user3_tok)
+ self.helper.join(room_id2, user4_id, tok=user4_tok)
+ # User1 starts typing in room2
+ channel = self.make_request(
+ "PUT",
+ f"/rooms/{room_id2}/typing/{user1_id}",
+ b'{"typing": true, "timeout": 30000}',
+ access_token=user1_tok,
+ )
+ self.assertEqual(channel.code, 200, channel.json_body)
+ # User2 starts typing in room2
+ channel = self.make_request(
+ "PUT",
+ f"/rooms/{room_id2}/typing/{user2_id}",
+ b'{"typing": true, "timeout": 30000}',
+ access_token=user2_tok,
+ )
+ self.assertEqual(channel.code, 200, channel.json_body)
+
+ # Make an initial Sliding Sync request with the typing extension enabled
+ sync_body = {
+ "lists": {},
+ "room_subscriptions": {
+ room_id1: {
+ "required_state": [],
+ "timeline_limit": 0,
+ }
+ },
+ "extensions": {
+ "typing": {
+ "enabled": True,
+ "rooms": [room_id1, room_id2],
+ }
+ },
+ }
+ response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+
+ # Even though we requested room2, we only expect room1 to show up because that's
+ # the only room in the Sliding Sync response (room2 is not one of our room
+ # subscriptions or in a sliding window list).
+ self.assertIncludes(
+ response_body["extensions"]["typing"].get("rooms").keys(),
+ {room_id1},
+ exact=True,
+ )
+ # Sanity check that it's the correct ephemeral event type
+ self.assertEqual(
+ response_body["extensions"]["typing"]["rooms"][room_id1]["type"],
+ EduTypes.TYPING,
+ )
+ # We can see user1 and user2 typing
+ self.assertIncludes(
+ set(
+ response_body["extensions"]["typing"]["rooms"][room_id1]["content"][
+ "user_ids"
+ ]
+ ),
+ {user1_id, user2_id},
+ exact=True,
+ )
+
+ def test_typing_incremental_sync(self) -> None:
+ """
+ On incremental sync, we return all typing notifications in the token range for a
+ given room but only for rooms that we request and are being returned in the
+ Sliding Sync response.
+ """
+
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+ user3_id = self.register_user("user3", "pass")
+ user3_tok = self.login(user3_id, "pass")
+
+ # Create room1
+ room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+ self.helper.join(room_id1, user1_id, tok=user1_tok)
+ self.helper.join(room_id1, user3_id, tok=user3_tok)
+ # User2 starts typing in room1
+ channel = self.make_request(
+ "PUT",
+ f"/rooms/{room_id1}/typing/{user2_id}",
+ b'{"typing": true, "timeout": 30000}',
+ access_token=user2_tok,
+ )
+ self.assertEqual(channel.code, 200, channel.json_body)
+
+ # Create room2
+ room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok)
+ self.helper.join(room_id2, user1_id, tok=user1_tok)
+ # User1 starts typing in room2 (before the `from_token`)
+ channel = self.make_request(
+ "PUT",
+ f"/rooms/{room_id2}/typing/{user1_id}",
+ b'{"typing": true, "timeout": 30000}',
+ access_token=user1_tok,
+ )
+ self.assertEqual(channel.code, 200, channel.json_body)
+
+ # Create room3
+ room_id3 = self.helper.create_room_as(user2_id, tok=user2_tok)
+ self.helper.join(room_id3, user1_id, tok=user1_tok)
+ self.helper.join(room_id3, user3_id, tok=user3_tok)
+
+ # Create room4
+ room_id4 = self.helper.create_room_as(user2_id, tok=user2_tok)
+ self.helper.join(room_id4, user1_id, tok=user1_tok)
+ self.helper.join(room_id4, user3_id, tok=user3_tok)
+ # User1 starts typing in room4 (before the `from_token`)
+ channel = self.make_request(
+ "PUT",
+ f"/rooms/{room_id4}/typing/{user1_id}",
+ b'{"typing": true, "timeout": 30000}',
+ access_token=user1_tok,
+ )
+ self.assertEqual(channel.code, 200, channel.json_body)
+
+ # Advance time so all of the typing notifications timeout before we make our
+ # Sliding Sync requests. Even though these are sent before the `from_token`, the
+ # typing code only keeps track of stream position of the latest typing
+ # notification so "old" typing notifications that are still "alive" (haven't
+ # timed out) can appear in the response.
+ self.reactor.advance(36)
+
+ sync_body = {
+ "lists": {},
+ "room_subscriptions": {
+ room_id1: {
+ "required_state": [],
+ "timeline_limit": 0,
+ },
+ room_id3: {
+ "required_state": [],
+ "timeline_limit": 0,
+ },
+ room_id4: {
+ "required_state": [],
+ "timeline_limit": 0,
+ },
+ },
+ "extensions": {
+ "typing": {
+ "enabled": True,
+ "rooms": [room_id1, room_id2, room_id3, room_id4],
+ }
+ },
+ }
+ _, from_token = self.do_sync(sync_body, tok=user1_tok)
+
+ # Add some more typing notifications after the `from_token`
+ #
+ # User1 starts typing in room1
+ channel = self.make_request(
+ "PUT",
+ f"/rooms/{room_id1}/typing/{user1_id}",
+ b'{"typing": true, "timeout": 30000}',
+ access_token=user1_tok,
+ )
+ self.assertEqual(channel.code, 200, channel.json_body)
+ # User1 starts typing in room2
+ channel = self.make_request(
+ "PUT",
+ f"/rooms/{room_id2}/typing/{user1_id}",
+ b'{"typing": true, "timeout": 30000}',
+ access_token=user1_tok,
+ )
+ self.assertEqual(channel.code, 200, channel.json_body)
+ # User3 starts typing in room3
+ channel = self.make_request(
+ "PUT",
+ f"/rooms/{room_id3}/typing/{user3_id}",
+ b'{"typing": true, "timeout": 30000}',
+ access_token=user3_tok,
+ )
+ self.assertEqual(channel.code, 200, channel.json_body)
+ # No activity for room4 after the `from_token`
+
+ # Make an incremental Sliding Sync request with the typing extension enabled
+ response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
+
+ # Even though we requested room2, we only expect rooms to show up if they are
+ # already in the Sliding Sync response. room4 doesn't show up because there is
+ # no activity after the `from_token`.
+ self.assertIncludes(
+ response_body["extensions"]["typing"].get("rooms").keys(),
+ {room_id1, room_id3},
+ exact=True,
+ )
+
+ # Check room1:
+ #
+ # Sanity check that it's the correct ephemeral event type
+ self.assertEqual(
+ response_body["extensions"]["typing"]["rooms"][room_id1]["type"],
+ EduTypes.TYPING,
+ )
+ # We only see that user1 is typing in room1 since the `from_token`
+ self.assertIncludes(
+ set(
+ response_body["extensions"]["typing"]["rooms"][room_id1]["content"][
+ "user_ids"
+ ]
+ ),
+ {user1_id},
+ exact=True,
+ )
+
+ # Check room3:
+ #
+ # Sanity check that it's the correct ephemeral event type
+ self.assertEqual(
+ response_body["extensions"]["typing"]["rooms"][room_id3]["type"],
+ EduTypes.TYPING,
+ )
+ # We only see that user3 is typing in room1 since the `from_token`
+ self.assertIncludes(
+ set(
+ response_body["extensions"]["typing"]["rooms"][room_id3]["content"][
+ "user_ids"
+ ]
+ ),
+ {user3_id},
+ exact=True,
+ )
+
+ def test_wait_for_new_data(self) -> None:
+ """
+ Test to make sure that the Sliding Sync request waits for new data to arrive.
+
+ (Only applies to incremental syncs with a `timeout` specified)
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
+ self.helper.join(room_id, user1_id, tok=user1_tok)
+
+ sync_body = {
+ "lists": {},
+ "room_subscriptions": {
+ room_id: {
+ "required_state": [],
+ "timeline_limit": 0,
+ },
+ },
+ "extensions": {
+ "typing": {
+ "enabled": True,
+ "rooms": [room_id],
+ }
+ },
+ }
+ _, from_token = self.do_sync(sync_body, tok=user1_tok)
+
+ # Make an incremental Sliding Sync request with the typing extension enabled
+ channel = self.make_request(
+ "POST",
+ self.sync_endpoint + f"?timeout=10000&pos={from_token}",
+ content=sync_body,
+ access_token=user1_tok,
+ await_result=False,
+ )
+ # Block for 5 seconds to make sure we are `notifier.wait_for_events(...)`
+ with self.assertRaises(TimedOutException):
+ channel.await_result(timeout_ms=5000)
+ # Bump the typing status to trigger new results
+ typing_channel = self.make_request(
+ "PUT",
+ f"/rooms/{room_id}/typing/{user2_id}",
+ b'{"typing": true, "timeout": 30000}',
+ access_token=user2_tok,
+ )
+ self.assertEqual(typing_channel.code, 200, typing_channel.json_body)
+ # Should respond before the 10 second timeout
+ channel.await_result(timeout_ms=3000)
+ self.assertEqual(channel.code, 200, channel.json_body)
+
+ # We should see the new typing notification
+ self.assertIncludes(
+ channel.json_body.get("extensions", {})
+ .get("typing", {})
+ .get("rooms", {})
+ .keys(),
+ {room_id},
+ exact=True,
+ message=str(channel.json_body),
+ )
+ self.assertIncludes(
+ set(
+ channel.json_body["extensions"]["typing"]["rooms"][room_id]["content"][
+ "user_ids"
+ ]
+ ),
+ {user2_id},
+ exact=True,
+ )
+
+ def test_wait_for_new_data_timeout(self) -> None:
+ """
+ Test to make sure that the Sliding Sync request waits for new data to arrive but
+ no data ever arrives so we timeout. We're also making sure that the default data
+ from the typing extension doesn't trigger a false-positive for new data.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+
+ sync_body = {
+ "lists": {},
+ "extensions": {
+ "typing": {
+ "enabled": True,
+ }
+ },
+ }
+ _, from_token = self.do_sync(sync_body, tok=user1_tok)
+
+ # Make the Sliding Sync request
+ channel = self.make_request(
+ "POST",
+ self.sync_endpoint + f"?timeout=10000&pos={from_token}",
+ content=sync_body,
+ access_token=user1_tok,
+ await_result=False,
+ )
+ # Block for 5 seconds to make sure we are `notifier.wait_for_events(...)`
+ with self.assertRaises(TimedOutException):
+ channel.await_result(timeout_ms=5000)
+ # Wake-up `notifier.wait_for_events(...)` that will cause us test
+ # `SlidingSyncResult.__bool__` for new results.
+ self._bump_notifier_wait_for_events(
+ user1_id, wake_stream_key=StreamKeyType.ACCOUNT_DATA
+ )
+ # Block for a little bit more to ensure we don't see any new results.
+ with self.assertRaises(TimedOutException):
+ channel.await_result(timeout_ms=4000)
+ # Wait for the sync to complete (wait for the rest of the 10 second timeout,
+ # 5000 + 4000 + 1200 > 10000)
+ channel.await_result(timeout_ms=1200)
+ self.assertEqual(channel.code, 200, channel.json_body)
+
+ self.assertIncludes(
+ channel.json_body["extensions"]["typing"].get("rooms").keys(),
+ set(),
+ exact=True,
+ )
diff --git a/tests/rest/client/sliding_sync/test_extensions.py b/tests/rest/client/sliding_sync/test_extensions.py
index e42904b69b..68f6661334 100644
--- a/tests/rest/client/sliding_sync/test_extensions.py
+++ b/tests/rest/client/sliding_sync/test_extensions.py
@@ -12,8 +12,10 @@
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
import logging
+from typing import Literal
from parameterized import parameterized
+from typing_extensions import assert_never
from twisted.test.proto_helpers import MemoryReactor
@@ -48,11 +50,16 @@ class SlidingSyncExtensionsTestCase(SlidingSyncBase):
self.account_data_handler = hs.get_account_data_handler()
# Any extensions that use `lists`/`rooms` should be tested here
- @parameterized.expand([("account_data",), ("receipts",)])
- def test_extensions_lists_rooms_relevant_rooms(self, extension_name: str) -> None:
+ @parameterized.expand([("account_data",), ("receipts",), ("typing",)])
+ def test_extensions_lists_rooms_relevant_rooms(
+ self,
+ extension_name: Literal["account_data", "receipts", "typing"],
+ ) -> None:
"""
With various extensions, test out requesting different variations of
`lists`/`rooms`.
+
+ Stresses `SlidingSyncHandler.find_relevant_room_ids_for_extension(...)`
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
@@ -95,8 +102,17 @@ class SlidingSyncExtensionsTestCase(SlidingSyncBase):
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.json_body)
+ elif extension_name == "typing":
+ # Start a typing notification
+ channel = self.make_request(
+ "PUT",
+ f"/rooms/{room_id}/typing/{user1_id}",
+ b'{"typing": true, "timeout": 30000}',
+ access_token=user1_tok,
+ )
+ self.assertEqual(channel.code, 200, channel.json_body)
else:
- raise AssertionError(f"Unknown extension name: {extension_name}")
+ assert_never(extension_name)
main_sync_body = {
"lists": {
|