diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index ec35784c5f..b44e862493 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -197,8 +197,14 @@ class AdminHandler:
# events that we have and then filtering, this isn't the most
# efficient method perhaps but it does guarantee we get everything.
while True:
- events, _ = await self._store.paginate_room_events(
- room_id, from_key, to_key, limit=100, direction=Direction.FORWARDS
+ events, _ = (
+ await self._store.paginate_room_events_by_topological_ordering(
+ room_id=room_id,
+ from_key=from_key,
+ to_key=to_key,
+ limit=100,
+ direction=Direction.FORWARDS,
+ )
)
if not events:
break
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 872c85fbad..6fd7afa280 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -507,13 +507,15 @@ class PaginationHandler:
# Initially fetch the events from the database. With any luck, we can return
# these without blocking on backfill (handled below).
- events, next_key = await self.store.paginate_room_events(
- room_id=room_id,
- from_key=from_token.room_key,
- to_key=to_room_key,
- direction=pagin_config.direction,
- limit=pagin_config.limit,
- event_filter=event_filter,
+ events, next_key = (
+ await self.store.paginate_room_events_by_topological_ordering(
+ room_id=room_id,
+ from_key=from_token.room_key,
+ to_key=to_room_key,
+ direction=pagin_config.direction,
+ limit=pagin_config.limit,
+ event_filter=event_filter,
+ )
)
if pagin_config.direction == Direction.BACKWARDS:
@@ -582,13 +584,15 @@ class PaginationHandler:
# If we did backfill something, refetch the events from the database to
# catch anything new that might have been added since we last fetched.
if did_backfill:
- events, next_key = await self.store.paginate_room_events(
- room_id=room_id,
- from_key=from_token.room_key,
- to_key=to_room_key,
- direction=pagin_config.direction,
- limit=pagin_config.limit,
- event_filter=event_filter,
+ events, next_key = (
+ await self.store.paginate_room_events_by_topological_ordering(
+ room_id=room_id,
+ from_key=from_token.room_key,
+ to_key=to_room_key,
+ direction=pagin_config.direction,
+ limit=pagin_config.limit,
+ event_filter=event_filter,
+ )
)
else:
# Otherwise, we can backfill in the background for eventual
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 262d9f4044..2c6e672ede 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -1750,7 +1750,7 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
from_key=from_key,
to_key=to_key,
limit=limit or 10,
- order="ASC",
+ direction=Direction.FORWARDS,
)
events = list(room_events)
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 1db96ad41c..0fe66c8bd2 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -64,7 +64,10 @@ from synapse.storage.databases.main.state import (
ROOM_UNKNOWN_SENTINEL,
Sentinel as StateSentinel,
)
-from synapse.storage.databases.main.stream import CurrentStateDeltaMembership
+from synapse.storage.databases.main.stream import (
+ CurrentStateDeltaMembership,
+ PaginateFunction,
+)
from synapse.storage.roommember import MemberSummary
from synapse.types import (
DeviceListUpdates,
@@ -1863,10 +1866,13 @@ class SlidingSyncHandler:
# We should return historical messages (before token range) in the
# following cases because we want clients to be able to show a basic
# screen of information:
+ #
# - Initial sync (because no `from_token` to limit us anyway)
# - When users `newly_joined`
# - For an incremental sync where we haven't sent it down this
# connection before
+ #
+ # Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917
from_bound = None
initial = True
if from_token and not room_membership_for_user_at_to_token.newly_joined:
@@ -1927,7 +1933,36 @@ class SlidingSyncHandler:
room_membership_for_user_at_to_token.event_pos.to_room_stream_token()
)
- timeline_events, new_room_key = await self.store.paginate_room_events(
+ # For initial `/sync` (and other historical scenarios mentioned above), we
+ # want to view a historical section of the timeline; to fetch events by
+ # `topological_ordering` (best representation of the room DAG as others were
+ # seeing it at the time). This also aligns with the order that `/messages`
+ # returns events in.
+ #
+ # For incremental `/sync`, we want to get all updates for rooms since
+ # the last `/sync` (regardless if those updates arrived late or happened
+ # a while ago in the past); to fetch events by `stream_ordering` (in the
+ # order they were received by the server).
+ #
+ # Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917
+ #
+ # FIXME: Using workaround for mypy,
+ # https://github.com/python/mypy/issues/10740#issuecomment-1997047277 and
+ # https://github.com/python/mypy/issues/17479
+ paginate_room_events_by_topological_ordering: PaginateFunction = (
+ self.store.paginate_room_events_by_topological_ordering
+ )
+ paginate_room_events_by_stream_ordering: PaginateFunction = (
+ self.store.paginate_room_events_by_stream_ordering
+ )
+ pagination_method: PaginateFunction = (
+ # Use `topographical_ordering` for historical events
+ paginate_room_events_by_topological_ordering
+ if from_bound is None
+ # Use `stream_ordering` for updates
+ else paginate_room_events_by_stream_ordering
+ )
+ timeline_events, new_room_key = await pagination_method(
room_id=room_id,
# The bounds are reversed so we can paginate backwards
# (from newer to older events) starting at to_bound.
@@ -1938,7 +1973,6 @@ class SlidingSyncHandler:
# We add one so we can determine if there are enough events to saturate
# the limit or not (see `limited`)
limit=room_sync_config.timeline_limit + 1,
- event_filter=None,
)
# We want to return the events in ascending order (the last event is the
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index ede014180c..6af2eeb75f 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -43,6 +43,7 @@ from prometheus_client import Counter
from synapse.api.constants import (
AccountDataTypes,
+ Direction,
EventContentFields,
EventTypes,
JoinRules,
@@ -64,6 +65,7 @@ from synapse.logging.opentracing import (
)
from synapse.storage.databases.main.event_push_actions import RoomNotifCounts
from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
+from synapse.storage.databases.main.stream import PaginateFunction
from synapse.storage.roommember import MemberSummary
from synapse.types import (
DeviceListUpdates,
@@ -879,22 +881,49 @@ class SyncHandler:
since_key = since_token.room_key
while limited and len(recents) < timeline_limit and max_repeat:
- # If we have a since_key then we are trying to get any events
- # that have happened since `since_key` up to `end_key`, so we
- # can just use `get_room_events_stream_for_room`.
- # Otherwise, we want to return the last N events in the room
- # in topological ordering.
- if since_key:
- events, end_key = await self.store.get_room_events_stream_for_room(
- room_id,
- limit=load_limit + 1,
- from_key=since_key,
- to_key=end_key,
- )
- else:
- events, end_key = await self.store.get_recent_events_for_room(
- room_id, limit=load_limit + 1, end_token=end_key
- )
+ # For initial `/sync`, we want to view a historical section of the
+ # timeline; to fetch events by `topological_ordering` (best
+ # representation of the room DAG as others were seeing it at the time).
+ # This also aligns with the order that `/messages` returns events in.
+ #
+ # For incremental `/sync`, we want to get all updates for rooms since
+ # the last `/sync` (regardless if those updates arrived late or happened
+ # a while ago in the past); to fetch events by `stream_ordering` (in the
+ # order they were received by the server).
+ #
+ # Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917
+ #
+ # FIXME: Using workaround for mypy,
+ # https://github.com/python/mypy/issues/10740#issuecomment-1997047277 and
+ # https://github.com/python/mypy/issues/17479
+ paginate_room_events_by_topological_ordering: PaginateFunction = (
+ self.store.paginate_room_events_by_topological_ordering
+ )
+ paginate_room_events_by_stream_ordering: PaginateFunction = (
+ self.store.paginate_room_events_by_stream_ordering
+ )
+ pagination_method: PaginateFunction = (
+ # Use `topographical_ordering` for historical events
+ paginate_room_events_by_topological_ordering
+ if since_key is None
+ # Use `stream_ordering` for updates
+ else paginate_room_events_by_stream_ordering
+ )
+ events, end_key = await pagination_method(
+ room_id=room_id,
+ # The bounds are reversed so we can paginate backwards
+ # (from newer to older events) starting at to_bound.
+ # This ensures we fill the `limit` with the newest events first,
+ from_key=end_key,
+ to_key=since_key,
+ direction=Direction.BACKWARDS,
+ # We add one so we can determine if there are enough events to saturate
+ # the limit or not (see `limited`)
+ limit=load_limit + 1,
+ )
+ # We want to return the events in ascending order (the last event is the
+ # most recent).
+ events.reverse()
log_kv({"loaded_recents": len(events)})
@@ -2641,9 +2670,10 @@ class SyncHandler:
# a "gap" in the timeline, as described by the spec for /sync.
room_to_events = await self.store.get_room_events_stream_for_rooms(
room_ids=sync_result_builder.joined_room_ids,
- from_key=since_token.room_key,
- to_key=now_token.room_key,
+ from_key=now_token.room_key,
+ to_key=since_token.room_key,
limit=timeline_limit + 1,
+ direction=Direction.BACKWARDS,
)
# We loop through all room ids, even if there are no new events, in case
@@ -2654,6 +2684,9 @@ class SyncHandler:
newly_joined = room_id in newly_joined_rooms
if room_entry:
events, start_key = room_entry
+ # We want to return the events in ascending order (the last event is the
+ # most recent).
+ events.reverse()
prev_batch_token = now_token.copy_and_replace(
StreamKeyType.ROOM, start_key
|