diff --git a/Cargo.lock b/Cargo.lock
index d50ce87d17..ce5520436d 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -67,9 +67,9 @@ checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c"
[[package]]
name = "bytes"
-version = "1.6.1"
+version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a12916984aab3fa6e39d655a33e09c0071eb36d6ab3aea5c2d78551f1df6d952"
+checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50"
[[package]]
name = "cfg-if"
@@ -444,9 +444,9 @@ dependencies = [
[[package]]
name = "regex"
-version = "1.10.5"
+version = "1.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b91213439dad192326a0d7c6ee3955910425f441d7038e0d6933b0aec5c4517f"
+checksum = "4219d74c6b67a3654a9fbebc4b419e22126d13d2f3c4a07ee0cb61ff79a79619"
dependencies = [
"aho-corasick",
"memchr",
diff --git a/changelog.d/17510.bugfix b/changelog.d/17510.bugfix
new file mode 100644
index 0000000000..3170c284bd
--- /dev/null
+++ b/changelog.d/17510.bugfix
@@ -0,0 +1 @@
+Fix timeline ordering (using `stream_ordering` instead of topological ordering) in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
diff --git a/changelog.d/17514.misc b/changelog.d/17514.misc
new file mode 100644
index 0000000000..fc3cc37915
--- /dev/null
+++ b/changelog.d/17514.misc
@@ -0,0 +1 @@
+Add more tracing to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
diff --git a/changelog.d/17515.doc b/changelog.d/17515.doc
new file mode 100644
index 0000000000..c2dbe24e9d
--- /dev/null
+++ b/changelog.d/17515.doc
@@ -0,0 +1,3 @@
+Clarify default behaviour of the
+[`auto_accept_invites.worker_to_run_on`](https://element-hq.github.io/synapse/develop/usage/configuration/config_documentation.html#auto-accept-invites)
+option.
\ No newline at end of file
diff --git a/changelog.d/17531.misc b/changelog.d/17531.misc
new file mode 100644
index 0000000000..25b7b36a72
--- /dev/null
+++ b/changelog.d/17531.misc
@@ -0,0 +1 @@
+Fixup comment in sliding sync implementation.
diff --git a/changelog.d/17535.bugfix b/changelog.d/17535.bugfix
new file mode 100644
index 0000000000..c5b5da0485
--- /dev/null
+++ b/changelog.d/17535.bugfix
@@ -0,0 +1 @@
+Fix experimental sliding sync implementation to remember any updates in rooms that were not sent down immediately.
diff --git a/docs/development/room-dag-concepts.md b/docs/development/room-dag-concepts.md
index 76709487f8..35b667831c 100644
--- a/docs/development/room-dag-concepts.md
+++ b/docs/development/room-dag-concepts.md
@@ -21,8 +21,10 @@ incrementing integer, but backfilled events start with `stream_ordering=-1` and
---
- - `/sync` returns things in the order they arrive at the server (`stream_ordering`).
- - `/messages` (and `/backfill` in the federation API) return them in the order determined by the event graph `(topological_ordering, stream_ordering)`.
+ - Incremental `/sync?since=xxx` returns things in the order they arrive at the server
+ (`stream_ordering`).
+ - Initial `/sync`, `/messages` (and `/backfill` in the federation API) return them in
+ the order determined by the event graph `(topological_ordering, stream_ordering)`.
The general idea is that, if you're following a room in real-time (i.e.
`/sync`), you probably want to see the messages as they arrive at your server,
diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md
index 40f64be856..567bbf88d2 100644
--- a/docs/usage/configuration/config_documentation.md
+++ b/docs/usage/configuration/config_documentation.md
@@ -4685,7 +4685,9 @@ This setting has the following sub-options:
* `only_for_direct_messages`: Whether invites should be automatically accepted for all room types, or only
for direct messages. Defaults to false.
* `only_from_local_users`: Whether to only automatically accept invites from users on this homeserver. Defaults to false.
-* `worker_to_run_on`: Which worker to run this module on. This must match the "worker_name".
+* `worker_to_run_on`: Which worker to run this module on. This must match
+ the "worker_name". If not set or `null`, invites will be accepted on the
+ main process.
NOTE: Care should be taken not to enable this setting if the `synapse_auto_accept_invite` module is enabled and installed.
The two modules will compete to perform the same task and may result in undesired behaviour. For example, multiple join
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index 99fc7eab54..e6efa7a424 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -128,7 +128,7 @@ class Codes(str, Enum):
# MSC2677
DUPLICATE_ANNOTATION = "M_DUPLICATE_ANNOTATION"
- # MSC3575 we are telling the client they need to reset their sliding sync
+ # MSC3575 we are telling the client they need to expire their sliding sync
# connection.
UNKNOWN_POS = "M_UNKNOWN_POS"
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index ec35784c5f..b44e862493 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -197,8 +197,14 @@ class AdminHandler:
# events that we have and then filtering, this isn't the most
# efficient method perhaps but it does guarantee we get everything.
while True:
- events, _ = await self._store.paginate_room_events(
- room_id, from_key, to_key, limit=100, direction=Direction.FORWARDS
+ events, _ = (
+ await self._store.paginate_room_events_by_topological_ordering(
+ room_id=room_id,
+ from_key=from_key,
+ to_key=to_key,
+ limit=100,
+ direction=Direction.FORWARDS,
+ )
)
if not events:
break
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 872c85fbad..6fd7afa280 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -507,13 +507,15 @@ class PaginationHandler:
# Initially fetch the events from the database. With any luck, we can return
# these without blocking on backfill (handled below).
- events, next_key = await self.store.paginate_room_events(
- room_id=room_id,
- from_key=from_token.room_key,
- to_key=to_room_key,
- direction=pagin_config.direction,
- limit=pagin_config.limit,
- event_filter=event_filter,
+ events, next_key = (
+ await self.store.paginate_room_events_by_topological_ordering(
+ room_id=room_id,
+ from_key=from_token.room_key,
+ to_key=to_room_key,
+ direction=pagin_config.direction,
+ limit=pagin_config.limit,
+ event_filter=event_filter,
+ )
)
if pagin_config.direction == Direction.BACKWARDS:
@@ -582,13 +584,15 @@ class PaginationHandler:
# If we did backfill something, refetch the events from the database to
# catch anything new that might have been added since we last fetched.
if did_backfill:
- events, next_key = await self.store.paginate_room_events(
- room_id=room_id,
- from_key=from_token.room_key,
- to_key=to_room_key,
- direction=pagin_config.direction,
- limit=pagin_config.limit,
- event_filter=event_filter,
+ events, next_key = (
+ await self.store.paginate_room_events_by_topological_ordering(
+ room_id=room_id,
+ from_key=from_token.room_key,
+ to_key=to_room_key,
+ direction=pagin_config.direction,
+ limit=pagin_config.limit,
+ event_filter=event_filter,
+ )
)
else:
# Otherwise, we can backfill in the background for eventual
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 262d9f4044..2c6e672ede 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -1750,7 +1750,7 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
from_key=from_key,
to_key=to_key,
limit=limit or 10,
- order="ASC",
+ direction=Direction.FORWARDS,
)
events = list(room_events)
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 1936471345..18a96843be 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -51,13 +51,23 @@ from synapse.api.errors import SlidingSyncUnknownPosition
from synapse.events import EventBase, StrippedStateEvent
from synapse.events.utils import parse_stripped_state_event, strip_event
from synapse.handlers.relations import BundledAggregations
-from synapse.logging.opentracing import log_kv, start_active_span, tag_args, trace
+from synapse.logging.opentracing import (
+ SynapseTags,
+ log_kv,
+ set_tag,
+ start_active_span,
+ tag_args,
+ trace,
+)
from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
from synapse.storage.databases.main.state import (
ROOM_UNKNOWN_SENTINEL,
Sentinel as StateSentinel,
)
-from synapse.storage.databases.main.stream import CurrentStateDeltaMembership
+from synapse.storage.databases.main.stream import (
+ CurrentStateDeltaMembership,
+ PaginateFunction,
+)
from synapse.storage.roommember import MemberSummary
from synapse.types import (
DeviceListUpdates,
@@ -533,126 +543,153 @@ class SlidingSyncHandler:
lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
# Keep track of the rooms that we can display and need to fetch more info about
relevant_room_map: Dict[str, RoomSyncConfig] = {}
+ # The set of room IDs of all rooms that could appear in any list. These
+ # include rooms that are outside the list ranges.
+ all_rooms: Set[str] = set()
if has_lists and sync_config.lists is not None:
- sync_room_map = await self.filter_rooms_relevant_for_sync(
- user=sync_config.user,
- room_membership_for_user_map=room_membership_for_user_map,
- )
+ with start_active_span("assemble_sliding_window_lists"):
+ sync_room_map = await self.filter_rooms_relevant_for_sync(
+ user=sync_config.user,
+ room_membership_for_user_map=room_membership_for_user_map,
+ )
+
+ for list_key, list_config in sync_config.lists.items():
+ # Apply filters
+ filtered_sync_room_map = sync_room_map
+ if list_config.filters is not None:
+ filtered_sync_room_map = await self.filter_rooms(
+ sync_config.user,
+ sync_room_map,
+ list_config.filters,
+ to_token,
+ )
- for list_key, list_config in sync_config.lists.items():
- # Apply filters
- filtered_sync_room_map = sync_room_map
- if list_config.filters is not None:
- filtered_sync_room_map = await self.filter_rooms(
- sync_config.user, sync_room_map, list_config.filters, to_token
+ # Find which rooms are partially stated and may need to be filtered out
+ # depending on the `required_state` requested (see below).
+ partial_state_room_map = (
+ await self.store.is_partial_state_room_batched(
+ filtered_sync_room_map.keys()
+ )
)
- # Sort the list
- sorted_room_info = await self.sort_rooms(
- filtered_sync_room_map, to_token
- )
+ # Since creating the `RoomSyncConfig` takes some work, let's just do it
+ # once and make a copy whenever we need it.
+ room_sync_config = RoomSyncConfig.from_room_config(list_config)
+ membership_state_keys = room_sync_config.required_state_map.get(
+ EventTypes.Member
+ )
+ # Also see `StateFilter.must_await_full_state(...)` for comparison
+ lazy_loading = (
+ membership_state_keys is not None
+ and StateValues.LAZY in membership_state_keys
+ )
- # Find which rooms are partially stated and may need to be filtered out
- # depending on the `required_state` requested (see below).
- partial_state_room_map = await self.store.is_partial_state_room_batched(
- filtered_sync_room_map.keys()
- )
+ if not lazy_loading:
+ # Exclude partially-stated rooms unless the `required_state`
+ # only has `["m.room.member", "$LAZY"]` for membership
+ # (lazy-loading room members).
+ filtered_sync_room_map = {
+ room_id: room
+ for room_id, room in filtered_sync_room_map.items()
+ if not partial_state_room_map.get(room_id)
+ }
- # Since creating the `RoomSyncConfig` takes some work, let's just do it
- # once and make a copy whenever we need it.
- room_sync_config = RoomSyncConfig.from_room_config(list_config)
- membership_state_keys = room_sync_config.required_state_map.get(
- EventTypes.Member
- )
- # Also see `StateFilter.must_await_full_state(...)` for comparison
- lazy_loading = (
- membership_state_keys is not None
- and StateValues.LAZY in membership_state_keys
- )
+ all_rooms.update(filtered_sync_room_map)
- ops: List[SlidingSyncResult.SlidingWindowList.Operation] = []
- if list_config.ranges:
- for range in list_config.ranges:
- room_ids_in_list: List[str] = []
-
- # We're going to loop through the sorted list of rooms starting
- # at the range start index and keep adding rooms until we fill
- # up the range or run out of rooms.
- #
- # Both sides of range are inclusive so we `+ 1`
- max_num_rooms = range[1] - range[0] + 1
- for room_membership in sorted_room_info[range[0] :]:
- room_id = room_membership.room_id
-
- if len(room_ids_in_list) >= max_num_rooms:
- break
-
- # Exclude partially-stated rooms unless the `required_state`
- # only has `["m.room.member", "$LAZY"]` for membership
- # (lazy-loading room members).
- if partial_state_room_map.get(room_id) and not lazy_loading:
- continue
-
- # Take the superset of the `RoomSyncConfig` for each room.
+ # Sort the list
+ sorted_room_info = await self.sort_rooms(
+ filtered_sync_room_map, to_token
+ )
+
+ ops: List[SlidingSyncResult.SlidingWindowList.Operation] = []
+ if list_config.ranges:
+ for range in list_config.ranges:
+ room_ids_in_list: List[str] = []
+
+ # We're going to loop through the sorted list of rooms starting
+ # at the range start index and keep adding rooms until we fill
+ # up the range or run out of rooms.
#
- # Update our `relevant_room_map` with the room we're going
- # to display and need to fetch more info about.
- existing_room_sync_config = relevant_room_map.get(room_id)
- if existing_room_sync_config is not None:
- existing_room_sync_config.combine_room_sync_config(
- room_sync_config
+ # Both sides of range are inclusive so we `+ 1`
+ max_num_rooms = range[1] - range[0] + 1
+ for room_membership in sorted_room_info[range[0] :]:
+ room_id = room_membership.room_id
+
+ if len(room_ids_in_list) >= max_num_rooms:
+ break
+
+ # Take the superset of the `RoomSyncConfig` for each room.
+ #
+ # Update our `relevant_room_map` with the room we're going
+ # to display and need to fetch more info about.
+ existing_room_sync_config = relevant_room_map.get(
+ room_id
)
- else:
- # Make a copy so if we modify it later, it doesn't
- # affect all references.
- relevant_room_map[room_id] = (
- room_sync_config.deep_copy()
+ if existing_room_sync_config is not None:
+ existing_room_sync_config.combine_room_sync_config(
+ room_sync_config
+ )
+ else:
+ # Make a copy so if we modify it later, it doesn't
+ # affect all references.
+ relevant_room_map[room_id] = (
+ room_sync_config.deep_copy()
+ )
+
+ room_ids_in_list.append(room_id)
+
+ ops.append(
+ SlidingSyncResult.SlidingWindowList.Operation(
+ op=OperationType.SYNC,
+ range=range,
+ room_ids=room_ids_in_list,
)
-
- room_ids_in_list.append(room_id)
-
- ops.append(
- SlidingSyncResult.SlidingWindowList.Operation(
- op=OperationType.SYNC,
- range=range,
- room_ids=room_ids_in_list,
)
- )
- lists[list_key] = SlidingSyncResult.SlidingWindowList(
- count=len(sorted_room_info),
- ops=ops,
- )
+ lists[list_key] = SlidingSyncResult.SlidingWindowList(
+ count=len(sorted_room_info),
+ ops=ops,
+ )
# Handle room subscriptions
if has_room_subscriptions and sync_config.room_subscriptions is not None:
- for room_id, room_subscription in sync_config.room_subscriptions.items():
- room_membership_for_user_at_to_token = (
- await self.check_room_subscription_allowed_for_user(
- room_id=room_id,
- room_membership_for_user_map=room_membership_for_user_map,
- to_token=to_token,
+ with start_active_span("assemble_room_subscriptions"):
+ for (
+ room_id,
+ room_subscription,
+ ) in sync_config.room_subscriptions.items():
+ room_membership_for_user_at_to_token = (
+ await self.check_room_subscription_allowed_for_user(
+ room_id=room_id,
+ room_membership_for_user_map=room_membership_for_user_map,
+ to_token=to_token,
+ )
)
- )
- # Skip this room if the user isn't allowed to see it
- if not room_membership_for_user_at_to_token:
- continue
+ # Skip this room if the user isn't allowed to see it
+ if not room_membership_for_user_at_to_token:
+ continue
- room_membership_for_user_map[room_id] = (
- room_membership_for_user_at_to_token
- )
+ all_rooms.add(room_id)
- # Take the superset of the `RoomSyncConfig` for each room.
- #
- # Update our `relevant_room_map` with the room we're going to display
- # and need to fetch more info about.
- room_sync_config = RoomSyncConfig.from_room_config(room_subscription)
- existing_room_sync_config = relevant_room_map.get(room_id)
- if existing_room_sync_config is not None:
- existing_room_sync_config.combine_room_sync_config(room_sync_config)
- else:
- relevant_room_map[room_id] = room_sync_config
+ room_membership_for_user_map[room_id] = (
+ room_membership_for_user_at_to_token
+ )
+
+ # Take the superset of the `RoomSyncConfig` for each room.
+ #
+ # Update our `relevant_room_map` with the room we're going to display
+ # and need to fetch more info about.
+ room_sync_config = RoomSyncConfig.from_room_config(
+ room_subscription
+ )
+ existing_room_sync_config = relevant_room_map.get(room_id)
+ if existing_room_sync_config is not None:
+ existing_room_sync_config.combine_room_sync_config(
+ room_sync_config
+ )
+ else:
+ relevant_room_map[room_id] = room_sync_config
# Fetch room data
rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
@@ -661,48 +698,49 @@ class SlidingSyncHandler:
# previously.
# Keep track of the rooms that we're going to display and need to fetch more info about
relevant_rooms_to_send_map = relevant_room_map
- if from_token:
- rooms_should_send = set()
-
- # First we check if there are rooms that match a list/room
- # subscription and have updates we need to send (i.e. either because
- # we haven't sent the room down, or we have but there are missing
- # updates).
- for room_id in relevant_room_map:
- status = await self.connection_store.have_sent_room(
- sync_config,
- from_token.connection_position,
- room_id,
- )
- if (
- # The room was never sent down before so the client needs to know
- # about it regardless of any updates.
- status.status == HaveSentRoomFlag.NEVER
- # `PREVIOUSLY` literally means the "room was sent down before *AND*
- # there are updates we haven't sent down" so we already know this
- # room has updates.
- or status.status == HaveSentRoomFlag.PREVIOUSLY
- ):
- rooms_should_send.add(room_id)
- elif status.status == HaveSentRoomFlag.LIVE:
- # We know that we've sent all updates up until `from_token`,
- # so we just need to check if there have been updates since
- # then.
- pass
- else:
- assert_never(status.status)
+ with start_active_span("filter_relevant_rooms_to_send"):
+ if from_token:
+ rooms_should_send = set()
+
+ # First we check if there are rooms that match a list/room
+ # subscription and have updates we need to send (i.e. either because
+ # we haven't sent the room down, or we have but there are missing
+ # updates).
+ for room_id in relevant_room_map:
+ status = await self.connection_store.have_sent_room(
+ sync_config,
+ from_token.connection_position,
+ room_id,
+ )
+ if (
+ # The room was never sent down before so the client needs to know
+ # about it regardless of any updates.
+ status.status == HaveSentRoomFlag.NEVER
+ # `PREVIOUSLY` literally means the "room was sent down before *AND*
+ # there are updates we haven't sent down" so we already know this
+ # room has updates.
+ or status.status == HaveSentRoomFlag.PREVIOUSLY
+ ):
+ rooms_should_send.add(room_id)
+ elif status.status == HaveSentRoomFlag.LIVE:
+ # We know that we've sent all updates up until `from_token`,
+ # so we just need to check if there have been updates since
+ # then.
+ pass
+ else:
+ assert_never(status.status)
- # We only need to check for new events since any state changes
- # will also come down as new events.
- rooms_that_have_updates = self.store.get_rooms_that_might_have_updates(
- relevant_room_map.keys(), from_token.stream_token.room_key
- )
- rooms_should_send.update(rooms_that_have_updates)
- relevant_rooms_to_send_map = {
- room_id: room_sync_config
- for room_id, room_sync_config in relevant_room_map.items()
- if room_id in rooms_should_send
- }
+ # We only need to check for new events since any state changes
+ # will also come down as new events.
+ rooms_that_have_updates = self.store.get_rooms_that_might_have_updates(
+ relevant_room_map.keys(), from_token.stream_token.room_key
+ )
+ rooms_should_send.update(rooms_that_have_updates)
+ relevant_rooms_to_send_map = {
+ room_id: room_sync_config
+ for room_id, room_sync_config in relevant_room_map.items()
+ if room_id in rooms_should_send
+ }
@trace
@tag_args
@@ -741,12 +779,40 @@ class SlidingSyncHandler:
)
if has_lists or has_room_subscriptions:
+ # We now calculate if any rooms outside the range have had updates,
+ # which we are not sending down.
+ #
+ # We *must* record rooms that have had updates, but it is also fine
+ # to record rooms as having updates even if there might not actually
+ # be anything new for the user (e.g. due to event filters, events
+ # having happened after the user left, etc).
+ unsent_room_ids = []
+ if from_token:
+ # The set of rooms that the client (may) care about, but aren't
+ # in any list range (or subscribed to).
+ missing_rooms = all_rooms - relevant_room_map.keys()
+
+ # We now just go and try fetching any events in the above rooms
+ # to see if anything has happened since the `from_token`.
+ #
+ # TODO: Replace this with something faster. When we land the
+ # sliding sync tables that record the most recent event
+ # positions we can use that.
+ missing_event_map_by_room = (
+ await self.store.get_room_events_stream_for_rooms(
+ room_ids=missing_rooms,
+ from_key=to_token.room_key,
+ to_key=from_token.stream_token.room_key,
+ limit=1,
+ )
+ )
+ unsent_room_ids = list(missing_event_map_by_room)
+
connection_position = await self.connection_store.record_rooms(
sync_config=sync_config,
from_token=from_token,
sent_room_ids=relevant_rooms_to_send_map.keys(),
- # TODO: We need to calculate which rooms have had updates since the `from_token` but were not included in the `sent_room_ids`
- unsent_room_ids=[],
+ unsent_room_ids=unsent_room_ids,
)
elif from_token:
connection_position = from_token.connection_position
@@ -754,13 +820,20 @@ class SlidingSyncHandler:
# Initial sync without a `from_token` starts at `0`
connection_position = 0
- return SlidingSyncResult(
+ sliding_sync_result = SlidingSyncResult(
next_pos=SlidingSyncStreamToken(to_token, connection_position),
lists=lists,
rooms=rooms,
extensions=extensions,
)
+ # Make it easy to find traces for syncs that aren't empty
+ set_tag(SynapseTags.RESULT_PREFIX + "result", bool(sliding_sync_result))
+ set_tag(SynapseTags.FUNC_ARG_PREFIX + "sync_config.user", user_id)
+
+ return sliding_sync_result
+
+ @trace
async def get_room_membership_for_user_at_to_token(
self,
user: UserID,
@@ -1099,6 +1172,7 @@ class SlidingSyncHandler:
return sync_room_id_set
+ @trace
async def filter_rooms_relevant_for_sync(
self,
user: UserID,
@@ -1209,6 +1283,7 @@ class SlidingSyncHandler:
# return None
+ @trace
async def _bulk_get_stripped_state_for_rooms_from_sync_room_map(
self,
room_ids: StrCollection,
@@ -1299,6 +1374,7 @@ class SlidingSyncHandler:
return room_id_to_stripped_state_map
+ @trace
async def _bulk_get_partial_current_state_content_for_rooms(
self,
content_type: Literal[
@@ -1498,125 +1574,132 @@ class SlidingSyncHandler:
# Filter for Direct-Message (DM) rooms
if filters.is_dm is not None:
- if filters.is_dm:
- # Only DM rooms please
- filtered_room_id_set = {
- room_id
- for room_id in filtered_room_id_set
- if sync_room_map[room_id].is_dm
- }
- else:
- # Only non-DM rooms please
- filtered_room_id_set = {
- room_id
- for room_id in filtered_room_id_set
- if not sync_room_map[room_id].is_dm
- }
+ with start_active_span("filters.is_dm"):
+ if filters.is_dm:
+ # Only DM rooms please
+ filtered_room_id_set = {
+ room_id
+ for room_id in filtered_room_id_set
+ if sync_room_map[room_id].is_dm
+ }
+ else:
+ # Only non-DM rooms please
+ filtered_room_id_set = {
+ room_id
+ for room_id in filtered_room_id_set
+ if not sync_room_map[room_id].is_dm
+ }
if filters.spaces is not None:
- raise NotImplementedError()
+ with start_active_span("filters.spaces"):
+ raise NotImplementedError()
# Filter for encrypted rooms
if filters.is_encrypted is not None:
- room_id_to_encryption = (
- await self._bulk_get_partial_current_state_content_for_rooms(
- content_type="room_encryption",
- room_ids=filtered_room_id_set,
- to_token=to_token,
- sync_room_map=sync_room_map,
- room_id_to_stripped_state_map=room_id_to_stripped_state_map,
+ with start_active_span("filters.is_encrypted"):
+ room_id_to_encryption = (
+ await self._bulk_get_partial_current_state_content_for_rooms(
+ content_type="room_encryption",
+ room_ids=filtered_room_id_set,
+ to_token=to_token,
+ sync_room_map=sync_room_map,
+ room_id_to_stripped_state_map=room_id_to_stripped_state_map,
+ )
)
- )
- # Make a copy so we don't run into an error: `Set changed size during
- # iteration`, when we filter out and remove items
- for room_id in filtered_room_id_set.copy():
- encryption = room_id_to_encryption.get(room_id, ROOM_UNKNOWN_SENTINEL)
+ # Make a copy so we don't run into an error: `Set changed size during
+ # iteration`, when we filter out and remove items
+ for room_id in filtered_room_id_set.copy():
+ encryption = room_id_to_encryption.get(
+ room_id, ROOM_UNKNOWN_SENTINEL
+ )
- # Just remove rooms if we can't determine their encryption status
- if encryption is ROOM_UNKNOWN_SENTINEL:
- filtered_room_id_set.remove(room_id)
- continue
+ # Just remove rooms if we can't determine their encryption status
+ if encryption is ROOM_UNKNOWN_SENTINEL:
+ filtered_room_id_set.remove(room_id)
+ continue
- # If we're looking for encrypted rooms, filter out rooms that are not
- # encrypted and vice versa
- is_encrypted = encryption is not None
- if (filters.is_encrypted and not is_encrypted) or (
- not filters.is_encrypted and is_encrypted
- ):
- filtered_room_id_set.remove(room_id)
+ # If we're looking for encrypted rooms, filter out rooms that are not
+ # encrypted and vice versa
+ is_encrypted = encryption is not None
+ if (filters.is_encrypted and not is_encrypted) or (
+ not filters.is_encrypted and is_encrypted
+ ):
+ filtered_room_id_set.remove(room_id)
# Filter for rooms that the user has been invited to
if filters.is_invite is not None:
- # Make a copy so we don't run into an error: `Set changed size during
- # iteration`, when we filter out and remove items
- for room_id in filtered_room_id_set.copy():
- room_for_user = sync_room_map[room_id]
- # If we're looking for invite rooms, filter out rooms that the user is
- # not invited to and vice versa
- if (
- filters.is_invite and room_for_user.membership != Membership.INVITE
- ) or (
- not filters.is_invite
- and room_for_user.membership == Membership.INVITE
- ):
- filtered_room_id_set.remove(room_id)
+ with start_active_span("filters.is_invite"):
+ # Make a copy so we don't run into an error: `Set changed size during
+ # iteration`, when we filter out and remove items
+ for room_id in filtered_room_id_set.copy():
+ room_for_user = sync_room_map[room_id]
+ # If we're looking for invite rooms, filter out rooms that the user is
+ # not invited to and vice versa
+ if (
+ filters.is_invite
+ and room_for_user.membership != Membership.INVITE
+ ) or (
+ not filters.is_invite
+ and room_for_user.membership == Membership.INVITE
+ ):
+ filtered_room_id_set.remove(room_id)
# Filter by room type (space vs room, etc). A room must match one of the types
# provided in the list. `None` is a valid type for rooms which do not have a
# room type.
if filters.room_types is not None or filters.not_room_types is not None:
- room_id_to_type = (
- await self._bulk_get_partial_current_state_content_for_rooms(
- content_type="room_type",
- room_ids=filtered_room_id_set,
- to_token=to_token,
- sync_room_map=sync_room_map,
- room_id_to_stripped_state_map=room_id_to_stripped_state_map,
+ with start_active_span("filters.room_types"):
+ room_id_to_type = (
+ await self._bulk_get_partial_current_state_content_for_rooms(
+ content_type="room_type",
+ room_ids=filtered_room_id_set,
+ to_token=to_token,
+ sync_room_map=sync_room_map,
+ room_id_to_stripped_state_map=room_id_to_stripped_state_map,
+ )
)
- )
- # Make a copy so we don't run into an error: `Set changed size during
- # iteration`, when we filter out and remove items
- for room_id in filtered_room_id_set.copy():
- room_type = room_id_to_type.get(room_id, ROOM_UNKNOWN_SENTINEL)
+ # Make a copy so we don't run into an error: `Set changed size during
+ # iteration`, when we filter out and remove items
+ for room_id in filtered_room_id_set.copy():
+ room_type = room_id_to_type.get(room_id, ROOM_UNKNOWN_SENTINEL)
- # Just remove rooms if we can't determine their type
- if room_type is ROOM_UNKNOWN_SENTINEL:
- filtered_room_id_set.remove(room_id)
- continue
+ # Just remove rooms if we can't determine their type
+ if room_type is ROOM_UNKNOWN_SENTINEL:
+ filtered_room_id_set.remove(room_id)
+ continue
- if (
- filters.room_types is not None
- and room_type not in filters.room_types
- ):
- filtered_room_id_set.remove(room_id)
+ if (
+ filters.room_types is not None
+ and room_type not in filters.room_types
+ ):
+ filtered_room_id_set.remove(room_id)
- if (
- filters.not_room_types is not None
- and room_type in filters.not_room_types
- ):
- filtered_room_id_set.remove(room_id)
+ if (
+ filters.not_room_types is not None
+ and room_type in filters.not_room_types
+ ):
+ filtered_room_id_set.remove(room_id)
if filters.room_name_like is not None:
- # TODO: The room name is a bit more sensitive to leak than the
- # create/encryption event. Maybe we should consider a better way to fetch
- # historical state before implementing this.
- #
- # room_id_to_create_content = await self._bulk_get_partial_current_state_content_for_rooms(
- # content_type="room_name",
- # room_ids=filtered_room_id_set,
- # to_token=to_token,
- # sync_room_map=sync_room_map,
- # room_id_to_stripped_state_map=room_id_to_stripped_state_map,
- # )
- raise NotImplementedError()
-
- if filters.tags is not None:
- raise NotImplementedError()
-
- if filters.not_tags is not None:
- raise NotImplementedError()
+ with start_active_span("filters.room_name_like"):
+ # TODO: The room name is a bit more sensitive to leak than the
+ # create/encryption event. Maybe we should consider a better way to fetch
+ # historical state before implementing this.
+ #
+ # room_id_to_create_content = await self._bulk_get_partial_current_state_content_for_rooms(
+ # content_type="room_name",
+ # room_ids=filtered_room_id_set,
+ # to_token=to_token,
+ # sync_room_map=sync_room_map,
+ # room_id_to_stripped_state_map=room_id_to_stripped_state_map,
+ # )
+ raise NotImplementedError()
+
+ if filters.tags is not None or filters.not_tags is not None:
+ with start_active_span("filters.tags"):
+ raise NotImplementedError()
# Assemble a new sync room map but only with the `filtered_room_id_set`
return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set}
@@ -1678,6 +1761,7 @@ class SlidingSyncHandler:
reverse=True,
)
+ @trace
async def get_current_state_ids_at(
self,
room_id: str,
@@ -1742,6 +1826,7 @@ class SlidingSyncHandler:
return state_ids
+ @trace
async def get_current_state_at(
self,
room_id: str,
@@ -1803,15 +1888,27 @@ class SlidingSyncHandler:
"""
user = sync_config.user
+ set_tag(
+ SynapseTags.FUNC_ARG_PREFIX + "membership",
+ room_membership_for_user_at_to_token.membership,
+ )
+ set_tag(
+ SynapseTags.FUNC_ARG_PREFIX + "timeline_limit",
+ room_sync_config.timeline_limit,
+ )
+
# Determine whether we should limit the timeline to the token range.
#
# We should return historical messages (before token range) in the
# following cases because we want clients to be able to show a basic
# screen of information:
+ #
# - Initial sync (because no `from_token` to limit us anyway)
# - When users `newly_joined`
# - For an incremental sync where we haven't sent it down this
# connection before
+ #
+ # Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917
from_bound = None
initial = True
if from_token and not room_membership_for_user_at_to_token.newly_joined:
@@ -1872,7 +1969,36 @@ class SlidingSyncHandler:
room_membership_for_user_at_to_token.event_pos.to_room_stream_token()
)
- timeline_events, new_room_key = await self.store.paginate_room_events(
+ # For initial `/sync` (and other historical scenarios mentioned above), we
+ # want to view a historical section of the timeline; to fetch events by
+ # `topological_ordering` (best representation of the room DAG as others were
+ # seeing it at the time). This also aligns with the order that `/messages`
+ # returns events in.
+ #
+ # For incremental `/sync`, we want to get all updates for rooms since
+ # the last `/sync` (regardless if those updates arrived late or happened
+ # a while ago in the past); to fetch events by `stream_ordering` (in the
+ # order they were received by the server).
+ #
+ # Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917
+ #
+ # FIXME: Using workaround for mypy,
+ # https://github.com/python/mypy/issues/10740#issuecomment-1997047277 and
+ # https://github.com/python/mypy/issues/17479
+ paginate_room_events_by_topological_ordering: PaginateFunction = (
+ self.store.paginate_room_events_by_topological_ordering
+ )
+ paginate_room_events_by_stream_ordering: PaginateFunction = (
+ self.store.paginate_room_events_by_stream_ordering
+ )
+ pagination_method: PaginateFunction = (
+ # Use `topographical_ordering` for historical events
+ paginate_room_events_by_topological_ordering
+ if from_bound is None
+ # Use `stream_ordering` for updates
+ else paginate_room_events_by_stream_ordering
+ )
+ timeline_events, new_room_key = await pagination_method(
room_id=room_id,
# The bounds are reversed so we can paginate backwards
# (from newer to older events) starting at to_bound.
@@ -1883,7 +2009,6 @@ class SlidingSyncHandler:
# We add one so we can determine if there are enough events to saturate
# the limit or not (see `limited`)
limit=room_sync_config.timeline_limit + 1,
- event_filter=None,
)
# We want to return the events in ascending order (the last event is the
@@ -2070,6 +2195,10 @@ class SlidingSyncHandler:
if StateValues.WILDCARD in room_sync_config.required_state_map.get(
StateValues.WILDCARD, set()
):
+ set_tag(
+ SynapseTags.FUNC_ARG_PREFIX + "required_state_wildcard",
+ True,
+ )
required_state_filter = StateFilter.all()
# TODO: `StateFilter` currently doesn't support wildcard event types. We're
# currently working around this by returning all state to the client but it
@@ -2079,6 +2208,10 @@ class SlidingSyncHandler:
room_sync_config.required_state_map.get(StateValues.WILDCARD)
is not None
):
+ set_tag(
+ SynapseTags.FUNC_ARG_PREFIX + "required_state_wildcard_event_type",
+ True,
+ )
required_state_filter = StateFilter.all()
else:
required_state_types: List[Tuple[str, Optional[str]]] = []
@@ -2086,8 +2219,12 @@ class SlidingSyncHandler:
state_type,
state_key_set,
) in room_sync_config.required_state_map.items():
+ num_wild_state_keys = 0
+ lazy_load_room_members = False
+ num_others = 0
for state_key in state_key_set:
if state_key == StateValues.WILDCARD:
+ num_wild_state_keys += 1
# `None` is a wildcard in the `StateFilter`
required_state_types.append((state_type, None))
# We need to fetch all relevant people when we're lazy-loading membership
@@ -2095,6 +2232,7 @@ class SlidingSyncHandler:
state_type == EventTypes.Member
and state_key == StateValues.LAZY
):
+ lazy_load_room_members = True
# Everyone in the timeline is relevant
timeline_membership: Set[str] = set()
if timeline_events is not None:
@@ -2109,10 +2247,26 @@ class SlidingSyncHandler:
# FIXME: We probably also care about invite, ban, kick, targets, etc
# but the spec only mentions "senders".
elif state_key == StateValues.ME:
+ num_others += 1
required_state_types.append((state_type, user.to_string()))
else:
+ num_others += 1
required_state_types.append((state_type, state_key))
+ set_tag(
+ SynapseTags.FUNC_ARG_PREFIX
+ + "required_state_wildcard_state_key_count",
+ num_wild_state_keys,
+ )
+ set_tag(
+ SynapseTags.FUNC_ARG_PREFIX + "required_state_lazy",
+ lazy_load_room_members,
+ )
+ set_tag(
+ SynapseTags.FUNC_ARG_PREFIX + "required_state_other_count",
+ num_others,
+ )
+
required_state_filter = StateFilter.from_types(required_state_types)
# We need this base set of info for the response so let's just fetch it along
@@ -2208,6 +2362,8 @@ class SlidingSyncHandler:
if new_bump_event_pos.stream > 0:
bump_stamp = new_bump_event_pos.stream
+ set_tag(SynapseTags.RESULT_PREFIX + "initial", initial)
+
return SlidingSyncResult.RoomResult(
name=room_name,
avatar=room_avatar,
@@ -2863,6 +3019,7 @@ class SlidingSyncConnectionStore:
return room_status
+ @trace
async def record_rooms(
self,
sync_config: SlidingSyncConfig,
@@ -2938,6 +3095,7 @@ class SlidingSyncConnectionStore:
return new_store_token
+ @trace
async def mark_token_seen(
self,
sync_config: SlidingSyncConfig,
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index ede014180c..6af2eeb75f 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -43,6 +43,7 @@ from prometheus_client import Counter
from synapse.api.constants import (
AccountDataTypes,
+ Direction,
EventContentFields,
EventTypes,
JoinRules,
@@ -64,6 +65,7 @@ from synapse.logging.opentracing import (
)
from synapse.storage.databases.main.event_push_actions import RoomNotifCounts
from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
+from synapse.storage.databases.main.stream import PaginateFunction
from synapse.storage.roommember import MemberSummary
from synapse.types import (
DeviceListUpdates,
@@ -879,22 +881,49 @@ class SyncHandler:
since_key = since_token.room_key
while limited and len(recents) < timeline_limit and max_repeat:
- # If we have a since_key then we are trying to get any events
- # that have happened since `since_key` up to `end_key`, so we
- # can just use `get_room_events_stream_for_room`.
- # Otherwise, we want to return the last N events in the room
- # in topological ordering.
- if since_key:
- events, end_key = await self.store.get_room_events_stream_for_room(
- room_id,
- limit=load_limit + 1,
- from_key=since_key,
- to_key=end_key,
- )
- else:
- events, end_key = await self.store.get_recent_events_for_room(
- room_id, limit=load_limit + 1, end_token=end_key
- )
+ # For initial `/sync`, we want to view a historical section of the
+ # timeline; to fetch events by `topological_ordering` (best
+ # representation of the room DAG as others were seeing it at the time).
+ # This also aligns with the order that `/messages` returns events in.
+ #
+ # For incremental `/sync`, we want to get all updates for rooms since
+ # the last `/sync` (regardless if those updates arrived late or happened
+ # a while ago in the past); to fetch events by `stream_ordering` (in the
+ # order they were received by the server).
+ #
+ # Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917
+ #
+ # FIXME: Using workaround for mypy,
+ # https://github.com/python/mypy/issues/10740#issuecomment-1997047277 and
+ # https://github.com/python/mypy/issues/17479
+ paginate_room_events_by_topological_ordering: PaginateFunction = (
+ self.store.paginate_room_events_by_topological_ordering
+ )
+ paginate_room_events_by_stream_ordering: PaginateFunction = (
+ self.store.paginate_room_events_by_stream_ordering
+ )
+ pagination_method: PaginateFunction = (
+ # Use `topographical_ordering` for historical events
+ paginate_room_events_by_topological_ordering
+ if since_key is None
+ # Use `stream_ordering` for updates
+ else paginate_room_events_by_stream_ordering
+ )
+ events, end_key = await pagination_method(
+ room_id=room_id,
+ # The bounds are reversed so we can paginate backwards
+ # (from newer to older events) starting at to_bound.
+ # This ensures we fill the `limit` with the newest events first,
+ from_key=end_key,
+ to_key=since_key,
+ direction=Direction.BACKWARDS,
+ # We add one so we can determine if there are enough events to saturate
+ # the limit or not (see `limited`)
+ limit=load_limit + 1,
+ )
+ # We want to return the events in ascending order (the last event is the
+ # most recent).
+ events.reverse()
log_kv({"loaded_recents": len(events)})
@@ -2641,9 +2670,10 @@ class SyncHandler:
# a "gap" in the timeline, as described by the spec for /sync.
room_to_events = await self.store.get_room_events_stream_for_rooms(
room_ids=sync_result_builder.joined_room_ids,
- from_key=since_token.room_key,
- to_key=now_token.room_key,
+ from_key=now_token.room_key,
+ to_key=since_token.room_key,
limit=timeline_limit + 1,
+ direction=Direction.BACKWARDS,
)
# We loop through all room ids, even if there are no new events, in case
@@ -2654,6 +2684,9 @@ class SyncHandler:
newly_joined = room_id in newly_joined_rooms
if room_entry:
events, start_key = room_entry
+ # We want to return the events in ascending order (the last event is the
+ # most recent).
+ events.reverse()
prev_batch_token = now_token.copy_and_replace(
StreamKeyType.ROOM, start_key
diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index 4f2c552af2..8c5db2a513 100644
--- a/synapse/rest/client/sync.py
+++ b/synapse/rest/client/sync.py
@@ -899,6 +899,9 @@ class SlidingSyncRestServlet(RestServlet):
body = parse_and_validate_json_object_from_request(request, SlidingSyncBody)
# Tag and log useful data to differentiate requests.
+ set_tag(
+ "sliding_sync.sync_type", "initial" if from_token is None else "incremental"
+ )
set_tag("sliding_sync.conn_id", body.conn_id or "")
log_kv(
{
@@ -912,6 +915,12 @@ class SlidingSyncRestServlet(RestServlet):
"sliding_sync.room_subscriptions": list(
(body.room_subscriptions or {}).keys()
),
+ # We also include the number of room subscriptions because logs are
+ # limited to 1024 characters and the large room ID list above can be cut
+ # off.
+ "sliding_sync.num_room_subscriptions": len(
+ (body.room_subscriptions or {}).keys()
+ ),
}
)
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 640ab123f0..1d9f0f52e1 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -39,6 +39,7 @@ from typing import (
import attr
from synapse.api.constants import EventTypes, Membership
+from synapse.logging.opentracing import trace
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
@@ -422,6 +423,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
return invite
return None
+ @trace
async def get_rooms_for_local_user_where_membership_is(
self,
user_id: str,
diff --git a/synapse/storage/databases/main/state_deltas.py b/synapse/storage/databases/main/state_deltas.py
index da3ebe66b8..9ed39e688a 100644
--- a/synapse/storage/databases/main/state_deltas.py
+++ b/synapse/storage/databases/main/state_deltas.py
@@ -24,6 +24,7 @@ from typing import List, Optional, Tuple
import attr
+from synapse.logging.opentracing import trace
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import LoggingTransaction
from synapse.storage.databases.main.stream import _filter_results_by_stream
@@ -159,6 +160,7 @@ class StateDeltasStore(SQLBaseStore):
self._get_max_stream_id_in_current_state_deltas_txn,
)
+ @trace
async def get_current_state_deltas_for_room(
self, room_id: str, from_token: RoomStreamToken, to_token: RoomStreamToken
) -> List[StateDelta]:
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 4207e73c7f..4989c960a6 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -51,6 +51,7 @@ from typing import (
Iterable,
List,
Optional,
+ Protocol,
Set,
Tuple,
cast,
@@ -59,7 +60,7 @@ from typing import (
import attr
from immutabledict import immutabledict
-from typing_extensions import Literal
+from typing_extensions import Literal, assert_never
from twisted.internet import defer
@@ -67,7 +68,7 @@ from synapse.api.constants import Direction, EventTypes, Membership
from synapse.api.filtering import Filter
from synapse.events import EventBase
from synapse.logging.context import make_deferred_yieldable, run_in_background
-from synapse.logging.opentracing import trace
+from synapse.logging.opentracing import tag_args, trace
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import (
DatabasePool,
@@ -97,6 +98,18 @@ _STREAM_TOKEN = "stream"
_TOPOLOGICAL_TOKEN = "topological"
+class PaginateFunction(Protocol):
+ async def __call__(
+ self,
+ *,
+ room_id: str,
+ from_key: RoomStreamToken,
+ to_key: Optional[RoomStreamToken] = None,
+ direction: Direction = Direction.BACKWARDS,
+ limit: int = 0,
+ ) -> Tuple[List[EventBase], RoomStreamToken]: ...
+
+
# Used as return values for pagination APIs
@attr.s(slots=True, frozen=True, auto_attribs=True)
class _EventDictReturn:
@@ -280,7 +293,7 @@ def generate_pagination_bounds(
def generate_next_token(
- direction: Direction, last_topo_ordering: int, last_stream_ordering: int
+ direction: Direction, last_topo_ordering: Optional[int], last_stream_ordering: int
) -> RoomStreamToken:
"""
Generate the next room stream token based on the currently returned data.
@@ -447,7 +460,6 @@ def _filter_results_by_stream(
The `instance_name` arg is optional to handle historic rows, and is
interpreted as if it was "master".
"""
-
if instance_name is None:
instance_name = "master"
@@ -660,33 +672,43 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
async def get_room_events_stream_for_rooms(
self,
+ *,
room_ids: Collection[str],
from_key: RoomStreamToken,
- to_key: RoomStreamToken,
+ to_key: Optional[RoomStreamToken] = None,
+ direction: Direction = Direction.BACKWARDS,
limit: int = 0,
- order: str = "DESC",
) -> Dict[str, Tuple[List[EventBase], RoomStreamToken]]:
"""Get new room events in stream ordering since `from_key`.
Args:
room_ids
- from_key: Token from which no events are returned before
- to_key: Token from which no events are returned after. (This
- is typically the current stream token)
+ from_key: The token to stream from (starting point and heading in the given
+ direction)
+ to_key: The token representing the end stream position (end point)
limit: Maximum number of events to return
- order: Either "DESC" or "ASC". Determines which events are
- returned when the result is limited. If "DESC" then the most
- recent `limit` events are returned, otherwise returns the
- oldest `limit` events.
+ direction: Indicates whether we are paginating forwards or backwards
+ from `from_key`.
Returns:
A map from room id to a tuple containing:
- list of recent events in the room
- stream ordering key for the start of the chunk of events returned.
+
+ When Direction.FORWARDS: from_key < x <= to_key, (ascending order)
+ When Direction.BACKWARDS: from_key >= x > to_key, (descending order)
"""
- room_ids = self._events_stream_cache.get_entities_changed(
- room_ids, from_key.stream
- )
+ if direction == Direction.FORWARDS:
+ room_ids = self._events_stream_cache.get_entities_changed(
+ room_ids, from_key.stream
+ )
+ elif direction == Direction.BACKWARDS:
+ if to_key is not None:
+ room_ids = self._events_stream_cache.get_entities_changed(
+ room_ids, to_key.stream
+ )
+ else:
+ assert_never(direction)
if not room_ids:
return {}
@@ -698,12 +720,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
defer.gatherResults(
[
run_in_background(
- self.get_room_events_stream_for_room,
- room_id,
- from_key,
- to_key,
- limit,
- order=order,
+ self.paginate_room_events_by_stream_ordering,
+ room_id=room_id,
+ from_key=from_key,
+ to_key=to_key,
+ direction=direction,
+ limit=limit,
)
for room_id in rm_ids
],
@@ -727,69 +749,122 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
if self._events_stream_cache.has_entity_changed(room_id, from_id)
}
- async def get_room_events_stream_for_room(
+ async def paginate_room_events_by_stream_ordering(
self,
+ *,
room_id: str,
from_key: RoomStreamToken,
- to_key: RoomStreamToken,
+ to_key: Optional[RoomStreamToken] = None,
+ direction: Direction = Direction.BACKWARDS,
limit: int = 0,
- order: str = "DESC",
) -> Tuple[List[EventBase], RoomStreamToken]:
- """Get new room events in stream ordering since `from_key`.
+ """
+ Paginate events by `stream_ordering` in the room from the `from_key` in the
+ given `direction` to the `to_key` or `limit`.
Args:
room_id
- from_key: Token from which no events are returned before
- to_key: Token from which no events are returned after. (This
- is typically the current stream token)
+ from_key: The token to stream from (starting point and heading in the given
+ direction)
+ to_key: The token representing the end stream position (end point)
+ direction: Indicates whether we are paginating forwards or backwards
+ from `from_key`.
limit: Maximum number of events to return
- order: Either "DESC" or "ASC". Determines which events are
- returned when the result is limited. If "DESC" then the most
- recent `limit` events are returned, otherwise returns the
- oldest `limit` events.
Returns:
- The list of events (in ascending stream order) and the token from the start
- of the chunk of events returned.
+ The results as a list of events and a token that points to the end
+ of the result set. If no events are returned then the end of the
+ stream has been reached (i.e. there are no events between `from_key`
+ and `to_key`).
+
+ When Direction.FORWARDS: from_key < x <= to_key, (ascending order)
+ When Direction.BACKWARDS: from_key >= x > to_key, (descending order)
"""
- if from_key == to_key:
- return [], from_key
- has_changed = self._events_stream_cache.has_entity_changed(
- room_id, from_key.stream
- )
+ # FIXME: When going forwards, we should enforce that the `to_key` is not `None`
+ # because we always need an upper bound when querying the events stream (as
+ # otherwise we'll potentially pick up events that are not fully persisted).
+
+ # We should only be working with `stream_ordering` tokens here
+ assert from_key is None or from_key.topological is None
+ assert to_key is None or to_key.topological is None
+
+ # We can bail early if we're looking forwards, and our `to_key` is already
+ # before our `from_key`.
+ if (
+ direction == Direction.FORWARDS
+ and to_key is not None
+ and to_key.is_before_or_eq(from_key)
+ ):
+ # Token selection matches what we do below if there are no rows
+ return [], to_key if to_key else from_key
+ # Or vice-versa, if we're looking backwards and our `from_key` is already before
+ # our `to_key`.
+ elif (
+ direction == Direction.BACKWARDS
+ and to_key is not None
+ and from_key.is_before_or_eq(to_key)
+ ):
+ # Token selection matches what we do below if there are no rows
+ return [], to_key if to_key else from_key
+
+ # We can do a quick sanity check to see if any events have been sent in the room
+ # since the earlier token.
+ has_changed = True
+ if direction == Direction.FORWARDS:
+ has_changed = self._events_stream_cache.has_entity_changed(
+ room_id, from_key.stream
+ )
+ elif direction == Direction.BACKWARDS:
+ if to_key is not None:
+ has_changed = self._events_stream_cache.has_entity_changed(
+ room_id, to_key.stream
+ )
+ else:
+ assert_never(direction)
if not has_changed:
- return [], from_key
+ # Token selection matches what we do below if there are no rows
+ return [], to_key if to_key else from_key
- def f(txn: LoggingTransaction) -> List[_EventDictReturn]:
- # To handle tokens with a non-empty instance_map we fetch more
- # results than necessary and then filter down
- min_from_id = from_key.stream
- max_to_id = to_key.get_max_stream_pos()
+ order, from_bound, to_bound = generate_pagination_bounds(
+ direction, from_key, to_key
+ )
- sql = """
- SELECT event_id, instance_name, topological_ordering, stream_ordering
+ bounds = generate_pagination_where_clause(
+ direction=direction,
+ # The empty string will shortcut downstream code to only use the
+ # `stream_ordering` column
+ column_names=("", "stream_ordering"),
+ from_token=from_bound,
+ to_token=to_bound,
+ engine=self.database_engine,
+ )
+
+ def f(txn: LoggingTransaction) -> List[_EventDictReturn]:
+ sql = f"""
+ SELECT event_id, instance_name, stream_ordering
FROM events
WHERE
room_id = ?
AND not outlier
- AND stream_ordering > ? AND stream_ordering <= ?
- ORDER BY stream_ordering %s LIMIT ?
- """ % (
- order,
- )
- txn.execute(sql, (room_id, min_from_id, max_to_id, 2 * limit))
+ AND {bounds}
+ ORDER BY stream_ordering {order} LIMIT ?
+ """
+ txn.execute(sql, (room_id, 2 * limit))
rows = [
_EventDictReturn(event_id, None, stream_ordering)
- for event_id, instance_name, topological_ordering, stream_ordering in txn
- if _filter_results(
- from_key,
- to_key,
- instance_name,
- topological_ordering,
- stream_ordering,
+ for event_id, instance_name, stream_ordering in txn
+ if _filter_results_by_stream(
+ lower_token=(
+ to_key if direction == Direction.BACKWARDS else from_key
+ ),
+ upper_token=(
+ from_key if direction == Direction.BACKWARDS else to_key
+ ),
+ instance_name=instance_name,
+ stream_ordering=stream_ordering,
)
][:limit]
return rows
@@ -800,18 +875,20 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
[r.event_id for r in rows], get_prev_content=True
)
- if order.lower() == "desc":
- ret.reverse()
-
if rows:
- key = RoomStreamToken(stream=min(r.stream_ordering for r in rows))
+ next_key = generate_next_token(
+ direction=direction,
+ last_topo_ordering=None,
+ last_stream_ordering=rows[-1].stream_ordering,
+ )
else:
- # Assume we didn't get anything because there was nothing to
- # get.
- key = from_key
+ # TODO (erikj): We should work out what to do here instead. (same as
+ # `_paginate_room_events_by_topological_ordering_txn(...)`)
+ next_key = to_key if to_key else from_key
- return ret, key
+ return ret, next_key
+ @trace
async def get_current_state_delta_membership_changes_for_user(
self,
user_id: str,
@@ -1117,7 +1194,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
rows, token = await self.db_pool.runInteraction(
"get_recent_event_ids_for_room",
- self._paginate_room_events_txn,
+ self._paginate_room_events_by_topological_ordering_txn,
room_id,
from_token=end_token,
limit=limit,
@@ -1186,6 +1263,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
return None
+ @trace
async def get_last_event_pos_in_room_before_stream_ordering(
self,
room_id: str,
@@ -1622,7 +1700,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
topological=topological_ordering, stream=stream_ordering
)
- rows, start_token = self._paginate_room_events_txn(
+ rows, start_token = self._paginate_room_events_by_topological_ordering_txn(
txn,
room_id,
before_token,
@@ -1632,7 +1710,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
)
events_before = [r.event_id for r in rows]
- rows, end_token = self._paginate_room_events_txn(
+ rows, end_token = self._paginate_room_events_by_topological_ordering_txn(
txn,
room_id,
after_token,
@@ -1795,14 +1873,14 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
def has_room_changed_since(self, room_id: str, stream_id: int) -> bool:
return self._events_stream_cache.has_entity_changed(room_id, stream_id)
- def _paginate_room_events_txn(
+ def _paginate_room_events_by_topological_ordering_txn(
self,
txn: LoggingTransaction,
room_id: str,
from_token: RoomStreamToken,
to_token: Optional[RoomStreamToken] = None,
direction: Direction = Direction.BACKWARDS,
- limit: int = -1,
+ limit: int = 0,
event_filter: Optional[Filter] = None,
) -> Tuple[List[_EventDictReturn], RoomStreamToken]:
"""Returns list of events before or after a given token.
@@ -1824,6 +1902,24 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
been reached (i.e. there are no events between `from_token` and
`to_token`), or `limit` is zero.
"""
+ # We can bail early if we're looking forwards, and our `to_key` is already
+ # before our `from_token`.
+ if (
+ direction == Direction.FORWARDS
+ and to_token is not None
+ and to_token.is_before_or_eq(from_token)
+ ):
+ # Token selection matches what we do below if there are no rows
+ return [], to_token if to_token else from_token
+ # Or vice-versa, if we're looking backwards and our `from_token` is already before
+ # our `to_token`.
+ elif (
+ direction == Direction.BACKWARDS
+ and to_token is not None
+ and from_token.is_before_or_eq(to_token)
+ ):
+ # Token selection matches what we do below if there are no rows
+ return [], to_token if to_token else from_token
args: List[Any] = [room_id]
@@ -1908,7 +2004,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
"bounds": bounds,
"order": order,
}
-
txn.execute(sql, args)
# Filter the result set.
@@ -1940,27 +2035,30 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
return rows, next_token
@trace
- async def paginate_room_events(
+ @tag_args
+ async def paginate_room_events_by_topological_ordering(
self,
+ *,
room_id: str,
from_key: RoomStreamToken,
to_key: Optional[RoomStreamToken] = None,
direction: Direction = Direction.BACKWARDS,
- limit: int = -1,
+ limit: int = 0,
event_filter: Optional[Filter] = None,
) -> Tuple[List[EventBase], RoomStreamToken]:
- """Returns list of events before or after a given token.
-
- When Direction.FORWARDS: from_key < x <= to_key
- When Direction.BACKWARDS: from_key >= x > to_key
+ """
+ Paginate events by `topological_ordering` (tie-break with `stream_ordering`) in
+ the room from the `from_key` in the given `direction` to the `to_key` or
+ `limit`.
Args:
room_id
- from_key: The token used to stream from
- to_key: A token which if given limits the results to only those before
+ from_key: The token to stream from (starting point and heading in the given
+ direction)
+ to_key: The token representing the end stream position (end point)
direction: Indicates whether we are paginating forwards or backwards
from `from_key`.
- limit: The maximum number of events to return.
+ limit: Maximum number of events to return
event_filter: If provided filters the events to those that match the filter.
Returns:
@@ -1968,8 +2066,18 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
of the result set. If no events are returned then the end of the
stream has been reached (i.e. there are no events between `from_key`
and `to_key`).
+
+ When Direction.FORWARDS: from_key < x <= to_key, (ascending order)
+ When Direction.BACKWARDS: from_key >= x > to_key, (descending order)
"""
+ # FIXME: When going forwards, we should enforce that the `to_key` is not `None`
+ # because we always need an upper bound when querying the events stream (as
+ # otherwise we'll potentially pick up events that are not fully persisted).
+
+ # We have these checks outside of the transaction function (txn) to save getting
+ # a DB connection and switching threads if we don't need to.
+ #
# We can bail early if we're looking forwards, and our `to_key` is already
# before our `from_key`.
if (
@@ -1992,8 +2100,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
return [], to_key if to_key else from_key
rows, token = await self.db_pool.runInteraction(
- "paginate_room_events",
- self._paginate_room_events_txn,
+ "paginate_room_events_by_topological_ordering",
+ self._paginate_room_events_by_topological_ordering_txn,
room_id,
from_key,
to_key,
@@ -2105,6 +2213,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
return None
+ @trace
def get_rooms_that_might_have_updates(
self, room_ids: StrCollection, from_token: RoomStreamToken
) -> StrCollection:
diff --git a/tests/rest/client/sliding_sync/test_connection_tracking.py b/tests/rest/client/sliding_sync/test_connection_tracking.py
index 4d8866b30a..6863c32f7c 100644
--- a/tests/rest/client/sliding_sync/test_connection_tracking.py
+++ b/tests/rest/client/sliding_sync/test_connection_tracking.py
@@ -21,8 +21,6 @@ import synapse.rest.admin
from synapse.api.constants import EventTypes
from synapse.rest.client import login, room, sync
from synapse.server import HomeServer
-from synapse.types import SlidingSyncStreamToken
-from synapse.types.handlers import SlidingSyncConfig
from synapse.util import Clock
from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase
@@ -130,7 +128,6 @@ class SlidingSyncConnectionTrackingTestCase(SlidingSyncBase):
self.helper.send(room_id1, "msg", tok=user1_tok)
timeline_limit = 5
- conn_id = "conn_id"
sync_body = {
"lists": {
"foo-list": {
@@ -170,40 +167,6 @@ class SlidingSyncConnectionTrackingTestCase(SlidingSyncBase):
response_body["rooms"].keys(), {room_id2}, response_body["rooms"]
)
- # FIXME: This is a hack to record that the first room wasn't sent down
- # sync, as we don't implement that currently.
- sliding_sync_handler = self.hs.get_sliding_sync_handler()
- requester = self.get_success(
- self.hs.get_auth().get_user_by_access_token(user1_tok)
- )
- sync_config = SlidingSyncConfig(
- user=requester.user,
- requester=requester,
- conn_id=conn_id,
- )
-
- parsed_initial_from_token = self.get_success(
- SlidingSyncStreamToken.from_string(self.store, initial_from_token)
- )
- connection_position = self.get_success(
- sliding_sync_handler.connection_store.record_rooms(
- sync_config,
- parsed_initial_from_token,
- sent_room_ids=[],
- unsent_room_ids=[room_id1],
- )
- )
-
- # FIXME: Now fix up `from_token` with new connect position above.
- parsed_from_token = self.get_success(
- SlidingSyncStreamToken.from_string(self.store, from_token)
- )
- parsed_from_token = SlidingSyncStreamToken(
- stream_token=parsed_from_token.stream_token,
- connection_position=connection_position,
- )
- from_token = self.get_success(parsed_from_token.to_string(self.store))
-
# We now send another event to room1, so we should sync all the missing events.
resp = self.helper.send(room_id1, "msg2", tok=user1_tok)
expected_events.append(resp["event_id"])
@@ -238,7 +201,6 @@ class SlidingSyncConnectionTrackingTestCase(SlidingSyncBase):
self.helper.send(room_id1, "msg", tok=user1_tok)
- conn_id = "conn_id"
sync_body = {
"lists": {
"foo-list": {
@@ -279,40 +241,6 @@ class SlidingSyncConnectionTrackingTestCase(SlidingSyncBase):
response_body["rooms"].keys(), {room_id2}, response_body["rooms"]
)
- # FIXME: This is a hack to record that the first room wasn't sent down
- # sync, as we don't implement that currently.
- sliding_sync_handler = self.hs.get_sliding_sync_handler()
- requester = self.get_success(
- self.hs.get_auth().get_user_by_access_token(user1_tok)
- )
- sync_config = SlidingSyncConfig(
- user=requester.user,
- requester=requester,
- conn_id=conn_id,
- )
-
- parsed_initial_from_token = self.get_success(
- SlidingSyncStreamToken.from_string(self.store, initial_from_token)
- )
- connection_position = self.get_success(
- sliding_sync_handler.connection_store.record_rooms(
- sync_config,
- parsed_initial_from_token,
- sent_room_ids=[],
- unsent_room_ids=[room_id1],
- )
- )
-
- # FIXME: Now fix up `from_token` with new connect position above.
- parsed_from_token = self.get_success(
- SlidingSyncStreamToken.from_string(self.store, from_token)
- )
- parsed_from_token = SlidingSyncStreamToken(
- stream_token=parsed_from_token.stream_token,
- connection_position=connection_position,
- )
- from_token = self.get_success(parsed_from_token.to_string(self.store))
-
# We now send another event to room1, so we should sync all the missing state.
self.helper.send(room_id1, "msg", tok=user1_tok)
diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py
index 9dea1af8ea..7b7590da76 100644
--- a/tests/storage/test_stream.py
+++ b/tests/storage/test_stream.py
@@ -148,7 +148,7 @@ class PaginationTestCase(HomeserverTestCase):
"""Make a request to /messages with a filter, returns the chunk of events."""
events, next_key = self.get_success(
- self.hs.get_datastores().main.paginate_room_events(
+ self.hs.get_datastores().main.paginate_room_events_by_topological_ordering(
room_id=self.room_id,
from_key=self.from_token.room_key,
to_key=None,
|