summary refs log tree commit diff
path: root/synapse/handlers/sliding_sync.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/sliding_sync.py')
-rw-r--r--synapse/handlers/sliding_sync.py1026
1 files changed, 783 insertions, 243 deletions
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 8e2f751c02..554ab59bf3 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -18,23 +18,33 @@
 #
 #
 import logging
-from typing import TYPE_CHECKING, Any, Dict, Final, List, Optional, Set, Tuple
+from itertools import chain
+from typing import (
+    TYPE_CHECKING,
+    Any,
+    Dict,
+    Final,
+    List,
+    Mapping,
+    Optional,
+    Sequence,
+    Set,
+    Tuple,
+)
 
 import attr
 from immutabledict import immutabledict
 
-from synapse.api.constants import (
-    AccountDataTypes,
-    Direction,
-    EventContentFields,
-    EventTypes,
-    Membership,
-)
+from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membership
 from synapse.events import EventBase
 from synapse.events.utils import strip_event
 from synapse.handlers.relations import BundledAggregations
+from synapse.logging.opentracing import start_active_span, tag_args, trace
+from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
 from synapse.storage.databases.main.stream import CurrentStateDeltaMembership
+from synapse.storage.roommember import MemberSummary
 from synapse.types import (
+    DeviceListUpdates,
     JsonDict,
     PersistedEventPosition,
     Requester,
@@ -46,6 +56,7 @@ from synapse.types import (
 )
 from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult
 from synapse.types.state import StateFilter
+from synapse.util.async_helpers import concurrently_execute
 from synapse.visibility import filter_events_for_client
 
 if TYPE_CHECKING:
@@ -56,6 +67,7 @@ logger = logging.getLogger(__name__)
 
 # The event types that clients should consider as new activity.
 DEFAULT_BUMP_EVENT_TYPES = {
+    EventTypes.Create,
     EventTypes.Message,
     EventTypes.Encrypted,
     EventTypes.Sticker,
@@ -65,32 +77,79 @@ DEFAULT_BUMP_EVENT_TYPES = {
 }
 
 
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class _RoomMembershipForUser:
+    """
+    Attributes:
+        room_id: The room ID of the membership event
+        event_id: The event ID of the membership event
+        event_pos: The stream position of the membership event
+        membership: The membership state of the user in the room
+        sender: The person who sent the membership event
+        newly_joined: Whether the user newly joined the room during the given token
+            range and is still joined to the room at the end of this range.
+        newly_left: Whether the user newly left (or kicked) the room during the given
+            token range and is still "leave" at the end of this range.
+        is_dm: Whether this user considers this room as a direct-message (DM) room
+    """
+
+    room_id: str
+    # Optional because state resets can affect room membership without a corresponding event.
+    event_id: Optional[str]
+    # Even during a state reset which removes the user from the room, we expect this to
+    # be set because `current_state_delta_stream` will note the position that the reset
+    # happened.
+    event_pos: PersistedEventPosition
+    # Even during a state reset which removes the user from the room, we expect this to
+    # be set to `LEAVE` because we can make that assumption based on the situaton (see
+    # `get_current_state_delta_membership_changes_for_user(...)`)
+    membership: str
+    # Optional because state resets can affect room membership without a corresponding event.
+    sender: Optional[str]
+    newly_joined: bool
+    newly_left: bool
+    is_dm: bool
+
+    def copy_and_replace(self, **kwds: Any) -> "_RoomMembershipForUser":
+        return attr.evolve(self, **kwds)
+
+
 def filter_membership_for_sync(
-    *, membership: str, user_id: str, sender: Optional[str]
+    *, user_id: str, room_membership_for_user: _RoomMembershipForUser
 ) -> bool:
     """
     Returns True if the membership event should be included in the sync response,
     otherwise False.
 
     Attributes:
-        membership: The membership state of the user in the room.
         user_id: The user ID that the membership applies to
-        sender: The person who sent the membership event
+        room_membership_for_user: Membership information for the user in the room
     """
 
-    # Everything except `Membership.LEAVE` because we want everything that's *still*
-    # relevant to the user. There are few more things to include in the sync response
-    # (newly_left) but those are handled separately.
+    membership = room_membership_for_user.membership
+    sender = room_membership_for_user.sender
+    newly_left = room_membership_for_user.newly_left
+
+    # We want to allow everything except rooms the user has left unless `newly_left`
+    # because we want everything that's *still* relevant to the user. We include
+    # `newly_left` rooms because the last event that the user should see is their own
+    # leave event.
     #
-    # This logic includes kicks (leave events where the sender is not the same user) and
-    # can be read as "anything that isn't a leave or a leave with a different sender".
+    # A leave != kick. This logic includes kicks (leave events where the sender is not
+    # the same user).
     #
-    # When `sender=None` and `membership=Membership.LEAVE`, it means that a state reset
-    # happened that removed the user from the room, or the user was the last person
-    # locally to leave the room which caused the server to leave the room. In both
-    # cases, we can just remove the rooms since they are no longer relevant to the user.
-    # They could still be added back later if they are `newly_left`.
-    return membership != Membership.LEAVE or sender not in (user_id, None)
+    # When `sender=None`, it means that a state reset happened that removed the user
+    # from the room without a corresponding leave event. We can just remove the rooms
+    # since they are no longer relevant to the user but will still appear if they are
+    # `newly_left`.
+    return (
+        # Anything except leave events
+        membership != Membership.LEAVE
+        # Unless...
+        or newly_left
+        # Allow kicks
+        or (membership == Membership.LEAVE and sender not in (user_id, None))
+    )
 
 
 # We can't freeze this class because we want to update it in place with the
@@ -282,29 +341,9 @@ class StateValues:
     # `sender` in the timeline). We only give special meaning to this value when it's a
     # `state_key`.
     LAZY: Final = "$LAZY"
-
-
-@attr.s(slots=True, frozen=True, auto_attribs=True)
-class _RoomMembershipForUser:
-    """
-    Attributes:
-        event_id: The event ID of the membership event
-        event_pos: The stream position of the membership event
-        membership: The membership state of the user in the room
-        sender: The person who sent the membership event
-        newly_joined: Whether the user newly joined the room during the given token
-            range
-    """
-
-    room_id: str
-    event_id: Optional[str]
-    event_pos: PersistedEventPosition
-    membership: str
-    sender: Optional[str]
-    newly_joined: bool
-
-    def copy_and_replace(self, **kwds: Any) -> "_RoomMembershipForUser":
-        return attr.evolve(self, **kwds)
+    # Subsitute with the requester's user ID. Typically used by clients to get
+    # the user's membership.
+    ME: Final = "$ME"
 
 
 class SlidingSyncHandler:
@@ -316,6 +355,7 @@ class SlidingSyncHandler:
         self.notifier = hs.get_notifier()
         self.event_sources = hs.get_event_sources()
         self.relations_handler = hs.get_relations_handler()
+        self.device_handler = hs.get_device_handler()
         self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
 
     async def wait_for_sync_for_user(
@@ -344,10 +384,6 @@ class SlidingSyncHandler:
         # auth_blocking will occur)
         await self.auth_blocking.check_auth_blocking(requester=requester)
 
-        # TODO: If the To-Device extension is enabled and we have a `from_token`, delete
-        # any to-device messages before that token (since we now know that the device
-        # has received them). (see sync v2 for how to do this)
-
         # If we're working with a user-provided token, we need to make sure to wait for
         # this worker to catch up with the token so we don't skip past any incoming
         # events or future events if the user is nefariously, manually modifying the
@@ -425,18 +461,31 @@ class SlidingSyncHandler:
             # See https://github.com/matrix-org/matrix-doc/issues/1144
             raise NotImplementedError()
 
+        # Get all of the room IDs that the user should be able to see in the sync
+        # response
+        has_lists = sync_config.lists is not None and len(sync_config.lists) > 0
+        has_room_subscriptions = (
+            sync_config.room_subscriptions is not None
+            and len(sync_config.room_subscriptions) > 0
+        )
+        if has_lists or has_room_subscriptions:
+            room_membership_for_user_map = (
+                await self.get_room_membership_for_user_at_to_token(
+                    user=sync_config.user,
+                    to_token=to_token,
+                    from_token=from_token,
+                )
+            )
+
         # Assemble sliding window lists
         lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
         # Keep track of the rooms that we're going to display and need to fetch more
         # info about
         relevant_room_map: Dict[str, RoomSyncConfig] = {}
-        if sync_config.lists:
-            # Get all of the room IDs that the user should be able to see in the sync
-            # response
-            sync_room_map = await self.get_sync_room_ids_for_user(
-                sync_config.user,
-                from_token=from_token,
-                to_token=to_token,
+        if has_lists and sync_config.lists is not None:
+            sync_room_map = await self.filter_rooms_relevant_for_sync(
+                user=sync_config.user,
+                room_membership_for_user_map=room_membership_for_user_map,
             )
 
             for list_key, list_config in sync_config.lists.items():
@@ -464,9 +513,9 @@ class SlidingSyncHandler:
                 membership_state_keys = room_sync_config.required_state_map.get(
                     EventTypes.Member
                 )
+                # Also see `StateFilter.must_await_full_state(...)` for comparison
                 lazy_loading = (
                     membership_state_keys is not None
-                    and len(membership_state_keys) == 1
                     and StateValues.LAZY in membership_state_keys
                 )
 
@@ -524,51 +573,88 @@ class SlidingSyncHandler:
                     ops=ops,
                 )
 
-        # TODO: if (sync_config.room_subscriptions):
+        # Handle room subscriptions
+        if has_room_subscriptions and sync_config.room_subscriptions is not None:
+            for room_id, room_subscription in sync_config.room_subscriptions.items():
+                room_membership_for_user_at_to_token = (
+                    await self.check_room_subscription_allowed_for_user(
+                        room_id=room_id,
+                        room_membership_for_user_map=room_membership_for_user_map,
+                        to_token=to_token,
+                    )
+                )
+
+                # Skip this room if the user isn't allowed to see it
+                if not room_membership_for_user_at_to_token:
+                    continue
+
+                room_membership_for_user_map[room_id] = (
+                    room_membership_for_user_at_to_token
+                )
+
+                # Take the superset of the `RoomSyncConfig` for each room.
+                #
+                # Update our `relevant_room_map` with the room we're going to display
+                # and need to fetch more info about.
+                room_sync_config = RoomSyncConfig.from_room_config(room_subscription)
+                existing_room_sync_config = relevant_room_map.get(room_id)
+                if existing_room_sync_config is not None:
+                    existing_room_sync_config.combine_room_sync_config(room_sync_config)
+                else:
+                    relevant_room_map[room_id] = room_sync_config
 
         # Fetch room data
         rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
-        for room_id, room_sync_config in relevant_room_map.items():
+
+        @trace
+        @tag_args
+        async def handle_room(room_id: str) -> None:
             room_sync_result = await self.get_room_sync_data(
                 user=sync_config.user,
                 room_id=room_id,
-                room_sync_config=room_sync_config,
-                room_membership_for_user_at_to_token=sync_room_map[room_id],
+                room_sync_config=relevant_room_map[room_id],
+                room_membership_for_user_at_to_token=room_membership_for_user_map[
+                    room_id
+                ],
                 from_token=from_token,
                 to_token=to_token,
             )
 
             rooms[room_id] = room_sync_result
 
+        with start_active_span("sliding_sync.generate_room_entries"):
+            await concurrently_execute(handle_room, relevant_room_map, 10)
+
+        extensions = await self.get_extensions_response(
+            sync_config=sync_config,
+            from_token=from_token,
+            to_token=to_token,
+        )
+
         return SlidingSyncResult(
             next_pos=to_token,
             lists=lists,
             rooms=rooms,
-            extensions={},
+            extensions=extensions,
         )
 
-    async def get_sync_room_ids_for_user(
+    async def get_room_membership_for_user_at_to_token(
         self,
         user: UserID,
         to_token: StreamToken,
-        from_token: Optional[StreamToken] = None,
+        from_token: Optional[StreamToken],
     ) -> Dict[str, _RoomMembershipForUser]:
         """
-        Fetch room IDs that should be listed for this user in the sync response (the
-        full room list that will be filtered, sorted, and sliced).
+        Fetch room IDs that the user has had membership in (the full room list including
+        long-lost left rooms that will be filtered, sorted, and sliced).
 
-        We're looking for rooms where the user has the following state in the token
-        range (> `from_token` and <= `to_token`):
+        We're looking for rooms where the user has had any sort of membership in the
+        token range (> `from_token` and <= `to_token`)
 
-        - `invite`, `join`, `knock`, `ban` membership events
-        - Kicks (`leave` membership events where `sender` is different from the
-          `user_id`/`state_key`)
-        - `newly_left` (rooms that were left during the given token range)
-        - In order for bans/kicks to not show up in sync, you need to `/forget` those
-          rooms. This doesn't modify the event itself though and only adds the
-          `forgotten` flag to the `room_memberships` table in Synapse. There isn't a way
-          to tell when a room was forgotten at the moment so we can't factor it into the
-          from/to range.
+        In order for bans/kicks to not show up, you need to `/forget` those rooms. This
+        doesn't modify the event itself though and only adds the `forgotten` flag to the
+        `room_memberships` table in Synapse. There isn't a way to tell when a room was
+        forgotten at the moment so we can't factor it into the token range.
 
         Args:
             user: User to fetch rooms for
@@ -576,8 +662,8 @@ class SlidingSyncHandler:
             from_token: The point in the stream to sync from.
 
         Returns:
-            A dictionary of room IDs that should be listed in the sync response along
-            with membership information in that room at the time of `to_token`.
+            A dictionary of room IDs that the user has had membership in along with
+            membership information in that room at the time of `to_token`.
         """
         user_id = user.to_string()
 
@@ -588,9 +674,6 @@ class SlidingSyncHandler:
             # We want to fetch any kind of membership (joined and left rooms) in order
             # to get the `event_pos` of the latest room membership event for the
             # user.
-            #
-            # We will filter out the rooms that don't belong below (see
-            # `filter_membership_for_sync`)
             membership_list=Membership.LIST,
             excluded_rooms=self.rooms_to_exclude_globally,
         )
@@ -610,7 +693,10 @@ class SlidingSyncHandler:
                 event_pos=room_for_user.event_pos,
                 membership=room_for_user.membership,
                 sender=room_for_user.sender,
+                # We will update these fields below to be accurate
                 newly_joined=False,
+                newly_left=False,
+                is_dm=False,
             )
             for room_for_user in room_for_user_list
         }
@@ -635,10 +721,17 @@ class SlidingSyncHandler:
                 instance_to_max_stream_ordering_map[instance_name] = stream_ordering
 
         # Then assemble the `RoomStreamToken`
+        min_stream_pos = min(instance_to_max_stream_ordering_map.values())
         membership_snapshot_token = RoomStreamToken(
             # Minimum position in the `instance_map`
-            stream=min(instance_to_max_stream_ordering_map.values()),
-            instance_map=immutabledict(instance_to_max_stream_ordering_map),
+            stream=min_stream_pos,
+            instance_map=immutabledict(
+                {
+                    instance_name: stream_pos
+                    for instance_name, stream_pos in instance_to_max_stream_ordering_map.items()
+                    if stream_pos > min_stream_pos
+                }
+            ),
         )
 
         # Since we fetched the users room list at some point in time after the from/to
@@ -648,10 +741,9 @@ class SlidingSyncHandler:
         # - 1a) Remove rooms that the user joined after the `to_token`
         # - 1b) Add back rooms that the user left after the `to_token`
         # - 1c) Update room membership events to the point in time of the `to_token`
-        # - 2) Add back newly_left rooms (> `from_token` and <= `to_token`)
-        # - 3) Figure out which rooms are `newly_joined`
-
-        # 1) -----------------------------------------------------
+        # - 2) Figure out which rooms are `newly_left` rooms (> `from_token` and <= `to_token`)
+        # - 3) Figure out which rooms are `newly_joined` (> `from_token` and <= `to_token`)
+        # - 4) Figure out which rooms are DM's
 
         # 1) Fetch membership changes that fall in the range from `to_token` up to
         # `membership_snapshot_token`
@@ -711,7 +803,10 @@ class SlidingSyncHandler:
                         event_pos=first_membership_change_after_to_token.prev_event_pos,
                         membership=first_membership_change_after_to_token.prev_membership,
                         sender=first_membership_change_after_to_token.prev_sender,
+                        # We will update these fields below to be accurate
                         newly_joined=False,
+                        newly_left=False,
+                        is_dm=False,
                     )
                 else:
                     # If we can't find the previous membership event, we shouldn't
@@ -719,22 +814,6 @@ class SlidingSyncHandler:
                     # exact membership state and shouldn't rely on the current snapshot.
                     sync_room_id_set.pop(room_id, None)
 
-        # Filter the rooms that that we have updated room membership events to the point
-        # in time of the `to_token` (from the "1)" fixups)
-        filtered_sync_room_id_set = {
-            room_id: room_membership_for_user
-            for room_id, room_membership_for_user in sync_room_id_set.items()
-            if filter_membership_for_sync(
-                membership=room_membership_for_user.membership,
-                user_id=user_id,
-                sender=room_membership_for_user.sender,
-            )
-        }
-
-        # 2) -----------------------------------------------------
-        # We fix-up newly_left rooms after the first fixup because it may have removed
-        # some left rooms that we can figure out are newly_left in the following code
-
         # 2) Fetch membership changes that fall in the range from `from_token` up to `to_token`
         current_state_delta_membership_changes_in_from_to_range = []
         if from_token:
@@ -796,18 +875,40 @@ class SlidingSyncHandler:
             if last_membership_change_in_from_to_range.membership == Membership.JOIN:
                 possibly_newly_joined_room_ids.add(room_id)
 
-            # 2) Add back newly_left rooms (> `from_token` and <= `to_token`). We
-            # include newly_left rooms because the last event that the user should see
-            # is their own leave event
+            # 2) Figure out newly_left rooms (> `from_token` and <= `to_token`).
             if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
-                filtered_sync_room_id_set[room_id] = _RoomMembershipForUser(
-                    room_id=room_id,
-                    event_id=last_membership_change_in_from_to_range.event_id,
-                    event_pos=last_membership_change_in_from_to_range.event_pos,
-                    membership=last_membership_change_in_from_to_range.membership,
-                    sender=last_membership_change_in_from_to_range.sender,
-                    newly_joined=False,
-                )
+                # 2) Mark this room as `newly_left`
+
+                # If we're seeing a membership change here, we should expect to already
+                # have it in our snapshot but if a state reset happens, it wouldn't have
+                # shown up in our snapshot but appear as a change here.
+                existing_sync_entry = sync_room_id_set.get(room_id)
+                if existing_sync_entry is not None:
+                    # Normal expected case
+                    sync_room_id_set[room_id] = existing_sync_entry.copy_and_replace(
+                        newly_left=True
+                    )
+                else:
+                    # State reset!
+                    logger.warn(
+                        "State reset detected for room_id %s with %s who is no longer in the room",
+                        room_id,
+                        user_id,
+                    )
+                    # Even though a state reset happened which removed the person from
+                    # the room, we still add it the list so the user knows they left the
+                    # room. Downstream code can check for a state reset by looking for
+                    # `event_id=None and membership is not None`.
+                    sync_room_id_set[room_id] = _RoomMembershipForUser(
+                        room_id=room_id,
+                        event_id=last_membership_change_in_from_to_range.event_id,
+                        event_pos=last_membership_change_in_from_to_range.event_pos,
+                        membership=last_membership_change_in_from_to_range.membership,
+                        sender=last_membership_change_in_from_to_range.sender,
+                        newly_joined=False,
+                        newly_left=True,
+                        is_dm=False,
+                    )
 
         # 3) Figure out `newly_joined`
         for room_id in possibly_newly_joined_room_ids:
@@ -818,9 +919,9 @@ class SlidingSyncHandler:
             # also some non-join in the range, we know they `newly_joined`.
             if has_non_join_in_from_to_range:
                 # We found a `newly_joined` room (we left and joined within the token range)
-                filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
-                    room_id
-                ].copy_and_replace(newly_joined=True)
+                sync_room_id_set[room_id] = sync_room_id_set[room_id].copy_and_replace(
+                    newly_joined=True
+                )
             else:
                 prev_event_id = first_membership_change_by_room_id_in_from_to_range[
                     room_id
@@ -832,7 +933,7 @@ class SlidingSyncHandler:
                 if prev_event_id is None:
                     # We found a `newly_joined` room (we are joining the room for the
                     # first time within the token range)
-                    filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
+                    sync_room_id_set[room_id] = sync_room_id_set[
                         room_id
                     ].copy_and_replace(newly_joined=True)
                 # Last resort, we need to step back to the previous membership event
@@ -840,11 +941,150 @@ class SlidingSyncHandler:
                 elif prev_membership != Membership.JOIN:
                     # We found a `newly_joined` room (we left before the token range
                     # and joined within the token range)
-                    filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
+                    sync_room_id_set[room_id] = sync_room_id_set[
                         room_id
                     ].copy_and_replace(newly_joined=True)
 
-        return filtered_sync_room_id_set
+        # 4) Figure out which rooms the user considers to be direct-message (DM) rooms
+        #
+        # We're using global account data (`m.direct`) instead of checking for
+        # `is_direct` on membership events because that property only appears for
+        # the invitee membership event (doesn't show up for the inviter).
+        #
+        # We're unable to take `to_token` into account for global account data since
+        # we only keep track of the latest account data for the user.
+        dm_map = await self.store.get_global_account_data_by_type_for_user(
+            user_id, AccountDataTypes.DIRECT
+        )
+
+        # Flatten out the map. Account data is set by the client so it needs to be
+        # scrutinized.
+        dm_room_id_set = set()
+        if isinstance(dm_map, dict):
+            for room_ids in dm_map.values():
+                # Account data should be a list of room IDs. Ignore anything else
+                if isinstance(room_ids, list):
+                    for room_id in room_ids:
+                        if isinstance(room_id, str):
+                            dm_room_id_set.add(room_id)
+
+        # 4) Fixup
+        for room_id in sync_room_id_set:
+            sync_room_id_set[room_id] = sync_room_id_set[room_id].copy_and_replace(
+                is_dm=room_id in dm_room_id_set
+            )
+
+        return sync_room_id_set
+
+    async def filter_rooms_relevant_for_sync(
+        self,
+        user: UserID,
+        room_membership_for_user_map: Dict[str, _RoomMembershipForUser],
+    ) -> Dict[str, _RoomMembershipForUser]:
+        """
+        Filter room IDs that should/can be listed for this user in the sync response (the
+        full room list that will be further filtered, sorted, and sliced).
+
+        We're looking for rooms where the user has the following state in the token
+        range (> `from_token` and <= `to_token`):
+
+        - `invite`, `join`, `knock`, `ban` membership events
+        - Kicks (`leave` membership events where `sender` is different from the
+          `user_id`/`state_key`)
+        - `newly_left` (rooms that were left during the given token range)
+        - In order for bans/kicks to not show up in sync, you need to `/forget` those
+          rooms. This doesn't modify the event itself though and only adds the
+          `forgotten` flag to the `room_memberships` table in Synapse. There isn't a way
+          to tell when a room was forgotten at the moment so we can't factor it into the
+          from/to range.
+
+        Args:
+            user: User that is syncing
+            room_membership_for_user_map: Room membership for the user
+
+        Returns:
+            A dictionary of room IDs that should be listed in the sync response along
+            with membership information in that room at the time of `to_token`.
+        """
+        user_id = user.to_string()
+
+        # Filter rooms to only what we're interested to sync with
+        filtered_sync_room_map = {
+            room_id: room_membership_for_user
+            for room_id, room_membership_for_user in room_membership_for_user_map.items()
+            if filter_membership_for_sync(
+                user_id=user_id,
+                room_membership_for_user=room_membership_for_user,
+            )
+        }
+
+        return filtered_sync_room_map
+
+    async def check_room_subscription_allowed_for_user(
+        self,
+        room_id: str,
+        room_membership_for_user_map: Dict[str, _RoomMembershipForUser],
+        to_token: StreamToken,
+    ) -> Optional[_RoomMembershipForUser]:
+        """
+        Check whether the user is allowed to see the room based on whether they have
+        ever had membership in the room or if the room is `world_readable`.
+
+        Similar to `check_user_in_room_or_world_readable(...)`
+
+        Args:
+            room_id: Room to check
+            room_membership_for_user_map: Room membership for the user at the time of
+                the `to_token` (<= `to_token`).
+            to_token: The token to fetch rooms up to.
+
+        Returns:
+            The room membership for the user if they are allowed to subscribe to the
+            room else `None`.
+        """
+
+        # We can first check if they are already allowed to see the room based
+        # on our previous work to assemble the `room_membership_for_user_map`.
+        #
+        # If they have had any membership in the room over time (up to the `to_token`),
+        # let them subscribe and see what they can.
+        existing_membership_for_user = room_membership_for_user_map.get(room_id)
+        if existing_membership_for_user is not None:
+            return existing_membership_for_user
+
+        # TODO: Handle `world_readable` rooms
+        return None
+
+        # If the room is `world_readable`, it doesn't matter whether they can join,
+        # everyone can see the room.
+        # not_in_room_membership_for_user = _RoomMembershipForUser(
+        #     room_id=room_id,
+        #     event_id=None,
+        #     event_pos=None,
+        #     membership=None,
+        #     sender=None,
+        #     newly_joined=False,
+        #     newly_left=False,
+        #     is_dm=False,
+        # )
+        # room_state = await self.get_current_state_at(
+        #     room_id=room_id,
+        #     room_membership_for_user_at_to_token=not_in_room_membership_for_user,
+        #     state_filter=StateFilter.from_types(
+        #         [(EventTypes.RoomHistoryVisibility, "")]
+        #     ),
+        #     to_token=to_token,
+        # )
+
+        # visibility_event = room_state.get((EventTypes.RoomHistoryVisibility, ""))
+        # if (
+        #     visibility_event is not None
+        #     and visibility_event.content.get("history_visibility")
+        #     == HistoryVisibility.WORLD_READABLE
+        # ):
+        #     return not_in_room_membership_for_user
+
+        # return None
 
     async def filter_rooms(
         self,
@@ -867,41 +1107,24 @@ class SlidingSyncHandler:
             A filtered dictionary of room IDs along with membership information in the
             room at the time of `to_token`.
         """
-        user_id = user.to_string()
-
-        # TODO: Apply filters
-
         filtered_room_id_set = set(sync_room_map.keys())
 
         # Filter for Direct-Message (DM) rooms
         if filters.is_dm is not None:
-            # We're using global account data (`m.direct`) instead of checking for
-            # `is_direct` on membership events because that property only appears for
-            # the invitee membership event (doesn't show up for the inviter). Account
-            # data is set by the client so it needs to be scrutinized.
-            #
-            # We're unable to take `to_token` into account for global account data since
-            # we only keep track of the latest account data for the user.
-            dm_map = await self.store.get_global_account_data_by_type_for_user(
-                user_id, AccountDataTypes.DIRECT
-            )
-
-            # Flatten out the map
-            dm_room_id_set = set()
-            if isinstance(dm_map, dict):
-                for room_ids in dm_map.values():
-                    # Account data should be a list of room IDs. Ignore anything else
-                    if isinstance(room_ids, list):
-                        for room_id in room_ids:
-                            if isinstance(room_id, str):
-                                dm_room_id_set.add(room_id)
-
             if filters.is_dm:
                 # Only DM rooms please
-                filtered_room_id_set = filtered_room_id_set.intersection(dm_room_id_set)
+                filtered_room_id_set = {
+                    room_id
+                    for room_id in filtered_room_id_set
+                    if sync_room_map[room_id].is_dm
+                }
             else:
                 # Only non-DM rooms please
-                filtered_room_id_set = filtered_room_id_set.difference(dm_room_id_set)
+                filtered_room_id_set = {
+                    room_id
+                    for room_id in filtered_room_id_set
+                    if not sync_room_map[room_id].is_dm
+                }
 
         if filters.spaces:
             raise NotImplementedError()
@@ -953,11 +1176,15 @@ class SlidingSyncHandler:
         # provided in the list. `None` is a valid type for rooms which do not have a
         # room type.
         if filters.room_types is not None or filters.not_room_types is not None:
-            # Make a copy so we don't run into an error: `Set changed size during
-            # iteration`, when we filter out and remove items
-            for room_id in filtered_room_id_set.copy():
-                create_event = await self.store.get_create_event_for_room(room_id)
-                room_type = create_event.content.get(EventContentFields.ROOM_TYPE)
+            room_to_type = await self.store.bulk_get_room_type(
+                {
+                    room_id
+                    for room_id in filtered_room_id_set
+                    # We only know the room types for joined rooms
+                    if sync_room_map[room_id].membership == Membership.JOIN
+                }
+            )
+            for room_id, room_type in room_to_type.items():
                 if (
                     filters.room_types is not None
                     and room_type not in filters.room_types
@@ -1003,34 +1230,33 @@ class SlidingSyncHandler:
         # Assemble a map of room ID to the `stream_ordering` of the last activity that the
         # user should see in the room (<= `to_token`)
         last_activity_in_room_map: Dict[str, int] = {}
-        for room_id, room_for_user in sync_room_map.items():
-            # If they are fully-joined to the room, let's find the latest activity
-            # at/before the `to_token`.
-            if room_for_user.membership == Membership.JOIN:
-                last_event_result = (
-                    await self.store.get_last_event_pos_in_room_before_stream_ordering(
-                        room_id, to_token.room_key
-                    )
-                )
-
-                # If the room has no events at/before the `to_token`, this is probably a
-                # mistake in the code that generates the `sync_room_map` since that should
-                # only give us rooms that the user had membership in during the token range.
-                assert last_event_result is not None
 
-                _, event_pos = last_event_result
-
-                last_activity_in_room_map[room_id] = event_pos.stream
-            else:
-                # Otherwise, if the user has left/been invited/knocked/been banned from
-                # a room, they shouldn't see anything past that point.
+        for room_id, room_for_user in sync_room_map.items():
+            if room_for_user.membership != Membership.JOIN:
+                # If the user has left/been invited/knocked/been banned from a
+                # room, they shouldn't see anything past that point.
                 #
-                # FIXME: It's possible that people should see beyond this point in
-                # invited/knocked cases if for example the room has
+                # FIXME: It's possible that people should see beyond this point
+                # in invited/knocked cases if for example the room has
                 # `invite`/`world_readable` history visibility, see
                 # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
                 last_activity_in_room_map[room_id] = room_for_user.event_pos.stream
 
+        # For fully-joined rooms, we find the latest activity at/before the
+        # `to_token`.
+        joined_room_positions = (
+            await self.store.bulk_get_last_event_pos_in_room_before_stream_ordering(
+                [
+                    room_id
+                    for room_id, room_for_user in sync_room_map.items()
+                    if room_for_user.membership == Membership.JOIN
+                ],
+                to_token.room_key,
+            )
+        )
+
+        last_activity_in_room_map.update(joined_room_positions)
+
         return sorted(
             sync_room_map.values(),
             # Sort by the last activity (stream_ordering) in the room
@@ -1039,6 +1265,102 @@ class SlidingSyncHandler:
             reverse=True,
         )
 
+    async def get_current_state_ids_at(
+        self,
+        room_id: str,
+        room_membership_for_user_at_to_token: _RoomMembershipForUser,
+        state_filter: StateFilter,
+        to_token: StreamToken,
+    ) -> StateMap[str]:
+        """
+        Get current state IDs for the user in the room according to their membership. This
+        will be the current state at the time of their LEAVE/BAN, otherwise will be the
+        current state <= to_token.
+
+        Args:
+            room_id: The room ID to fetch data for
+            room_membership_for_user_at_token: Membership information for the user
+                in the room at the time of `to_token`.
+            to_token: The point in the stream to sync up to.
+        """
+        room_state_ids: StateMap[str]
+        # People shouldn't see past their leave/ban event
+        if room_membership_for_user_at_to_token.membership in (
+            Membership.LEAVE,
+            Membership.BAN,
+        ):
+            # TODO: `get_state_ids_at(...)` doesn't take into account the "current state"
+            room_state_ids = await self.storage_controllers.state.get_state_ids_at(
+                room_id,
+                stream_position=to_token.copy_and_replace(
+                    StreamKeyType.ROOM,
+                    room_membership_for_user_at_to_token.event_pos.to_room_stream_token(),
+                ),
+                state_filter=state_filter,
+                # Partially-stated rooms should have all state events except for
+                # remote membership events. Since we've already excluded
+                # partially-stated rooms unless `required_state` only has
+                # `["m.room.member", "$LAZY"]` for membership, we should be able to
+                # retrieve everything requested. When we're lazy-loading, if there
+                # are some remote senders in the timeline, we should also have their
+                # membership event because we had to auth that timeline event. Plus
+                # we don't want to block the whole sync waiting for this one room.
+                await_full_state=False,
+            )
+        # Otherwise, we can get the latest current state in the room
+        else:
+            room_state_ids = await self.storage_controllers.state.get_current_state_ids(
+                room_id,
+                state_filter,
+                # Partially-stated rooms should have all state events except for
+                # remote membership events. Since we've already excluded
+                # partially-stated rooms unless `required_state` only has
+                # `["m.room.member", "$LAZY"]` for membership, we should be able to
+                # retrieve everything requested. When we're lazy-loading, if there
+                # are some remote senders in the timeline, we should also have their
+                # membership event because we had to auth that timeline event. Plus
+                # we don't want to block the whole sync waiting for this one room.
+                await_full_state=False,
+            )
+            # TODO: Query `current_state_delta_stream` and reverse/rewind back to the `to_token`
+
+        return room_state_ids
+
+    async def get_current_state_at(
+        self,
+        room_id: str,
+        room_membership_for_user_at_to_token: _RoomMembershipForUser,
+        state_filter: StateFilter,
+        to_token: StreamToken,
+    ) -> StateMap[EventBase]:
+        """
+        Get current state for the user in the room according to their membership. This
+        will be the current state at the time of their LEAVE/BAN, otherwise will be the
+        current state <= to_token.
+
+        Args:
+            room_id: The room ID to fetch data for
+            room_membership_for_user_at_token: Membership information for the user
+                in the room at the time of `to_token`.
+            to_token: The point in the stream to sync up to.
+        """
+        room_state_ids = await self.get_current_state_ids_at(
+            room_id=room_id,
+            room_membership_for_user_at_to_token=room_membership_for_user_at_to_token,
+            state_filter=state_filter,
+            to_token=to_token,
+        )
+
+        event_map = await self.store.get_events(list(room_state_ids.values()))
+
+        state_map = {}
+        for key, event_id in room_state_ids.items():
+            event = event_map.get(event_id)
+            if event:
+                state_map[key] = event
+
+        return state_map
+
     async def get_room_sync_data(
         self,
         user: UserID,
@@ -1070,7 +1392,7 @@ class SlidingSyncHandler:
         # membership. Currently, we have to make all of these optional because
         # `invite`/`knock` rooms only have `stripped_state`. See
         # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
-        timeline_events: Optional[List[EventBase]] = None
+        timeline_events: List[EventBase] = []
         bundled_aggregations: Optional[Dict[str, BundledAggregations]] = None
         limited: Optional[bool] = None
         prev_batch_token: Optional[StreamToken] = None
@@ -1228,10 +1550,10 @@ class SlidingSyncHandler:
             stripped_state.append(strip_event(invite_or_knock_event))
 
         # TODO: Handle state resets. For example, if we see
-        # `room_membership_for_user_at_to_token.membership = Membership.LEAVE` but
-        # `required_state` doesn't include it, we should indicate to the client that a
-        # state reset happened. Perhaps we should indicate this by setting `initial:
-        # True` and empty `required_state`.
+        # `room_membership_for_user_at_to_token.event_id=None and
+        # room_membership_for_user_at_to_token.membership is not None`, we should
+        # indicate to the client that a state reset happened. Perhaps we should indicate
+        # this by setting `initial: True` and empty `required_state`.
 
         # TODO: Since we can't determine whether we've already sent a room down this
         # Sliding Sync connection before (we plan to add this optimization in the
@@ -1239,7 +1561,45 @@ class SlidingSyncHandler:
         # updates.
         initial = True
 
-        # Fetch the required state for the room
+        # Check whether the room has a name set
+        name_state_ids = await self.get_current_state_ids_at(
+            room_id=room_id,
+            room_membership_for_user_at_to_token=room_membership_for_user_at_to_token,
+            state_filter=StateFilter.from_types([(EventTypes.Name, "")]),
+            to_token=to_token,
+        )
+        name_event_id = name_state_ids.get((EventTypes.Name, ""))
+
+        room_membership_summary: Mapping[str, MemberSummary]
+        empty_membership_summary = MemberSummary([], 0)
+        if room_membership_for_user_at_to_token.membership in (
+            Membership.LEAVE,
+            Membership.BAN,
+        ):
+            # TODO: Figure out how to get the membership summary for left/banned rooms
+            room_membership_summary = {}
+        else:
+            room_membership_summary = await self.store.get_room_summary(room_id)
+            # TODO: Reverse/rewind back to the `to_token`
+
+        # `heroes` are required if the room name is not set.
+        #
+        # Note: When you're the first one on your server to be invited to a new room
+        # over federation, we only have access to some stripped state in
+        # `event.unsigned.invite_room_state` which currently doesn't include `heroes`,
+        # see https://github.com/matrix-org/matrix-spec/issues/380. This means that
+        # clients won't be able to calculate the room name when necessary and just a
+        # pitfall we have to deal with until that spec issue is resolved.
+        hero_user_ids: List[str] = []
+        # TODO: Should we also check for `EventTypes.CanonicalAlias`
+        # (`m.room.canonical_alias`) as a fallback for the room name? see
+        # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1671260153
+        if name_event_id is None:
+            hero_user_ids = extract_heroes_from_room_summary(
+                room_membership_summary, me=user.to_string()
+            )
+
+        # Fetch the `required_state` for the room
         #
         # No `required_state` for invite/knock rooms (just `stripped_state`)
         #
@@ -1247,13 +1607,13 @@ class SlidingSyncHandler:
         # of membership. Currently, we have to make this optional because
         # `invite`/`knock` rooms only have `stripped_state`. See
         # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
-        room_state: Optional[StateMap[EventBase]] = None
+        #
+        # Calculate the `StateFilter` based on the `required_state` for the room
+        required_state_filter = StateFilter.none()
         if room_membership_for_user_at_to_token.membership not in (
             Membership.INVITE,
             Membership.KNOCK,
         ):
-            # Calculate the `StateFilter` based on the `required_state` for the room
-            state_filter: Optional[StateFilter] = StateFilter.none()
             # If we have a double wildcard ("*", "*") in the `required_state`, we need
             # to fetch all state for the room
             #
@@ -1276,7 +1636,7 @@ class SlidingSyncHandler:
             if StateValues.WILDCARD in room_sync_config.required_state_map.get(
                 StateValues.WILDCARD, set()
             ):
-                state_filter = StateFilter.all()
+                required_state_filter = StateFilter.all()
             # TODO: `StateFilter` currently doesn't support wildcard event types. We're
             # currently working around this by returning all state to the client but it
             # would be nice to fetch less from the database and return just what the
@@ -1285,7 +1645,7 @@ class SlidingSyncHandler:
                 room_sync_config.required_state_map.get(StateValues.WILDCARD)
                 is not None
             ):
-                state_filter = StateFilter.all()
+                required_state_filter = StateFilter.all()
             else:
                 required_state_types: List[Tuple[str, Optional[str]]] = []
                 for (
@@ -1314,54 +1674,72 @@ class SlidingSyncHandler:
 
                             # FIXME: We probably also care about invite, ban, kick, targets, etc
                             # but the spec only mentions "senders".
+                        elif state_key == StateValues.ME:
+                            required_state_types.append((state_type, user.to_string()))
                         else:
                             required_state_types.append((state_type, state_key))
 
-                state_filter = StateFilter.from_types(required_state_types)
-
-            # We can skip fetching state if we don't need any
-            if state_filter != StateFilter.none():
-                # We can return all of the state that was requested if we're doing an
-                # initial sync
-                if initial:
-                    # People shouldn't see past their leave/ban event
-                    if room_membership_for_user_at_to_token.membership in (
-                        Membership.LEAVE,
-                        Membership.BAN,
-                    ):
-                        room_state = await self.storage_controllers.state.get_state_at(
-                            room_id,
-                            stream_position=to_token.copy_and_replace(
-                                StreamKeyType.ROOM,
-                                room_membership_for_user_at_to_token.event_pos.to_room_stream_token(),
-                            ),
-                            state_filter=state_filter,
-                            # Partially-stated rooms should have all state events except for
-                            # the membership events and since we've already excluded
-                            # partially-stated rooms unless `required_state` only has
-                            # `["m.room.member", "$LAZY"]` for membership, we should be able
-                            # to retrieve everything requested. Plus we don't want to block
-                            # the whole sync waiting for this one room.
-                            await_full_state=False,
-                        )
-                    # Otherwise, we can get the latest current state in the room
-                    else:
-                        room_state = await self.storage_controllers.state.get_current_state(
-                            room_id,
-                            state_filter,
-                            # Partially-stated rooms should have all state events except for
-                            # the membership events and since we've already excluded
-                            # partially-stated rooms unless `required_state` only has
-                            # `["m.room.member", "$LAZY"]` for membership, we should be able
-                            # to retrieve everything requested. Plus we don't want to block
-                            # the whole sync waiting for this one room.
-                            await_full_state=False,
-                        )
-                        # TODO: Query `current_state_delta_stream` and reverse/rewind back to the `to_token`
-                else:
-                    # TODO: Once we can figure out if we've sent a room down this connection before,
-                    # we can return updates instead of the full required state.
-                    raise NotImplementedError()
+                required_state_filter = StateFilter.from_types(required_state_types)
+
+        # We need this base set of info for the response so let's just fetch it along
+        # with the `required_state` for the room
+        meta_room_state = [(EventTypes.Name, ""), (EventTypes.RoomAvatar, "")] + [
+            (EventTypes.Member, hero_user_id) for hero_user_id in hero_user_ids
+        ]
+        state_filter = StateFilter.all()
+        if required_state_filter != StateFilter.all():
+            state_filter = StateFilter(
+                types=StateFilter.from_types(
+                    chain(meta_room_state, required_state_filter.to_types())
+                ).types,
+                include_others=required_state_filter.include_others,
+            )
+
+        # We can return all of the state that was requested if this was the first
+        # time we've sent the room down this connection.
+        room_state: StateMap[EventBase] = {}
+        if initial:
+            room_state = await self.get_current_state_at(
+                room_id=room_id,
+                room_membership_for_user_at_to_token=room_membership_for_user_at_to_token,
+                state_filter=state_filter,
+                to_token=to_token,
+            )
+        else:
+            # TODO: Once we can figure out if we've sent a room down this connection before,
+            # we can return updates instead of the full required state.
+            raise NotImplementedError()
+
+        required_room_state: StateMap[EventBase] = {}
+        if required_state_filter != StateFilter.none():
+            required_room_state = required_state_filter.filter_state(room_state)
+
+        # Find the room name and avatar from the state
+        room_name: Optional[str] = None
+        # TODO: Should we also check for `EventTypes.CanonicalAlias`
+        # (`m.room.canonical_alias`) as a fallback for the room name? see
+        # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1671260153
+        name_event = room_state.get((EventTypes.Name, ""))
+        if name_event is not None:
+            room_name = name_event.content.get("name")
+
+        room_avatar: Optional[str] = None
+        avatar_event = room_state.get((EventTypes.RoomAvatar, ""))
+        if avatar_event is not None:
+            room_avatar = avatar_event.content.get("url")
+
+        # Assemble heroes: extract the info from the state we just fetched
+        heroes: List[SlidingSyncResult.RoomResult.StrippedHero] = []
+        for hero_user_id in hero_user_ids:
+            member_event = room_state.get((EventTypes.Member, hero_user_id))
+            if member_event is not None:
+                heroes.append(
+                    SlidingSyncResult.RoomResult.StrippedHero(
+                        user_id=hero_user_id,
+                        display_name=member_event.content.get("displayname"),
+                        avatar_url=member_event.content.get("avatar_url"),
+                    )
+                )
 
         # Figure out the last bump event in the room
         last_bump_event_result = (
@@ -1378,16 +1756,12 @@ class SlidingSyncHandler:
             bump_stamp = bump_event_pos.stream
 
         return SlidingSyncResult.RoomResult(
-            # TODO: Dummy value
-            name=None,
-            # TODO: Dummy value
-            avatar=None,
-            # TODO: Dummy value
-            heroes=None,
-            # TODO: Dummy value
-            is_dm=False,
+            name=room_name,
+            avatar=room_avatar,
+            heroes=heroes,
+            is_dm=room_membership_for_user_at_to_token.is_dm,
             initial=initial,
-            required_state=list(room_state.values()) if room_state else None,
+            required_state=list(required_room_state.values()),
             timeline_events=timeline_events,
             bundled_aggregations=bundled_aggregations,
             stripped_state=stripped_state,
@@ -1395,12 +1769,178 @@ class SlidingSyncHandler:
             limited=limited,
             num_live=num_live,
             bump_stamp=bump_stamp,
-            # TODO: Dummy values
-            joined_count=0,
-            invited_count=0,
+            joined_count=room_membership_summary.get(
+                Membership.JOIN, empty_membership_summary
+            ).count,
+            invited_count=room_membership_summary.get(
+                Membership.INVITE, empty_membership_summary
+            ).count,
             # TODO: These are just dummy values. We could potentially just remove these
             # since notifications can only really be done correctly on the client anyway
             # (encrypted rooms).
             notification_count=0,
             highlight_count=0,
         )
+
+    async def get_extensions_response(
+        self,
+        sync_config: SlidingSyncConfig,
+        to_token: StreamToken,
+        from_token: Optional[StreamToken],
+    ) -> SlidingSyncResult.Extensions:
+        """Handle extension requests.
+
+        Args:
+            sync_config: Sync configuration
+            to_token: The point in the stream to sync up to.
+            from_token: The point in the stream to sync from.
+        """
+
+        if sync_config.extensions is None:
+            return SlidingSyncResult.Extensions()
+
+        to_device_response = None
+        if sync_config.extensions.to_device is not None:
+            to_device_response = await self.get_to_device_extension_response(
+                sync_config=sync_config,
+                to_device_request=sync_config.extensions.to_device,
+                to_token=to_token,
+            )
+
+        e2ee_response = None
+        if sync_config.extensions.e2ee is not None:
+            e2ee_response = await self.get_e2ee_extension_response(
+                sync_config=sync_config,
+                e2ee_request=sync_config.extensions.e2ee,
+                to_token=to_token,
+                from_token=from_token,
+            )
+
+        return SlidingSyncResult.Extensions(
+            to_device=to_device_response,
+            e2ee=e2ee_response,
+        )
+
+    async def get_to_device_extension_response(
+        self,
+        sync_config: SlidingSyncConfig,
+        to_device_request: SlidingSyncConfig.Extensions.ToDeviceExtension,
+        to_token: StreamToken,
+    ) -> Optional[SlidingSyncResult.Extensions.ToDeviceExtension]:
+        """Handle to-device extension (MSC3885)
+
+        Args:
+            sync_config: Sync configuration
+            to_device_request: The to-device extension from the request
+            to_token: The point in the stream to sync up to.
+        """
+        user_id = sync_config.user.to_string()
+        device_id = sync_config.device_id
+
+        # Skip if the extension is not enabled
+        if not to_device_request.enabled:
+            return None
+
+        # Check that this request has a valid device ID (not all requests have
+        # to belong to a device, and so device_id is None)
+        if device_id is None:
+            return SlidingSyncResult.Extensions.ToDeviceExtension(
+                next_batch=f"{to_token.to_device_key}",
+                events=[],
+            )
+
+        since_stream_id = 0
+        if to_device_request.since is not None:
+            # We've already validated this is an int.
+            since_stream_id = int(to_device_request.since)
+
+            if to_token.to_device_key < since_stream_id:
+                # The since token is ahead of our current token, so we return an
+                # empty response.
+                logger.warning(
+                    "Got to-device.since from the future. since token: %r is ahead of our current to_device stream position: %r",
+                    since_stream_id,
+                    to_token.to_device_key,
+                )
+                return SlidingSyncResult.Extensions.ToDeviceExtension(
+                    next_batch=to_device_request.since,
+                    events=[],
+                )
+
+            # Delete everything before the given since token, as we know the
+            # device must have received them.
+            deleted = await self.store.delete_messages_for_device(
+                user_id=user_id,
+                device_id=device_id,
+                up_to_stream_id=since_stream_id,
+            )
+
+            logger.debug(
+                "Deleted %d to-device messages up to %d for %s",
+                deleted,
+                since_stream_id,
+                user_id,
+            )
+
+        messages, stream_id = await self.store.get_messages_for_device(
+            user_id=user_id,
+            device_id=device_id,
+            from_stream_id=since_stream_id,
+            to_stream_id=to_token.to_device_key,
+            limit=min(to_device_request.limit, 100),  # Limit to at most 100 events
+        )
+
+        return SlidingSyncResult.Extensions.ToDeviceExtension(
+            next_batch=f"{stream_id}",
+            events=messages,
+        )
+
+    async def get_e2ee_extension_response(
+        self,
+        sync_config: SlidingSyncConfig,
+        e2ee_request: SlidingSyncConfig.Extensions.E2eeExtension,
+        to_token: StreamToken,
+        from_token: Optional[StreamToken],
+    ) -> Optional[SlidingSyncResult.Extensions.E2eeExtension]:
+        """Handle E2EE device extension (MSC3884)
+
+        Args:
+            sync_config: Sync configuration
+            e2ee_request: The e2ee extension from the request
+            to_token: The point in the stream to sync up to.
+            from_token: The point in the stream to sync from.
+        """
+        user_id = sync_config.user.to_string()
+        device_id = sync_config.device_id
+
+        # Skip if the extension is not enabled
+        if not e2ee_request.enabled:
+            return None
+
+        device_list_updates: Optional[DeviceListUpdates] = None
+        if from_token is not None:
+            # TODO: This should take into account the `from_token` and `to_token`
+            device_list_updates = await self.device_handler.get_user_ids_changed(
+                user_id=user_id,
+                from_token=from_token,
+            )
+
+        device_one_time_keys_count: Mapping[str, int] = {}
+        device_unused_fallback_key_types: Sequence[str] = []
+        if device_id:
+            # TODO: We should have a way to let clients differentiate between the states of:
+            #   * no change in OTK count since the provided since token
+            #   * the server has zero OTKs left for this device
+            #  Spec issue: https://github.com/matrix-org/matrix-doc/issues/3298
+            device_one_time_keys_count = await self.store.count_e2e_one_time_keys(
+                user_id, device_id
+            )
+            device_unused_fallback_key_types = (
+                await self.store.get_e2e_unused_fallback_key_types(user_id, device_id)
+            )
+
+        return SlidingSyncResult.Extensions.E2eeExtension(
+            device_list_updates=device_list_updates,
+            device_one_time_keys_count=device_one_time_keys_count,
+            device_unused_fallback_key_types=device_unused_fallback_key_types,
+        )