summary refs log tree commit diff
path: root/synapse/handlers/sync.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/sync.py')
-rw-r--r--synapse/handlers/sync.py69
1 files changed, 51 insertions, 18 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index ede014180c..6af2eeb75f 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -43,6 +43,7 @@ from prometheus_client import Counter
 
 from synapse.api.constants import (
     AccountDataTypes,
+    Direction,
     EventContentFields,
     EventTypes,
     JoinRules,
@@ -64,6 +65,7 @@ from synapse.logging.opentracing import (
 )
 from synapse.storage.databases.main.event_push_actions import RoomNotifCounts
 from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
+from synapse.storage.databases.main.stream import PaginateFunction
 from synapse.storage.roommember import MemberSummary
 from synapse.types import (
     DeviceListUpdates,
@@ -879,22 +881,49 @@ class SyncHandler:
                 since_key = since_token.room_key
 
             while limited and len(recents) < timeline_limit and max_repeat:
-                # If we have a since_key then we are trying to get any events
-                # that have happened since `since_key` up to `end_key`, so we
-                # can just use `get_room_events_stream_for_room`.
-                # Otherwise, we want to return the last N events in the room
-                # in topological ordering.
-                if since_key:
-                    events, end_key = await self.store.get_room_events_stream_for_room(
-                        room_id,
-                        limit=load_limit + 1,
-                        from_key=since_key,
-                        to_key=end_key,
-                    )
-                else:
-                    events, end_key = await self.store.get_recent_events_for_room(
-                        room_id, limit=load_limit + 1, end_token=end_key
-                    )
+                # For initial `/sync`, we want to view a historical section of the
+                # timeline; to fetch events by `topological_ordering` (best
+                # representation of the room DAG as others were seeing it at the time).
+                # This also aligns with the order that `/messages` returns events in.
+                #
+                # For incremental `/sync`, we want to get all updates for rooms since
+                # the last `/sync` (regardless if those updates arrived late or happened
+                # a while ago in the past); to fetch events by `stream_ordering` (in the
+                # order they were received by the server).
+                #
+                # Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917
+                #
+                # FIXME: Using workaround for mypy,
+                # https://github.com/python/mypy/issues/10740#issuecomment-1997047277 and
+                # https://github.com/python/mypy/issues/17479
+                paginate_room_events_by_topological_ordering: PaginateFunction = (
+                    self.store.paginate_room_events_by_topological_ordering
+                )
+                paginate_room_events_by_stream_ordering: PaginateFunction = (
+                    self.store.paginate_room_events_by_stream_ordering
+                )
+                pagination_method: PaginateFunction = (
+                    # Use `topographical_ordering` for historical events
+                    paginate_room_events_by_topological_ordering
+                    if since_key is None
+                    # Use `stream_ordering` for updates
+                    else paginate_room_events_by_stream_ordering
+                )
+                events, end_key = await pagination_method(
+                    room_id=room_id,
+                    # The bounds are reversed so we can paginate backwards
+                    # (from newer to older events) starting at to_bound.
+                    # This ensures we fill the `limit` with the newest events first,
+                    from_key=end_key,
+                    to_key=since_key,
+                    direction=Direction.BACKWARDS,
+                    # We add one so we can determine if there are enough events to saturate
+                    # the limit or not (see `limited`)
+                    limit=load_limit + 1,
+                )
+                # We want to return the events in ascending order (the last event is the
+                # most recent).
+                events.reverse()
 
                 log_kv({"loaded_recents": len(events)})
 
@@ -2641,9 +2670,10 @@ class SyncHandler:
         # a "gap" in the timeline, as described by the spec for /sync.
         room_to_events = await self.store.get_room_events_stream_for_rooms(
             room_ids=sync_result_builder.joined_room_ids,
-            from_key=since_token.room_key,
-            to_key=now_token.room_key,
+            from_key=now_token.room_key,
+            to_key=since_token.room_key,
             limit=timeline_limit + 1,
+            direction=Direction.BACKWARDS,
         )
 
         # We loop through all room ids, even if there are no new events, in case
@@ -2654,6 +2684,9 @@ class SyncHandler:
             newly_joined = room_id in newly_joined_rooms
             if room_entry:
                 events, start_key = room_entry
+                # We want to return the events in ascending order (the last event is the
+                # most recent).
+                events.reverse()
 
                 prev_batch_token = now_token.copy_and_replace(
                     StreamKeyType.ROOM, start_key