diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index d04c76be2a..c776654d12 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -288,6 +288,10 @@ class ReceiptEventSource(EventSource[MultiWriterStreamToken, JsonMapping]):
explicit_room_id: Optional[str] = None,
to_key: Optional[MultiWriterStreamToken] = None,
) -> Tuple[List[JsonMapping], MultiWriterStreamToken]:
+ """
+ Find read receipts for given rooms (> `from_token` and <= `to_token`)
+ """
+
if to_key is None:
to_key = self.get_current_key()
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 530e7b7b4e..8467766518 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -2284,11 +2284,24 @@ class SlidingSyncHandler:
from_token=from_token,
)
+ typing_response = None
+ if sync_config.extensions.typing is not None:
+ typing_response = await self.get_typing_extension_response(
+ sync_config=sync_config,
+ actual_lists=actual_lists,
+ actual_room_ids=actual_room_ids,
+ actual_room_response_map=actual_room_response_map,
+ typing_request=sync_config.extensions.typing,
+ to_token=to_token,
+ from_token=from_token,
+ )
+
return SlidingSyncResult.Extensions(
to_device=to_device_response,
e2ee=e2ee_response,
account_data=account_data_response,
receipts=receipts_response,
+ typing=typing_response,
)
def find_relevant_room_ids_for_extension(
@@ -2615,6 +2628,8 @@ class SlidingSyncHandler:
room_id_to_receipt_map: Dict[str, JsonMapping] = {}
if len(relevant_room_ids) > 0:
+ # TODO: Take connection tracking into account so that when a room comes back
+ # into range we can send the receipts that were missed.
receipt_source = self.event_sources.sources.receipt
receipts, _ = await receipt_source.get_new_events(
user=sync_config.user,
@@ -2636,6 +2651,8 @@ class SlidingSyncHandler:
type = receipt["type"]
content = receipt["content"]
+ # For `inital: True` rooms, we only want to include receipts for events
+ # in the timeline.
room_result = actual_room_response_map.get(room_id)
if room_result is not None:
if room_result.initial:
@@ -2659,6 +2676,70 @@ class SlidingSyncHandler:
room_id_to_receipt_map=room_id_to_receipt_map,
)
+ async def get_typing_extension_response(
+ self,
+ sync_config: SlidingSyncConfig,
+ actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList],
+ actual_room_ids: Set[str],
+ actual_room_response_map: Dict[str, SlidingSyncResult.RoomResult],
+ typing_request: SlidingSyncConfig.Extensions.TypingExtension,
+ to_token: StreamToken,
+ from_token: Optional[SlidingSyncStreamToken],
+ ) -> Optional[SlidingSyncResult.Extensions.TypingExtension]:
+ """Handle Typing Notification extension (MSC3961)
+
+ Args:
+ sync_config: Sync configuration
+ actual_lists: Sliding window API. A map of list key to list results in the
+ Sliding Sync response.
+ actual_room_ids: The actual room IDs in the the Sliding Sync response.
+ actual_room_response_map: A map of room ID to room results in the the
+ Sliding Sync response.
+ account_data_request: The account_data extension from the request
+ to_token: The point in the stream to sync up to.
+ from_token: The point in the stream to sync from.
+ """
+ # Skip if the extension is not enabled
+ if not typing_request.enabled:
+ return None
+
+ relevant_room_ids = self.find_relevant_room_ids_for_extension(
+ requested_lists=typing_request.lists,
+ requested_room_ids=typing_request.rooms,
+ actual_lists=actual_lists,
+ actual_room_ids=actual_room_ids,
+ )
+
+ room_id_to_typing_map: Dict[str, JsonMapping] = {}
+ if len(relevant_room_ids) > 0:
+ # Note: We don't need to take connection tracking into account for typing
+ # notifications because they'll get anything still relevant and hasn't timed
+ # out when the room comes into range. We consider the gap where the room
+ # fell out of range, as long enough for any typing notifications to have
+ # timed out (it's not worth the 30 seconds of data we may have missed).
+ typing_source = self.event_sources.sources.typing
+ typing_notifications, _ = await typing_source.get_new_events(
+ user=sync_config.user,
+ from_key=(from_token.stream_token.typing_key if from_token else 0),
+ to_key=to_token.typing_key,
+ # This is a dummy value and isn't used in the function
+ limit=0,
+ room_ids=relevant_room_ids,
+ is_guest=False,
+ )
+
+ for typing_notification in typing_notifications:
+ # These fields should exist for every typing notification
+ room_id = typing_notification["room_id"]
+ type = typing_notification["type"]
+ content = typing_notification["content"]
+
+ room_id_to_typing_map[room_id] = {"type": type, "content": content}
+
+ return SlidingSyncResult.Extensions.TypingExtension(
+ room_id_to_typing_map=room_id_to_typing_map,
+ )
+
class HaveSentRoomFlag(Enum):
"""Flag for whether we have sent the room down a sliding sync connection.
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 4c87718337..8d693fee30 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -565,7 +565,12 @@ class TypingNotificationEventSource(EventSource[int, JsonMapping]):
room_ids: Iterable[str],
is_guest: bool,
explicit_room_id: Optional[str] = None,
+ to_key: Optional[int] = None,
) -> Tuple[List[JsonMapping], int]:
+ """
+ Find typing notifications for given rooms (> `from_token` and <= `to_token`)
+ """
+
with Measure(self.clock, "typing.get_new_events"):
from_key = int(from_key)
handler = self.get_typing_handler()
@@ -574,7 +579,9 @@ class TypingNotificationEventSource(EventSource[int, JsonMapping]):
for room_id in room_ids:
if room_id not in handler._room_serials:
continue
- if handler._room_serials[room_id] <= from_key:
+ if handler._room_serials[room_id] <= from_key or (
+ to_key is not None and handler._room_serials[room_id] > to_key
+ ):
continue
events.append(self._make_event_for(room_id))
diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index c607d08de5..4f2c552af2 100644
--- a/synapse/rest/client/sync.py
+++ b/synapse/rest/client/sync.py
@@ -1152,10 +1152,14 @@ class SlidingSyncRestServlet(RestServlet):
if extensions.receipts is not None:
serialized_extensions["receipts"] = {
- # Same as the the top-level `account_data.events` field in Sync v2.
"rooms": extensions.receipts.room_id_to_receipt_map,
}
+ if extensions.typing is not None:
+ serialized_extensions["typing"] = {
+ "rooms": extensions.typing.room_id_to_typing_map,
+ }
+
return serialized_extensions
diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py
index a6adf6c9ea..363f060bef 100644
--- a/synapse/types/handlers/__init__.py
+++ b/synapse/types/handlers/__init__.py
@@ -366,7 +366,8 @@ class SlidingSyncResult:
"""The Receipts extension (MSC3960)
Attributes:
- room_id_to_receipt_map: Mapping from room_id to `m.receipt` event (type, content)
+ room_id_to_receipt_map: Mapping from room_id to `m.receipt` ephemeral
+ event (type, content)
"""
room_id_to_receipt_map: Mapping[str, JsonMapping]
@@ -374,14 +375,33 @@ class SlidingSyncResult:
def __bool__(self) -> bool:
return bool(self.room_id_to_receipt_map)
+ @attr.s(slots=True, frozen=True, auto_attribs=True)
+ class TypingExtension:
+ """The Typing Notification extension (MSC3961)
+
+ Attributes:
+ room_id_to_typing_map: Mapping from room_id to `m.typing` ephemeral
+ event (type, content)
+ """
+
+ room_id_to_typing_map: Mapping[str, JsonMapping]
+
+ def __bool__(self) -> bool:
+ return bool(self.room_id_to_typing_map)
+
to_device: Optional[ToDeviceExtension] = None
e2ee: Optional[E2eeExtension] = None
account_data: Optional[AccountDataExtension] = None
receipts: Optional[ReceiptsExtension] = None
+ typing: Optional[TypingExtension] = None
def __bool__(self) -> bool:
return bool(
- self.to_device or self.e2ee or self.account_data or self.receipts
+ self.to_device
+ or self.e2ee
+ or self.account_data
+ or self.receipts
+ or self.typing
)
next_pos: SlidingSyncStreamToken
diff --git a/synapse/types/rest/client/__init__.py b/synapse/types/rest/client/__init__.py
index 4e632e4492..93b537ab7b 100644
--- a/synapse/types/rest/client/__init__.py
+++ b/synapse/types/rest/client/__init__.py
@@ -359,10 +359,28 @@ class SlidingSyncBody(RequestBodyModel):
# Process all room subscriptions defined in the Room Subscription API. (This is the default.)
rooms: Optional[List[StrictStr]] = ["*"]
+ class TypingExtension(RequestBodyModel):
+ """The Typing Notification extension (MSC3961)
+
+ Attributes:
+ enabled
+ lists: List of list keys (from the Sliding Window API) to apply this
+ extension to.
+ rooms: List of room IDs (from the Room Subscription API) to apply this
+ extension to.
+ """
+
+ enabled: Optional[StrictBool] = False
+ # Process all lists defined in the Sliding Window API. (This is the default.)
+ lists: Optional[List[StrictStr]] = ["*"]
+ # Process all room subscriptions defined in the Room Subscription API. (This is the default.)
+ rooms: Optional[List[StrictStr]] = ["*"]
+
to_device: Optional[ToDeviceExtension] = None
e2ee: Optional[E2eeExtension] = None
account_data: Optional[AccountDataExtension] = None
receipts: Optional[ReceiptsExtension] = None
+ typing: Optional[TypingExtension] = None
conn_id: Optional[str]
|