summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorEric Eastwood <eric.eastwood@beta.gouv.fr>2024-06-17 11:27:14 -0500
committerGitHub <noreply@github.com>2024-06-17 11:27:14 -0500
commite5b8a3e37f10168953124282c296821b9d9d81ad (patch)
tree0f3bea2d5067d6a5bd5be08c23fb5ad159a09577 /synapse/handlers
parentMerge branch 'release-v1.109' into develop (diff)
downloadsynapse-e5b8a3e37f10168953124282c296821b9d9d81ad.tar.xz
Add `stream_ordering` sort to Sliding Sync `/sync` (#17293)
Sort is no longer configurable and we always sort rooms by the `stream_ordering` of the last event in the room or the point where the user can see up to in cases of leave/ban/invite/knock.
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/message.py2
-rw-r--r--synapse/handlers/sliding_sync.py158
-rw-r--r--synapse/handlers/sync.py10
3 files changed, 136 insertions, 34 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 721ef04f41..16d01efc67 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -201,7 +201,7 @@ class MessageHandler:
 
         if at_token:
             last_event_id = (
-                await self.store.get_last_event_in_room_before_stream_ordering(
+                await self.store.get_last_event_id_in_room_before_stream_ordering(
                     room_id,
                     end_token=at_token.room_key,
                 )
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 78fb66d6e2..b84cf67f7d 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -18,13 +18,20 @@
 #
 #
 import logging
-from typing import TYPE_CHECKING, AbstractSet, Dict, List, Optional
+from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
 
 from immutabledict import immutabledict
 
 from synapse.api.constants import AccountDataTypes, Membership
 from synapse.events import EventBase
-from synapse.types import Requester, RoomStreamToken, StreamToken, UserID
+from synapse.storage.roommember import RoomsForUser
+from synapse.types import (
+    PersistedEventPosition,
+    Requester,
+    RoomStreamToken,
+    StreamToken,
+    UserID,
+)
 from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult
 
 if TYPE_CHECKING:
@@ -33,6 +40,27 @@ if TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 
+def convert_event_to_rooms_for_user(event: EventBase) -> RoomsForUser:
+    """
+    Quick helper to convert an event to a `RoomsForUser` object.
+    """
+    # These fields should be present for all persisted events
+    assert event.internal_metadata.stream_ordering is not None
+    assert event.internal_metadata.instance_name is not None
+
+    return RoomsForUser(
+        room_id=event.room_id,
+        sender=event.sender,
+        membership=event.membership,
+        event_id=event.event_id,
+        event_pos=PersistedEventPosition(
+            event.internal_metadata.instance_name,
+            event.internal_metadata.stream_ordering,
+        ),
+        room_version_id=event.room_version.identifier,
+    )
+
+
 def filter_membership_for_sync(*, membership: str, user_id: str, sender: str) -> bool:
     """
     Returns True if the membership event should be included in the sync response,
@@ -169,26 +197,28 @@ class SlidingSyncHandler:
             # See https://github.com/matrix-org/matrix-doc/issues/1144
             raise NotImplementedError()
 
-        # Get all of the room IDs that the user should be able to see in the sync
-        # response
-        room_id_set = await self.get_sync_room_ids_for_user(
-            sync_config.user,
-            from_token=from_token,
-            to_token=to_token,
-        )
-
         # Assemble sliding window lists
         lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
         if sync_config.lists:
+            # Get all of the room IDs that the user should be able to see in the sync
+            # response
+            sync_room_map = await self.get_sync_room_ids_for_user(
+                sync_config.user,
+                from_token=from_token,
+                to_token=to_token,
+            )
+
             for list_key, list_config in sync_config.lists.items():
                 # Apply filters
-                filtered_room_ids = room_id_set
+                filtered_sync_room_map = sync_room_map
                 if list_config.filters is not None:
-                    filtered_room_ids = await self.filter_rooms(
-                        sync_config.user, room_id_set, list_config.filters, to_token
+                    filtered_sync_room_map = await self.filter_rooms(
+                        sync_config.user, sync_room_map, list_config.filters, to_token
                     )
-                # TODO: Apply sorts
-                sorted_room_ids = sorted(filtered_room_ids)
+
+                sorted_room_info = await self.sort_rooms(
+                    filtered_sync_room_map, to_token
+                )
 
                 ops: List[SlidingSyncResult.SlidingWindowList.Operation] = []
                 if list_config.ranges:
@@ -197,12 +227,17 @@ class SlidingSyncHandler:
                             SlidingSyncResult.SlidingWindowList.Operation(
                                 op=OperationType.SYNC,
                                 range=range,
-                                room_ids=sorted_room_ids[range[0] : range[1]],
+                                room_ids=[
+                                    room_id
+                                    for room_id, _ in sorted_room_info[
+                                        range[0] : range[1]
+                                    ]
+                                ],
                             )
                         )
 
                 lists[list_key] = SlidingSyncResult.SlidingWindowList(
-                    count=len(sorted_room_ids),
+                    count=len(sorted_room_info),
                     ops=ops,
                 )
 
@@ -219,7 +254,7 @@ class SlidingSyncHandler:
         user: UserID,
         to_token: StreamToken,
         from_token: Optional[StreamToken] = None,
-    ) -> AbstractSet[str]:
+    ) -> Dict[str, RoomsForUser]:
         """
         Fetch room IDs that should be listed for this user in the sync response (the
         full room list that will be filtered, sorted, and sliced).
@@ -237,11 +272,14 @@ class SlidingSyncHandler:
           to tell when a room was forgotten at the moment so we can't factor it into the
           from/to range.
 
-
         Args:
             user: User to fetch rooms for
             to_token: The token to fetch rooms up to.
             from_token: The point in the stream to sync from.
+
+        Returns:
+            A dictionary of room IDs that should be listed in the sync response along
+            with membership information in that room at the time of `to_token`.
         """
         user_id = user.to_string()
 
@@ -261,11 +299,11 @@ class SlidingSyncHandler:
 
         # If the user has never joined any rooms before, we can just return an empty list
         if not room_for_user_list:
-            return set()
+            return {}
 
         # Our working list of rooms that can show up in the sync response
         sync_room_id_set = {
-            room_for_user.room_id
+            room_for_user.room_id: room_for_user
             for room_for_user in room_for_user_list
             if filter_membership_for_sync(
                 membership=room_for_user.membership,
@@ -415,7 +453,9 @@ class SlidingSyncHandler:
                 not was_last_membership_already_included
                 and should_prev_membership_be_included
             ):
-                sync_room_id_set.add(room_id)
+                sync_room_id_set[room_id] = convert_event_to_rooms_for_user(
+                    last_membership_change_after_to_token
+                )
             # 1b) Remove rooms that the user joined (hasn't left) after the `to_token`
             #
             # For example, if the last membership event after the `to_token` is a "join"
@@ -426,7 +466,7 @@ class SlidingSyncHandler:
                 was_last_membership_already_included
                 and not should_prev_membership_be_included
             ):
-                sync_room_id_set.discard(room_id)
+                del sync_room_id_set[room_id]
 
         # 2) -----------------------------------------------------
         # We fix-up newly_left rooms after the first fixup because it may have removed
@@ -461,25 +501,32 @@ class SlidingSyncHandler:
             # include newly_left rooms because the last event that the user should see
             # is their own leave event
             if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
-                sync_room_id_set.add(room_id)
+                sync_room_id_set[room_id] = convert_event_to_rooms_for_user(
+                    last_membership_change_in_from_to_range
+                )
 
         return sync_room_id_set
 
     async def filter_rooms(
         self,
         user: UserID,
-        room_id_set: AbstractSet[str],
+        sync_room_map: Dict[str, RoomsForUser],
         filters: SlidingSyncConfig.SlidingSyncList.Filters,
         to_token: StreamToken,
-    ) -> AbstractSet[str]:
+    ) -> Dict[str, RoomsForUser]:
         """
         Filter rooms based on the sync request.
 
         Args:
             user: User to filter rooms for
-            room_id_set: Set of room IDs to filter down
+            sync_room_map: Dictionary of room IDs to sort along with membership
+                information in the room at the time of `to_token`.
             filters: Filters to apply
             to_token: We filter based on the state of the room at this token
+
+        Returns:
+            A filtered dictionary of room IDs along with membership information in the
+            room at the time of `to_token`.
         """
         user_id = user.to_string()
 
@@ -488,7 +535,7 @@ class SlidingSyncHandler:
         # TODO: Exclude partially stated rooms unless the `required_state` has
         # `["m.room.member", "$LAZY"]`
 
-        filtered_room_id_set = set(room_id_set)
+        filtered_room_id_set = set(sync_room_map.keys())
 
         # Filter for Direct-Message (DM) rooms
         if filters.is_dm is not None:
@@ -544,4 +591,57 @@ class SlidingSyncHandler:
         if filters.not_tags:
             raise NotImplementedError()
 
-        return filtered_room_id_set
+        # Assemble a new sync room map but only with the `filtered_room_id_set`
+        return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set}
+
+    async def sort_rooms(
+        self,
+        sync_room_map: Dict[str, RoomsForUser],
+        to_token: StreamToken,
+    ) -> List[Tuple[str, RoomsForUser]]:
+        """
+        Sort by `stream_ordering` of the last event that the user should see in the
+        room. `stream_ordering` is unique so we get a stable sort.
+
+        Args:
+            sync_room_map: Dictionary of room IDs to sort along with membership
+                information in the room at the time of `to_token`.
+            to_token: We sort based on the events in the room at this token (<= `to_token`)
+
+        Returns:
+            A sorted list of room IDs by `stream_ordering` along with membership information.
+        """
+
+        # Assemble a map of room ID to the `stream_ordering` of the last activity that the
+        # user should see in the room (<= `to_token`)
+        last_activity_in_room_map: Dict[str, int] = {}
+        for room_id, room_for_user in sync_room_map.items():
+            # If they are fully-joined to the room, let's find the latest activity
+            # at/before the `to_token`.
+            if room_for_user.membership == Membership.JOIN:
+                last_event_result = (
+                    await self.store.get_last_event_pos_in_room_before_stream_ordering(
+                        room_id, to_token.room_key
+                    )
+                )
+
+                # If the room has no events at/before the `to_token`, this is probably a
+                # mistake in the code that generates the `sync_room_map` since that should
+                # only give us rooms that the user had membership in during the token range.
+                assert last_event_result is not None
+
+                _, event_pos = last_event_result
+
+                last_activity_in_room_map[room_id] = event_pos.stream
+            else:
+                # Otherwise, if the user has left/been invited/knocked/been banned from
+                # a room, they shouldn't see anything past that point.
+                last_activity_in_room_map[room_id] = room_for_user.event_pos.stream
+
+        return sorted(
+            sync_room_map.items(),
+            # Sort by the last activity (stream_ordering) in the room
+            key=lambda room_info: last_activity_in_room_map[room_info[0]],
+            # We want descending order
+            reverse=True,
+        )
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index f1c69d9893..0a40d62c6a 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1036,9 +1036,11 @@ class SyncHandler:
         # FIXME: This gets the state at the latest event before the stream ordering,
         # which might not be the same as the "current state" of the room at the time
         # of the stream token if there were multiple forward extremities at the time.
-        last_event_id = await self.store.get_last_event_in_room_before_stream_ordering(
-            room_id,
-            end_token=stream_position.room_key,
+        last_event_id = (
+            await self.store.get_last_event_id_in_room_before_stream_ordering(
+                room_id,
+                end_token=stream_position.room_key,
+            )
         )
 
         if last_event_id:
@@ -1519,7 +1521,7 @@ class SyncHandler:
             # We need to make sure the first event in our batch points to the
             # last event in the previous batch.
             last_event_id_prev_batch = (
-                await self.store.get_last_event_in_room_before_stream_ordering(
+                await self.store.get_last_event_id_in_room_before_stream_ordering(
                     room_id,
                     end_token=since_token.room_key,
                 )