summary refs log tree commit diff
path: root/synapse/rest/client/sync.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/rest/client/sync.py')
-rw-r--r--synapse/rest/client/sync.py419
1 files changed, 416 insertions, 3 deletions
diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py

index 2b103ca6a8..b5ab0d8534 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py
@@ -33,6 +33,7 @@ from synapse.events.utils import ( format_event_raw, ) from synapse.handlers.presence import format_user_presence_state +from synapse.handlers.sliding_sync import SlidingSyncConfig, SlidingSyncResult from synapse.handlers.sync import ( ArchivedSyncResult, InvitedSyncResult, @@ -40,13 +41,22 @@ from synapse.handlers.sync import ( KnockedSyncResult, SyncConfig, SyncResult, + SyncVersion, ) from synapse.http.server import HttpServer -from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string +from synapse.http.servlet import ( + RestServlet, + parse_and_validate_json_object_from_request, + parse_boolean, + parse_integer, + parse_string, +) from synapse.http.site import SynapseRequest from synapse.logging.opentracing import trace_with_opname from synapse.types import JsonDict, Requester, StreamToken +from synapse.types.rest.client import SlidingSyncBody from synapse.util import json_decoder +from synapse.util.caches.lrucache import LruCache from ._base import client_patterns, set_timeline_upper_limit @@ -110,6 +120,11 @@ class SyncRestServlet(RestServlet): self._msc2654_enabled = hs.config.experimental.msc2654_enabled self._msc3773_enabled = hs.config.experimental.msc3773_enabled + self._json_filter_cache: LruCache[str, bool] = LruCache( + max_size=1000, + cache_name="sync_valid_filter", + ) + async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: # This will always be set by the time Twisted calls us. assert request.args is not None @@ -177,7 +192,13 @@ class SyncRestServlet(RestServlet): filter_object = json_decoder.decode(filter_id) except Exception: raise SynapseError(400, "Invalid filter JSON", errcode=Codes.NOT_JSON) - self.filtering.check_valid_filter(filter_object) + + # We cache the validation, as this can get quite expensive if people use + # a literal json blob as a query param. + if not self._json_filter_cache.get(filter_id): + self.filtering.check_valid_filter(filter_object) + self._json_filter_cache[filter_id] = True + set_timeline_upper_limit( filter_object, self.hs.config.server.filter_timeline_limit ) @@ -197,7 +218,6 @@ class SyncRestServlet(RestServlet): user=user, filter_collection=filter_collection, is_guest=requester.is_guest, - request_key=request_key, device_id=device_id, ) @@ -220,6 +240,8 @@ class SyncRestServlet(RestServlet): sync_result = await self.sync_handler.wait_for_sync_for_user( requester, sync_config, + SyncVersion.SYNC_V2, + request_key, since_token=since_token, timeout=timeout, full_state=full_state, @@ -553,5 +575,396 @@ class SyncRestServlet(RestServlet): return result +class SlidingSyncE2eeRestServlet(RestServlet): + """ + API endpoint for MSC3575 Sliding Sync `/sync/e2ee`. This is being introduced as part + of Sliding Sync but doesn't have any sliding window component. It's just a way to + get E2EE events without having to sit through a big initial sync (`/sync` v2). And + we can avoid encryption events being backed up by the main sync response. + + Having To-Device messages split out to this sync endpoint also helps when clients + need to have 2 or more sync streams open at a time, e.g a push notification process + and a main process. This can cause the two processes to race to fetch the To-Device + events, resulting in the need for complex synchronisation rules to ensure the token + is correctly and atomically exchanged between processes. + + GET parameters:: + timeout(int): How long to wait for new events in milliseconds. + since(batch_token): Batch token when asking for incremental deltas. + + Response JSON:: + { + "next_batch": // batch token for the next /sync + "to_device": { + // list of to-device events + "events": [ + { + "content: { "algorithm": "m.olm.v1.curve25519-aes-sha2", "ciphertext": { ... }, "org.matrix.msgid": "abcd", "session_id": "abcd" }, + "type": "m.room.encrypted", + "sender": "@alice:example.com", + } + // ... + ] + }, + "device_lists": { + "changed": ["@alice:example.com"], + "left": ["@bob:example.com"] + }, + "device_one_time_keys_count": { + "signed_curve25519": 50 + }, + "device_unused_fallback_key_types": [ + "signed_curve25519" + ] + } + """ + + PATTERNS = client_patterns( + "/org.matrix.msc3575/sync/e2ee$", releases=[], v1=False, unstable=True + ) + + def __init__(self, hs: "HomeServer"): + super().__init__() + self.hs = hs + self.auth = hs.get_auth() + self.store = hs.get_datastores().main + self.sync_handler = hs.get_sync_handler() + + # Filtering only matters for the `device_lists` because it requires a bunch of + # derived information from rooms (see how `_generate_sync_entry_for_rooms()` + # prepares a bunch of data for `_generate_sync_entry_for_device_list()`). + self.only_member_events_filter_collection = FilterCollection( + self.hs, + { + "room": { + # We only care about membership events for the `device_lists`. + # Membership will tell us whether a user has joined/left a room and + # if there are new devices to encrypt for. + "timeline": { + "types": ["m.room.member"], + }, + "state": { + "types": ["m.room.member"], + }, + # We don't want any extra account_data generated because it's not + # returned by this endpoint. This helps us avoid work in + # `_generate_sync_entry_for_rooms()` + "account_data": { + "not_types": ["*"], + }, + # We don't want any extra ephemeral data generated because it's not + # returned by this endpoint. This helps us avoid work in + # `_generate_sync_entry_for_rooms()` + "ephemeral": { + "not_types": ["*"], + }, + }, + # We don't want any extra account_data generated because it's not + # returned by this endpoint. (This is just here for good measure) + "account_data": { + "not_types": ["*"], + }, + # We don't want any extra presence data generated because it's not + # returned by this endpoint. (This is just here for good measure) + "presence": { + "not_types": ["*"], + }, + }, + ) + + async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request, allow_guest=True) + user = requester.user + device_id = requester.device_id + + timeout = parse_integer(request, "timeout", default=0) + since = parse_string(request, "since") + + sync_config = SyncConfig( + user=user, + filter_collection=self.only_member_events_filter_collection, + is_guest=requester.is_guest, + device_id=device_id, + ) + + since_token = None + if since is not None: + since_token = await StreamToken.from_string(self.store, since) + + # Request cache key + request_key = ( + SyncVersion.E2EE_SYNC, + user, + timeout, + since, + ) + + # Gather data for the response + sync_result = await self.sync_handler.wait_for_sync_for_user( + requester, + sync_config, + SyncVersion.E2EE_SYNC, + request_key, + since_token=since_token, + timeout=timeout, + full_state=False, + ) + + # The client may have disconnected by now; don't bother to serialize the + # response if so. + if request._disconnected: + logger.info("Client has disconnected; not serializing response.") + return 200, {} + + response: JsonDict = defaultdict(dict) + response["next_batch"] = await sync_result.next_batch.to_string(self.store) + + if sync_result.to_device: + response["to_device"] = {"events": sync_result.to_device} + + if sync_result.device_lists.changed: + response["device_lists"]["changed"] = list(sync_result.device_lists.changed) + if sync_result.device_lists.left: + response["device_lists"]["left"] = list(sync_result.device_lists.left) + + # We always include this because https://github.com/vector-im/element-android/issues/3725 + # The spec isn't terribly clear on when this can be omitted and how a client would tell + # the difference between "no keys present" and "nothing changed" in terms of whole field + # absent / individual key type entry absent + # Corresponding synapse issue: https://github.com/matrix-org/synapse/issues/10456 + response["device_one_time_keys_count"] = sync_result.device_one_time_keys_count + + # https://github.com/matrix-org/matrix-doc/blob/54255851f642f84a4f1aaf7bc063eebe3d76752b/proposals/2732-olm-fallback-keys.md + # states that this field should always be included, as long as the server supports the feature. + response["device_unused_fallback_key_types"] = ( + sync_result.device_unused_fallback_key_types + ) + + return 200, response + + +class SlidingSyncRestServlet(RestServlet): + """ + API endpoint for MSC3575 Sliding Sync `/sync`. Allows for clients to request a + subset (sliding window) of rooms, state, and timeline events (just what they need) + in order to bootstrap quickly and subscribe to only what the client cares about. + Because the client can specify what it cares about, we can respond quickly and skip + all of the work we would normally have to do with a sync v2 response. + + Request query parameters: + timeout: How long to wait for new events in milliseconds. + pos: Stream position token when asking for incremental deltas. + + Request body:: + { + // Sliding Window API + "lists": { + "foo-list": { + "ranges": [ [0, 99] ], + "sort": [ "by_notification_level", "by_recency", "by_name" ], + "required_state": [ + ["m.room.join_rules", ""], + ["m.room.history_visibility", ""], + ["m.space.child", "*"] + ], + "timeline_limit": 10, + "filters": { + "is_dm": true + }, + "bump_event_types": [ "m.room.message", "m.room.encrypted" ], + } + }, + // Room Subscriptions API + "room_subscriptions": { + "!sub1:bar": { + "required_state": [ ["*","*"] ], + "timeline_limit": 10, + "include_old_rooms": { + "timeline_limit": 1, + "required_state": [ ["m.room.tombstone", ""], ["m.room.create", ""] ], + } + } + }, + // Extensions API + "extensions": {} + } + + Response JSON:: + { + "next_pos": "s58_224_0_13_10_1_1_16_0_1", + "lists": { + "foo-list": { + "count": 1337, + "ops": [{ + "op": "SYNC", + "range": [0, 99], + "room_ids": [ + "!foo:bar", + // ... 99 more room IDs + ] + }] + } + }, + // Aggregated rooms from lists and room subscriptions + "rooms": { + // Room from room subscription + "!sub1:bar": { + "name": "Alice and Bob", + "avatar": "mxc://...", + "initial": true, + "required_state": [ + {"sender":"@alice:example.com","type":"m.room.create", "state_key":"", "content":{"creator":"@alice:example.com"}}, + {"sender":"@alice:example.com","type":"m.room.join_rules", "state_key":"", "content":{"join_rule":"invite"}}, + {"sender":"@alice:example.com","type":"m.room.history_visibility", "state_key":"", "content":{"history_visibility":"joined"}}, + {"sender":"@alice:example.com","type":"m.room.member", "state_key":"@alice:example.com", "content":{"membership":"join"}} + ], + "timeline": [ + {"sender":"@alice:example.com","type":"m.room.create", "state_key":"", "content":{"creator":"@alice:example.com"}}, + {"sender":"@alice:example.com","type":"m.room.join_rules", "state_key":"", "content":{"join_rule":"invite"}}, + {"sender":"@alice:example.com","type":"m.room.history_visibility", "state_key":"", "content":{"history_visibility":"joined"}}, + {"sender":"@alice:example.com","type":"m.room.member", "state_key":"@alice:example.com", "content":{"membership":"join"}}, + {"sender":"@alice:example.com","type":"m.room.message", "content":{"body":"A"}}, + {"sender":"@alice:example.com","type":"m.room.message", "content":{"body":"B"}}, + ], + "prev_batch": "t111_222_333", + "joined_count": 41, + "invited_count": 1, + "notification_count": 1, + "highlight_count": 0 + }, + // rooms from list + "!foo:bar": { + "name": "The calculated room name", + "avatar": "mxc://...", + "initial": true, + "required_state": [ + {"sender":"@alice:example.com","type":"m.room.join_rules", "state_key":"", "content":{"join_rule":"invite"}}, + {"sender":"@alice:example.com","type":"m.room.history_visibility", "state_key":"", "content":{"history_visibility":"joined"}}, + {"sender":"@alice:example.com","type":"m.space.child", "state_key":"!foo:example.com", "content":{"via":["example.com"]}}, + {"sender":"@alice:example.com","type":"m.space.child", "state_key":"!bar:example.com", "content":{"via":["example.com"]}}, + {"sender":"@alice:example.com","type":"m.space.child", "state_key":"!baz:example.com", "content":{"via":["example.com"]}} + ], + "timeline": [ + {"sender":"@alice:example.com","type":"m.room.join_rules", "state_key":"", "content":{"join_rule":"invite"}}, + {"sender":"@alice:example.com","type":"m.room.message", "content":{"body":"A"}}, + {"sender":"@alice:example.com","type":"m.room.message", "content":{"body":"B"}}, + {"sender":"@alice:example.com","type":"m.room.message", "content":{"body":"C"}}, + {"sender":"@alice:example.com","type":"m.room.message", "content":{"body":"D"}}, + ], + "prev_batch": "t111_222_333", + "joined_count": 4, + "invited_count": 0, + "notification_count": 54, + "highlight_count": 3 + }, + // ... 99 more items + }, + "extensions": {} + } + """ + + PATTERNS = client_patterns( + "/org.matrix.simplified_msc3575/sync$", releases=[], v1=False, unstable=True + ) + + def __init__(self, hs: "HomeServer"): + super().__init__() + self.auth = hs.get_auth() + self.store = hs.get_datastores().main + self.filtering = hs.get_filtering() + self.sliding_sync_handler = hs.get_sliding_sync_handler() + + # TODO: Update this to `on_GET` once we figure out how we want to handle params + async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request, allow_guest=True) + user = requester.user + device_id = requester.device_id + + timeout = parse_integer(request, "timeout", default=0) + # Position in the stream + from_token_string = parse_string(request, "pos") + + from_token = None + if from_token_string is not None: + from_token = await StreamToken.from_string(self.store, from_token_string) + + # TODO: We currently don't know whether we're going to use sticky params or + # maybe some filters like sync v2 where they are built up once and referenced + # by filter ID. For now, we will just prototype with always passing everything + # in. + body = parse_and_validate_json_object_from_request(request, SlidingSyncBody) + logger.info("Sliding sync request: %r", body) + + sync_config = SlidingSyncConfig( + user=user, + device_id=device_id, + # FIXME: Currently, we're just manually copying the fields from the + # `SlidingSyncBody` into the config. How can we gurantee into the future + # that we don't forget any? I would like something more structured like + # `copy_attributes(from=body, to=config)` + lists=body.lists, + room_subscriptions=body.room_subscriptions, + extensions=body.extensions, + ) + + sliding_sync_results = await self.sliding_sync_handler.wait_for_sync_for_user( + requester, + sync_config, + from_token, + timeout, + ) + + # The client may have disconnected by now; don't bother to serialize the + # response if so. + if request._disconnected: + logger.info("Client has disconnected; not serializing response.") + return 200, {} + + response_content = await self.encode_response(sliding_sync_results) + + return 200, response_content + + # TODO: Is there a better way to encode things? + async def encode_response( + self, + sliding_sync_result: SlidingSyncResult, + ) -> JsonDict: + response: JsonDict = defaultdict(dict) + + response["next_pos"] = await sliding_sync_result.next_pos.to_string(self.store) + serialized_lists = self.encode_lists(sliding_sync_result.lists) + if serialized_lists: + response["lists"] = serialized_lists + response["rooms"] = {} # TODO: sliding_sync_result.rooms + response["extensions"] = {} # TODO: sliding_sync_result.extensions + + return response + + def encode_lists( + self, lists: Dict[str, SlidingSyncResult.SlidingWindowList] + ) -> JsonDict: + def encode_operation( + operation: SlidingSyncResult.SlidingWindowList.Operation, + ) -> JsonDict: + return { + "op": operation.op.value, + "range": operation.range, + "room_ids": operation.room_ids, + } + + serialized_lists = {} + for list_key, list_result in lists.items(): + serialized_lists[list_key] = { + "count": list_result.count, + "ops": [encode_operation(op) for op in list_result.ops], + } + + return serialized_lists + + def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: SyncRestServlet(hs).register(http_server) + + if hs.config.experimental.msc3575_enabled: + SlidingSyncRestServlet(hs).register(http_server) + SlidingSyncE2eeRestServlet(hs).register(http_server)