diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index 2b103ca6a8..b5ab0d8534 100644
--- a/synapse/rest/client/sync.py
+++ b/synapse/rest/client/sync.py
@@ -33,6 +33,7 @@ from synapse.events.utils import (
format_event_raw,
)
from synapse.handlers.presence import format_user_presence_state
+from synapse.handlers.sliding_sync import SlidingSyncConfig, SlidingSyncResult
from synapse.handlers.sync import (
ArchivedSyncResult,
InvitedSyncResult,
@@ -40,13 +41,22 @@ from synapse.handlers.sync import (
KnockedSyncResult,
SyncConfig,
SyncResult,
+ SyncVersion,
)
from synapse.http.server import HttpServer
-from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string
+from synapse.http.servlet import (
+ RestServlet,
+ parse_and_validate_json_object_from_request,
+ parse_boolean,
+ parse_integer,
+ parse_string,
+)
from synapse.http.site import SynapseRequest
from synapse.logging.opentracing import trace_with_opname
from synapse.types import JsonDict, Requester, StreamToken
+from synapse.types.rest.client import SlidingSyncBody
from synapse.util import json_decoder
+from synapse.util.caches.lrucache import LruCache
from ._base import client_patterns, set_timeline_upper_limit
@@ -110,6 +120,11 @@ class SyncRestServlet(RestServlet):
self._msc2654_enabled = hs.config.experimental.msc2654_enabled
self._msc3773_enabled = hs.config.experimental.msc3773_enabled
+ self._json_filter_cache: LruCache[str, bool] = LruCache(
+ max_size=1000,
+ cache_name="sync_valid_filter",
+ )
+
async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
# This will always be set by the time Twisted calls us.
assert request.args is not None
@@ -177,7 +192,13 @@ class SyncRestServlet(RestServlet):
filter_object = json_decoder.decode(filter_id)
except Exception:
raise SynapseError(400, "Invalid filter JSON", errcode=Codes.NOT_JSON)
- self.filtering.check_valid_filter(filter_object)
+
+ # We cache the validation, as this can get quite expensive if people use
+ # a literal json blob as a query param.
+ if not self._json_filter_cache.get(filter_id):
+ self.filtering.check_valid_filter(filter_object)
+ self._json_filter_cache[filter_id] = True
+
set_timeline_upper_limit(
filter_object, self.hs.config.server.filter_timeline_limit
)
@@ -197,7 +218,6 @@ class SyncRestServlet(RestServlet):
user=user,
filter_collection=filter_collection,
is_guest=requester.is_guest,
- request_key=request_key,
device_id=device_id,
)
@@ -220,6 +240,8 @@ class SyncRestServlet(RestServlet):
sync_result = await self.sync_handler.wait_for_sync_for_user(
requester,
sync_config,
+ SyncVersion.SYNC_V2,
+ request_key,
since_token=since_token,
timeout=timeout,
full_state=full_state,
@@ -553,5 +575,396 @@ class SyncRestServlet(RestServlet):
return result
+class SlidingSyncE2eeRestServlet(RestServlet):
+ """
+ API endpoint for MSC3575 Sliding Sync `/sync/e2ee`. This is being introduced as part
+ of Sliding Sync but doesn't have any sliding window component. It's just a way to
+ get E2EE events without having to sit through a big initial sync (`/sync` v2). And
+ we can avoid encryption events being backed up by the main sync response.
+
+ Having To-Device messages split out to this sync endpoint also helps when clients
+ need to have 2 or more sync streams open at a time, e.g a push notification process
+ and a main process. This can cause the two processes to race to fetch the To-Device
+ events, resulting in the need for complex synchronisation rules to ensure the token
+ is correctly and atomically exchanged between processes.
+
+ GET parameters::
+ timeout(int): How long to wait for new events in milliseconds.
+ since(batch_token): Batch token when asking for incremental deltas.
+
+ Response JSON::
+ {
+ "next_batch": // batch token for the next /sync
+ "to_device": {
+ // list of to-device events
+ "events": [
+ {
+ "content: { "algorithm": "m.olm.v1.curve25519-aes-sha2", "ciphertext": { ... }, "org.matrix.msgid": "abcd", "session_id": "abcd" },
+ "type": "m.room.encrypted",
+ "sender": "@alice:example.com",
+ }
+ // ...
+ ]
+ },
+ "device_lists": {
+ "changed": ["@alice:example.com"],
+ "left": ["@bob:example.com"]
+ },
+ "device_one_time_keys_count": {
+ "signed_curve25519": 50
+ },
+ "device_unused_fallback_key_types": [
+ "signed_curve25519"
+ ]
+ }
+ """
+
+ PATTERNS = client_patterns(
+ "/org.matrix.msc3575/sync/e2ee$", releases=[], v1=False, unstable=True
+ )
+
+ def __init__(self, hs: "HomeServer"):
+ super().__init__()
+ self.hs = hs
+ self.auth = hs.get_auth()
+ self.store = hs.get_datastores().main
+ self.sync_handler = hs.get_sync_handler()
+
+ # Filtering only matters for the `device_lists` because it requires a bunch of
+ # derived information from rooms (see how `_generate_sync_entry_for_rooms()`
+ # prepares a bunch of data for `_generate_sync_entry_for_device_list()`).
+ self.only_member_events_filter_collection = FilterCollection(
+ self.hs,
+ {
+ "room": {
+ # We only care about membership events for the `device_lists`.
+ # Membership will tell us whether a user has joined/left a room and
+ # if there are new devices to encrypt for.
+ "timeline": {
+ "types": ["m.room.member"],
+ },
+ "state": {
+ "types": ["m.room.member"],
+ },
+ # We don't want any extra account_data generated because it's not
+ # returned by this endpoint. This helps us avoid work in
+ # `_generate_sync_entry_for_rooms()`
+ "account_data": {
+ "not_types": ["*"],
+ },
+ # We don't want any extra ephemeral data generated because it's not
+ # returned by this endpoint. This helps us avoid work in
+ # `_generate_sync_entry_for_rooms()`
+ "ephemeral": {
+ "not_types": ["*"],
+ },
+ },
+ # We don't want any extra account_data generated because it's not
+ # returned by this endpoint. (This is just here for good measure)
+ "account_data": {
+ "not_types": ["*"],
+ },
+ # We don't want any extra presence data generated because it's not
+ # returned by this endpoint. (This is just here for good measure)
+ "presence": {
+ "not_types": ["*"],
+ },
+ },
+ )
+
+ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
+ requester = await self.auth.get_user_by_req(request, allow_guest=True)
+ user = requester.user
+ device_id = requester.device_id
+
+ timeout = parse_integer(request, "timeout", default=0)
+ since = parse_string(request, "since")
+
+ sync_config = SyncConfig(
+ user=user,
+ filter_collection=self.only_member_events_filter_collection,
+ is_guest=requester.is_guest,
+ device_id=device_id,
+ )
+
+ since_token = None
+ if since is not None:
+ since_token = await StreamToken.from_string(self.store, since)
+
+ # Request cache key
+ request_key = (
+ SyncVersion.E2EE_SYNC,
+ user,
+ timeout,
+ since,
+ )
+
+ # Gather data for the response
+ sync_result = await self.sync_handler.wait_for_sync_for_user(
+ requester,
+ sync_config,
+ SyncVersion.E2EE_SYNC,
+ request_key,
+ since_token=since_token,
+ timeout=timeout,
+ full_state=False,
+ )
+
+ # The client may have disconnected by now; don't bother to serialize the
+ # response if so.
+ if request._disconnected:
+ logger.info("Client has disconnected; not serializing response.")
+ return 200, {}
+
+ response: JsonDict = defaultdict(dict)
+ response["next_batch"] = await sync_result.next_batch.to_string(self.store)
+
+ if sync_result.to_device:
+ response["to_device"] = {"events": sync_result.to_device}
+
+ if sync_result.device_lists.changed:
+ response["device_lists"]["changed"] = list(sync_result.device_lists.changed)
+ if sync_result.device_lists.left:
+ response["device_lists"]["left"] = list(sync_result.device_lists.left)
+
+ # We always include this because https://github.com/vector-im/element-android/issues/3725
+ # The spec isn't terribly clear on when this can be omitted and how a client would tell
+ # the difference between "no keys present" and "nothing changed" in terms of whole field
+ # absent / individual key type entry absent
+ # Corresponding synapse issue: https://github.com/matrix-org/synapse/issues/10456
+ response["device_one_time_keys_count"] = sync_result.device_one_time_keys_count
+
+ # https://github.com/matrix-org/matrix-doc/blob/54255851f642f84a4f1aaf7bc063eebe3d76752b/proposals/2732-olm-fallback-keys.md
+ # states that this field should always be included, as long as the server supports the feature.
+ response["device_unused_fallback_key_types"] = (
+ sync_result.device_unused_fallback_key_types
+ )
+
+ return 200, response
+
+
+class SlidingSyncRestServlet(RestServlet):
+ """
+ API endpoint for MSC3575 Sliding Sync `/sync`. Allows for clients to request a
+ subset (sliding window) of rooms, state, and timeline events (just what they need)
+ in order to bootstrap quickly and subscribe to only what the client cares about.
+ Because the client can specify what it cares about, we can respond quickly and skip
+ all of the work we would normally have to do with a sync v2 response.
+
+ Request query parameters:
+ timeout: How long to wait for new events in milliseconds.
+ pos: Stream position token when asking for incremental deltas.
+
+ Request body::
+ {
+ // Sliding Window API
+ "lists": {
+ "foo-list": {
+ "ranges": [ [0, 99] ],
+ "sort": [ "by_notification_level", "by_recency", "by_name" ],
+ "required_state": [
+ ["m.room.join_rules", ""],
+ ["m.room.history_visibility", ""],
+ ["m.space.child", "*"]
+ ],
+ "timeline_limit": 10,
+ "filters": {
+ "is_dm": true
+ },
+ "bump_event_types": [ "m.room.message", "m.room.encrypted" ],
+ }
+ },
+ // Room Subscriptions API
+ "room_subscriptions": {
+ "!sub1:bar": {
+ "required_state": [ ["*","*"] ],
+ "timeline_limit": 10,
+ "include_old_rooms": {
+ "timeline_limit": 1,
+ "required_state": [ ["m.room.tombstone", ""], ["m.room.create", ""] ],
+ }
+ }
+ },
+ // Extensions API
+ "extensions": {}
+ }
+
+ Response JSON::
+ {
+ "next_pos": "s58_224_0_13_10_1_1_16_0_1",
+ "lists": {
+ "foo-list": {
+ "count": 1337,
+ "ops": [{
+ "op": "SYNC",
+ "range": [0, 99],
+ "room_ids": [
+ "!foo:bar",
+ // ... 99 more room IDs
+ ]
+ }]
+ }
+ },
+ // Aggregated rooms from lists and room subscriptions
+ "rooms": {
+ // Room from room subscription
+ "!sub1:bar": {
+ "name": "Alice and Bob",
+ "avatar": "mxc://...",
+ "initial": true,
+ "required_state": [
+ {"sender":"@alice:example.com","type":"m.room.create", "state_key":"", "content":{"creator":"@alice:example.com"}},
+ {"sender":"@alice:example.com","type":"m.room.join_rules", "state_key":"", "content":{"join_rule":"invite"}},
+ {"sender":"@alice:example.com","type":"m.room.history_visibility", "state_key":"", "content":{"history_visibility":"joined"}},
+ {"sender":"@alice:example.com","type":"m.room.member", "state_key":"@alice:example.com", "content":{"membership":"join"}}
+ ],
+ "timeline": [
+ {"sender":"@alice:example.com","type":"m.room.create", "state_key":"", "content":{"creator":"@alice:example.com"}},
+ {"sender":"@alice:example.com","type":"m.room.join_rules", "state_key":"", "content":{"join_rule":"invite"}},
+ {"sender":"@alice:example.com","type":"m.room.history_visibility", "state_key":"", "content":{"history_visibility":"joined"}},
+ {"sender":"@alice:example.com","type":"m.room.member", "state_key":"@alice:example.com", "content":{"membership":"join"}},
+ {"sender":"@alice:example.com","type":"m.room.message", "content":{"body":"A"}},
+ {"sender":"@alice:example.com","type":"m.room.message", "content":{"body":"B"}},
+ ],
+ "prev_batch": "t111_222_333",
+ "joined_count": 41,
+ "invited_count": 1,
+ "notification_count": 1,
+ "highlight_count": 0
+ },
+ // rooms from list
+ "!foo:bar": {
+ "name": "The calculated room name",
+ "avatar": "mxc://...",
+ "initial": true,
+ "required_state": [
+ {"sender":"@alice:example.com","type":"m.room.join_rules", "state_key":"", "content":{"join_rule":"invite"}},
+ {"sender":"@alice:example.com","type":"m.room.history_visibility", "state_key":"", "content":{"history_visibility":"joined"}},
+ {"sender":"@alice:example.com","type":"m.space.child", "state_key":"!foo:example.com", "content":{"via":["example.com"]}},
+ {"sender":"@alice:example.com","type":"m.space.child", "state_key":"!bar:example.com", "content":{"via":["example.com"]}},
+ {"sender":"@alice:example.com","type":"m.space.child", "state_key":"!baz:example.com", "content":{"via":["example.com"]}}
+ ],
+ "timeline": [
+ {"sender":"@alice:example.com","type":"m.room.join_rules", "state_key":"", "content":{"join_rule":"invite"}},
+ {"sender":"@alice:example.com","type":"m.room.message", "content":{"body":"A"}},
+ {"sender":"@alice:example.com","type":"m.room.message", "content":{"body":"B"}},
+ {"sender":"@alice:example.com","type":"m.room.message", "content":{"body":"C"}},
+ {"sender":"@alice:example.com","type":"m.room.message", "content":{"body":"D"}},
+ ],
+ "prev_batch": "t111_222_333",
+ "joined_count": 4,
+ "invited_count": 0,
+ "notification_count": 54,
+ "highlight_count": 3
+ },
+ // ... 99 more items
+ },
+ "extensions": {}
+ }
+ """
+
+ PATTERNS = client_patterns(
+ "/org.matrix.simplified_msc3575/sync$", releases=[], v1=False, unstable=True
+ )
+
+ def __init__(self, hs: "HomeServer"):
+ super().__init__()
+ self.auth = hs.get_auth()
+ self.store = hs.get_datastores().main
+ self.filtering = hs.get_filtering()
+ self.sliding_sync_handler = hs.get_sliding_sync_handler()
+
+ # TODO: Update this to `on_GET` once we figure out how we want to handle params
+ async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
+ requester = await self.auth.get_user_by_req(request, allow_guest=True)
+ user = requester.user
+ device_id = requester.device_id
+
+ timeout = parse_integer(request, "timeout", default=0)
+ # Position in the stream
+ from_token_string = parse_string(request, "pos")
+
+ from_token = None
+ if from_token_string is not None:
+ from_token = await StreamToken.from_string(self.store, from_token_string)
+
+ # TODO: We currently don't know whether we're going to use sticky params or
+ # maybe some filters like sync v2 where they are built up once and referenced
+ # by filter ID. For now, we will just prototype with always passing everything
+ # in.
+ body = parse_and_validate_json_object_from_request(request, SlidingSyncBody)
+ logger.info("Sliding sync request: %r", body)
+
+ sync_config = SlidingSyncConfig(
+ user=user,
+ device_id=device_id,
+ # FIXME: Currently, we're just manually copying the fields from the
+ # `SlidingSyncBody` into the config. How can we gurantee into the future
+ # that we don't forget any? I would like something more structured like
+ # `copy_attributes(from=body, to=config)`
+ lists=body.lists,
+ room_subscriptions=body.room_subscriptions,
+ extensions=body.extensions,
+ )
+
+ sliding_sync_results = await self.sliding_sync_handler.wait_for_sync_for_user(
+ requester,
+ sync_config,
+ from_token,
+ timeout,
+ )
+
+ # The client may have disconnected by now; don't bother to serialize the
+ # response if so.
+ if request._disconnected:
+ logger.info("Client has disconnected; not serializing response.")
+ return 200, {}
+
+ response_content = await self.encode_response(sliding_sync_results)
+
+ return 200, response_content
+
+ # TODO: Is there a better way to encode things?
+ async def encode_response(
+ self,
+ sliding_sync_result: SlidingSyncResult,
+ ) -> JsonDict:
+ response: JsonDict = defaultdict(dict)
+
+ response["next_pos"] = await sliding_sync_result.next_pos.to_string(self.store)
+ serialized_lists = self.encode_lists(sliding_sync_result.lists)
+ if serialized_lists:
+ response["lists"] = serialized_lists
+ response["rooms"] = {} # TODO: sliding_sync_result.rooms
+ response["extensions"] = {} # TODO: sliding_sync_result.extensions
+
+ return response
+
+ def encode_lists(
+ self, lists: Dict[str, SlidingSyncResult.SlidingWindowList]
+ ) -> JsonDict:
+ def encode_operation(
+ operation: SlidingSyncResult.SlidingWindowList.Operation,
+ ) -> JsonDict:
+ return {
+ "op": operation.op.value,
+ "range": operation.range,
+ "room_ids": operation.room_ids,
+ }
+
+ serialized_lists = {}
+ for list_key, list_result in lists.items():
+ serialized_lists[list_key] = {
+ "count": list_result.count,
+ "ops": [encode_operation(op) for op in list_result.ops],
+ }
+
+ return serialized_lists
+
+
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
SyncRestServlet(hs).register(http_server)
+
+ if hs.config.experimental.msc3575_enabled:
+ SlidingSyncRestServlet(hs).register(http_server)
+ SlidingSyncE2eeRestServlet(hs).register(http_server)
|