diff --git a/synapse/handlers/sliding_sync/extensions.py b/synapse/handlers/sliding_sync/extensions.py
new file mode 100644
index 0000000000..599c74429e
--- /dev/null
+++ b/synapse/handlers/sliding_sync/extensions.py
@@ -0,0 +1,660 @@
+#
+# This file is licensed under the Affero General Public License (AGPL) version 3.
+#
+# Copyright (C) 2023 New Vector, Ltd
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# See the GNU Affero General Public License for more details:
+# <https://www.gnu.org/licenses/agpl-3.0.html>.
+#
+
+import logging
+from typing import TYPE_CHECKING, Dict, List, Mapping, Optional, Sequence, Set
+
+from typing_extensions import assert_never
+
+from synapse.api.constants import AccountDataTypes
+from synapse.handlers.receipts import ReceiptEventSource
+from synapse.handlers.sliding_sync.types import (
+ HaveSentRoomFlag,
+ MutablePerConnectionState,
+ PerConnectionState,
+)
+from synapse.logging.opentracing import trace
+from synapse.types import (
+ DeviceListUpdates,
+ JsonMapping,
+ MultiWriterStreamToken,
+ SlidingSyncStreamToken,
+ StreamToken,
+)
+from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult
+
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
+logger = logging.getLogger(__name__)
+
+
+class SlidingSyncExtensionHandler:
+ """Handles the extensions to sliding sync."""
+
+ def __init__(self, hs: "HomeServer"):
+ self.store = hs.get_datastores().main
+ self.event_sources = hs.get_event_sources()
+ self.device_handler = hs.get_device_handler()
+ self.push_rules_handler = hs.get_push_rules_handler()
+
+ @trace
+ async def get_extensions_response(
+ self,
+ sync_config: SlidingSyncConfig,
+ previous_connection_state: "PerConnectionState",
+ new_connection_state: "MutablePerConnectionState",
+ actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList],
+ actual_room_ids: Set[str],
+ actual_room_response_map: Dict[str, SlidingSyncResult.RoomResult],
+ to_token: StreamToken,
+ from_token: Optional[SlidingSyncStreamToken],
+ ) -> SlidingSyncResult.Extensions:
+ """Handle extension requests.
+
+ Args:
+ sync_config: Sync configuration
+ new_connection_state: Snapshot of the current per-connection state
+ new_per_connection_state: A mutable copy of the per-connection
+ state, used to record updates to the state during this request.
+ actual_lists: Sliding window API. A map of list key to list results in the
+ Sliding Sync response.
+ actual_room_ids: The actual room IDs in the the Sliding Sync response.
+ actual_room_response_map: A map of room ID to room results in the the
+ Sliding Sync response.
+ to_token: The point in the stream to sync up to.
+ from_token: The point in the stream to sync from.
+ """
+
+ if sync_config.extensions is None:
+ return SlidingSyncResult.Extensions()
+
+ to_device_response = None
+ if sync_config.extensions.to_device is not None:
+ to_device_response = await self.get_to_device_extension_response(
+ sync_config=sync_config,
+ to_device_request=sync_config.extensions.to_device,
+ to_token=to_token,
+ )
+
+ e2ee_response = None
+ if sync_config.extensions.e2ee is not None:
+ e2ee_response = await self.get_e2ee_extension_response(
+ sync_config=sync_config,
+ e2ee_request=sync_config.extensions.e2ee,
+ to_token=to_token,
+ from_token=from_token,
+ )
+
+ account_data_response = None
+ if sync_config.extensions.account_data is not None:
+ account_data_response = await self.get_account_data_extension_response(
+ sync_config=sync_config,
+ actual_lists=actual_lists,
+ actual_room_ids=actual_room_ids,
+ account_data_request=sync_config.extensions.account_data,
+ to_token=to_token,
+ from_token=from_token,
+ )
+
+ receipts_response = None
+ if sync_config.extensions.receipts is not None:
+ receipts_response = await self.get_receipts_extension_response(
+ sync_config=sync_config,
+ previous_connection_state=previous_connection_state,
+ new_connection_state=new_connection_state,
+ actual_lists=actual_lists,
+ actual_room_ids=actual_room_ids,
+ actual_room_response_map=actual_room_response_map,
+ receipts_request=sync_config.extensions.receipts,
+ to_token=to_token,
+ from_token=from_token,
+ )
+
+ typing_response = None
+ if sync_config.extensions.typing is not None:
+ typing_response = await self.get_typing_extension_response(
+ sync_config=sync_config,
+ actual_lists=actual_lists,
+ actual_room_ids=actual_room_ids,
+ actual_room_response_map=actual_room_response_map,
+ typing_request=sync_config.extensions.typing,
+ to_token=to_token,
+ from_token=from_token,
+ )
+
+ return SlidingSyncResult.Extensions(
+ to_device=to_device_response,
+ e2ee=e2ee_response,
+ account_data=account_data_response,
+ receipts=receipts_response,
+ typing=typing_response,
+ )
+
+ def find_relevant_room_ids_for_extension(
+ self,
+ requested_lists: Optional[List[str]],
+ requested_room_ids: Optional[List[str]],
+ actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList],
+ actual_room_ids: Set[str],
+ ) -> Set[str]:
+ """
+ Handle the reserved `lists`/`rooms` keys for extensions. Extensions should only
+ return results for rooms in the Sliding Sync response. This matches up the
+ requested rooms/lists with the actual lists/rooms in the Sliding Sync response.
+
+ {"lists": []} // Do not process any lists.
+ {"lists": ["rooms", "dms"]} // Process only a subset of lists.
+ {"lists": ["*"]} // Process all lists defined in the Sliding Window API. (This is the default.)
+
+ {"rooms": []} // Do not process any specific rooms.
+ {"rooms": ["!a:b", "!c:d"]} // Process only a subset of room subscriptions.
+ {"rooms": ["*"]} // Process all room subscriptions defined in the Room Subscription API. (This is the default.)
+
+ Args:
+ requested_lists: The `lists` from the extension request.
+ requested_room_ids: The `rooms` from the extension request.
+ actual_lists: The actual lists from the Sliding Sync response.
+ actual_room_ids: The actual room subscriptions from the Sliding Sync request.
+ """
+
+ # We only want to include account data for rooms that are already in the sliding
+ # sync response AND that were requested in the account data request.
+ relevant_room_ids: Set[str] = set()
+
+ # See what rooms from the room subscriptions we should get account data for
+ if requested_room_ids is not None:
+ for room_id in requested_room_ids:
+ # A wildcard means we process all rooms from the room subscriptions
+ if room_id == "*":
+ relevant_room_ids.update(actual_room_ids)
+ break
+
+ if room_id in actual_room_ids:
+ relevant_room_ids.add(room_id)
+
+ # See what rooms from the sliding window lists we should get account data for
+ if requested_lists is not None:
+ for list_key in requested_lists:
+ # Just some typing because we share the variable name in multiple places
+ actual_list: Optional[SlidingSyncResult.SlidingWindowList] = None
+
+ # A wildcard means we process rooms from all lists
+ if list_key == "*":
+ for actual_list in actual_lists.values():
+ # We only expect a single SYNC operation for any list
+ assert len(actual_list.ops) == 1
+ sync_op = actual_list.ops[0]
+ assert sync_op.op == OperationType.SYNC
+
+ relevant_room_ids.update(sync_op.room_ids)
+
+ break
+
+ actual_list = actual_lists.get(list_key)
+ if actual_list is not None:
+ # We only expect a single SYNC operation for any list
+ assert len(actual_list.ops) == 1
+ sync_op = actual_list.ops[0]
+ assert sync_op.op == OperationType.SYNC
+
+ relevant_room_ids.update(sync_op.room_ids)
+
+ return relevant_room_ids
+
+ @trace
+ async def get_to_device_extension_response(
+ self,
+ sync_config: SlidingSyncConfig,
+ to_device_request: SlidingSyncConfig.Extensions.ToDeviceExtension,
+ to_token: StreamToken,
+ ) -> Optional[SlidingSyncResult.Extensions.ToDeviceExtension]:
+ """Handle to-device extension (MSC3885)
+
+ Args:
+ sync_config: Sync configuration
+ to_device_request: The to-device extension from the request
+ to_token: The point in the stream to sync up to.
+ """
+ user_id = sync_config.user.to_string()
+ device_id = sync_config.requester.device_id
+
+ # Skip if the extension is not enabled
+ if not to_device_request.enabled:
+ return None
+
+ # Check that this request has a valid device ID (not all requests have
+ # to belong to a device, and so device_id is None)
+ if device_id is None:
+ return SlidingSyncResult.Extensions.ToDeviceExtension(
+ next_batch=f"{to_token.to_device_key}",
+ events=[],
+ )
+
+ since_stream_id = 0
+ if to_device_request.since is not None:
+ # We've already validated this is an int.
+ since_stream_id = int(to_device_request.since)
+
+ if to_token.to_device_key < since_stream_id:
+ # The since token is ahead of our current token, so we return an
+ # empty response.
+ logger.warning(
+ "Got to-device.since from the future. since token: %r is ahead of our current to_device stream position: %r",
+ since_stream_id,
+ to_token.to_device_key,
+ )
+ return SlidingSyncResult.Extensions.ToDeviceExtension(
+ next_batch=to_device_request.since,
+ events=[],
+ )
+
+ # Delete everything before the given since token, as we know the
+ # device must have received them.
+ deleted = await self.store.delete_messages_for_device(
+ user_id=user_id,
+ device_id=device_id,
+ up_to_stream_id=since_stream_id,
+ )
+
+ logger.debug(
+ "Deleted %d to-device messages up to %d for %s",
+ deleted,
+ since_stream_id,
+ user_id,
+ )
+
+ messages, stream_id = await self.store.get_messages_for_device(
+ user_id=user_id,
+ device_id=device_id,
+ from_stream_id=since_stream_id,
+ to_stream_id=to_token.to_device_key,
+ limit=min(to_device_request.limit, 100), # Limit to at most 100 events
+ )
+
+ return SlidingSyncResult.Extensions.ToDeviceExtension(
+ next_batch=f"{stream_id}",
+ events=messages,
+ )
+
+ @trace
+ async def get_e2ee_extension_response(
+ self,
+ sync_config: SlidingSyncConfig,
+ e2ee_request: SlidingSyncConfig.Extensions.E2eeExtension,
+ to_token: StreamToken,
+ from_token: Optional[SlidingSyncStreamToken],
+ ) -> Optional[SlidingSyncResult.Extensions.E2eeExtension]:
+ """Handle E2EE device extension (MSC3884)
+
+ Args:
+ sync_config: Sync configuration
+ e2ee_request: The e2ee extension from the request
+ to_token: The point in the stream to sync up to.
+ from_token: The point in the stream to sync from.
+ """
+ user_id = sync_config.user.to_string()
+ device_id = sync_config.requester.device_id
+
+ # Skip if the extension is not enabled
+ if not e2ee_request.enabled:
+ return None
+
+ device_list_updates: Optional[DeviceListUpdates] = None
+ if from_token is not None:
+ # TODO: This should take into account the `from_token` and `to_token`
+ device_list_updates = await self.device_handler.get_user_ids_changed(
+ user_id=user_id,
+ from_token=from_token.stream_token,
+ )
+
+ device_one_time_keys_count: Mapping[str, int] = {}
+ device_unused_fallback_key_types: Sequence[str] = []
+ if device_id:
+ # TODO: We should have a way to let clients differentiate between the states of:
+ # * no change in OTK count since the provided since token
+ # * the server has zero OTKs left for this device
+ # Spec issue: https://github.com/matrix-org/matrix-doc/issues/3298
+ device_one_time_keys_count = await self.store.count_e2e_one_time_keys(
+ user_id, device_id
+ )
+ device_unused_fallback_key_types = (
+ await self.store.get_e2e_unused_fallback_key_types(user_id, device_id)
+ )
+
+ return SlidingSyncResult.Extensions.E2eeExtension(
+ device_list_updates=device_list_updates,
+ device_one_time_keys_count=device_one_time_keys_count,
+ device_unused_fallback_key_types=device_unused_fallback_key_types,
+ )
+
+ @trace
+ async def get_account_data_extension_response(
+ self,
+ sync_config: SlidingSyncConfig,
+ actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList],
+ actual_room_ids: Set[str],
+ account_data_request: SlidingSyncConfig.Extensions.AccountDataExtension,
+ to_token: StreamToken,
+ from_token: Optional[SlidingSyncStreamToken],
+ ) -> Optional[SlidingSyncResult.Extensions.AccountDataExtension]:
+ """Handle Account Data extension (MSC3959)
+
+ Args:
+ sync_config: Sync configuration
+ actual_lists: Sliding window API. A map of list key to list results in the
+ Sliding Sync response.
+ actual_room_ids: The actual room IDs in the the Sliding Sync response.
+ account_data_request: The account_data extension from the request
+ to_token: The point in the stream to sync up to.
+ from_token: The point in the stream to sync from.
+ """
+ user_id = sync_config.user.to_string()
+
+ # Skip if the extension is not enabled
+ if not account_data_request.enabled:
+ return None
+
+ global_account_data_map: Mapping[str, JsonMapping] = {}
+ if from_token is not None:
+ # TODO: This should take into account the `from_token` and `to_token`
+ global_account_data_map = (
+ await self.store.get_updated_global_account_data_for_user(
+ user_id, from_token.stream_token.account_data_key
+ )
+ )
+
+ have_push_rules_changed = await self.store.have_push_rules_changed_for_user(
+ user_id, from_token.stream_token.push_rules_key
+ )
+ if have_push_rules_changed:
+ global_account_data_map = dict(global_account_data_map)
+ # TODO: This should take into account the `from_token` and `to_token`
+ global_account_data_map[AccountDataTypes.PUSH_RULES] = (
+ await self.push_rules_handler.push_rules_for_user(sync_config.user)
+ )
+ else:
+ # TODO: This should take into account the `to_token`
+ all_global_account_data = await self.store.get_global_account_data_for_user(
+ user_id
+ )
+
+ global_account_data_map = dict(all_global_account_data)
+ # TODO: This should take into account the `to_token`
+ global_account_data_map[AccountDataTypes.PUSH_RULES] = (
+ await self.push_rules_handler.push_rules_for_user(sync_config.user)
+ )
+
+ # Fetch room account data
+ account_data_by_room_map: Mapping[str, Mapping[str, JsonMapping]] = {}
+ relevant_room_ids = self.find_relevant_room_ids_for_extension(
+ requested_lists=account_data_request.lists,
+ requested_room_ids=account_data_request.rooms,
+ actual_lists=actual_lists,
+ actual_room_ids=actual_room_ids,
+ )
+ if len(relevant_room_ids) > 0:
+ if from_token is not None:
+ # TODO: This should take into account the `from_token` and `to_token`
+ account_data_by_room_map = (
+ await self.store.get_updated_room_account_data_for_user(
+ user_id, from_token.stream_token.account_data_key
+ )
+ )
+ else:
+ # TODO: This should take into account the `to_token`
+ account_data_by_room_map = (
+ await self.store.get_room_account_data_for_user(user_id)
+ )
+
+ # Filter down to the relevant rooms
+ account_data_by_room_map = {
+ room_id: account_data_map
+ for room_id, account_data_map in account_data_by_room_map.items()
+ if room_id in relevant_room_ids
+ }
+
+ return SlidingSyncResult.Extensions.AccountDataExtension(
+ global_account_data_map=global_account_data_map,
+ account_data_by_room_map=account_data_by_room_map,
+ )
+
+ @trace
+ async def get_receipts_extension_response(
+ self,
+ sync_config: SlidingSyncConfig,
+ previous_connection_state: "PerConnectionState",
+ new_connection_state: "MutablePerConnectionState",
+ actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList],
+ actual_room_ids: Set[str],
+ actual_room_response_map: Dict[str, SlidingSyncResult.RoomResult],
+ receipts_request: SlidingSyncConfig.Extensions.ReceiptsExtension,
+ to_token: StreamToken,
+ from_token: Optional[SlidingSyncStreamToken],
+ ) -> Optional[SlidingSyncResult.Extensions.ReceiptsExtension]:
+ """Handle Receipts extension (MSC3960)
+
+ Args:
+ sync_config: Sync configuration
+ previous_connection_state: The current per-connection state
+ new_connection_state: A mutable copy of the per-connection
+ state, used to record updates to the state.
+ actual_lists: Sliding window API. A map of list key to list results in the
+ Sliding Sync response.
+ actual_room_ids: The actual room IDs in the the Sliding Sync response.
+ actual_room_response_map: A map of room ID to room results in the the
+ Sliding Sync response.
+ account_data_request: The account_data extension from the request
+ to_token: The point in the stream to sync up to.
+ from_token: The point in the stream to sync from.
+ """
+ # Skip if the extension is not enabled
+ if not receipts_request.enabled:
+ return None
+
+ relevant_room_ids = self.find_relevant_room_ids_for_extension(
+ requested_lists=receipts_request.lists,
+ requested_room_ids=receipts_request.rooms,
+ actual_lists=actual_lists,
+ actual_room_ids=actual_room_ids,
+ )
+
+ room_id_to_receipt_map: Dict[str, JsonMapping] = {}
+ if len(relevant_room_ids) > 0:
+ # We need to handle the different cases depending on if we have sent
+ # down receipts previously or not, so we split the relevant rooms
+ # up into different collections based on status.
+ live_rooms = set()
+ previously_rooms: Dict[str, MultiWriterStreamToken] = {}
+ initial_rooms = set()
+
+ for room_id in relevant_room_ids:
+ if not from_token:
+ initial_rooms.add(room_id)
+ continue
+
+ # If we're sending down the room from scratch again for some reason, we
+ # should always resend the receipts as well (regardless of if
+ # we've sent them down before). This is to mimic the behaviour
+ # of what happens on initial sync, where you get a chunk of
+ # timeline with all of the corresponding receipts for the events in the timeline.
+ room_result = actual_room_response_map.get(room_id)
+ if room_result is not None and room_result.initial:
+ initial_rooms.add(room_id)
+ continue
+
+ room_status = previous_connection_state.receipts.have_sent_room(room_id)
+ if room_status.status == HaveSentRoomFlag.LIVE:
+ live_rooms.add(room_id)
+ elif room_status.status == HaveSentRoomFlag.PREVIOUSLY:
+ assert room_status.last_token is not None
+ previously_rooms[room_id] = room_status.last_token
+ elif room_status.status == HaveSentRoomFlag.NEVER:
+ initial_rooms.add(room_id)
+ else:
+ assert_never(room_status.status)
+
+ # The set of receipts that we fetched. Private receipts need to be
+ # filtered out before returning.
+ fetched_receipts = []
+
+ # For live rooms we just fetch all receipts in those rooms since the
+ # `since` token.
+ if live_rooms:
+ assert from_token is not None
+ receipts = await self.store.get_linearized_receipts_for_rooms(
+ room_ids=live_rooms,
+ from_key=from_token.stream_token.receipt_key,
+ to_key=to_token.receipt_key,
+ )
+ fetched_receipts.extend(receipts)
+
+ # For rooms we've previously sent down, but aren't up to date, we
+ # need to use the from token from the room status.
+ if previously_rooms:
+ for room_id, receipt_token in previously_rooms.items():
+ # TODO: Limit the number of receipts we're about to send down
+ # for the room, if its too many we should TODO
+ previously_receipts = (
+ await self.store.get_linearized_receipts_for_room(
+ room_id=room_id,
+ from_key=receipt_token,
+ to_key=to_token.receipt_key,
+ )
+ )
+ fetched_receipts.extend(previously_receipts)
+
+ # For rooms we haven't previously sent down, we could send all receipts
+ # from that room but we only want to include receipts for events
+ # in the timeline to avoid bloating and blowing up the sync response
+ # as the number of users in the room increases. (this behavior is part of the spec)
+ initial_rooms_and_event_ids = [
+ (room_id, event.event_id)
+ for room_id in initial_rooms
+ if room_id in actual_room_response_map
+ for event in actual_room_response_map[room_id].timeline_events
+ ]
+ if initial_rooms_and_event_ids:
+ initial_receipts = await self.store.get_linearized_receipts_for_events(
+ room_and_event_ids=initial_rooms_and_event_ids,
+ )
+ fetched_receipts.extend(initial_receipts)
+
+ fetched_receipts = ReceiptEventSource.filter_out_private_receipts(
+ fetched_receipts, sync_config.user.to_string()
+ )
+
+ for receipt in fetched_receipts:
+ # These fields should exist for every receipt
+ room_id = receipt["room_id"]
+ type = receipt["type"]
+ content = receipt["content"]
+
+ room_id_to_receipt_map[room_id] = {"type": type, "content": content}
+
+ # Now we update the per-connection state to track which receipts we have
+ # and haven't sent down.
+ new_connection_state.receipts.record_sent_rooms(relevant_room_ids)
+
+ if from_token:
+ # Now find the set of rooms that may have receipts that we're not sending
+ # down. We only need to check rooms that we have previously returned
+ # receipts for (in `previous_connection_state`) because we only care about
+ # updating `LIVE` rooms to `PREVIOUSLY`. The `PREVIOUSLY` rooms will just
+ # stay pointing at their previous position so we don't need to waste time
+ # checking those and since we default to `NEVER`, rooms that were `NEVER`
+ # sent before don't need to be recorded as we'll handle them correctly when
+ # they come into range for the first time.
+ rooms_no_receipts = [
+ room_id
+ for room_id, room_status in previous_connection_state.receipts._statuses.items()
+ if room_status.status == HaveSentRoomFlag.LIVE
+ and room_id not in relevant_room_ids
+ ]
+ changed_rooms = await self.store.get_rooms_with_receipts_between(
+ rooms_no_receipts,
+ from_key=from_token.stream_token.receipt_key,
+ to_key=to_token.receipt_key,
+ )
+ new_connection_state.receipts.record_unsent_rooms(
+ changed_rooms, from_token.stream_token.receipt_key
+ )
+
+ return SlidingSyncResult.Extensions.ReceiptsExtension(
+ room_id_to_receipt_map=room_id_to_receipt_map,
+ )
+
+ async def get_typing_extension_response(
+ self,
+ sync_config: SlidingSyncConfig,
+ actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList],
+ actual_room_ids: Set[str],
+ actual_room_response_map: Dict[str, SlidingSyncResult.RoomResult],
+ typing_request: SlidingSyncConfig.Extensions.TypingExtension,
+ to_token: StreamToken,
+ from_token: Optional[SlidingSyncStreamToken],
+ ) -> Optional[SlidingSyncResult.Extensions.TypingExtension]:
+ """Handle Typing Notification extension (MSC3961)
+
+ Args:
+ sync_config: Sync configuration
+ actual_lists: Sliding window API. A map of list key to list results in the
+ Sliding Sync response.
+ actual_room_ids: The actual room IDs in the the Sliding Sync response.
+ actual_room_response_map: A map of room ID to room results in the the
+ Sliding Sync response.
+ account_data_request: The account_data extension from the request
+ to_token: The point in the stream to sync up to.
+ from_token: The point in the stream to sync from.
+ """
+ # Skip if the extension is not enabled
+ if not typing_request.enabled:
+ return None
+
+ relevant_room_ids = self.find_relevant_room_ids_for_extension(
+ requested_lists=typing_request.lists,
+ requested_room_ids=typing_request.rooms,
+ actual_lists=actual_lists,
+ actual_room_ids=actual_room_ids,
+ )
+
+ room_id_to_typing_map: Dict[str, JsonMapping] = {}
+ if len(relevant_room_ids) > 0:
+ # Note: We don't need to take connection tracking into account for typing
+ # notifications because they'll get anything still relevant and hasn't timed
+ # out when the room comes into range. We consider the gap where the room
+ # fell out of range, as long enough for any typing notifications to have
+ # timed out (it's not worth the 30 seconds of data we may have missed).
+ typing_source = self.event_sources.sources.typing
+ typing_notifications, _ = await typing_source.get_new_events(
+ user=sync_config.user,
+ from_key=(from_token.stream_token.typing_key if from_token else 0),
+ to_key=to_token.typing_key,
+ # This is a dummy value and isn't used in the function
+ limit=0,
+ room_ids=relevant_room_ids,
+ is_guest=False,
+ )
+
+ for typing_notification in typing_notifications:
+ # These fields should exist for every typing notification
+ room_id = typing_notification["room_id"]
+ type = typing_notification["type"]
+ content = typing_notification["content"]
+
+ room_id_to_typing_map[room_id] = {"type": type, "content": content}
+
+ return SlidingSyncResult.Extensions.TypingExtension(
+ room_id_to_typing_map=room_id_to_typing_map,
+ )
|