summary refs log tree commit diff
path: root/synapse/rest/client/sync.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/rest/client/sync.py')
-rw-r--r--synapse/rest/client/sync.py532
1 files changed, 532 insertions, 0 deletions
diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
new file mode 100644
index 0000000000..e18f4d01b3
--- /dev/null
+++ b/synapse/rest/client/sync.py
@@ -0,0 +1,532 @@
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import itertools
+import logging
+from collections import defaultdict
+from typing import TYPE_CHECKING, Any, Callable, Dict, List, Tuple
+
+from synapse.api.constants import Membership, PresenceState
+from synapse.api.errors import Codes, StoreError, SynapseError
+from synapse.api.filtering import DEFAULT_FILTER_COLLECTION, FilterCollection
+from synapse.events.utils import (
+    format_event_for_client_v2_without_room_id,
+    format_event_raw,
+)
+from synapse.handlers.presence import format_user_presence_state
+from synapse.handlers.sync import KnockedSyncResult, SyncConfig
+from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string
+from synapse.http.site import SynapseRequest
+from synapse.types import JsonDict, StreamToken
+from synapse.util import json_decoder
+
+from ._base import client_patterns, set_timeline_upper_limit
+
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
+logger = logging.getLogger(__name__)
+
+
+class SyncRestServlet(RestServlet):
+    """
+
+    GET parameters::
+        timeout(int): How long to wait for new events in milliseconds.
+        since(batch_token): Batch token when asking for incremental deltas.
+        set_presence(str): What state the device presence should be set to.
+            default is "online".
+        filter(filter_id): A filter to apply to the events returned.
+
+    Response JSON::
+        {
+          "next_batch": // batch token for the next /sync
+          "presence": // presence data for the user.
+          "rooms": {
+            "join": { // Joined rooms being updated.
+              "${room_id}": { // Id of the room being updated
+                "event_map": // Map of EventID -> event JSON.
+                "timeline": { // The recent events in the room if gap is "true"
+                  "limited": // Was the per-room event limit exceeded?
+                             // otherwise the next events in the room.
+                  "events": [] // list of EventIDs in the "event_map".
+                  "prev_batch": // back token for getting previous events.
+                }
+                "state": {"events": []} // list of EventIDs updating the
+                                        // current state to be what it should
+                                        // be at the end of the batch.
+                "ephemeral": {"events": []} // list of event objects
+              }
+            },
+            "invite": {}, // Invited rooms being updated.
+            "leave": {} // Archived rooms being updated.
+          }
+        }
+    """
+
+    PATTERNS = client_patterns("/sync$")
+    ALLOWED_PRESENCE = {"online", "offline", "unavailable"}
+
+    def __init__(self, hs: "HomeServer"):
+        super().__init__()
+        self.hs = hs
+        self.auth = hs.get_auth()
+        self.store = hs.get_datastore()
+        self.sync_handler = hs.get_sync_handler()
+        self.clock = hs.get_clock()
+        self.filtering = hs.get_filtering()
+        self.presence_handler = hs.get_presence_handler()
+        self._server_notices_sender = hs.get_server_notices_sender()
+        self._event_serializer = hs.get_event_client_serializer()
+
+    async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
+        # This will always be set by the time Twisted calls us.
+        assert request.args is not None
+
+        if b"from" in request.args:
+            # /events used to use 'from', but /sync uses 'since'.
+            # Lets be helpful and whine if we see a 'from'.
+            raise SynapseError(
+                400, "'from' is not a valid query parameter. Did you mean 'since'?"
+            )
+
+        requester = await self.auth.get_user_by_req(request, allow_guest=True)
+        user = requester.user
+        device_id = requester.device_id
+
+        timeout = parse_integer(request, "timeout", default=0)
+        since = parse_string(request, "since")
+        set_presence = parse_string(
+            request,
+            "set_presence",
+            default="online",
+            allowed_values=self.ALLOWED_PRESENCE,
+        )
+        filter_id = parse_string(request, "filter")
+        full_state = parse_boolean(request, "full_state", default=False)
+
+        logger.debug(
+            "/sync: user=%r, timeout=%r, since=%r, "
+            "set_presence=%r, filter_id=%r, device_id=%r",
+            user,
+            timeout,
+            since,
+            set_presence,
+            filter_id,
+            device_id,
+        )
+
+        request_key = (user, timeout, since, filter_id, full_state, device_id)
+
+        if filter_id is None:
+            filter_collection = DEFAULT_FILTER_COLLECTION
+        elif filter_id.startswith("{"):
+            try:
+                filter_object = json_decoder.decode(filter_id)
+                set_timeline_upper_limit(
+                    filter_object, self.hs.config.filter_timeline_limit
+                )
+            except Exception:
+                raise SynapseError(400, "Invalid filter JSON")
+            self.filtering.check_valid_filter(filter_object)
+            filter_collection = FilterCollection(filter_object)
+        else:
+            try:
+                filter_collection = await self.filtering.get_user_filter(
+                    user.localpart, filter_id
+                )
+            except StoreError as err:
+                if err.code != 404:
+                    raise
+                # fix up the description and errcode to be more useful
+                raise SynapseError(400, "No such filter", errcode=Codes.INVALID_PARAM)
+
+        sync_config = SyncConfig(
+            user=user,
+            filter_collection=filter_collection,
+            is_guest=requester.is_guest,
+            request_key=request_key,
+            device_id=device_id,
+        )
+
+        since_token = None
+        if since is not None:
+            since_token = await StreamToken.from_string(self.store, since)
+
+        # send any outstanding server notices to the user.
+        await self._server_notices_sender.on_user_syncing(user.to_string())
+
+        affect_presence = set_presence != PresenceState.OFFLINE
+
+        if affect_presence:
+            await self.presence_handler.set_state(
+                user, {"presence": set_presence}, True
+            )
+
+        context = await self.presence_handler.user_syncing(
+            user.to_string(), affect_presence=affect_presence
+        )
+        with context:
+            sync_result = await self.sync_handler.wait_for_sync_for_user(
+                requester,
+                sync_config,
+                since_token=since_token,
+                timeout=timeout,
+                full_state=full_state,
+            )
+
+        # the client may have disconnected by now; don't bother to serialize the
+        # response if so.
+        if request._disconnected:
+            logger.info("Client has disconnected; not serializing response.")
+            return 200, {}
+
+        time_now = self.clock.time_msec()
+        response_content = await self.encode_response(
+            time_now, sync_result, requester.access_token_id, filter_collection
+        )
+
+        logger.debug("Event formatting complete")
+        return 200, response_content
+
+    async def encode_response(self, time_now, sync_result, access_token_id, filter):
+        logger.debug("Formatting events in sync response")
+        if filter.event_format == "client":
+            event_formatter = format_event_for_client_v2_without_room_id
+        elif filter.event_format == "federation":
+            event_formatter = format_event_raw
+        else:
+            raise Exception("Unknown event format %s" % (filter.event_format,))
+
+        joined = await self.encode_joined(
+            sync_result.joined,
+            time_now,
+            access_token_id,
+            filter.event_fields,
+            event_formatter,
+        )
+
+        invited = await self.encode_invited(
+            sync_result.invited, time_now, access_token_id, event_formatter
+        )
+
+        knocked = await self.encode_knocked(
+            sync_result.knocked, time_now, access_token_id, event_formatter
+        )
+
+        archived = await self.encode_archived(
+            sync_result.archived,
+            time_now,
+            access_token_id,
+            filter.event_fields,
+            event_formatter,
+        )
+
+        logger.debug("building sync response dict")
+
+        response: dict = defaultdict(dict)
+        response["next_batch"] = await sync_result.next_batch.to_string(self.store)
+
+        if sync_result.account_data:
+            response["account_data"] = {"events": sync_result.account_data}
+        if sync_result.presence:
+            response["presence"] = SyncRestServlet.encode_presence(
+                sync_result.presence, time_now
+            )
+
+        if sync_result.to_device:
+            response["to_device"] = {"events": sync_result.to_device}
+
+        if sync_result.device_lists.changed:
+            response["device_lists"]["changed"] = list(sync_result.device_lists.changed)
+        if sync_result.device_lists.left:
+            response["device_lists"]["left"] = list(sync_result.device_lists.left)
+
+        # We always include this because https://github.com/vector-im/element-android/issues/3725
+        # The spec isn't terribly clear on when this can be omitted and how a client would tell
+        # the difference between "no keys present" and "nothing changed" in terms of whole field
+        # absent / individual key type entry absent
+        # Corresponding synapse issue: https://github.com/matrix-org/synapse/issues/10456
+        response["device_one_time_keys_count"] = sync_result.device_one_time_keys_count
+
+        # https://github.com/matrix-org/matrix-doc/blob/54255851f642f84a4f1aaf7bc063eebe3d76752b/proposals/2732-olm-fallback-keys.md
+        # states that this field should always be included, as long as the server supports the feature.
+        response[
+            "org.matrix.msc2732.device_unused_fallback_key_types"
+        ] = sync_result.device_unused_fallback_key_types
+
+        if joined:
+            response["rooms"][Membership.JOIN] = joined
+        if invited:
+            response["rooms"][Membership.INVITE] = invited
+        if knocked:
+            response["rooms"][Membership.KNOCK] = knocked
+        if archived:
+            response["rooms"][Membership.LEAVE] = archived
+
+        if sync_result.groups.join:
+            response["groups"][Membership.JOIN] = sync_result.groups.join
+        if sync_result.groups.invite:
+            response["groups"][Membership.INVITE] = sync_result.groups.invite
+        if sync_result.groups.leave:
+            response["groups"][Membership.LEAVE] = sync_result.groups.leave
+
+        return response
+
+    @staticmethod
+    def encode_presence(events, time_now):
+        return {
+            "events": [
+                {
+                    "type": "m.presence",
+                    "sender": event.user_id,
+                    "content": format_user_presence_state(
+                        event, time_now, include_user_id=False
+                    ),
+                }
+                for event in events
+            ]
+        }
+
+    async def encode_joined(
+        self, rooms, time_now, token_id, event_fields, event_formatter
+    ):
+        """
+        Encode the joined rooms in a sync result
+
+        Args:
+            rooms(list[synapse.handlers.sync.JoinedSyncResult]): list of sync
+                results for rooms this user is joined to
+            time_now(int): current time - used as a baseline for age
+                calculations
+            token_id(int): ID of the user's auth token - used for namespacing
+                of transaction IDs
+            event_fields(list<str>): List of event fields to include. If empty,
+                all fields will be returned.
+            event_formatter (func[dict]): function to convert from federation format
+                to client format
+        Returns:
+            dict[str, dict[str, object]]: the joined rooms list, in our
+                response format
+        """
+        joined = {}
+        for room in rooms:
+            joined[room.room_id] = await self.encode_room(
+                room,
+                time_now,
+                token_id,
+                joined=True,
+                only_fields=event_fields,
+                event_formatter=event_formatter,
+            )
+
+        return joined
+
+    async def encode_invited(self, rooms, time_now, token_id, event_formatter):
+        """
+        Encode the invited rooms in a sync result
+
+        Args:
+            rooms(list[synapse.handlers.sync.InvitedSyncResult]): list of
+                sync results for rooms this user is invited to
+            time_now(int): current time - used as a baseline for age
+                calculations
+            token_id(int): ID of the user's auth token - used for namespacing
+                of transaction IDs
+            event_formatter (func[dict]): function to convert from federation format
+                to client format
+
+        Returns:
+            dict[str, dict[str, object]]: the invited rooms list, in our
+                response format
+        """
+        invited = {}
+        for room in rooms:
+            invite = await self._event_serializer.serialize_event(
+                room.invite,
+                time_now,
+                token_id=token_id,
+                event_format=event_formatter,
+                include_stripped_room_state=True,
+            )
+            unsigned = dict(invite.get("unsigned", {}))
+            invite["unsigned"] = unsigned
+            invited_state = list(unsigned.pop("invite_room_state", []))
+            invited_state.append(invite)
+            invited[room.room_id] = {"invite_state": {"events": invited_state}}
+
+        return invited
+
+    async def encode_knocked(
+        self,
+        rooms: List[KnockedSyncResult],
+        time_now: int,
+        token_id: int,
+        event_formatter: Callable[[Dict], Dict],
+    ) -> Dict[str, Dict[str, Any]]:
+        """
+        Encode the rooms we've knocked on in a sync result.
+
+        Args:
+            rooms: list of sync results for rooms this user is knocking on
+            time_now: current time - used as a baseline for age calculations
+            token_id: ID of the user's auth token - used for namespacing of transaction IDs
+            event_formatter: function to convert from federation format to client format
+
+        Returns:
+            The list of rooms the user has knocked on, in our response format.
+        """
+        knocked = {}
+        for room in rooms:
+            knock = await self._event_serializer.serialize_event(
+                room.knock,
+                time_now,
+                token_id=token_id,
+                event_format=event_formatter,
+                include_stripped_room_state=True,
+            )
+
+            # Extract the `unsigned` key from the knock event.
+            # This is where we (cheekily) store the knock state events
+            unsigned = knock.setdefault("unsigned", {})
+
+            # Duplicate the dictionary in order to avoid modifying the original
+            unsigned = dict(unsigned)
+
+            # Extract the stripped room state from the unsigned dict
+            # This is for clients to get a little bit of information about
+            # the room they've knocked on, without revealing any sensitive information
+            knocked_state = list(unsigned.pop("knock_room_state", []))
+
+            # Append the actual knock membership event itself as well. This provides
+            # the client with:
+            #
+            # * A knock state event that they can use for easier internal tracking
+            # * The rough timestamp of when the knock occurred contained within the event
+            knocked_state.append(knock)
+
+            # Build the `knock_state` dictionary, which will contain the state of the
+            # room that the client has knocked on
+            knocked[room.room_id] = {"knock_state": {"events": knocked_state}}
+
+        return knocked
+
+    async def encode_archived(
+        self, rooms, time_now, token_id, event_fields, event_formatter
+    ):
+        """
+        Encode the archived rooms in a sync result
+
+        Args:
+            rooms (list[synapse.handlers.sync.ArchivedSyncResult]): list of
+                sync results for rooms this user is joined to
+            time_now(int): current time - used as a baseline for age
+                calculations
+            token_id(int): ID of the user's auth token - used for namespacing
+                of transaction IDs
+            event_fields(list<str>): List of event fields to include. If empty,
+                all fields will be returned.
+            event_formatter (func[dict]): function to convert from federation format
+                to client format
+        Returns:
+            dict[str, dict[str, object]]: The invited rooms list, in our
+                response format
+        """
+        joined = {}
+        for room in rooms:
+            joined[room.room_id] = await self.encode_room(
+                room,
+                time_now,
+                token_id,
+                joined=False,
+                only_fields=event_fields,
+                event_formatter=event_formatter,
+            )
+
+        return joined
+
+    async def encode_room(
+        self, room, time_now, token_id, joined, only_fields, event_formatter
+    ):
+        """
+        Args:
+            room (JoinedSyncResult|ArchivedSyncResult): sync result for a
+                single room
+            time_now (int): current time - used as a baseline for age
+                calculations
+            token_id (int): ID of the user's auth token - used for namespacing
+                of transaction IDs
+            joined (bool): True if the user is joined to this room - will mean
+                we handle ephemeral events
+            only_fields(list<str>): Optional. The list of event fields to include.
+            event_formatter (func[dict]): function to convert from federation format
+                to client format
+        Returns:
+            dict[str, object]: the room, encoded in our response format
+        """
+
+        def serialize(events):
+            return self._event_serializer.serialize_events(
+                events,
+                time_now=time_now,
+                # We don't bundle "live" events, as otherwise clients
+                # will end up double counting annotations.
+                bundle_aggregations=False,
+                token_id=token_id,
+                event_format=event_formatter,
+                only_event_fields=only_fields,
+            )
+
+        state_dict = room.state
+        timeline_events = room.timeline.events
+
+        state_events = state_dict.values()
+
+        for event in itertools.chain(state_events, timeline_events):
+            # We've had bug reports that events were coming down under the
+            # wrong room.
+            if event.room_id != room.room_id:
+                logger.warning(
+                    "Event %r is under room %r instead of %r",
+                    event.event_id,
+                    room.room_id,
+                    event.room_id,
+                )
+
+        serialized_state = await serialize(state_events)
+        serialized_timeline = await serialize(timeline_events)
+
+        account_data = room.account_data
+
+        result = {
+            "timeline": {
+                "events": serialized_timeline,
+                "prev_batch": await room.timeline.prev_batch.to_string(self.store),
+                "limited": room.timeline.limited,
+            },
+            "state": {"events": serialized_state},
+            "account_data": {"events": account_data},
+        }
+
+        if joined:
+            ephemeral_events = room.ephemeral
+            result["ephemeral"] = {"events": ephemeral_events}
+            result["unread_notifications"] = room.unread_notifications
+            result["summary"] = room.summary
+            result["org.matrix.msc2654.unread_count"] = room.unread_count
+
+        return result
+
+
+def register_servlets(hs, http_server):
+    SyncRestServlet(hs).register(http_server)