summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/admin.py10
-rw-r--r--synapse/handlers/pagination.py32
-rw-r--r--synapse/handlers/room.py2
-rw-r--r--synapse/handlers/sliding_sync.py646
-rw-r--r--synapse/handlers/sync.py69
5 files changed, 480 insertions, 279 deletions
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index ec35784c5f..b44e862493 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -197,8 +197,14 @@ class AdminHandler:
             # events that we have and then filtering, this isn't the most
             # efficient method perhaps but it does guarantee we get everything.
             while True:
-                events, _ = await self._store.paginate_room_events(
-                    room_id, from_key, to_key, limit=100, direction=Direction.FORWARDS
+                events, _ = (
+                    await self._store.paginate_room_events_by_topological_ordering(
+                        room_id=room_id,
+                        from_key=from_key,
+                        to_key=to_key,
+                        limit=100,
+                        direction=Direction.FORWARDS,
+                    )
                 )
                 if not events:
                     break
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 872c85fbad..6fd7afa280 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -507,13 +507,15 @@ class PaginationHandler:
 
         # Initially fetch the events from the database. With any luck, we can return
         # these without blocking on backfill (handled below).
-        events, next_key = await self.store.paginate_room_events(
-            room_id=room_id,
-            from_key=from_token.room_key,
-            to_key=to_room_key,
-            direction=pagin_config.direction,
-            limit=pagin_config.limit,
-            event_filter=event_filter,
+        events, next_key = (
+            await self.store.paginate_room_events_by_topological_ordering(
+                room_id=room_id,
+                from_key=from_token.room_key,
+                to_key=to_room_key,
+                direction=pagin_config.direction,
+                limit=pagin_config.limit,
+                event_filter=event_filter,
+            )
         )
 
         if pagin_config.direction == Direction.BACKWARDS:
@@ -582,13 +584,15 @@ class PaginationHandler:
                 # If we did backfill something, refetch the events from the database to
                 # catch anything new that might have been added since we last fetched.
                 if did_backfill:
-                    events, next_key = await self.store.paginate_room_events(
-                        room_id=room_id,
-                        from_key=from_token.room_key,
-                        to_key=to_room_key,
-                        direction=pagin_config.direction,
-                        limit=pagin_config.limit,
-                        event_filter=event_filter,
+                    events, next_key = (
+                        await self.store.paginate_room_events_by_topological_ordering(
+                            room_id=room_id,
+                            from_key=from_token.room_key,
+                            to_key=to_room_key,
+                            direction=pagin_config.direction,
+                            limit=pagin_config.limit,
+                            event_filter=event_filter,
+                        )
                     )
             else:
                 # Otherwise, we can backfill in the background for eventual
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 262d9f4044..2c6e672ede 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -1750,7 +1750,7 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
                 from_key=from_key,
                 to_key=to_key,
                 limit=limit or 10,
-                order="ASC",
+                direction=Direction.FORWARDS,
             )
 
             events = list(room_events)
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 1936471345..18a96843be 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -51,13 +51,23 @@ from synapse.api.errors import SlidingSyncUnknownPosition
 from synapse.events import EventBase, StrippedStateEvent
 from synapse.events.utils import parse_stripped_state_event, strip_event
 from synapse.handlers.relations import BundledAggregations
-from synapse.logging.opentracing import log_kv, start_active_span, tag_args, trace
+from synapse.logging.opentracing import (
+    SynapseTags,
+    log_kv,
+    set_tag,
+    start_active_span,
+    tag_args,
+    trace,
+)
 from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
 from synapse.storage.databases.main.state import (
     ROOM_UNKNOWN_SENTINEL,
     Sentinel as StateSentinel,
 )
-from synapse.storage.databases.main.stream import CurrentStateDeltaMembership
+from synapse.storage.databases.main.stream import (
+    CurrentStateDeltaMembership,
+    PaginateFunction,
+)
 from synapse.storage.roommember import MemberSummary
 from synapse.types import (
     DeviceListUpdates,
@@ -533,126 +543,153 @@ class SlidingSyncHandler:
         lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
         # Keep track of the rooms that we can display and need to fetch more info about
         relevant_room_map: Dict[str, RoomSyncConfig] = {}
+        # The set of room IDs of all rooms that could appear in any list. These
+        # include rooms that are outside the list ranges.
+        all_rooms: Set[str] = set()
         if has_lists and sync_config.lists is not None:
-            sync_room_map = await self.filter_rooms_relevant_for_sync(
-                user=sync_config.user,
-                room_membership_for_user_map=room_membership_for_user_map,
-            )
+            with start_active_span("assemble_sliding_window_lists"):
+                sync_room_map = await self.filter_rooms_relevant_for_sync(
+                    user=sync_config.user,
+                    room_membership_for_user_map=room_membership_for_user_map,
+                )
+
+                for list_key, list_config in sync_config.lists.items():
+                    # Apply filters
+                    filtered_sync_room_map = sync_room_map
+                    if list_config.filters is not None:
+                        filtered_sync_room_map = await self.filter_rooms(
+                            sync_config.user,
+                            sync_room_map,
+                            list_config.filters,
+                            to_token,
+                        )
 
-            for list_key, list_config in sync_config.lists.items():
-                # Apply filters
-                filtered_sync_room_map = sync_room_map
-                if list_config.filters is not None:
-                    filtered_sync_room_map = await self.filter_rooms(
-                        sync_config.user, sync_room_map, list_config.filters, to_token
+                    # Find which rooms are partially stated and may need to be filtered out
+                    # depending on the `required_state` requested (see below).
+                    partial_state_room_map = (
+                        await self.store.is_partial_state_room_batched(
+                            filtered_sync_room_map.keys()
+                        )
                     )
 
-                # Sort the list
-                sorted_room_info = await self.sort_rooms(
-                    filtered_sync_room_map, to_token
-                )
+                    # Since creating the `RoomSyncConfig` takes some work, let's just do it
+                    # once and make a copy whenever we need it.
+                    room_sync_config = RoomSyncConfig.from_room_config(list_config)
+                    membership_state_keys = room_sync_config.required_state_map.get(
+                        EventTypes.Member
+                    )
+                    # Also see `StateFilter.must_await_full_state(...)` for comparison
+                    lazy_loading = (
+                        membership_state_keys is not None
+                        and StateValues.LAZY in membership_state_keys
+                    )
 
-                # Find which rooms are partially stated and may need to be filtered out
-                # depending on the `required_state` requested (see below).
-                partial_state_room_map = await self.store.is_partial_state_room_batched(
-                    filtered_sync_room_map.keys()
-                )
+                    if not lazy_loading:
+                        # Exclude partially-stated rooms unless the `required_state`
+                        # only has `["m.room.member", "$LAZY"]` for membership
+                        # (lazy-loading room members).
+                        filtered_sync_room_map = {
+                            room_id: room
+                            for room_id, room in filtered_sync_room_map.items()
+                            if not partial_state_room_map.get(room_id)
+                        }
 
-                # Since creating the `RoomSyncConfig` takes some work, let's just do it
-                # once and make a copy whenever we need it.
-                room_sync_config = RoomSyncConfig.from_room_config(list_config)
-                membership_state_keys = room_sync_config.required_state_map.get(
-                    EventTypes.Member
-                )
-                # Also see `StateFilter.must_await_full_state(...)` for comparison
-                lazy_loading = (
-                    membership_state_keys is not None
-                    and StateValues.LAZY in membership_state_keys
-                )
+                    all_rooms.update(filtered_sync_room_map)
 
-                ops: List[SlidingSyncResult.SlidingWindowList.Operation] = []
-                if list_config.ranges:
-                    for range in list_config.ranges:
-                        room_ids_in_list: List[str] = []
-
-                        # We're going to loop through the sorted list of rooms starting
-                        # at the range start index and keep adding rooms until we fill
-                        # up the range or run out of rooms.
-                        #
-                        # Both sides of range are inclusive so we `+ 1`
-                        max_num_rooms = range[1] - range[0] + 1
-                        for room_membership in sorted_room_info[range[0] :]:
-                            room_id = room_membership.room_id
-
-                            if len(room_ids_in_list) >= max_num_rooms:
-                                break
-
-                            # Exclude partially-stated rooms unless the `required_state`
-                            # only has `["m.room.member", "$LAZY"]` for membership
-                            # (lazy-loading room members).
-                            if partial_state_room_map.get(room_id) and not lazy_loading:
-                                continue
-
-                            # Take the superset of the `RoomSyncConfig` for each room.
+                    # Sort the list
+                    sorted_room_info = await self.sort_rooms(
+                        filtered_sync_room_map, to_token
+                    )
+
+                    ops: List[SlidingSyncResult.SlidingWindowList.Operation] = []
+                    if list_config.ranges:
+                        for range in list_config.ranges:
+                            room_ids_in_list: List[str] = []
+
+                            # We're going to loop through the sorted list of rooms starting
+                            # at the range start index and keep adding rooms until we fill
+                            # up the range or run out of rooms.
                             #
-                            # Update our `relevant_room_map` with the room we're going
-                            # to display and need to fetch more info about.
-                            existing_room_sync_config = relevant_room_map.get(room_id)
-                            if existing_room_sync_config is not None:
-                                existing_room_sync_config.combine_room_sync_config(
-                                    room_sync_config
+                            # Both sides of range are inclusive so we `+ 1`
+                            max_num_rooms = range[1] - range[0] + 1
+                            for room_membership in sorted_room_info[range[0] :]:
+                                room_id = room_membership.room_id
+
+                                if len(room_ids_in_list) >= max_num_rooms:
+                                    break
+
+                                # Take the superset of the `RoomSyncConfig` for each room.
+                                #
+                                # Update our `relevant_room_map` with the room we're going
+                                # to display and need to fetch more info about.
+                                existing_room_sync_config = relevant_room_map.get(
+                                    room_id
                                 )
-                            else:
-                                # Make a copy so if we modify it later, it doesn't
-                                # affect all references.
-                                relevant_room_map[room_id] = (
-                                    room_sync_config.deep_copy()
+                                if existing_room_sync_config is not None:
+                                    existing_room_sync_config.combine_room_sync_config(
+                                        room_sync_config
+                                    )
+                                else:
+                                    # Make a copy so if we modify it later, it doesn't
+                                    # affect all references.
+                                    relevant_room_map[room_id] = (
+                                        room_sync_config.deep_copy()
+                                    )
+
+                                room_ids_in_list.append(room_id)
+
+                            ops.append(
+                                SlidingSyncResult.SlidingWindowList.Operation(
+                                    op=OperationType.SYNC,
+                                    range=range,
+                                    room_ids=room_ids_in_list,
                                 )
-
-                            room_ids_in_list.append(room_id)
-
-                        ops.append(
-                            SlidingSyncResult.SlidingWindowList.Operation(
-                                op=OperationType.SYNC,
-                                range=range,
-                                room_ids=room_ids_in_list,
                             )
-                        )
 
-                lists[list_key] = SlidingSyncResult.SlidingWindowList(
-                    count=len(sorted_room_info),
-                    ops=ops,
-                )
+                    lists[list_key] = SlidingSyncResult.SlidingWindowList(
+                        count=len(sorted_room_info),
+                        ops=ops,
+                    )
 
         # Handle room subscriptions
         if has_room_subscriptions and sync_config.room_subscriptions is not None:
-            for room_id, room_subscription in sync_config.room_subscriptions.items():
-                room_membership_for_user_at_to_token = (
-                    await self.check_room_subscription_allowed_for_user(
-                        room_id=room_id,
-                        room_membership_for_user_map=room_membership_for_user_map,
-                        to_token=to_token,
+            with start_active_span("assemble_room_subscriptions"):
+                for (
+                    room_id,
+                    room_subscription,
+                ) in sync_config.room_subscriptions.items():
+                    room_membership_for_user_at_to_token = (
+                        await self.check_room_subscription_allowed_for_user(
+                            room_id=room_id,
+                            room_membership_for_user_map=room_membership_for_user_map,
+                            to_token=to_token,
+                        )
                     )
-                )
 
-                # Skip this room if the user isn't allowed to see it
-                if not room_membership_for_user_at_to_token:
-                    continue
+                    # Skip this room if the user isn't allowed to see it
+                    if not room_membership_for_user_at_to_token:
+                        continue
 
-                room_membership_for_user_map[room_id] = (
-                    room_membership_for_user_at_to_token
-                )
+                    all_rooms.add(room_id)
 
-                # Take the superset of the `RoomSyncConfig` for each room.
-                #
-                # Update our `relevant_room_map` with the room we're going to display
-                # and need to fetch more info about.
-                room_sync_config = RoomSyncConfig.from_room_config(room_subscription)
-                existing_room_sync_config = relevant_room_map.get(room_id)
-                if existing_room_sync_config is not None:
-                    existing_room_sync_config.combine_room_sync_config(room_sync_config)
-                else:
-                    relevant_room_map[room_id] = room_sync_config
+                    room_membership_for_user_map[room_id] = (
+                        room_membership_for_user_at_to_token
+                    )
+
+                    # Take the superset of the `RoomSyncConfig` for each room.
+                    #
+                    # Update our `relevant_room_map` with the room we're going to display
+                    # and need to fetch more info about.
+                    room_sync_config = RoomSyncConfig.from_room_config(
+                        room_subscription
+                    )
+                    existing_room_sync_config = relevant_room_map.get(room_id)
+                    if existing_room_sync_config is not None:
+                        existing_room_sync_config.combine_room_sync_config(
+                            room_sync_config
+                        )
+                    else:
+                        relevant_room_map[room_id] = room_sync_config
 
         # Fetch room data
         rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
@@ -661,48 +698,49 @@ class SlidingSyncHandler:
         # previously.
         # Keep track of the rooms that we're going to display and need to fetch more info about
         relevant_rooms_to_send_map = relevant_room_map
-        if from_token:
-            rooms_should_send = set()
-
-            # First we check if there are rooms that match a list/room
-            # subscription and have updates we need to send (i.e. either because
-            # we haven't sent the room down, or we have but there are missing
-            # updates).
-            for room_id in relevant_room_map:
-                status = await self.connection_store.have_sent_room(
-                    sync_config,
-                    from_token.connection_position,
-                    room_id,
-                )
-                if (
-                    # The room was never sent down before so the client needs to know
-                    # about it regardless of any updates.
-                    status.status == HaveSentRoomFlag.NEVER
-                    # `PREVIOUSLY` literally means the "room was sent down before *AND*
-                    # there are updates we haven't sent down" so we already know this
-                    # room has updates.
-                    or status.status == HaveSentRoomFlag.PREVIOUSLY
-                ):
-                    rooms_should_send.add(room_id)
-                elif status.status == HaveSentRoomFlag.LIVE:
-                    # We know that we've sent all updates up until `from_token`,
-                    # so we just need to check if there have been updates since
-                    # then.
-                    pass
-                else:
-                    assert_never(status.status)
+        with start_active_span("filter_relevant_rooms_to_send"):
+            if from_token:
+                rooms_should_send = set()
+
+                # First we check if there are rooms that match a list/room
+                # subscription and have updates we need to send (i.e. either because
+                # we haven't sent the room down, or we have but there are missing
+                # updates).
+                for room_id in relevant_room_map:
+                    status = await self.connection_store.have_sent_room(
+                        sync_config,
+                        from_token.connection_position,
+                        room_id,
+                    )
+                    if (
+                        # The room was never sent down before so the client needs to know
+                        # about it regardless of any updates.
+                        status.status == HaveSentRoomFlag.NEVER
+                        # `PREVIOUSLY` literally means the "room was sent down before *AND*
+                        # there are updates we haven't sent down" so we already know this
+                        # room has updates.
+                        or status.status == HaveSentRoomFlag.PREVIOUSLY
+                    ):
+                        rooms_should_send.add(room_id)
+                    elif status.status == HaveSentRoomFlag.LIVE:
+                        # We know that we've sent all updates up until `from_token`,
+                        # so we just need to check if there have been updates since
+                        # then.
+                        pass
+                    else:
+                        assert_never(status.status)
 
-            # We only need to check for new events since any state changes
-            # will also come down as new events.
-            rooms_that_have_updates = self.store.get_rooms_that_might_have_updates(
-                relevant_room_map.keys(), from_token.stream_token.room_key
-            )
-            rooms_should_send.update(rooms_that_have_updates)
-            relevant_rooms_to_send_map = {
-                room_id: room_sync_config
-                for room_id, room_sync_config in relevant_room_map.items()
-                if room_id in rooms_should_send
-            }
+                # We only need to check for new events since any state changes
+                # will also come down as new events.
+                rooms_that_have_updates = self.store.get_rooms_that_might_have_updates(
+                    relevant_room_map.keys(), from_token.stream_token.room_key
+                )
+                rooms_should_send.update(rooms_that_have_updates)
+                relevant_rooms_to_send_map = {
+                    room_id: room_sync_config
+                    for room_id, room_sync_config in relevant_room_map.items()
+                    if room_id in rooms_should_send
+                }
 
         @trace
         @tag_args
@@ -741,12 +779,40 @@ class SlidingSyncHandler:
         )
 
         if has_lists or has_room_subscriptions:
+            # We now calculate if any rooms outside the range have had updates,
+            # which we are not sending down.
+            #
+            # We *must* record rooms that have had updates, but it is also fine
+            # to record rooms as having updates even if there might not actually
+            # be anything new for the user (e.g. due to event filters, events
+            # having happened after the user left, etc).
+            unsent_room_ids = []
+            if from_token:
+                # The set of rooms that the client (may) care about, but aren't
+                # in any list range (or subscribed to).
+                missing_rooms = all_rooms - relevant_room_map.keys()
+
+                # We now just go and try fetching any events in the above rooms
+                # to see if anything has happened since the `from_token`.
+                #
+                # TODO: Replace this with something faster. When we land the
+                # sliding sync tables that record the most recent event
+                # positions we can use that.
+                missing_event_map_by_room = (
+                    await self.store.get_room_events_stream_for_rooms(
+                        room_ids=missing_rooms,
+                        from_key=to_token.room_key,
+                        to_key=from_token.stream_token.room_key,
+                        limit=1,
+                    )
+                )
+                unsent_room_ids = list(missing_event_map_by_room)
+
             connection_position = await self.connection_store.record_rooms(
                 sync_config=sync_config,
                 from_token=from_token,
                 sent_room_ids=relevant_rooms_to_send_map.keys(),
-                # TODO: We need to calculate which rooms have had updates since the `from_token` but were not included in the `sent_room_ids`
-                unsent_room_ids=[],
+                unsent_room_ids=unsent_room_ids,
             )
         elif from_token:
             connection_position = from_token.connection_position
@@ -754,13 +820,20 @@ class SlidingSyncHandler:
             # Initial sync without a `from_token` starts at `0`
             connection_position = 0
 
-        return SlidingSyncResult(
+        sliding_sync_result = SlidingSyncResult(
             next_pos=SlidingSyncStreamToken(to_token, connection_position),
             lists=lists,
             rooms=rooms,
             extensions=extensions,
         )
 
+        # Make it easy to find traces for syncs that aren't empty
+        set_tag(SynapseTags.RESULT_PREFIX + "result", bool(sliding_sync_result))
+        set_tag(SynapseTags.FUNC_ARG_PREFIX + "sync_config.user", user_id)
+
+        return sliding_sync_result
+
+    @trace
     async def get_room_membership_for_user_at_to_token(
         self,
         user: UserID,
@@ -1099,6 +1172,7 @@ class SlidingSyncHandler:
 
         return sync_room_id_set
 
+    @trace
     async def filter_rooms_relevant_for_sync(
         self,
         user: UserID,
@@ -1209,6 +1283,7 @@ class SlidingSyncHandler:
 
         # return None
 
+    @trace
     async def _bulk_get_stripped_state_for_rooms_from_sync_room_map(
         self,
         room_ids: StrCollection,
@@ -1299,6 +1374,7 @@ class SlidingSyncHandler:
 
         return room_id_to_stripped_state_map
 
+    @trace
     async def _bulk_get_partial_current_state_content_for_rooms(
         self,
         content_type: Literal[
@@ -1498,125 +1574,132 @@ class SlidingSyncHandler:
 
         # Filter for Direct-Message (DM) rooms
         if filters.is_dm is not None:
-            if filters.is_dm:
-                # Only DM rooms please
-                filtered_room_id_set = {
-                    room_id
-                    for room_id in filtered_room_id_set
-                    if sync_room_map[room_id].is_dm
-                }
-            else:
-                # Only non-DM rooms please
-                filtered_room_id_set = {
-                    room_id
-                    for room_id in filtered_room_id_set
-                    if not sync_room_map[room_id].is_dm
-                }
+            with start_active_span("filters.is_dm"):
+                if filters.is_dm:
+                    # Only DM rooms please
+                    filtered_room_id_set = {
+                        room_id
+                        for room_id in filtered_room_id_set
+                        if sync_room_map[room_id].is_dm
+                    }
+                else:
+                    # Only non-DM rooms please
+                    filtered_room_id_set = {
+                        room_id
+                        for room_id in filtered_room_id_set
+                        if not sync_room_map[room_id].is_dm
+                    }
 
         if filters.spaces is not None:
-            raise NotImplementedError()
+            with start_active_span("filters.spaces"):
+                raise NotImplementedError()
 
         # Filter for encrypted rooms
         if filters.is_encrypted is not None:
-            room_id_to_encryption = (
-                await self._bulk_get_partial_current_state_content_for_rooms(
-                    content_type="room_encryption",
-                    room_ids=filtered_room_id_set,
-                    to_token=to_token,
-                    sync_room_map=sync_room_map,
-                    room_id_to_stripped_state_map=room_id_to_stripped_state_map,
+            with start_active_span("filters.is_encrypted"):
+                room_id_to_encryption = (
+                    await self._bulk_get_partial_current_state_content_for_rooms(
+                        content_type="room_encryption",
+                        room_ids=filtered_room_id_set,
+                        to_token=to_token,
+                        sync_room_map=sync_room_map,
+                        room_id_to_stripped_state_map=room_id_to_stripped_state_map,
+                    )
                 )
-            )
 
-            # Make a copy so we don't run into an error: `Set changed size during
-            # iteration`, when we filter out and remove items
-            for room_id in filtered_room_id_set.copy():
-                encryption = room_id_to_encryption.get(room_id, ROOM_UNKNOWN_SENTINEL)
+                # Make a copy so we don't run into an error: `Set changed size during
+                # iteration`, when we filter out and remove items
+                for room_id in filtered_room_id_set.copy():
+                    encryption = room_id_to_encryption.get(
+                        room_id, ROOM_UNKNOWN_SENTINEL
+                    )
 
-                # Just remove rooms if we can't determine their encryption status
-                if encryption is ROOM_UNKNOWN_SENTINEL:
-                    filtered_room_id_set.remove(room_id)
-                    continue
+                    # Just remove rooms if we can't determine their encryption status
+                    if encryption is ROOM_UNKNOWN_SENTINEL:
+                        filtered_room_id_set.remove(room_id)
+                        continue
 
-                # If we're looking for encrypted rooms, filter out rooms that are not
-                # encrypted and vice versa
-                is_encrypted = encryption is not None
-                if (filters.is_encrypted and not is_encrypted) or (
-                    not filters.is_encrypted and is_encrypted
-                ):
-                    filtered_room_id_set.remove(room_id)
+                    # If we're looking for encrypted rooms, filter out rooms that are not
+                    # encrypted and vice versa
+                    is_encrypted = encryption is not None
+                    if (filters.is_encrypted and not is_encrypted) or (
+                        not filters.is_encrypted and is_encrypted
+                    ):
+                        filtered_room_id_set.remove(room_id)
 
         # Filter for rooms that the user has been invited to
         if filters.is_invite is not None:
-            # Make a copy so we don't run into an error: `Set changed size during
-            # iteration`, when we filter out and remove items
-            for room_id in filtered_room_id_set.copy():
-                room_for_user = sync_room_map[room_id]
-                # If we're looking for invite rooms, filter out rooms that the user is
-                # not invited to and vice versa
-                if (
-                    filters.is_invite and room_for_user.membership != Membership.INVITE
-                ) or (
-                    not filters.is_invite
-                    and room_for_user.membership == Membership.INVITE
-                ):
-                    filtered_room_id_set.remove(room_id)
+            with start_active_span("filters.is_invite"):
+                # Make a copy so we don't run into an error: `Set changed size during
+                # iteration`, when we filter out and remove items
+                for room_id in filtered_room_id_set.copy():
+                    room_for_user = sync_room_map[room_id]
+                    # If we're looking for invite rooms, filter out rooms that the user is
+                    # not invited to and vice versa
+                    if (
+                        filters.is_invite
+                        and room_for_user.membership != Membership.INVITE
+                    ) or (
+                        not filters.is_invite
+                        and room_for_user.membership == Membership.INVITE
+                    ):
+                        filtered_room_id_set.remove(room_id)
 
         # Filter by room type (space vs room, etc). A room must match one of the types
         # provided in the list. `None` is a valid type for rooms which do not have a
         # room type.
         if filters.room_types is not None or filters.not_room_types is not None:
-            room_id_to_type = (
-                await self._bulk_get_partial_current_state_content_for_rooms(
-                    content_type="room_type",
-                    room_ids=filtered_room_id_set,
-                    to_token=to_token,
-                    sync_room_map=sync_room_map,
-                    room_id_to_stripped_state_map=room_id_to_stripped_state_map,
+            with start_active_span("filters.room_types"):
+                room_id_to_type = (
+                    await self._bulk_get_partial_current_state_content_for_rooms(
+                        content_type="room_type",
+                        room_ids=filtered_room_id_set,
+                        to_token=to_token,
+                        sync_room_map=sync_room_map,
+                        room_id_to_stripped_state_map=room_id_to_stripped_state_map,
+                    )
                 )
-            )
 
-            # Make a copy so we don't run into an error: `Set changed size during
-            # iteration`, when we filter out and remove items
-            for room_id in filtered_room_id_set.copy():
-                room_type = room_id_to_type.get(room_id, ROOM_UNKNOWN_SENTINEL)
+                # Make a copy so we don't run into an error: `Set changed size during
+                # iteration`, when we filter out and remove items
+                for room_id in filtered_room_id_set.copy():
+                    room_type = room_id_to_type.get(room_id, ROOM_UNKNOWN_SENTINEL)
 
-                # Just remove rooms if we can't determine their type
-                if room_type is ROOM_UNKNOWN_SENTINEL:
-                    filtered_room_id_set.remove(room_id)
-                    continue
+                    # Just remove rooms if we can't determine their type
+                    if room_type is ROOM_UNKNOWN_SENTINEL:
+                        filtered_room_id_set.remove(room_id)
+                        continue
 
-                if (
-                    filters.room_types is not None
-                    and room_type not in filters.room_types
-                ):
-                    filtered_room_id_set.remove(room_id)
+                    if (
+                        filters.room_types is not None
+                        and room_type not in filters.room_types
+                    ):
+                        filtered_room_id_set.remove(room_id)
 
-                if (
-                    filters.not_room_types is not None
-                    and room_type in filters.not_room_types
-                ):
-                    filtered_room_id_set.remove(room_id)
+                    if (
+                        filters.not_room_types is not None
+                        and room_type in filters.not_room_types
+                    ):
+                        filtered_room_id_set.remove(room_id)
 
         if filters.room_name_like is not None:
-            # TODO: The room name is a bit more sensitive to leak than the
-            # create/encryption event. Maybe we should consider a better way to fetch
-            # historical state before implementing this.
-            #
-            # room_id_to_create_content = await self._bulk_get_partial_current_state_content_for_rooms(
-            #     content_type="room_name",
-            #     room_ids=filtered_room_id_set,
-            #     to_token=to_token,
-            #     sync_room_map=sync_room_map,
-            #     room_id_to_stripped_state_map=room_id_to_stripped_state_map,
-            # )
-            raise NotImplementedError()
-
-        if filters.tags is not None:
-            raise NotImplementedError()
-
-        if filters.not_tags is not None:
-            raise NotImplementedError()
+            with start_active_span("filters.room_name_like"):
+                # TODO: The room name is a bit more sensitive to leak than the
+                # create/encryption event. Maybe we should consider a better way to fetch
+                # historical state before implementing this.
+                #
+                # room_id_to_create_content = await self._bulk_get_partial_current_state_content_for_rooms(
+                #     content_type="room_name",
+                #     room_ids=filtered_room_id_set,
+                #     to_token=to_token,
+                #     sync_room_map=sync_room_map,
+                #     room_id_to_stripped_state_map=room_id_to_stripped_state_map,
+                # )
+                raise NotImplementedError()
+
+        if filters.tags is not None or filters.not_tags is not None:
+            with start_active_span("filters.tags"):
+                raise NotImplementedError()
 
         # Assemble a new sync room map but only with the `filtered_room_id_set`
         return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set}
@@ -1678,6 +1761,7 @@ class SlidingSyncHandler:
             reverse=True,
         )
 
+    @trace
     async def get_current_state_ids_at(
         self,
         room_id: str,
@@ -1742,6 +1826,7 @@ class SlidingSyncHandler:
 
         return state_ids
 
+    @trace
     async def get_current_state_at(
         self,
         room_id: str,
@@ -1803,15 +1888,27 @@ class SlidingSyncHandler:
         """
         user = sync_config.user
 
+        set_tag(
+            SynapseTags.FUNC_ARG_PREFIX + "membership",
+            room_membership_for_user_at_to_token.membership,
+        )
+        set_tag(
+            SynapseTags.FUNC_ARG_PREFIX + "timeline_limit",
+            room_sync_config.timeline_limit,
+        )
+
         # Determine whether we should limit the timeline to the token range.
         #
         # We should return historical messages (before token range) in the
         # following cases because we want clients to be able to show a basic
         # screen of information:
+        #
         #  - Initial sync (because no `from_token` to limit us anyway)
         #  - When users `newly_joined`
         #  - For an incremental sync where we haven't sent it down this
         #    connection before
+        #
+        # Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917
         from_bound = None
         initial = True
         if from_token and not room_membership_for_user_at_to_token.newly_joined:
@@ -1872,7 +1969,36 @@ class SlidingSyncHandler:
                     room_membership_for_user_at_to_token.event_pos.to_room_stream_token()
                 )
 
-            timeline_events, new_room_key = await self.store.paginate_room_events(
+            # For initial `/sync` (and other historical scenarios mentioned above), we
+            # want to view a historical section of the timeline; to fetch events by
+            # `topological_ordering` (best representation of the room DAG as others were
+            # seeing it at the time). This also aligns with the order that `/messages`
+            # returns events in.
+            #
+            # For incremental `/sync`, we want to get all updates for rooms since
+            # the last `/sync` (regardless if those updates arrived late or happened
+            # a while ago in the past); to fetch events by `stream_ordering` (in the
+            # order they were received by the server).
+            #
+            # Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917
+            #
+            # FIXME: Using workaround for mypy,
+            # https://github.com/python/mypy/issues/10740#issuecomment-1997047277 and
+            # https://github.com/python/mypy/issues/17479
+            paginate_room_events_by_topological_ordering: PaginateFunction = (
+                self.store.paginate_room_events_by_topological_ordering
+            )
+            paginate_room_events_by_stream_ordering: PaginateFunction = (
+                self.store.paginate_room_events_by_stream_ordering
+            )
+            pagination_method: PaginateFunction = (
+                # Use `topographical_ordering` for historical events
+                paginate_room_events_by_topological_ordering
+                if from_bound is None
+                # Use `stream_ordering` for updates
+                else paginate_room_events_by_stream_ordering
+            )
+            timeline_events, new_room_key = await pagination_method(
                 room_id=room_id,
                 # The bounds are reversed so we can paginate backwards
                 # (from newer to older events) starting at to_bound.
@@ -1883,7 +2009,6 @@ class SlidingSyncHandler:
                 # We add one so we can determine if there are enough events to saturate
                 # the limit or not (see `limited`)
                 limit=room_sync_config.timeline_limit + 1,
-                event_filter=None,
             )
 
             # We want to return the events in ascending order (the last event is the
@@ -2070,6 +2195,10 @@ class SlidingSyncHandler:
             if StateValues.WILDCARD in room_sync_config.required_state_map.get(
                 StateValues.WILDCARD, set()
             ):
+                set_tag(
+                    SynapseTags.FUNC_ARG_PREFIX + "required_state_wildcard",
+                    True,
+                )
                 required_state_filter = StateFilter.all()
             # TODO: `StateFilter` currently doesn't support wildcard event types. We're
             # currently working around this by returning all state to the client but it
@@ -2079,6 +2208,10 @@ class SlidingSyncHandler:
                 room_sync_config.required_state_map.get(StateValues.WILDCARD)
                 is not None
             ):
+                set_tag(
+                    SynapseTags.FUNC_ARG_PREFIX + "required_state_wildcard_event_type",
+                    True,
+                )
                 required_state_filter = StateFilter.all()
             else:
                 required_state_types: List[Tuple[str, Optional[str]]] = []
@@ -2086,8 +2219,12 @@ class SlidingSyncHandler:
                     state_type,
                     state_key_set,
                 ) in room_sync_config.required_state_map.items():
+                    num_wild_state_keys = 0
+                    lazy_load_room_members = False
+                    num_others = 0
                     for state_key in state_key_set:
                         if state_key == StateValues.WILDCARD:
+                            num_wild_state_keys += 1
                             # `None` is a wildcard in the `StateFilter`
                             required_state_types.append((state_type, None))
                         # We need to fetch all relevant people when we're lazy-loading membership
@@ -2095,6 +2232,7 @@ class SlidingSyncHandler:
                             state_type == EventTypes.Member
                             and state_key == StateValues.LAZY
                         ):
+                            lazy_load_room_members = True
                             # Everyone in the timeline is relevant
                             timeline_membership: Set[str] = set()
                             if timeline_events is not None:
@@ -2109,10 +2247,26 @@ class SlidingSyncHandler:
                             # FIXME: We probably also care about invite, ban, kick, targets, etc
                             # but the spec only mentions "senders".
                         elif state_key == StateValues.ME:
+                            num_others += 1
                             required_state_types.append((state_type, user.to_string()))
                         else:
+                            num_others += 1
                             required_state_types.append((state_type, state_key))
 
+                    set_tag(
+                        SynapseTags.FUNC_ARG_PREFIX
+                        + "required_state_wildcard_state_key_count",
+                        num_wild_state_keys,
+                    )
+                    set_tag(
+                        SynapseTags.FUNC_ARG_PREFIX + "required_state_lazy",
+                        lazy_load_room_members,
+                    )
+                    set_tag(
+                        SynapseTags.FUNC_ARG_PREFIX + "required_state_other_count",
+                        num_others,
+                    )
+
                 required_state_filter = StateFilter.from_types(required_state_types)
 
         # We need this base set of info for the response so let's just fetch it along
@@ -2208,6 +2362,8 @@ class SlidingSyncHandler:
             if new_bump_event_pos.stream > 0:
                 bump_stamp = new_bump_event_pos.stream
 
+        set_tag(SynapseTags.RESULT_PREFIX + "initial", initial)
+
         return SlidingSyncResult.RoomResult(
             name=room_name,
             avatar=room_avatar,
@@ -2863,6 +3019,7 @@ class SlidingSyncConnectionStore:
 
         return room_status
 
+    @trace
     async def record_rooms(
         self,
         sync_config: SlidingSyncConfig,
@@ -2938,6 +3095,7 @@ class SlidingSyncConnectionStore:
 
         return new_store_token
 
+    @trace
     async def mark_token_seen(
         self,
         sync_config: SlidingSyncConfig,
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index ede014180c..6af2eeb75f 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -43,6 +43,7 @@ from prometheus_client import Counter
 
 from synapse.api.constants import (
     AccountDataTypes,
+    Direction,
     EventContentFields,
     EventTypes,
     JoinRules,
@@ -64,6 +65,7 @@ from synapse.logging.opentracing import (
 )
 from synapse.storage.databases.main.event_push_actions import RoomNotifCounts
 from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
+from synapse.storage.databases.main.stream import PaginateFunction
 from synapse.storage.roommember import MemberSummary
 from synapse.types import (
     DeviceListUpdates,
@@ -879,22 +881,49 @@ class SyncHandler:
                 since_key = since_token.room_key
 
             while limited and len(recents) < timeline_limit and max_repeat:
-                # If we have a since_key then we are trying to get any events
-                # that have happened since `since_key` up to `end_key`, so we
-                # can just use `get_room_events_stream_for_room`.
-                # Otherwise, we want to return the last N events in the room
-                # in topological ordering.
-                if since_key:
-                    events, end_key = await self.store.get_room_events_stream_for_room(
-                        room_id,
-                        limit=load_limit + 1,
-                        from_key=since_key,
-                        to_key=end_key,
-                    )
-                else:
-                    events, end_key = await self.store.get_recent_events_for_room(
-                        room_id, limit=load_limit + 1, end_token=end_key
-                    )
+                # For initial `/sync`, we want to view a historical section of the
+                # timeline; to fetch events by `topological_ordering` (best
+                # representation of the room DAG as others were seeing it at the time).
+                # This also aligns with the order that `/messages` returns events in.
+                #
+                # For incremental `/sync`, we want to get all updates for rooms since
+                # the last `/sync` (regardless if those updates arrived late or happened
+                # a while ago in the past); to fetch events by `stream_ordering` (in the
+                # order they were received by the server).
+                #
+                # Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917
+                #
+                # FIXME: Using workaround for mypy,
+                # https://github.com/python/mypy/issues/10740#issuecomment-1997047277 and
+                # https://github.com/python/mypy/issues/17479
+                paginate_room_events_by_topological_ordering: PaginateFunction = (
+                    self.store.paginate_room_events_by_topological_ordering
+                )
+                paginate_room_events_by_stream_ordering: PaginateFunction = (
+                    self.store.paginate_room_events_by_stream_ordering
+                )
+                pagination_method: PaginateFunction = (
+                    # Use `topographical_ordering` for historical events
+                    paginate_room_events_by_topological_ordering
+                    if since_key is None
+                    # Use `stream_ordering` for updates
+                    else paginate_room_events_by_stream_ordering
+                )
+                events, end_key = await pagination_method(
+                    room_id=room_id,
+                    # The bounds are reversed so we can paginate backwards
+                    # (from newer to older events) starting at to_bound.
+                    # This ensures we fill the `limit` with the newest events first,
+                    from_key=end_key,
+                    to_key=since_key,
+                    direction=Direction.BACKWARDS,
+                    # We add one so we can determine if there are enough events to saturate
+                    # the limit or not (see `limited`)
+                    limit=load_limit + 1,
+                )
+                # We want to return the events in ascending order (the last event is the
+                # most recent).
+                events.reverse()
 
                 log_kv({"loaded_recents": len(events)})
 
@@ -2641,9 +2670,10 @@ class SyncHandler:
         # a "gap" in the timeline, as described by the spec for /sync.
         room_to_events = await self.store.get_room_events_stream_for_rooms(
             room_ids=sync_result_builder.joined_room_ids,
-            from_key=since_token.room_key,
-            to_key=now_token.room_key,
+            from_key=now_token.room_key,
+            to_key=since_token.room_key,
             limit=timeline_limit + 1,
+            direction=Direction.BACKWARDS,
         )
 
         # We loop through all room ids, even if there are no new events, in case
@@ -2654,6 +2684,9 @@ class SyncHandler:
             newly_joined = room_id in newly_joined_rooms
             if room_entry:
                 events, start_key = room_entry
+                # We want to return the events in ascending order (the last event is the
+                # most recent).
+                events.reverse()
 
                 prev_batch_token = now_token.copy_and_replace(
                     StreamKeyType.ROOM, start_key