From e26340cee7049b6c36f4c3451ec7524fa6b80d1c Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 23 Jan 2015 18:31:29 +0000 Subject: Start implementing the v2_alpha sync API --- synapse/rest/client/v2_alpha/__init__.py | 33 +++++++ synapse/rest/client/v2_alpha/_base.py | 38 ++++++++ synapse/rest/client/v2_alpha/sync.py | 143 +++++++++++++++++++++++++++++++ 3 files changed, 214 insertions(+) create mode 100644 synapse/rest/client/v2_alpha/__init__.py create mode 100644 synapse/rest/client/v2_alpha/_base.py create mode 100644 synapse/rest/client/v2_alpha/sync.py (limited to 'synapse/rest/client') diff --git a/synapse/rest/client/v2_alpha/__init__.py b/synapse/rest/client/v2_alpha/__init__.py new file mode 100644 index 0000000000..e75f9d250f --- /dev/null +++ b/synapse/rest/client/v2_alpha/__init__.py @@ -0,0 +1,33 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from . import ( + sync, +) + +from synapse.http.server import JsonResource + + +class ClientV2AlphaRestResource(JsonResource): + """A resource for version 2 alpha of the matrix client API.""" + + def __init__(self, hs): + JsonResource.__init__(self) + self.register_servlets(self, hs) + + @staticmethod + def register_servlets(client_resource, hs): + sync.register_servlets(hs, client_resource) diff --git a/synapse/rest/client/v2_alpha/_base.py b/synapse/rest/client/v2_alpha/_base.py new file mode 100644 index 0000000000..22dc5cb862 --- /dev/null +++ b/synapse/rest/client/v2_alpha/_base.py @@ -0,0 +1,38 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""This module contains base REST classes for constructing client v1 servlets. +""" + +from synapse.api.urls import CLIENT_V2_ALPHA_PREFIX +import re + +import logging + + +logger = logging.getLogger(__name__) + + +def client_v2_pattern(path_regex): + """Creates a regex compiled client path with the correct client path + prefix. + + Args: + path_regex (str): The regex string to match. This should NOT have a ^ + as this will be prefixed. + Returns: + SRE_Pattern + """ + return re.compile("^" + CLIENT_V2_ALPHA_PREFIX + path_regex) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py new file mode 100644 index 0000000000..39bb5ec8e9 --- /dev/null +++ b/synapse/rest/client/v2_alpha/sync.py @@ -0,0 +1,143 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from synapse.http.servlet import RestServlet +from ._base import client_v2_pattern + +import logging + +logger = logging.getLogger(__name__) + + +class SyncRestServlet(RestServlet): + """ + + GET parameters:: + timeout(int): How long to wait for new events in milliseconds. + limit(int): Maxiumum number of events per room to return. + gap(bool): Create gaps the message history if limit is exceeded to + ensure that the client has the most recent messages. Defaults to + "true". + sort(str,str): tuple of sort key (e.g. "timeline") and direction + (e.g. "asc", "desc"). Defaults to "timeline,asc". + since(batch_token): Batch token when asking for incremental deltas. + set_presence(str): What state the device presence should be set to. + default is "online". + backfill(bool): Should the HS request message history from other + servers. This may take a long time making it unsuitable for clients + expecting a prompt response. Defaults to "true". + filter(filter_id): A filter to apply to the events returned. + filter_*: Filter override parameters. + + Response JSON:: + { + "next_batch": // batch token for the next /sync + "private_user_data": // private events for this user. + "public_user_data": // public events for all users including the + // public events for this user. + "rooms": [{ // List of rooms with updates. + "room_id": // Id of the room being updated + "limited": // Was the per-room event limit exceeded? + "published": // Is the room published by our HS? + "event_map": // Map of EventID -> event JSON. + "events": { // The recent events in the room if gap is "true" + // otherwise the next events in the room. + "batch": [] // list of EventIDs in the "event_map". + "prev_batch": // back token for getting previous events. + } + "state": [] // list of EventIDs updating the current state to + // be what it should be at the end of the batch. + }] + } + """ + + + PATTERN = client_v2_pattern("/sync$") + ALLOWED_SORT = set(["timeline,asc", "timeline,desc"]) + ALLOWED_PRESENCE = set(["online", "offline", "idle"]) + + def __init__(self, hs): + super(SyncRestServlet, self).__init__() + self.auth = hs.get_auth() + #self.sync_handler = hs.get_handlers().sync_hanlder + + @defer.inlineCallbacks + def on_GET(self, request): + user = yield self.auth.get_user_by_req(request) + + timeout = self.parse_integer(request, "timeout", default=0) + limit = self.parse_integer(request, "limit", default=None) + gap = self.parse_boolean(request, "gap", default=True) + sort = self.parse_string( + request, "sort", default="timeline,asc", + allowed_values=self.ALLOWED_SORT + ) + since = self.parse_string(request, "since") + set_presence = self.parse_string( + request, "set_presence", default="online", + allowed_values=self.ALLOWED_PRESENCE + ) + backfill = self.parse_boolean(request, "backfill", default=True) + filter_id = self.parse_string(request, "filter", default=None) + + logger.info( + "/sync: user=%r, timeout=%r, limit=%r, gap=%r, sort=%r, since=%r," + " set_presence=%r, backfill=%r, filter_id=%r" % ( + user, timeout, limit, gap, sort, since, set_presence, + backfill, filter_id + ) + ) + + # TODO(mjark): Load filter and apply overrides. + # filter = self.filters.load_fitler(filter_id_str) + # filter = filter.apply_overrides(http_request) + # if filter.matches(event): + # # stuff + + # if timeout != 0: + # register for updates from the event stream + + #rooms = [] + + if gap: + pass + # now_stream_token = get_current_stream_token + # for room_id in get_rooms_for_user(user, filter=filter): + # state, events, start, end, limited, published = updates_for_room( + # from=since, to=now_stream_token, limit=limit, + # anchor_to_start=False + # ) + # rooms[room_id] = (state, events, start, limited, published) + # next_stream_token = now. + else: + pass + # now_stream_token = get_current_stream_token + # for room_id in get_rooms_for_user(user, filter=filter) + # state, events, start, end, limited, published = updates_for_room( + # from=since, to=now_stream_token, limit=limit, + # anchor_to_start=False + # ) + # next_stream_token = min(next_stream_token, end) + + + response_content = {} + + defer.returnValue((200, response_content)) + + +def register_servlets(hs, http_server): + SyncRestServlet(hs).register(http_server) -- cgit 1.4.1 From 436513068de73ab47d9ba9a32046420be3d86588 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 26 Jan 2015 18:53:31 +0000 Subject: Start implementing the non-incremental sync portion of the v2 /sync API --- synapse/events/utils.py | 6 +- synapse/handlers/__init__.py | 2 + synapse/handlers/sync.py | 87 ++++++++++++++++++++--------- synapse/rest/client/v2_alpha/sync.py | 105 +++++++++++++++++++++++++---------- 4 files changed, 146 insertions(+), 54 deletions(-) (limited to 'synapse/rest/client') diff --git a/synapse/events/utils.py b/synapse/events/utils.py index e391aca4cc..b7f1ad4b40 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -89,7 +89,7 @@ def prune_event(event): return type(event)(allowed_fields) -def serialize_event(e, time_now_ms, client_event=True): +def serialize_event(e, time_now_ms, client_event=True, strip_ids=False): # FIXME(erikj): To handle the case of presence events and the like if not isinstance(e, EventBase): return e @@ -138,4 +138,8 @@ def serialize_event(e, time_now_ms, client_event=True): d.pop("unsigned", None) d.pop("origin", None) + if strip_ids: + d.pop("room_id", None) + d.pop("event_id", None) + return d diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index fe071a4bc2..a32eab9316 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -26,6 +26,7 @@ from .presence import PresenceHandler from .directory import DirectoryHandler from .typing import TypingNotificationHandler from .admin import AdminHandler +from .sync import SyncHandler class Handlers(object): @@ -51,3 +52,4 @@ class Handlers(object): self.directory_handler = DirectoryHandler(hs) self.typing_notification_handler = TypingNotificationHandler(hs) self.admin_handler = AdminHandler(hs) + self.sync_handler = SyncHandler(hs) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index ec20ea4890..bbabaf3df1 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1,26 +1,49 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseHandler + +from synapse.streams.config import PaginationConfig +from synapse.api.constants import Membership + +from twisted.internet import defer + import collections +import logging + +logger = logging.getLogger(__name__) SyncConfig = collections.namedtuple("SyncConfig", [ "user", "device", - "since", "limit", "gap", - "sort" - "backfill" + "sort", + "backfill", "filter", -) +]) RoomSyncResult = collections.namedtuple("RoomSyncResult", [ "room_id", "limited", "published", - "prev_batch", - "events", + "events", # dict of event "state", - "event_map", + "prev_batch", ]) @@ -41,10 +64,11 @@ class SyncHandler(BaseHandler): def __init__(self, hs): super(SyncHandler, self).__init__(hs) self.event_sources = hs.get_event_sources() + self.clock = hs.get_clock() def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0): if timeout == 0: - return self.current_sync_for_user(sync_config, since) + return self.current_sync_for_user(sync_config, since_token) else: def current_sync_callback(since_token): return self.current_sync_for_user( @@ -53,58 +77,71 @@ class SyncHandler(BaseHandler): return self.notifier.wait_for_events( sync_config.filter, since_token, current_sync_callback ) - defer.returnValue(result) def current_sync_for_user(self, sync_config, since_token=None): if since_token is None: - return self.inital_sync(sync_config) + return self.initial_sync(sync_config) else: return self.incremental_sync(sync_config) @defer.inlineCallbacks def initial_sync(self, sync_config): + if sync_config.sort == "timeline,desc": + # TODO(mjark): Handle going through events in reverse order?. + # What does "most recent events" mean when applying the limits mean + # in this case? + raise NotImplementedError() + now_token = yield self.event_sources.get_current_token() presence_stream = self.event_sources.sources["presence"] - # TODO (markjh): This looks wrong, shouldn't we be getting the presence + # TODO (mjark): This looks wrong, shouldn't we be getting the presence # UP to the present rather than after the present? pagination_config = PaginationConfig(from_token=now_token) presence, _ = yield presence_stream.get_pagination_rows( - user, pagination_config.get_source_config("presence"), None + user=sync_config.user, + pagination_config=pagination_config.get_source_config("presence"), + key=None ) room_list = yield self.store.get_rooms_for_user_where_membership_is( - user_id=user_id, + user_id=sync_config.user.to_string(), membership_list=[Membership.INVITE, Membership.JOIN] ) - # TODO (markjh): Does public mean "published"? + # TODO (mjark): Does public mean "published"? published_rooms = yield self.store.get_rooms(is_public=True) - published_room_ids = set(r["room_id"] for r in public_rooms) + published_room_ids = set(r["room_id"] for r in published_rooms) + rooms = [] for event in room_list: - - messages, token = yield self.store.get_recent_events_for_room( + #TODO (mjark): Apply the event filter in sync_config. + recent_events, token = yield self.store.get_recent_events_for_room( event.room_id, limit=sync_config.limit, end_token=now_token.room_key, ) prev_batch_token = now_token.copy_and_replace("room_key", token[0]) - current_state = yield self.state_handler.get_current_state( + current_state_events = yield self.state_handler.get_current_state( event.room_id ) rooms.append(RoomSyncResult( room_id=event.room_id, published=event.room_id in published_room_ids, + events=recent_events, + prev_batch=prev_batch_token, + state=current_state_events, + limited=True, + )) - - + defer.returnValue(SyncResult( + public_user_data=presence, + private_user_data=[], + rooms=rooms, + next_batch=now_token, + )) @defer.inlineCallbacks def incremental_sync(self, sync_config): - - - - - + pass diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 39bb5ec8e9..cc667ebafc 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014, 2015 OpenMarket Ltd +# Copyright 2015 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,6 +16,9 @@ from twisted.internet import defer from synapse.http.servlet import RestServlet +from synapse.handlers.sync import SyncConfig +from synapse.types import StreamToken +from synapse.events.utils import serialize_event from ._base import client_v2_pattern import logging @@ -73,14 +76,15 @@ class SyncRestServlet(RestServlet): def __init__(self, hs): super(SyncRestServlet, self).__init__() self.auth = hs.get_auth() - #self.sync_handler = hs.get_handlers().sync_hanlder + self.sync_handler = hs.get_handlers().sync_handler + self.clock = hs.get_clock() @defer.inlineCallbacks def on_GET(self, request): user = yield self.auth.get_user_by_req(request) timeout = self.parse_integer(request, "timeout", default=0) - limit = self.parse_integer(request, "limit", default=None) + limit = self.parse_integer(request, "limit", required=True) gap = self.parse_boolean(request, "gap", default=True) sort = self.parse_string( request, "sort", default="timeline,asc", @@ -91,7 +95,7 @@ class SyncRestServlet(RestServlet): request, "set_presence", default="online", allowed_values=self.ALLOWED_PRESENCE ) - backfill = self.parse_boolean(request, "backfill", default=True) + backfill = self.parse_boolean(request, "backfill", default=False) filter_id = self.parse_string(request, "filter", default=None) logger.info( @@ -108,36 +112,81 @@ class SyncRestServlet(RestServlet): # if filter.matches(event): # # stuff - # if timeout != 0: - # register for updates from the event stream - - #rooms = [] - - if gap: - pass - # now_stream_token = get_current_stream_token - # for room_id in get_rooms_for_user(user, filter=filter): - # state, events, start, end, limited, published = updates_for_room( - # from=since, to=now_stream_token, limit=limit, - # anchor_to_start=False - # ) - # rooms[room_id] = (state, events, start, limited, published) - # next_stream_token = now. + sync_config = SyncConfig( + user=user, + device="TODO", # TODO(mjark) Get the device_id from access_token + gap=gap, + limit=limit, + sort=sort, + backfill=backfill, + filter="TODO", # TODO(mjark) Add the filter to the config. + ) + + if since is not None: + since_token = StreamToken.from_string(since) else: - pass - # now_stream_token = get_current_stream_token - # for room_id in get_rooms_for_user(user, filter=filter) - # state, events, start, end, limited, published = updates_for_room( - # from=since, to=now_stream_token, limit=limit, - # anchor_to_start=False - # ) - # next_stream_token = min(next_stream_token, end) + since_token = None + sync_result = yield self.sync_handler.wait_for_sync_for_user( + sync_config, since_token=since_token, timeout=timeout + ) - response_content = {} + time_now = self.clock.time_msec() + + response_content = { + "public_user_data": self.encode_events( + sync_result.public_user_data, filter, time_now + ), + "private_user_data": self.encode_events( + sync_result.private_user_data, filter, time_now + ), + "rooms": self.encode_rooms(sync_result.rooms, filter, time_now), + "next_batch": sync_result.next_batch.to_string(), + } defer.returnValue((200, response_content)) + def encode_events(self, events, filter, time_now): + return [self.encode_event(event, filter, time_now) for event in events] + + @staticmethod + def encode_event(event, filter, time_now): + # TODO(mjark): Respect formatting requirements in the filter. + return serialize_event(event, time_now) + + def encode_rooms(self, rooms, filter, time_now): + return [self.encode_room(room, filter, time_now) for room in rooms] + + @staticmethod + def encode_room(room, filter, time_now): + event_map = {} + state_event_ids = [] + recent_event_ids = [] + for event in room.state: + # TODO(mjark): Respect formatting requirements in the filter. + event_map[event.event_id] = serialize_event( + event, time_now, strip_ids=True + ) + state_event_ids.append(event.event_id) + + for event in room.events: + # TODO(mjark): Respect formatting requirements in the filter. + event_map[event.event_id] = serialize_event( + event, time_now, strip_ids=True + ) + recent_event_ids.append(event.event_id) + return { + "room_id": room.room_id, + "event_map": event_map, + "events": { + "batch": recent_event_ids, + "prev_batch": room.prev_batch.to_string(), + }, + "state": state_event_ids, + "limited": room.limited, + "published": room.published, + } + def register_servlets(hs, http_server): SyncRestServlet(hs).register(http_server) -- cgit 1.4.1 From e020574d65a994858ac53c45070ae5016090d2f3 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 27 Jan 2015 20:19:36 +0000 Subject: Fix Formatting --- synapse/handlers/sync.py | 13 +++++-------- synapse/notifier.py | 4 ++-- synapse/rest/client/v2_alpha/sync.py | 5 ++--- synapse/storage/stream.py | 1 - 4 files changed, 9 insertions(+), 14 deletions(-) (limited to 'synapse/rest/client') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 9f5f73eab6..82a2c6986a 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -52,10 +52,10 @@ class RoomSyncResult(collections.namedtuple("RoomSyncResult", [ class SyncResult(collections.namedtuple("SyncResult", [ - "next_batch", # Token for the next sync - "private_user_data", # List of private events for the user. - "public_user_data", # List of public events for all users. - "rooms", # RoomSyncResult for each room. + "next_batch", # Token for the next sync + "private_user_data", # List of private events for the user. + "public_user_data", # List of public events for all users. + "rooms", # RoomSyncResult for each room. ])): __slots__ = [] @@ -181,7 +181,6 @@ class SyncHandler(BaseHandler): limited=True, )) - @defer.inlineCallbacks def incremental_sync_with_gap(self, sync_config, since_token): """ Get the incremental delta needed to bring the client up to @@ -231,7 +230,6 @@ class SyncHandler(BaseHandler): next_batch=now_token, )) - @defer.inlineCallbacks def incremental_sync_with_gap_for_room(self, room_id, sync_config, since_token, now_token, @@ -316,7 +314,6 @@ class SyncHandler(BaseHandler): state = () defer.returnValue(state) - def compute_state_delta(self, since_token, previous_state, current_state): """ Works out the differnce in state between the current state and the state the client got when it last performed a sync. @@ -327,7 +324,7 @@ class SyncHandler(BaseHandler): # after the previous sync, since we need to include those state # updates even if they occured logically before the previous event. # TODO(mjark) Check for new redactions in the state events. - previous_dict = {event.event_id:event for event in previous_state} + previous_dict = {event.event_id: event for event in previous_state} state_delta = [] for event in current_state: if event.event_id not in previous_dict: diff --git a/synapse/notifier.py b/synapse/notifier.py index 922bf064d0..e3b6ead620 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -214,7 +214,7 @@ class Notifier(object): deferred = defer.Deferred() - from_token=StreamToken("s0","0","0") + from_token = StreamToken("s0", "0", "0") listener = [_NotificationListener( user=user, @@ -231,6 +231,7 @@ class Notifier(object): result = yield callback() if timeout: timed_out = [False] + def _timeout_listener(): timed_out[0] = True listener[0].notify(self, [], from_token, from_token) @@ -252,7 +253,6 @@ class Notifier(object): defer.returnValue(result) - def get_events_for(self, user, rooms, pagination_config, timeout): """ For the given user and rooms, return any new events for them. If there are no new events wait for up to `timeout` milliseconds for any diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index cc667ebafc..0c17208cd3 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -68,7 +68,6 @@ class SyncRestServlet(RestServlet): } """ - PATTERN = client_v2_pattern("/sync$") ALLOWED_SORT = set(["timeline,asc", "timeline,desc"]) ALLOWED_PRESENCE = set(["online", "offline", "idle"]) @@ -114,12 +113,12 @@ class SyncRestServlet(RestServlet): sync_config = SyncConfig( user=user, - device="TODO", # TODO(mjark) Get the device_id from access_token + device="TODO", # TODO(mjark) Get the device_id from access_token gap=gap, limit=limit, sort=sort, backfill=backfill, - filter="TODO", # TODO(mjark) Add the filter to the config. + filter="TODO", # TODO(mjark) Add the filter to the config. ) if since is not None: diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 06aca1a4e5..db1816ea84 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -289,7 +289,6 @@ class StreamStore(SQLBaseStore): " LIMIT ?" ) - def get_recent_events_for_room_txn(txn): if from_token is None: txn.execute(sql, (room_id, end_token.stream, limit,)) -- cgit 1.4.1 From c81a19552f12a16591ae695fa1ef660f9e38730e Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 28 Jan 2015 17:32:41 +0000 Subject: Add ports back to demo/start.sh --- demo/start.sh | 2 +- synapse/rest/client/v2_alpha/sync.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/rest/client') diff --git a/demo/start.sh b/demo/start.sh index a7502e7a8d..bb2248770d 100755 --- a/demo/start.sh +++ b/demo/start.sh @@ -16,7 +16,7 @@ if [ $# -eq 1 ]; then fi fi -for port in 8080; do +for port in 8080 8081 8082; do echo "Starting server on port $port... " https_port=$((port + 400)) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 0c17208cd3..a0ab9839a6 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -80,7 +80,7 @@ class SyncRestServlet(RestServlet): @defer.inlineCallbacks def on_GET(self, request): - user = yield self.auth.get_user_by_req(request) + user, client = yield self.auth.get_user_by_req(request) timeout = self.parse_integer(request, "timeout", default=0) limit = self.parse_integer(request, "limit", required=True) -- cgit 1.4.1 From b9c442c85c9f8c2aa7fcb57d6132dec9b85b4e60 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 29 Jan 2015 02:45:33 +0000 Subject: Include transaction ids in unsigned section of events in the sync results for the clients that made those requests --- synapse/events/utils.py | 11 ++++++----- synapse/rest/client/v2_alpha/sync.py | 23 ++++++++++++++++------- 2 files changed, 22 insertions(+), 12 deletions(-) (limited to 'synapse/rest/client') diff --git a/synapse/events/utils.py b/synapse/events/utils.py index dd7f3d6f42..21316cc125 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -103,7 +103,7 @@ def format_event_for_client_v1(d): drop_keys = ( "auth_events", "prev_events", "hashes", "signatures", "depth", - "unsigned", "origin" + "unsigned", "origin", "prev_state" ) for key in drop_keys: d.pop(key, None) @@ -112,7 +112,8 @@ def format_event_for_client_v1(d): def format_event_for_client_v2(d): drop_keys = ( - "auth_events", "prev_events", "hashes", "signatures", "depth", "origin" + "auth_events", "prev_events", "hashes", "signatures", "depth", + "origin", "prev_state", ) for key in drop_keys: d.pop(key, None) @@ -140,7 +141,7 @@ def serialize_event(e, time_now_ms, as_client_event=True, if "age_ts" in d["unsigned"]: d["unsigned"]["age"] = time_now_ms - d["unsigned"]["age_ts"] - d["unsigned"]["age_ts"] + del d["unsigned"]["age_ts"] if "redacted_because" in e.unsigned: d["unsigned"]["redacted_because"] = serialize_event( @@ -148,8 +149,8 @@ def serialize_event(e, time_now_ms, as_client_event=True, ) if token_id is not None: - if token_id == e.internal_metadata["token_id"]: - txn_id = e.internal_metadata.get("txn_id", None) + if token_id == getattr(e.internal_metadata, "token_id", None): + txn_id = getattr(e.internal_metadata, "txn_id", None) if txn_id is not None: d["unsigned"]["transaction_id"] = txn_id diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index a0ab9839a6..4d950f9956 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -18,7 +18,9 @@ from twisted.internet import defer from synapse.http.servlet import RestServlet from synapse.handlers.sync import SyncConfig from synapse.types import StreamToken -from synapse.events.utils import serialize_event +from synapse.events.utils import ( + serialize_event, format_event_for_client_v2_without_event_id, +) from ._base import client_v2_pattern import logging @@ -139,7 +141,9 @@ class SyncRestServlet(RestServlet): "private_user_data": self.encode_events( sync_result.private_user_data, filter, time_now ), - "rooms": self.encode_rooms(sync_result.rooms, filter, time_now), + "rooms": self.encode_rooms( + sync_result.rooms, filter, time_now, client.token_id + ), "next_batch": sync_result.next_batch.to_string(), } @@ -153,25 +157,30 @@ class SyncRestServlet(RestServlet): # TODO(mjark): Respect formatting requirements in the filter. return serialize_event(event, time_now) - def encode_rooms(self, rooms, filter, time_now): - return [self.encode_room(room, filter, time_now) for room in rooms] + def encode_rooms(self, rooms, filter, time_now, token_id): + return [ + self.encode_room(room, filter, time_now, token_id) + for room in rooms + ] @staticmethod - def encode_room(room, filter, time_now): + def encode_room(room, filter, time_now, token_id): event_map = {} state_event_ids = [] recent_event_ids = [] for event in room.state: # TODO(mjark): Respect formatting requirements in the filter. event_map[event.event_id] = serialize_event( - event, time_now, strip_ids=True + event, time_now, token_id=token_id, + event_format=format_event_for_client_v2_without_event_id, ) state_event_ids.append(event.event_id) for event in room.events: # TODO(mjark): Respect formatting requirements in the filter. event_map[event.event_id] = serialize_event( - event, time_now, strip_ids=True + event, time_now, token_id=token_id, + event_format=format_event_for_client_v2_without_event_id, ) recent_event_ids.append(event.event_id) return { -- cgit 1.4.1 From 3dbce6f4a59fde2a67e563ce338f510feda2dd1a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 29 Jan 2015 03:33:51 +0000 Subject: Add typing notifications to sync --- synapse/handlers/sync.py | 30 +++++++++++++++++++++--------- synapse/rest/client/v2_alpha/sync.py | 18 ++++++++---------- 2 files changed, 29 insertions(+), 19 deletions(-) (limited to 'synapse/rest/client') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 82a2c6986a..b86e783e14 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -44,11 +44,12 @@ class RoomSyncResult(collections.namedtuple("RoomSyncResult", [ "events", "state", "prev_batch", + "typing", ])): __slots__ = [] def __nonzero__(self): - return bool(self.events or self.state) + return bool(self.events or self.state or self.typing) class SyncResult(collections.namedtuple("SyncResult", [ @@ -196,15 +197,25 @@ class SyncHandler(BaseHandler): now_token = yield self.event_sources.get_current_token() - presence_stream = self.event_sources.sources["presence"] - pagination_config = PaginationConfig( - from_token=since_token, to_token=now_token + presence_source = self.event_sources.sources["presence"] + presence, presence_key = yield presence_source.get_new_events_for_user( + user=sync_config.user, + from_key=since_token.presence_key, + limit=sync_config.limit, ) - presence, _ = yield presence_stream.get_pagination_rows( + now_token = now_token.copy_and_replace("presence_key", presence_key) + + typing_source = self.event_sources.sources["typing"] + typing, typing_key = yield typing_source.get_new_events_for_user( user=sync_config.user, - pagination_config=pagination_config.get_source_config("presence"), - key=None + from_key=since_token.typing_key, + limit=sync_config.limit, ) + now_token = now_token.copy_and_replace("typing_key", typing_key) + + typing_by_room = {event["room_id"]: event for event in typing} + logger.debug("Typing %r", typing_by_room) + room_list = yield self.store.get_rooms_for_user_where_membership_is( user_id=sync_config.user.to_string(), membership_list=[Membership.INVITE, Membership.JOIN] @@ -218,7 +229,7 @@ class SyncHandler(BaseHandler): for event in room_list: room_sync = yield self.incremental_sync_with_gap_for_room( event.room_id, sync_config, since_token, now_token, - published_room_ids + published_room_ids, typing_by_room ) if room_sync: rooms.append(room_sync) @@ -233,7 +244,7 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def incremental_sync_with_gap_for_room(self, room_id, sync_config, since_token, now_token, - published_room_ids): + published_room_ids, typing_by_room): """ Get the incremental delta needed to bring the client up to date for the room. Gives the client the most recent events and the changes to state. @@ -285,6 +296,7 @@ class SyncHandler(BaseHandler): prev_batch=prev_batch_token, state=state_events_delta, limited=limited, + typing=typing_by_room.get(room_id, None) ) logging.debug("Room sync: %r", room_sync) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 4d950f9956..76489e27c8 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -135,10 +135,10 @@ class SyncRestServlet(RestServlet): time_now = self.clock.time_msec() response_content = { - "public_user_data": self.encode_events( + "public_user_data": self.encode_user_data( sync_result.public_user_data, filter, time_now ), - "private_user_data": self.encode_events( + "private_user_data": self.encode_user_data( sync_result.private_user_data, filter, time_now ), "rooms": self.encode_rooms( @@ -149,13 +149,8 @@ class SyncRestServlet(RestServlet): defer.returnValue((200, response_content)) - def encode_events(self, events, filter, time_now): - return [self.encode_event(event, filter, time_now) for event in events] - - @staticmethod - def encode_event(event, filter, time_now): - # TODO(mjark): Respect formatting requirements in the filter. - return serialize_event(event, time_now) + def encode_user_data(self, events, filter, time_now): + return events def encode_rooms(self, rooms, filter, time_now, token_id): return [ @@ -183,7 +178,7 @@ class SyncRestServlet(RestServlet): event_format=format_event_for_client_v2_without_event_id, ) recent_event_ids.append(event.event_id) - return { + result = { "room_id": room.room_id, "event_map": event_map, "events": { @@ -194,6 +189,9 @@ class SyncRestServlet(RestServlet): "limited": room.limited, "published": room.published, } + if room.typing is not None: + result["typing"] = room.typing + return result def register_servlets(hs, http_server): -- cgit 1.4.1 From 722b65f46131349c5afdcc7eb48297cdd9d9cbd6 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 29 Jan 2015 16:41:21 +0000 Subject: Move typing notifs to an "emphermal" event list on the room object --- synapse/handlers/sync.py | 12 +++++++----- synapse/rest/client/v2_alpha/sync.py | 4 ++-- 2 files changed, 9 insertions(+), 7 deletions(-) (limited to 'synapse/rest/client') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index ec8beb4c6a..3860c3c95b 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -44,12 +44,12 @@ class RoomSyncResult(collections.namedtuple("RoomSyncResult", [ "events", "state", "prev_batch", - "typing", + "ephemeral", ])): __slots__ = [] def __nonzero__(self): - return bool(self.events or self.state or self.typing) + return bool(self.events or self.state or self.ephemeral) class SyncResult(collections.namedtuple("SyncResult", [ @@ -180,7 +180,7 @@ class SyncHandler(BaseHandler): prev_batch=prev_batch_token, state=current_state_events, limited=True, - typing=None, + ephemeral=[], )) @defer.inlineCallbacks @@ -214,7 +214,9 @@ class SyncHandler(BaseHandler): ) now_token = now_token.copy_and_replace("typing_key", typing_key) - typing_by_room = {event["room_id"]: event for event in typing} + typing_by_room = {event["room_id"]: [event] for event in typing} + for event in typing: + event.pop("room_id") logger.debug("Typing %r", typing_by_room) rm_handler = self.hs.get_handlers().room_member_handler @@ -256,7 +258,7 @@ class SyncHandler(BaseHandler): prev_batch=prev_batch, state=state, limited=False, - typing=typing_by_room.get(room_id, None) + ephemeral=typing_by_room.get(room_id, []) ) if room_sync: rooms.append(room_sync) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 76489e27c8..2ae2eec55d 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -66,6 +66,7 @@ class SyncRestServlet(RestServlet): } "state": [] // list of EventIDs updating the current state to // be what it should be at the end of the batch. + "ephemeral": [] }] } """ @@ -188,9 +189,8 @@ class SyncRestServlet(RestServlet): "state": state_event_ids, "limited": room.limited, "published": room.published, + "ephemeral": room.ephemeral, } - if room.typing is not None: - result["typing"] = room.typing return result -- cgit 1.4.1 From 365a1867293b8f1d34a1f100d1bc8d2d39bb3d94 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 29 Jan 2015 18:11:28 +0000 Subject: Add basic filtering support --- synapse/rest/client/v2_alpha/sync.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) (limited to 'synapse/rest/client') diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 2ae2eec55d..c1277d2675 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -21,6 +21,7 @@ from synapse.types import StreamToken from synapse.events.utils import ( serialize_event, format_event_for_client_v2_without_event_id, ) +from synapse.api.filtering import Filter from ._base import client_v2_pattern import logging @@ -80,6 +81,7 @@ class SyncRestServlet(RestServlet): self.auth = hs.get_auth() self.sync_handler = hs.get_handlers().sync_handler self.clock = hs.get_clock() + self.filtering = hs.get_filtering() @defer.inlineCallbacks def on_GET(self, request): @@ -109,9 +111,14 @@ class SyncRestServlet(RestServlet): ) # TODO(mjark): Load filter and apply overrides. - # filter = self.filters.load_fitler(filter_id_str) + try: + filter = yield self.filtering.get_user_filter( + user.localpart, filter_id + ) + except: + filter = Filter({}) # filter = filter.apply_overrides(http_request) - # if filter.matches(event): + #if filter.matches(event): # # stuff sync_config = SyncConfig( @@ -121,7 +128,7 @@ class SyncRestServlet(RestServlet): limit=limit, sort=sort, backfill=backfill, - filter="TODO", # TODO(mjark) Add the filter to the config. + filter=filter, ) if since is not None: @@ -162,9 +169,11 @@ class SyncRestServlet(RestServlet): @staticmethod def encode_room(room, filter, time_now, token_id): event_map = {} + state_events = filter.filter_room_state(room.state) + recent_events = filter.filter_room_events(room.events) state_event_ids = [] recent_event_ids = [] - for event in room.state: + for event in state_events: # TODO(mjark): Respect formatting requirements in the filter. event_map[event.event_id] = serialize_event( event, time_now, token_id=token_id, @@ -172,7 +181,7 @@ class SyncRestServlet(RestServlet): ) state_event_ids.append(event.event_id) - for event in room.events: + for event in recent_events: # TODO(mjark): Respect formatting requirements in the filter. event_map[event.event_id] = serialize_event( event, time_now, token_id=token_id, -- cgit 1.4.1 From 22dd1cde2d83a2448074816108b85d1957315236 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 30 Jan 2015 11:32:35 +0000 Subject: Filter the recent events before applying the limit when doing an incremental sync with a gap --- synapse/api/filtering.py | 2 -- synapse/handlers/sync.py | 53 ++++++++++++++++++++++++++---------- synapse/rest/client/v2_alpha/sync.py | 2 +- synapse/storage/stream.py | 21 ++++++++++---- 4 files changed, 54 insertions(+), 24 deletions(-) (limited to 'synapse/rest/client') diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index b7e5d3222f..fa4de2614d 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -12,8 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer - from synapse.api.errors import SynapseError from synapse.types import UserID, RoomID diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 5768702192..0df1851b0e 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -278,6 +278,40 @@ class SyncHandler(BaseHandler): next_batch=now_token, )) + @defer.inlineCallbacks + def load_filtered_recents(self, room_id, sync_config, since_token, + now_token): + limited = True + recents = [] + filtering_factor = 2 + load_limit = max(sync_config.limit * filtering_factor, 100) + max_repeat = 3 # Only try a few times per room, otherwise + room_key = now_token.room_key + + while limited and len(recents) < sync_config.limit and max_repeat: + events, room_key = yield self.store.get_recent_events_for_room( + room_id, + limit=load_limit + 1, + from_token=since_token.room_key, + end_token=room_key, + ) + loaded_recents = sync_config.filter.filter_room_events(events) + loaded_recents.extend(recents) + recents = loaded_recents + if len(events) <= load_limit: + limited = False + max_repeat -= 1 + + if len(recents) > sync_config.limit: + recents = recents[-sync_config.limit:] + room_key = recents[0].internal_metadata.before + + prev_batch_token = now_token.copy_and_replace( + "room_key", room_key + ) + + defer.returnValue((recents, prev_batch_token, limited)) + @defer.inlineCallbacks def incremental_sync_with_gap_for_room(self, room_id, sync_config, since_token, now_token, @@ -288,28 +322,17 @@ class SyncHandler(BaseHandler): Returns: A Deferred RoomSyncResult """ + # TODO(mjark): Check if they have joined the room between # the previous sync and this one. - # TODO(mjark): Apply the event filter in sync_config taking care to get - # enough events to reach the limit # TODO(mjark): Check for redactions we might have missed. - recents, token = yield self.store.get_recent_events_for_room( - room_id, - limit=sync_config.limit + 1, - from_token=since_token.room_key, - end_token=now_token.room_key, + + recents, prev_batch_token, limited = self.load_filtered_recents( + room_id, sync_config, since_token, ) logging.debug("Recents %r", recents) - if len(recents) > sync_config.limit: - limited = True - recents = recents[1:] - else: - limited = False - - prev_batch_token = now_token.copy_and_replace("room_key", token[0]) - # TODO(mjark): This seems racy since this isn't being passed a # token to indicate what point in the stream this is current_state_events = yield self.state_handler.get_current_state( diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index c1277d2675..46ea50d118 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -116,7 +116,7 @@ class SyncRestServlet(RestServlet): user.localpart, filter_id ) except: - filter = Filter({}) + filter = Filter({}) # filter = filter.apply_overrides(http_request) #if filter.matches(event): # # stuff diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 2ea5e1a021..73504c8b52 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -181,15 +181,11 @@ class StreamStore(SQLBaseStore): get_prev_content=True ) - for event, row in zip(ret, rows): - stream = row["stream_ordering"] - topo = event.depth - internal = event.internal_metadata - internal.before = str(_StreamToken(topo, stream - 1)) - internal.after = str(_StreamToken(topo, stream)) + self._set_before_and_after(ret, rows) if rows: key = "s%d" % max([r["stream_ordering"] for r in rows]) + else: # Assume we didn't get anything because there was nothing to # get. @@ -267,6 +263,8 @@ class StreamStore(SQLBaseStore): get_prev_content=True ) + self._set_before_and_after(events, rows) + return events, next_token, return self.runInteraction("paginate_room_events", f) @@ -328,6 +326,8 @@ class StreamStore(SQLBaseStore): get_prev_content=True ) + self._set_before_and_after(events, rows) + return events, token return self.runInteraction( @@ -354,3 +354,12 @@ class StreamStore(SQLBaseStore): key = res[0]["m"] return "s%d" % (key,) + + @staticmethod + def _set_before_and_after(events, rows): + for event, row in zip(events, rows): + stream = row["stream_ordering"] + topo = event.depth + internal = event.internal_metadata + internal.before = str(_StreamToken(topo, stream - 1)) + internal.after = str(_StreamToken(topo, stream)) -- cgit 1.4.1 From 4a67834bc84f604605c618049599f4638434c7cf Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 30 Jan 2015 11:50:15 +0000 Subject: Pass client info to the sync_config --- synapse/handlers/sync.py | 5 +++-- synapse/rest/client/v2_alpha/sync.py | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) (limited to 'synapse/rest/client') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 3c68e2a9ec..1a74a4c97c 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -28,7 +28,7 @@ logger = logging.getLogger(__name__) SyncConfig = collections.namedtuple("SyncConfig", [ "user", - "device", + "client_info", "limit", "gap", "sort", @@ -288,12 +288,13 @@ class SyncHandler(BaseHandler): room_key = now_token.room_key while limited and len(recents) < sync_config.limit and max_repeat: - events, (room_key,_) = yield self.store.get_recent_events_for_room( + events, keys = yield self.store.get_recent_events_for_room( room_id, limit=load_limit + 1, from_token=since_token.room_key if since_token else None, end_token=room_key, ) + (room_key, _) = keys loaded_recents = sync_config.filter.filter_room_events(events) loaded_recents.extend(recents) recents = loaded_recents diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 46ea50d118..81d5cf8ead 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -123,7 +123,7 @@ class SyncRestServlet(RestServlet): sync_config = SyncConfig( user=user, - device="TODO", # TODO(mjark) Get the device_id from access_token + client_info=client, gap=gap, limit=limit, sort=sort, -- cgit 1.4.1