summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorEric Eastwood <eric.eastwood@beta.gouv.fr>2024-06-17 11:27:14 -0500
committerGitHub <noreply@github.com>2024-06-17 11:27:14 -0500
commite5b8a3e37f10168953124282c296821b9d9d81ad (patch)
tree0f3bea2d5067d6a5bd5be08c23fb5ad159a09577 /synapse
parentMerge branch 'release-v1.109' into develop (diff)
downloadsynapse-e5b8a3e37f10168953124282c296821b9d9d81ad.tar.xz
Add `stream_ordering` sort to Sliding Sync `/sync` (#17293)
Sort is no longer configurable and we always sort rooms by the `stream_ordering` of the last event in the room or the point where the user can see up to in cases of leave/ban/invite/knock.
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/message.py2
-rw-r--r--synapse/handlers/sliding_sync.py158
-rw-r--r--synapse/handlers/sync.py10
-rw-r--r--synapse/storage/databases/main/stream.py42
-rw-r--r--synapse/types/rest/client/__init__.py24
5 files changed, 173 insertions, 63 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 721ef04f41..16d01efc67 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -201,7 +201,7 @@ class MessageHandler:
 
         if at_token:
             last_event_id = (
-                await self.store.get_last_event_in_room_before_stream_ordering(
+                await self.store.get_last_event_id_in_room_before_stream_ordering(
                     room_id,
                     end_token=at_token.room_key,
                 )
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 78fb66d6e2..b84cf67f7d 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -18,13 +18,20 @@
 #
 #
 import logging
-from typing import TYPE_CHECKING, AbstractSet, Dict, List, Optional
+from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
 
 from immutabledict import immutabledict
 
 from synapse.api.constants import AccountDataTypes, Membership
 from synapse.events import EventBase
-from synapse.types import Requester, RoomStreamToken, StreamToken, UserID
+from synapse.storage.roommember import RoomsForUser
+from synapse.types import (
+    PersistedEventPosition,
+    Requester,
+    RoomStreamToken,
+    StreamToken,
+    UserID,
+)
 from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult
 
 if TYPE_CHECKING:
@@ -33,6 +40,27 @@ if TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 
+def convert_event_to_rooms_for_user(event: EventBase) -> RoomsForUser:
+    """
+    Quick helper to convert an event to a `RoomsForUser` object.
+    """
+    # These fields should be present for all persisted events
+    assert event.internal_metadata.stream_ordering is not None
+    assert event.internal_metadata.instance_name is not None
+
+    return RoomsForUser(
+        room_id=event.room_id,
+        sender=event.sender,
+        membership=event.membership,
+        event_id=event.event_id,
+        event_pos=PersistedEventPosition(
+            event.internal_metadata.instance_name,
+            event.internal_metadata.stream_ordering,
+        ),
+        room_version_id=event.room_version.identifier,
+    )
+
+
 def filter_membership_for_sync(*, membership: str, user_id: str, sender: str) -> bool:
     """
     Returns True if the membership event should be included in the sync response,
@@ -169,26 +197,28 @@ class SlidingSyncHandler:
             # See https://github.com/matrix-org/matrix-doc/issues/1144
             raise NotImplementedError()
 
-        # Get all of the room IDs that the user should be able to see in the sync
-        # response
-        room_id_set = await self.get_sync_room_ids_for_user(
-            sync_config.user,
-            from_token=from_token,
-            to_token=to_token,
-        )
-
         # Assemble sliding window lists
         lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
         if sync_config.lists:
+            # Get all of the room IDs that the user should be able to see in the sync
+            # response
+            sync_room_map = await self.get_sync_room_ids_for_user(
+                sync_config.user,
+                from_token=from_token,
+                to_token=to_token,
+            )
+
             for list_key, list_config in sync_config.lists.items():
                 # Apply filters
-                filtered_room_ids = room_id_set
+                filtered_sync_room_map = sync_room_map
                 if list_config.filters is not None:
-                    filtered_room_ids = await self.filter_rooms(
-                        sync_config.user, room_id_set, list_config.filters, to_token
+                    filtered_sync_room_map = await self.filter_rooms(
+                        sync_config.user, sync_room_map, list_config.filters, to_token
                     )
-                # TODO: Apply sorts
-                sorted_room_ids = sorted(filtered_room_ids)
+
+                sorted_room_info = await self.sort_rooms(
+                    filtered_sync_room_map, to_token
+                )
 
                 ops: List[SlidingSyncResult.SlidingWindowList.Operation] = []
                 if list_config.ranges:
@@ -197,12 +227,17 @@ class SlidingSyncHandler:
                             SlidingSyncResult.SlidingWindowList.Operation(
                                 op=OperationType.SYNC,
                                 range=range,
-                                room_ids=sorted_room_ids[range[0] : range[1]],
+                                room_ids=[
+                                    room_id
+                                    for room_id, _ in sorted_room_info[
+                                        range[0] : range[1]
+                                    ]
+                                ],
                             )
                         )
 
                 lists[list_key] = SlidingSyncResult.SlidingWindowList(
-                    count=len(sorted_room_ids),
+                    count=len(sorted_room_info),
                     ops=ops,
                 )
 
@@ -219,7 +254,7 @@ class SlidingSyncHandler:
         user: UserID,
         to_token: StreamToken,
         from_token: Optional[StreamToken] = None,
-    ) -> AbstractSet[str]:
+    ) -> Dict[str, RoomsForUser]:
         """
         Fetch room IDs that should be listed for this user in the sync response (the
         full room list that will be filtered, sorted, and sliced).
@@ -237,11 +272,14 @@ class SlidingSyncHandler:
           to tell when a room was forgotten at the moment so we can't factor it into the
           from/to range.
 
-
         Args:
             user: User to fetch rooms for
             to_token: The token to fetch rooms up to.
             from_token: The point in the stream to sync from.
+
+        Returns:
+            A dictionary of room IDs that should be listed in the sync response along
+            with membership information in that room at the time of `to_token`.
         """
         user_id = user.to_string()
 
@@ -261,11 +299,11 @@ class SlidingSyncHandler:
 
         # If the user has never joined any rooms before, we can just return an empty list
         if not room_for_user_list:
-            return set()
+            return {}
 
         # Our working list of rooms that can show up in the sync response
         sync_room_id_set = {
-            room_for_user.room_id
+            room_for_user.room_id: room_for_user
             for room_for_user in room_for_user_list
             if filter_membership_for_sync(
                 membership=room_for_user.membership,
@@ -415,7 +453,9 @@ class SlidingSyncHandler:
                 not was_last_membership_already_included
                 and should_prev_membership_be_included
             ):
-                sync_room_id_set.add(room_id)
+                sync_room_id_set[room_id] = convert_event_to_rooms_for_user(
+                    last_membership_change_after_to_token
+                )
             # 1b) Remove rooms that the user joined (hasn't left) after the `to_token`
             #
             # For example, if the last membership event after the `to_token` is a "join"
@@ -426,7 +466,7 @@ class SlidingSyncHandler:
                 was_last_membership_already_included
                 and not should_prev_membership_be_included
             ):
-                sync_room_id_set.discard(room_id)
+                del sync_room_id_set[room_id]
 
         # 2) -----------------------------------------------------
         # We fix-up newly_left rooms after the first fixup because it may have removed
@@ -461,25 +501,32 @@ class SlidingSyncHandler:
             # include newly_left rooms because the last event that the user should see
             # is their own leave event
             if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
-                sync_room_id_set.add(room_id)
+                sync_room_id_set[room_id] = convert_event_to_rooms_for_user(
+                    last_membership_change_in_from_to_range
+                )
 
         return sync_room_id_set
 
     async def filter_rooms(
         self,
         user: UserID,
-        room_id_set: AbstractSet[str],
+        sync_room_map: Dict[str, RoomsForUser],
         filters: SlidingSyncConfig.SlidingSyncList.Filters,
         to_token: StreamToken,
-    ) -> AbstractSet[str]:
+    ) -> Dict[str, RoomsForUser]:
         """
         Filter rooms based on the sync request.
 
         Args:
             user: User to filter rooms for
-            room_id_set: Set of room IDs to filter down
+            sync_room_map: Dictionary of room IDs to sort along with membership
+                information in the room at the time of `to_token`.
             filters: Filters to apply
             to_token: We filter based on the state of the room at this token
+
+        Returns:
+            A filtered dictionary of room IDs along with membership information in the
+            room at the time of `to_token`.
         """
         user_id = user.to_string()
 
@@ -488,7 +535,7 @@ class SlidingSyncHandler:
         # TODO: Exclude partially stated rooms unless the `required_state` has
         # `["m.room.member", "$LAZY"]`
 
-        filtered_room_id_set = set(room_id_set)
+        filtered_room_id_set = set(sync_room_map.keys())
 
         # Filter for Direct-Message (DM) rooms
         if filters.is_dm is not None:
@@ -544,4 +591,57 @@ class SlidingSyncHandler:
         if filters.not_tags:
             raise NotImplementedError()
 
-        return filtered_room_id_set
+        # Assemble a new sync room map but only with the `filtered_room_id_set`
+        return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set}
+
+    async def sort_rooms(
+        self,
+        sync_room_map: Dict[str, RoomsForUser],
+        to_token: StreamToken,
+    ) -> List[Tuple[str, RoomsForUser]]:
+        """
+        Sort by `stream_ordering` of the last event that the user should see in the
+        room. `stream_ordering` is unique so we get a stable sort.
+
+        Args:
+            sync_room_map: Dictionary of room IDs to sort along with membership
+                information in the room at the time of `to_token`.
+            to_token: We sort based on the events in the room at this token (<= `to_token`)
+
+        Returns:
+            A sorted list of room IDs by `stream_ordering` along with membership information.
+        """
+
+        # Assemble a map of room ID to the `stream_ordering` of the last activity that the
+        # user should see in the room (<= `to_token`)
+        last_activity_in_room_map: Dict[str, int] = {}
+        for room_id, room_for_user in sync_room_map.items():
+            # If they are fully-joined to the room, let's find the latest activity
+            # at/before the `to_token`.
+            if room_for_user.membership == Membership.JOIN:
+                last_event_result = (
+                    await self.store.get_last_event_pos_in_room_before_stream_ordering(
+                        room_id, to_token.room_key
+                    )
+                )
+
+                # If the room has no events at/before the `to_token`, this is probably a
+                # mistake in the code that generates the `sync_room_map` since that should
+                # only give us rooms that the user had membership in during the token range.
+                assert last_event_result is not None
+
+                _, event_pos = last_event_result
+
+                last_activity_in_room_map[room_id] = event_pos.stream
+            else:
+                # Otherwise, if the user has left/been invited/knocked/been banned from
+                # a room, they shouldn't see anything past that point.
+                last_activity_in_room_map[room_id] = room_for_user.event_pos.stream
+
+        return sorted(
+            sync_room_map.items(),
+            # Sort by the last activity (stream_ordering) in the room
+            key=lambda room_info: last_activity_in_room_map[room_info[0]],
+            # We want descending order
+            reverse=True,
+        )
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index f1c69d9893..0a40d62c6a 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1036,9 +1036,11 @@ class SyncHandler:
         # FIXME: This gets the state at the latest event before the stream ordering,
         # which might not be the same as the "current state" of the room at the time
         # of the stream token if there were multiple forward extremities at the time.
-        last_event_id = await self.store.get_last_event_in_room_before_stream_ordering(
-            room_id,
-            end_token=stream_position.room_key,
+        last_event_id = (
+            await self.store.get_last_event_id_in_room_before_stream_ordering(
+                room_id,
+                end_token=stream_position.room_key,
+            )
         )
 
         if last_event_id:
@@ -1519,7 +1521,7 @@ class SyncHandler:
             # We need to make sure the first event in our batch points to the
             # last event in the previous batch.
             last_event_id_prev_batch = (
-                await self.store.get_last_event_in_room_before_stream_ordering(
+                await self.store.get_last_event_id_in_room_before_stream_ordering(
                     room_id,
                     end_token=since_token.room_key,
                 )
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 61373f0bfb..ff0d723684 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -895,7 +895,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             "get_room_event_before_stream_ordering", _f
         )
 
-    async def get_last_event_in_room_before_stream_ordering(
+    async def get_last_event_id_in_room_before_stream_ordering(
         self,
         room_id: str,
         end_token: RoomStreamToken,
@@ -910,10 +910,38 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             The ID of the most recent event, or None if there are no events in the room
             before this stream ordering.
         """
+        last_event_result = (
+            await self.get_last_event_pos_in_room_before_stream_ordering(
+                room_id, end_token
+            )
+        )
+
+        if last_event_result:
+            return last_event_result[0]
+
+        return None
+
+    async def get_last_event_pos_in_room_before_stream_ordering(
+        self,
+        room_id: str,
+        end_token: RoomStreamToken,
+    ) -> Optional[Tuple[str, PersistedEventPosition]]:
+        """
+        Returns the ID and event position of the last event in a room at or before a
+        stream ordering.
+
+        Args:
+            room_id
+            end_token: The token used to stream from
+
+        Returns:
+            The ID of the most recent event and it's position, or None if there are no
+            events in the room before this stream ordering.
+        """
 
-        def get_last_event_in_room_before_stream_ordering_txn(
+        def get_last_event_pos_in_room_before_stream_ordering_txn(
             txn: LoggingTransaction,
-        ) -> Optional[str]:
+        ) -> Optional[Tuple[str, PersistedEventPosition]]:
             # We're looking for the closest event at or before the token. We need to
             # handle the fact that the stream token can be a vector clock (with an
             # `instance_map`) and events can be persisted on different instances
@@ -975,13 +1003,15 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                     topological_ordering=topological_ordering,
                     stream_ordering=stream_ordering,
                 ):
-                    return event_id
+                    return event_id, PersistedEventPosition(
+                        instance_name, stream_ordering
+                    )
 
             return None
 
         return await self.db_pool.runInteraction(
-            "get_last_event_in_room_before_stream_ordering",
-            get_last_event_in_room_before_stream_ordering_txn,
+            "get_last_event_pos_in_room_before_stream_ordering",
+            get_last_event_pos_in_room_before_stream_ordering_txn,
         )
 
     async def get_current_room_stream_token_for_room_id(
diff --git a/synapse/types/rest/client/__init__.py b/synapse/types/rest/client/__init__.py
index ec83d0daa6..e2c79c4106 100644
--- a/synapse/types/rest/client/__init__.py
+++ b/synapse/types/rest/client/__init__.py
@@ -175,22 +175,8 @@ class SlidingSyncBody(RequestBodyModel):
             ranges: Sliding window ranges. If this field is missing, no sliding window
                 is used and all rooms are returned in this list. Integers are
                 *inclusive*.
-            sort: How the list should be sorted on the server. The first value is
-                applied first, then tiebreaks are performed with each subsequent sort
-                listed.
-
-                    FIXME: Furthermore, it's not currently defined how servers should behave
-                    if they encounter a filter or sort operation they do not recognise. If
-                    the server rejects the request with an HTTP 400 then that will break
-                    backwards compatibility with new clients vs old servers. However, the
-                    client would be otherwise unaware that only some of the sort/filter
-                    operations have taken effect. We may need to include a "warnings"
-                    section to indicate which sort/filter operations are unrecognised,
-                    allowing for some form of graceful degradation of service.
-                    -- https://github.com/matrix-org/matrix-spec-proposals/blob/kegan/sync-v3/proposals/3575-sync.md#filter-and-sort-extensions
-
             slow_get_all_rooms: Just get all rooms (for clients that don't want to deal with
-                sliding windows). When true, the `ranges` and `sort` fields are ignored.
+                sliding windows). When true, the `ranges` field is ignored.
             required_state: Required state for each room returned. An array of event
                 type and state key tuples. Elements in this array are ORd together to
                 produce the final set of state events to return.
@@ -229,12 +215,6 @@ class SlidingSyncBody(RequestBodyModel):
                 `user_id` and optionally `avatar_url` and `displayname`) for the users used
                 to calculate the room name.
             filters: Filters to apply to the list before sorting.
-            bump_event_types: Allowlist of event types which should be considered recent activity
-                when sorting `by_recency`. By omitting event types from this field,
-                clients can ensure that uninteresting events (e.g. a profile rename) do
-                not cause a room to jump to the top of its list(s). Empty or omitted
-                `bump_event_types` have no effect—all events in a room will be
-                considered recent activity.
         """
 
         class Filters(RequestBodyModel):
@@ -300,11 +280,9 @@ class SlidingSyncBody(RequestBodyModel):
             ranges: Optional[List[Tuple[int, int]]] = None
         else:
             ranges: Optional[List[Tuple[conint(ge=0, strict=True), conint(ge=0, strict=True)]]] = None  # type: ignore[valid-type]
-        sort: Optional[List[StrictStr]] = None
         slow_get_all_rooms: Optional[StrictBool] = False
         include_heroes: Optional[StrictBool] = False
         filters: Optional[Filters] = None
-        bump_event_types: Optional[List[StrictStr]] = None
 
     class RoomSubscription(CommonRoomParameters):
         pass