From 8bbc98e66d5b60676e24b0ed7126938396040ab9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 24 Jul 2024 11:47:25 +0100 Subject: Use a new token format for sliding sync (#17452) This is in preparation for adding per-connection state. --------- Co-authored-by: Eric Eastwood --- synapse/handlers/sliding_sync.py | 30 +++++++++++++++----------- synapse/rest/client/sync.py | 6 ++++-- synapse/types/__init__.py | 43 ++++++++++++++++++++++++++++++++++++++ synapse/types/handlers/__init__.py | 13 +++++++++--- 4 files changed, 75 insertions(+), 17 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 554ab59bf3..36665db8e1 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -49,6 +49,7 @@ from synapse.types import ( PersistedEventPosition, Requester, RoomStreamToken, + SlidingSyncStreamToken, StateMap, StreamKeyType, StreamToken, @@ -362,7 +363,7 @@ class SlidingSyncHandler: self, requester: Requester, sync_config: SlidingSyncConfig, - from_token: Optional[StreamToken] = None, + from_token: Optional[SlidingSyncStreamToken] = None, timeout_ms: int = 0, ) -> SlidingSyncResult: """ @@ -393,7 +394,7 @@ class SlidingSyncHandler: # this returns false, it means we timed out waiting, and we should # just return an empty response. before_wait_ts = self.clock.time_msec() - if not await self.notifier.wait_for_stream_token(from_token): + if not await self.notifier.wait_for_stream_token(from_token.stream_token): logger.warning( "Timed out waiting for worker to catch up. Returning empty response" ) @@ -431,7 +432,7 @@ class SlidingSyncHandler: sync_config.user.to_string(), timeout_ms, current_sync_callback, - from_token=from_token, + from_token=from_token.stream_token, ) return result @@ -440,7 +441,7 @@ class SlidingSyncHandler: self, sync_config: SlidingSyncConfig, to_token: StreamToken, - from_token: Optional[StreamToken] = None, + from_token: Optional[SlidingSyncStreamToken] = None, ) -> SlidingSyncResult: """ Generates the response body of a Sliding Sync result, represented as a @@ -473,7 +474,7 @@ class SlidingSyncHandler: await self.get_room_membership_for_user_at_to_token( user=sync_config.user, to_token=to_token, - from_token=from_token, + from_token=from_token.stream_token if from_token else None, ) ) @@ -631,8 +632,11 @@ class SlidingSyncHandler: to_token=to_token, ) + # TODO: Update this when we implement per-connection state + connection_token = 0 + return SlidingSyncResult( - next_pos=to_token, + next_pos=SlidingSyncStreamToken(to_token, connection_token), lists=lists, rooms=rooms, extensions=extensions, @@ -1367,7 +1371,7 @@ class SlidingSyncHandler: room_id: str, room_sync_config: RoomSyncConfig, room_membership_for_user_at_to_token: _RoomMembershipForUser, - from_token: Optional[StreamToken], + from_token: Optional[SlidingSyncStreamToken], to_token: StreamToken, ) -> SlidingSyncResult.RoomResult: """ @@ -1431,7 +1435,7 @@ class SlidingSyncHandler: # - TODO: For an incremental sync where we haven't sent it down this # connection before to_bound = ( - from_token.room_key + from_token.stream_token.room_key if from_token is not None and not room_membership_for_user_at_to_token.newly_joined else None @@ -1498,7 +1502,9 @@ class SlidingSyncHandler: instance_name=timeline_event.internal_metadata.instance_name, stream=timeline_event.internal_metadata.stream_ordering, ) - if persisted_position.persisted_after(from_token.room_key): + if persisted_position.persisted_after( + from_token.stream_token.room_key + ): num_live += 1 else: # Since we're iterating over the timeline events in @@ -1786,7 +1792,7 @@ class SlidingSyncHandler: self, sync_config: SlidingSyncConfig, to_token: StreamToken, - from_token: Optional[StreamToken], + from_token: Optional[SlidingSyncStreamToken], ) -> SlidingSyncResult.Extensions: """Handle extension requests. @@ -1900,7 +1906,7 @@ class SlidingSyncHandler: sync_config: SlidingSyncConfig, e2ee_request: SlidingSyncConfig.Extensions.E2eeExtension, to_token: StreamToken, - from_token: Optional[StreamToken], + from_token: Optional[SlidingSyncStreamToken], ) -> Optional[SlidingSyncResult.Extensions.E2eeExtension]: """Handle E2EE device extension (MSC3884) @@ -1922,7 +1928,7 @@ class SlidingSyncHandler: # TODO: This should take into account the `from_token` and `to_token` device_list_updates = await self.device_handler.get_user_ids_changed( user_id=user_id, - from_token=from_token, + from_token=from_token.stream_token, ) device_one_time_keys_count: Mapping[str, int] = {} diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index 93fe1d439e..d72dfa2b10 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -54,7 +54,7 @@ from synapse.http.servlet import ( from synapse.http.site import SynapseRequest from synapse.logging.opentracing import trace_with_opname from synapse.rest.admin.experimental_features import ExperimentalFeature -from synapse.types import JsonDict, Requester, StreamToken +from synapse.types import JsonDict, Requester, SlidingSyncStreamToken, StreamToken from synapse.types.rest.client import SlidingSyncBody from synapse.util import json_decoder from synapse.util.caches.lrucache import LruCache @@ -889,7 +889,9 @@ class SlidingSyncRestServlet(RestServlet): from_token = None if from_token_string is not None: - from_token = await StreamToken.from_string(self.store, from_token_string) + from_token = await SlidingSyncStreamToken.from_string( + self.store, from_token_string + ) # TODO: We currently don't know whether we're going to use sticky params or # maybe some filters like sync v2 where they are built up once and referenced diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index c0d30ac2a3..5259550f1c 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -1160,6 +1160,49 @@ StreamToken.START = StreamToken( ) +@attr.s(slots=True, frozen=True, auto_attribs=True) +class SlidingSyncStreamToken: + """The same as a `StreamToken`, but includes an extra field at the start for + the sliding sync connection token (separated by a '/'). This is used to + store per-connection state. + + This then looks something like: + 5/s2633508_17_338_6732159_1082514_541479_274711_265584_1_379 + + Attributes: + stream_token: Token representing the position of all the standard + streams. + connection_position: Token used by sliding sync to track updates to any + per-connection state stored by Synapse. + """ + + stream_token: StreamToken + connection_position: int + + @staticmethod + @cancellable + async def from_string(store: "DataStore", string: str) -> "SlidingSyncStreamToken": + """Creates a SlidingSyncStreamToken from its textual representation.""" + try: + connection_position_str, stream_token_str = string.split("/", 1) + connection_position = int(connection_position_str) + stream_token = await StreamToken.from_string(store, stream_token_str) + + return SlidingSyncStreamToken( + stream_token=stream_token, + connection_position=connection_position, + ) + except CancelledError: + raise + except Exception: + raise SynapseError(400, "Invalid stream token") + + async def to_string(self, store: "DataStore") -> str: + """Serializes the token to a string""" + stream_token_str = await self.stream_token.to_string(store) + return f"{self.connection_position}/{stream_token_str}" + + @attr.s(slots=True, frozen=True, auto_attribs=True) class PersistedPosition: """Position of a newly persisted row with instance that persisted it.""" diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py index 4c6c42db04..59eb0963ee 100644 --- a/synapse/types/handlers/__init__.py +++ b/synapse/types/handlers/__init__.py @@ -31,7 +31,14 @@ else: from pydantic import Extra from synapse.events import EventBase -from synapse.types import DeviceListUpdates, JsonDict, JsonMapping, StreamToken, UserID +from synapse.types import ( + DeviceListUpdates, + JsonDict, + JsonMapping, + SlidingSyncStreamToken, + StreamToken, + UserID, +) from synapse.types.rest.client import SlidingSyncBody if TYPE_CHECKING: @@ -329,7 +336,7 @@ class SlidingSyncResult: def __bool__(self) -> bool: return bool(self.to_device or self.e2ee) - next_pos: StreamToken + next_pos: SlidingSyncStreamToken lists: Dict[str, SlidingWindowList] rooms: Dict[str, RoomResult] extensions: Extensions @@ -342,7 +349,7 @@ class SlidingSyncResult: return bool(self.lists or self.rooms or self.extensions) @staticmethod - def empty(next_pos: StreamToken) -> "SlidingSyncResult": + def empty(next_pos: SlidingSyncStreamToken) -> "SlidingSyncResult": "Return a new empty result" return SlidingSyncResult( next_pos=next_pos, -- cgit 1.5.1 From bdf37ad4c4d66c7a2ca69a29542b01e0856cff48 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 24 Jul 2024 15:21:56 +0100 Subject: Sliding Sync: ensure bump stamp ignores backfilled events (#17478) Backfill events have a negative stream ordering, and so its not useful to use to compare with other (positive) stream orderings. Plus, the Rust SDK currently assumes `bump_stamp` is positive. --- changelog.d/17478.misc | 1 + synapse/handlers/sliding_sync.py | 10 +++- tests/rest/client/test_sync.py | 122 ++++++++++++++++++++++++++++++++++++++- 3 files changed, 130 insertions(+), 3 deletions(-) create mode 100644 changelog.d/17478.misc (limited to 'synapse') diff --git a/changelog.d/17478.misc b/changelog.d/17478.misc new file mode 100644 index 0000000000..5406c82742 --- /dev/null +++ b/changelog.d/17478.misc @@ -0,0 +1 @@ +Ensure we don't send down negative `bump_stamp` in experimental sliding sync endpoint. diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 36665db8e1..f1f6f30b95 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -1758,8 +1758,14 @@ class SlidingSyncHandler: bump_stamp = room_membership_for_user_at_to_token.event_pos.stream # But if we found a bump event, use that instead if last_bump_event_result is not None: - _, bump_event_pos = last_bump_event_result - bump_stamp = bump_event_pos.stream + _, new_bump_event_pos = last_bump_event_result + + # If we've just joined a remote room, then the last bump event may + # have been backfilled (and so have a negative stream ordering). + # These negative stream orderings can't sensibly be compared, so + # instead we use the membership event position. + if new_bump_event_pos.stream > 0: + bump_stamp = new_bump_event_pos.stream return SlidingSyncResult.RoomResult( name=room_name, diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index 65c5f8ccae..6c73f4ec33 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -37,6 +37,7 @@ from synapse.api.constants import ( ReceiptTypes, RelationTypes, ) +from synapse.api.room_versions import RoomVersions from synapse.events import EventBase from synapse.handlers.sliding_sync import StateValues from synapse.rest.client import ( @@ -65,7 +66,7 @@ from tests.federation.transport.test_knocking import ( KnockingStrippedStateEventHelperMixin, ) from tests.server import FakeChannel, TimedOutException -from tests.test_utils.event_injection import mark_event_as_partial_state +from tests.test_utils.event_injection import create_event, mark_event_as_partial_state from tests.unittest import skip_unless logger = logging.getLogger(__name__) @@ -2793,6 +2794,125 @@ class SlidingSyncTestCase(SlidingSyncBase): channel.json_body["rooms"][room_id2], ) + def test_rooms_bump_stamp_backfill(self) -> None: + """ + Test that `bump_stamp` ignores backfilled events, i.e. events with a + negative stream ordering. + """ + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + # Create a remote room + creator = "@user:other" + room_id = "!foo:other" + shared_kwargs = { + "room_id": room_id, + "room_version": "10", + } + + create_tuple = self.get_success( + create_event( + self.hs, + prev_event_ids=[], + type=EventTypes.Create, + state_key="", + sender=creator, + **shared_kwargs, + ) + ) + creator_tuple = self.get_success( + create_event( + self.hs, + prev_event_ids=[create_tuple[0].event_id], + auth_event_ids=[create_tuple[0].event_id], + type=EventTypes.Member, + state_key=creator, + content={"membership": Membership.JOIN}, + sender=creator, + **shared_kwargs, + ) + ) + # We add a message event as a valid "bump type" + msg_tuple = self.get_success( + create_event( + self.hs, + prev_event_ids=[creator_tuple[0].event_id], + auth_event_ids=[create_tuple[0].event_id], + type=EventTypes.Message, + content={"body": "foo", "msgtype": "m.text"}, + sender=creator, + **shared_kwargs, + ) + ) + invite_tuple = self.get_success( + create_event( + self.hs, + prev_event_ids=[msg_tuple[0].event_id], + auth_event_ids=[create_tuple[0].event_id, creator_tuple[0].event_id], + type=EventTypes.Member, + state_key=user1_id, + content={"membership": Membership.INVITE}, + sender=creator, + **shared_kwargs, + ) + ) + + remote_events_and_contexts = [ + create_tuple, + creator_tuple, + msg_tuple, + invite_tuple, + ] + + # Ensure the local HS knows the room version + self.get_success( + self.store.store_room(room_id, creator, False, RoomVersions.V10) + ) + + # Persist these events as backfilled events. + persistence = self.hs.get_storage_controllers().persistence + assert persistence is not None + + for event, context in remote_events_and_contexts: + self.get_success(persistence.persist_event(event, context, backfilled=True)) + + # Now we join the local user to the room + join_tuple = self.get_success( + create_event( + self.hs, + prev_event_ids=[invite_tuple[0].event_id], + auth_event_ids=[create_tuple[0].event_id, invite_tuple[0].event_id], + type=EventTypes.Member, + state_key=user1_id, + content={"membership": Membership.JOIN}, + sender=user1_id, + **shared_kwargs, + ) + ) + self.get_success(persistence.persist_event(*join_tuple)) + + # Doing an SS request should return a positive `bump_stamp`, even though + # the only event that matches the bump types has as negative stream + # ordering. + channel = self.make_request( + "POST", + self.sync_endpoint, + content={ + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 5, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + self.assertGreater(channel.json_body["rooms"][room_id]["bump_stamp"], 0) + def test_rooms_newly_joined_incremental_sync(self) -> None: """ Test that when we make an incremental sync with a `newly_joined` `rooms`, we are -- cgit 1.5.1