diff options
Diffstat (limited to 'synapse/rest/client/v2_alpha/sync.py')
-rw-r--r-- | synapse/rest/client/v2_alpha/sync.py | 170 |
1 files changed, 92 insertions, 78 deletions
diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index cac28b47b6..fffecb24f5 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -16,7 +16,7 @@ from twisted.internet import defer from synapse.http.servlet import ( - RestServlet, parse_string, parse_integer, parse_boolean + RestServlet, parse_string, parse_integer ) from synapse.handlers.sync import SyncConfig from synapse.types import StreamToken @@ -26,6 +26,7 @@ from synapse.events.utils import ( from synapse.api.filtering import Filter from ._base import client_v2_pattern +import copy import logging logger = logging.getLogger(__name__) @@ -36,51 +37,44 @@ class SyncRestServlet(RestServlet): GET parameters:: timeout(int): How long to wait for new events in milliseconds. - limit(int): Maxiumum number of events per room to return. - gap(bool): Create gaps the message history if limit is exceeded to - ensure that the client has the most recent messages. Defaults to - "true". - sort(str,str): tuple of sort key (e.g. "timeline") and direction - (e.g. "asc", "desc"). Defaults to "timeline,asc". since(batch_token): Batch token when asking for incremental deltas. set_presence(str): What state the device presence should be set to. default is "online". - backfill(bool): Should the HS request message history from other - servers. This may take a long time making it unsuitable for clients - expecting a prompt response. Defaults to "true". filter(filter_id): A filter to apply to the events returned. - filter_*: Filter override parameters. Response JSON:: { - "next_batch": // batch token for the next /sync - "private_user_data": // private events for this user. - "public_user_data": // public events for all users including the - // public events for this user. - "rooms": [{ // List of rooms with updates. - "room_id": // Id of the room being updated - "limited": // Was the per-room event limit exceeded? - "published": // Is the room published by our HS? + "next_batch": // batch token for the next /sync + "presence": // presence data for the user. + "rooms": { + "joined": { // Joined rooms being updated. + "${room_id}": { // Id of the room being updated "event_map": // Map of EventID -> event JSON. - "events": { // The recent events in the room if gap is "true" - // otherwise the next events in the room. - "batch": [] // list of EventIDs in the "event_map". - "prev_batch": // back token for getting previous events. + "timeline": { // The recent events in the room if gap is "true" + "limited": // Was the per-room event limit exceeded? + // otherwise the next events in the room. + "events": [] // list of EventIDs in the "event_map". + "prev_batch": // back token for getting previous events. } - "state": [] // list of EventIDs updating the current state to - // be what it should be at the end of the batch. - "ephemeral": [] - }] + "state": {"events": []} // list of EventIDs updating the + // current state to be what it should + // be at the end of the batch. + "ephemeral": {"events": []} // list of event objects + } + }, + "invited": {}, // Invited rooms being updated. + "archived": {} // Archived rooms being updated. + } } """ PATTERN = client_v2_pattern("/sync$") - ALLOWED_SORT = set(["timeline,asc", "timeline,desc"]) - ALLOWED_PRESENCE = set(["online", "offline", "idle"]) + ALLOWED_PRESENCE = set(["online", "offline"]) def __init__(self, hs): super(SyncRestServlet, self).__init__() self.auth = hs.get_auth() + self.event_stream_handler = hs.get_handlers().event_stream_handler self.sync_handler = hs.get_handlers().sync_handler self.clock = hs.get_clock() self.filtering = hs.get_filtering() @@ -90,45 +84,29 @@ class SyncRestServlet(RestServlet): user, token_id = yield self.auth.get_user_by_req(request) timeout = parse_integer(request, "timeout", default=0) - limit = parse_integer(request, "limit", required=True) - gap = parse_boolean(request, "gap", default=True) - sort = parse_string( - request, "sort", default="timeline,asc", - allowed_values=self.ALLOWED_SORT - ) since = parse_string(request, "since") set_presence = parse_string( request, "set_presence", default="online", allowed_values=self.ALLOWED_PRESENCE ) - backfill = parse_boolean(request, "backfill", default=False) filter_id = parse_string(request, "filter", default=None) logger.info( - "/sync: user=%r, timeout=%r, limit=%r, gap=%r, sort=%r, since=%r," - " set_presence=%r, backfill=%r, filter_id=%r" % ( - user, timeout, limit, gap, sort, since, set_presence, - backfill, filter_id + "/sync: user=%r, timeout=%r, since=%r," + " set_presence=%r, filter_id=%r" % ( + user, timeout, since, set_presence, filter_id ) ) - # TODO(mjark): Load filter and apply overrides. try: filter = yield self.filtering.get_user_filter( user.localpart, filter_id ) except: filter = Filter({}) - # filter = filter.apply_overrides(http_request) - # if filter.matches(event): - # # stuff sync_config = SyncConfig( user=user, - gap=gap, - limit=limit, - sort=sort, - backfill=backfill, filter=filter, ) @@ -137,43 +115,81 @@ class SyncRestServlet(RestServlet): else: since_token = None - sync_result = yield self.sync_handler.wait_for_sync_for_user( - sync_config, since_token=since_token, timeout=timeout - ) + if set_presence == "online": + yield self.event_stream_handler.started_stream(user) + + try: + sync_result = yield self.sync_handler.wait_for_sync_for_user( + sync_config, since_token=since_token, timeout=timeout + ) + finally: + if set_presence == "online": + self.event_stream_handler.stopped_stream(user) time_now = self.clock.time_msec() + joined = self.encode_joined( + sync_result.joined, filter, time_now, token_id + ) + + invited = self.encode_invited( + sync_result.invited, filter, time_now, token_id + ) + response_content = { - "public_user_data": self.encode_user_data( - sync_result.public_user_data, filter, time_now - ), - "private_user_data": self.encode_user_data( - sync_result.private_user_data, filter, time_now - ), - "rooms": self.encode_rooms( - sync_result.rooms, filter, time_now, token_id + "presence": self.encode_presence( + sync_result.presence, filter, time_now ), + "rooms": { + "joined": joined, + "invited": invited, + "archived": {}, + }, "next_batch": sync_result.next_batch.to_string(), } defer.returnValue((200, response_content)) - def encode_user_data(self, events, filter, time_now): - return events + def encode_presence(self, events, filter, time_now): + formatted = [] + for event in events: + event = copy.deepcopy(event) + event['sender'] = event['content'].pop('user_id') + formatted.append(event) + return {"events": filter.filter_presence(formatted)} + + def encode_joined(self, rooms, filter, time_now, token_id): + joined = {} + for room in rooms: + joined[room.room_id] = self.encode_room( + room, filter, time_now, token_id + ) + + return joined + + def encode_invited(self, rooms, filter, time_now, token_id): + invited = {} + for room in rooms: + invite = serialize_event( + room.invite, time_now, token_id=token_id, + event_format=format_event_for_client_v2_without_event_id, + ) + invited_state = invite.get("unsigned", {}).pop("invite_room_state", []) + invited_state.append(invite) + invited[room.room_id] = { + "invite_state": {"events": invited_state} + } - def encode_rooms(self, rooms, filter, time_now, token_id): - return [ - self.encode_room(room, filter, time_now, token_id) - for room in rooms - ] + return invited @staticmethod def encode_room(room, filter, time_now, token_id): event_map = {} state_events = filter.filter_room_state(room.state) - recent_events = filter.filter_room_events(room.events) + timeline_events = filter.filter_room_timeline(room.timeline.events) + ephemeral_events = filter.filter_room_ephemeral(room.ephemeral) state_event_ids = [] - recent_event_ids = [] + timeline_event_ids = [] for event in state_events: # TODO(mjark): Respect formatting requirements in the filter. event_map[event.event_id] = serialize_event( @@ -182,24 +198,22 @@ class SyncRestServlet(RestServlet): ) state_event_ids.append(event.event_id) - for event in recent_events: + for event in timeline_events: # TODO(mjark): Respect formatting requirements in the filter. event_map[event.event_id] = serialize_event( event, time_now, token_id=token_id, event_format=format_event_for_client_v2_without_event_id, ) - recent_event_ids.append(event.event_id) + timeline_event_ids.append(event.event_id) result = { - "room_id": room.room_id, "event_map": event_map, - "events": { - "batch": recent_event_ids, - "prev_batch": room.prev_batch.to_string(), + "timeline": { + "events": timeline_event_ids, + "prev_batch": room.timeline.prev_batch.to_string(), + "limited": room.timeline.limited, }, - "state": state_event_ids, - "limited": room.limited, - "published": room.published, - "ephemeral": room.ephemeral, + "state": {"events": state_event_ids}, + "ephemeral": {"events": ephemeral_events}, } return result |