summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/events/utils.py85
-rw-r--r--synapse/handlers/__init__.py2
-rw-r--r--synapse/handlers/sync.py380
-rw-r--r--synapse/notifier.py48
-rw-r--r--synapse/rest/client/v2_alpha/__init__.py4
-rw-r--r--synapse/rest/client/v2_alpha/sync.py198
-rw-r--r--synapse/storage/stream.py47
7 files changed, 720 insertions, 44 deletions
diff --git a/synapse/events/utils.py b/synapse/events/utils.py
index e391aca4cc..21316cc125 100644
--- a/synapse/events/utils.py
+++ b/synapse/events/utils.py
@@ -89,7 +89,47 @@ def prune_event(event):
     return type(event)(allowed_fields)
 
 
-def serialize_event(e, time_now_ms, client_event=True):
+def format_event_raw(d):
+    return d
+
+
+def format_event_for_client_v1(d):
+    d["user_id"] = d.pop("sender", None)
+
+    move_keys = ("age", "redacted_because", "replaces_state", "prev_content")
+    for key in move_keys:
+        if key in d["unsigned"]:
+            d[key] = d["unsigned"][key]
+
+    drop_keys = (
+        "auth_events", "prev_events", "hashes", "signatures", "depth",
+        "unsigned", "origin", "prev_state"
+    )
+    for key in drop_keys:
+        d.pop(key, None)
+    return d
+
+
+def format_event_for_client_v2(d):
+    drop_keys = (
+        "auth_events", "prev_events", "hashes", "signatures", "depth",
+        "origin", "prev_state",
+    )
+    for key in drop_keys:
+        d.pop(key, None)
+    return d
+
+
+def format_event_for_client_v2_without_event_id(d):
+    d = format_event_for_client_v2(d)
+    d.pop("room_id", None)
+    d.pop("event_id", None)
+    return d
+
+
+def serialize_event(e, time_now_ms, as_client_event=True,
+                    event_format=format_event_for_client_v1,
+                    token_id=None):
     # FIXME(erikj): To handle the case of presence events and the like
     if not isinstance(e, EventBase):
         return e
@@ -99,43 +139,22 @@ def serialize_event(e, time_now_ms, client_event=True):
     # Should this strip out None's?
     d = {k: v for k, v in e.get_dict().items()}
 
-    if not client_event:
-        # set the age and keep all other keys
-        if "age_ts" in d["unsigned"]:
-            d["unsigned"]["age"] = time_now_ms - d["unsigned"]["age_ts"]
-        return d
-
     if "age_ts" in d["unsigned"]:
-        d["age"] = time_now_ms - d["unsigned"]["age_ts"]
+        d["unsigned"]["age"] = time_now_ms - d["unsigned"]["age_ts"]
         del d["unsigned"]["age_ts"]
 
-    d["user_id"] = d.pop("sender", None)
-
     if "redacted_because" in e.unsigned:
-        d["redacted_because"] = serialize_event(
+        d["unsigned"]["redacted_because"] = serialize_event(
             e.unsigned["redacted_because"], time_now_ms
         )
 
-        del d["unsigned"]["redacted_because"]
+    if token_id is not None:
+        if token_id == getattr(e.internal_metadata, "token_id", None):
+            txn_id = getattr(e.internal_metadata, "txn_id", None)
+            if txn_id is not None:
+                d["unsigned"]["transaction_id"] = txn_id
 
-    if "redacted_by" in e.unsigned:
-        d["redacted_by"] = e.unsigned["redacted_by"]
-        del d["unsigned"]["redacted_by"]
-
-    if "replaces_state" in e.unsigned:
-        d["replaces_state"] = e.unsigned["replaces_state"]
-        del d["unsigned"]["replaces_state"]
-
-    if "prev_content" in e.unsigned:
-        d["prev_content"] = e.unsigned["prev_content"]
-        del d["unsigned"]["prev_content"]
-
-    del d["auth_events"]
-    del d["prev_events"]
-    del d["hashes"]
-    del d["signatures"]
-    d.pop("depth", None)
-    d.pop("unsigned", None)
-    d.pop("origin", None)
-
-    return d
+    if as_client_event:
+        return event_format(d)
+    else:
+        return d
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index fe071a4bc2..a32eab9316 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -26,6 +26,7 @@ from .presence import PresenceHandler
 from .directory import DirectoryHandler
 from .typing import TypingNotificationHandler
 from .admin import AdminHandler
+from .sync import SyncHandler
 
 
 class Handlers(object):
@@ -51,3 +52,4 @@ class Handlers(object):
         self.directory_handler = DirectoryHandler(hs)
         self.typing_notification_handler = TypingNotificationHandler(hs)
         self.admin_handler = AdminHandler(hs)
+        self.sync_handler = SyncHandler(hs)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
new file mode 100644
index 0000000000..8c7681a48d
--- /dev/null
+++ b/synapse/handlers/sync.py
@@ -0,0 +1,380 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseHandler
+
+from synapse.streams.config import PaginationConfig
+from synapse.api.constants import Membership
+
+from twisted.internet import defer
+
+import collections
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+SyncConfig = collections.namedtuple("SyncConfig", [
+    "user",
+    "device",
+    "limit",
+    "gap",
+    "sort",
+    "backfill",
+    "filter",
+])
+
+
+class RoomSyncResult(collections.namedtuple("RoomSyncResult", [
+    "room_id",
+    "limited",
+    "published",
+    "events",
+    "state",
+    "prev_batch",
+    "ephemeral",
+])):
+    __slots__ = []
+
+    def __nonzero__(self):
+        return bool(self.events or self.state or self.ephemeral)
+
+
+class SyncResult(collections.namedtuple("SyncResult", [
+    "next_batch",  # Token for the next sync
+    "private_user_data",  # List of private events for the user.
+    "public_user_data",  # List of public events for all users.
+    "rooms",  # RoomSyncResult for each room.
+])):
+    __slots__ = []
+
+    def __nonzero__(self):
+        return bool(
+            self.private_user_data or self.public_user_data or self.rooms
+        )
+
+
+class SyncHandler(BaseHandler):
+
+    def __init__(self, hs):
+        super(SyncHandler, self).__init__(hs)
+        self.event_sources = hs.get_event_sources()
+        self.clock = hs.get_clock()
+
+    @defer.inlineCallbacks
+    def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0):
+        """Get the sync for a client if we have new data for it now. Otherwise
+        wait for new data to arrive on the server. If the timeout expires, then
+        return an empty sync result.
+        Returns:
+            A Deferred SyncResult.
+        """
+        if timeout == 0 or since_token is None:
+            result = yield self.current_sync_for_user(sync_config, since_token)
+            defer.returnValue(result)
+        else:
+            def current_sync_callback():
+                return self.current_sync_for_user(sync_config, since_token)
+
+            rm_handler = self.hs.get_handlers().room_member_handler
+            room_ids = yield rm_handler.get_rooms_for_user(sync_config.user)
+            result = yield self.notifier.wait_for_events(
+                sync_config.user, room_ids,
+                sync_config.filter, timeout, current_sync_callback
+            )
+            defer.returnValue(result)
+
+    def current_sync_for_user(self, sync_config, since_token=None):
+        """Get the sync for client needed to match what the server has now.
+        Returns:
+            A Deferred SyncResult.
+        """
+        if since_token is None:
+            return self.initial_sync(sync_config)
+        else:
+            if sync_config.gap:
+                return self.incremental_sync_with_gap(sync_config, since_token)
+            else:
+                #TODO(mjark): Handle gapless sync
+                pass
+
+    @defer.inlineCallbacks
+    def initial_sync(self, sync_config):
+        """Get a sync for a client which is starting without any state
+        Returns:
+            A Deferred SyncResult.
+        """
+        if sync_config.sort == "timeline,desc":
+            # TODO(mjark): Handle going through events in reverse order?.
+            # What does "most recent events" mean when applying the limits mean
+            # in this case?
+            raise NotImplementedError()
+
+        now_token = yield self.event_sources.get_current_token()
+
+        presence_stream = self.event_sources.sources["presence"]
+        # TODO (mjark): This looks wrong, shouldn't we be getting the presence
+        # UP to the present rather than after the present?
+        pagination_config = PaginationConfig(from_token=now_token)
+        presence, _ = yield presence_stream.get_pagination_rows(
+            user=sync_config.user,
+            pagination_config=pagination_config.get_source_config("presence"),
+            key=None
+        )
+        room_list = yield self.store.get_rooms_for_user_where_membership_is(
+            user_id=sync_config.user.to_string(),
+            membership_list=[Membership.INVITE, Membership.JOIN]
+        )
+
+        # TODO (mjark): Does public mean "published"?
+        published_rooms = yield self.store.get_rooms(is_public=True)
+        published_room_ids = set(r["room_id"] for r in published_rooms)
+
+        rooms = []
+        for event in room_list:
+            room_sync = yield self.initial_sync_for_room(
+                event.room_id, sync_config, now_token, published_room_ids
+            )
+            rooms.append(room_sync)
+
+        defer.returnValue(SyncResult(
+            public_user_data=presence,
+            private_user_data=[],
+            rooms=rooms,
+            next_batch=now_token,
+        ))
+
+    @defer.inlineCallbacks
+    def initial_sync_for_room(self, room_id, sync_config, now_token,
+                              published_room_ids):
+        """Sync a room for a client which is starting without any state
+        Returns:
+            A Deferred RoomSyncResult.
+        """
+        recent_events, token = yield self.store.get_recent_events_for_room(
+            room_id,
+            limit=sync_config.limit,
+            end_token=now_token.room_key,
+        )
+        prev_batch_token = now_token.copy_and_replace("room_key", token[0])
+        current_state_events = yield self.state_handler.get_current_state(
+            room_id
+        )
+
+        defer.returnValue(RoomSyncResult(
+            room_id=room_id,
+            published=room_id in published_room_ids,
+            events=recent_events,
+            prev_batch=prev_batch_token,
+            state=current_state_events,
+            limited=True,
+            ephemeral=[],
+        ))
+
+    @defer.inlineCallbacks
+    def incremental_sync_with_gap(self, sync_config, since_token):
+        """ Get the incremental delta needed to bring the client up to
+        date with the server.
+        Returns:
+            A Deferred SyncResult.
+        """
+        if sync_config.sort == "timeline,desc":
+            # TODO(mjark): Handle going through events in reverse order?.
+            # What does "most recent events" mean when applying the limits mean
+            # in this case?
+            raise NotImplementedError()
+
+        now_token = yield self.event_sources.get_current_token()
+
+        presence_source = self.event_sources.sources["presence"]
+        presence, presence_key = yield presence_source.get_new_events_for_user(
+            user=sync_config.user,
+            from_key=since_token.presence_key,
+            limit=sync_config.limit,
+        )
+        now_token = now_token.copy_and_replace("presence_key", presence_key)
+
+        typing_source = self.event_sources.sources["typing"]
+        typing, typing_key = yield typing_source.get_new_events_for_user(
+            user=sync_config.user,
+            from_key=since_token.typing_key,
+            limit=sync_config.limit,
+        )
+        now_token = now_token.copy_and_replace("typing_key", typing_key)
+
+        typing_by_room = {event["room_id"]: [event] for event in typing}
+        for event in typing:
+            event.pop("room_id")
+        logger.debug("Typing %r", typing_by_room)
+
+        rm_handler = self.hs.get_handlers().room_member_handler
+        room_ids = yield rm_handler.get_rooms_for_user(sync_config.user)
+
+        # TODO (mjark): Does public mean "published"?
+        published_rooms = yield self.store.get_rooms(is_public=True)
+        published_room_ids = set(r["room_id"] for r in published_rooms)
+
+        room_events, _ = yield self.store.get_room_events_stream(
+            sync_config.user.to_string(),
+            from_key=since_token.room_key,
+            to_key=now_token.room_key,
+            room_id=None,
+            limit=sync_config.limit + 1,
+        )
+
+        rooms = []
+        if len(room_events) <= sync_config.limit:
+            # There is no gap in any of the rooms. Therefore we can just
+            # partition the new events by room and return them.
+            events_by_room_id = {}
+            for event in room_events:
+                events_by_room_id.setdefault(event.room_id, []).append(event)
+
+            for room_id in room_ids:
+                recents = events_by_room_id.get(room_id, [])
+                state = [event for event in recents if event.is_state()]
+                if recents:
+                    prev_batch = now_token.copy_and_replace(
+                        "room_key", recents[0].internal_metadata.before
+                    )
+                else:
+                    prev_batch = now_token
+                room_sync = RoomSyncResult(
+                    room_id=room_id,
+                    published=room_id in published_room_ids,
+                    events=recents,
+                    prev_batch=prev_batch,
+                    state=state,
+                    limited=False,
+                    ephemeral=typing_by_room.get(room_id, [])
+                )
+                if room_sync:
+                    rooms.append(room_sync)
+        else:
+            for room_id in room_ids:
+                room_sync = yield self.incremental_sync_with_gap_for_room(
+                    room_id, sync_config, since_token, now_token,
+                    published_room_ids, typing_by_room
+                )
+                if room_sync:
+                    rooms.append(room_sync)
+
+        defer.returnValue(SyncResult(
+            public_user_data=presence,
+            private_user_data=[],
+            rooms=rooms,
+            next_batch=now_token,
+        ))
+
+    @defer.inlineCallbacks
+    def incremental_sync_with_gap_for_room(self, room_id, sync_config,
+                                           since_token, now_token,
+                                           published_room_ids, typing_by_room):
+        """ Get the incremental delta needed to bring the client up to date for
+        the room. Gives the client the most recent events and the changes to
+        state.
+        Returns:
+            A Deferred RoomSyncResult
+        """
+        # TODO(mjark): Check if they have joined the room between
+        # the previous sync and this one.
+        # TODO(mjark): Apply the event filter in sync_config
+        # TODO(mjark): Check for redactions we might have missed.
+        recents, token = yield self.store.get_recent_events_for_room(
+            room_id,
+            limit=sync_config.limit + 1,
+            from_token=since_token.room_key,
+            end_token=now_token.room_key,
+        )
+
+        logging.debug("Recents %r", recents)
+
+        if len(recents) > sync_config.limit:
+            limited = True
+            recents = recents[1:]
+        else:
+            limited = False
+
+        prev_batch_token = now_token.copy_and_replace("room_key", token[0])
+
+        # TODO(mjark): This seems racy since this isn't being passed a
+        # token to indicate what point in the stream this is
+        current_state_events = yield self.state_handler.get_current_state(
+            room_id
+        )
+
+        state_at_previous_sync = yield self.get_state_at_previous_sync(
+            room_id, since_token=since_token
+        )
+
+        state_events_delta = yield self.compute_state_delta(
+            since_token=since_token,
+            previous_state=state_at_previous_sync,
+            current_state=current_state_events,
+        )
+
+        room_sync = RoomSyncResult(
+            room_id=room_id,
+            published=room_id in published_room_ids,
+            events=recents,
+            prev_batch=prev_batch_token,
+            state=state_events_delta,
+            limited=limited,
+            typing=typing_by_room.get(room_id, None)
+        )
+
+        logging.debug("Room sync: %r", room_sync)
+
+        defer.returnValue(room_sync)
+
+    @defer.inlineCallbacks
+    def get_state_at_previous_sync(self, room_id, since_token):
+        """ Get the room state at the previous sync the client made.
+        Returns:
+            A Deferred list of Events.
+        """
+        last_events, token = yield self.store.get_recent_events_for_room(
+            room_id, end_token=since_token.room_key, limit=1,
+        )
+
+        if last_events:
+            last_event = last_events[0]
+            last_context = yield self.state_handler.compute_event_context(
+                last_event
+            )
+            if last_event.is_state():
+                state = [last_event] + last_context.current_state.values()
+            else:
+                state = last_context.current_state.values()
+        else:
+            state = ()
+        defer.returnValue(state)
+
+    def compute_state_delta(self, since_token, previous_state, current_state):
+        """ Works out the differnce in state between the current state and the
+        state the client got when it last performed a sync.
+        Returns:
+            A list of events.
+        """
+        # TODO(mjark) Check if the state events were received by the server
+        # after the previous sync, since we need to include those state
+        # updates even if they occured logically before the previous event.
+        # TODO(mjark) Check for new redactions in the state events.
+        previous_dict = {event.event_id: event for event in previous_state}
+        state_delta = []
+        for event in current_state:
+            if event.event_id not in previous_dict:
+                state_delta.append(event)
+        return state_delta
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 3aec1d4af2..e3b6ead620 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
 from synapse.util.logutils import log_function
 from synapse.util.logcontext import PreserveLoggingContext
 from synapse.util.async import run_on_reactor
+from synapse.types import StreamToken
 
 import logging
 
@@ -205,6 +206,53 @@ class Notifier(object):
                 [notify(l).addErrback(eb) for l in listeners]
             )
 
+    @defer.inlineCallbacks
+    def wait_for_events(self, user, rooms, filter, timeout, callback):
+        """Wait until the callback returns a non empty response or the
+        timeout fires.
+        """
+
+        deferred = defer.Deferred()
+
+        from_token = StreamToken("s0", "0", "0")
+
+        listener = [_NotificationListener(
+            user=user,
+            rooms=rooms,
+            from_token=from_token,
+            limit=1,
+            timeout=timeout,
+            deferred=deferred,
+        )]
+
+        if timeout:
+            self._register_with_keys(listener[0])
+
+        result = yield callback()
+        if timeout:
+            timed_out = [False]
+
+            def _timeout_listener():
+                timed_out[0] = True
+                listener[0].notify(self, [], from_token, from_token)
+
+            self.clock.call_later(timeout/1000., _timeout_listener)
+            while not result and not timed_out[0]:
+                yield deferred
+                deferred = defer.Deferred()
+                listener[0] = _NotificationListener(
+                    user=user,
+                    rooms=rooms,
+                    from_token=from_token,
+                    limit=1,
+                    timeout=timeout,
+                    deferred=deferred,
+                )
+                self._register_with_keys(listener[0])
+                result = yield callback()
+
+        defer.returnValue(result)
+
     def get_events_for(self, user, rooms, pagination_config, timeout):
         """ For the given user and rooms, return any new events for them. If
         there are no new events wait for up to `timeout` milliseconds for any
diff --git a/synapse/rest/client/v2_alpha/__init__.py b/synapse/rest/client/v2_alpha/__init__.py
index 4349579ab7..8f611de3a8 100644
--- a/synapse/rest/client/v2_alpha/__init__.py
+++ b/synapse/rest/client/v2_alpha/__init__.py
@@ -13,12 +13,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
 from . import (
+    sync,
     filter
 )
 
-
 from synapse.http.server import JsonResource
 
 
@@ -31,4 +30,5 @@ class ClientV2AlphaRestResource(JsonResource):
 
     @staticmethod
     def register_servlets(client_resource, hs):
+        sync.register_servlets(hs, client_resource)
         filter.register_servlets(hs, client_resource)
diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
new file mode 100644
index 0000000000..2ae2eec55d
--- /dev/null
+++ b/synapse/rest/client/v2_alpha/sync.py
@@ -0,0 +1,198 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from synapse.http.servlet import RestServlet
+from synapse.handlers.sync import SyncConfig
+from synapse.types import StreamToken
+from synapse.events.utils import (
+    serialize_event, format_event_for_client_v2_without_event_id,
+)
+from ._base import client_v2_pattern
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class SyncRestServlet(RestServlet):
+    """
+
+    GET parameters::
+        timeout(int): How long to wait for new events in milliseconds.
+        limit(int): Maxiumum number of events per room to return.
+        gap(bool): Create gaps the message history if limit is exceeded to
+            ensure that the client has the most recent messages. Defaults to
+            "true".
+        sort(str,str): tuple of sort key (e.g. "timeline") and direction
+            (e.g. "asc", "desc"). Defaults to "timeline,asc".
+        since(batch_token): Batch token when asking for incremental deltas.
+        set_presence(str): What state the device presence should be set to.
+            default is "online".
+        backfill(bool): Should the HS request message history from other
+            servers. This may take a long time making it unsuitable for clients
+            expecting a prompt response. Defaults to "true".
+        filter(filter_id): A filter to apply to the events returned.
+        filter_*: Filter override parameters.
+
+    Response JSON::
+        {
+            "next_batch": // batch token for the next /sync
+            "private_user_data": // private events for this user.
+            "public_user_data": // public events for all users including the
+                                // public events for this user.
+            "rooms": [{ // List of rooms with updates.
+                "room_id": // Id of the room being updated
+                "limited": // Was the per-room event limit exceeded?
+                "published": // Is the room published by our HS?
+                "event_map": // Map of EventID -> event JSON.
+                "events": { // The recent events in the room if gap is "true"
+                            // otherwise the next events in the room.
+                    "batch": [] // list of EventIDs in the "event_map".
+                    "prev_batch": // back token for getting previous events.
+                }
+                "state": [] // list of EventIDs updating the current state to
+                            // be what it should be at the end of the batch.
+                "ephemeral": []
+            }]
+        }
+    """
+
+    PATTERN = client_v2_pattern("/sync$")
+    ALLOWED_SORT = set(["timeline,asc", "timeline,desc"])
+    ALLOWED_PRESENCE = set(["online", "offline", "idle"])
+
+    def __init__(self, hs):
+        super(SyncRestServlet, self).__init__()
+        self.auth = hs.get_auth()
+        self.sync_handler = hs.get_handlers().sync_handler
+        self.clock = hs.get_clock()
+
+    @defer.inlineCallbacks
+    def on_GET(self, request):
+        user, client = yield self.auth.get_user_by_req(request)
+
+        timeout = self.parse_integer(request, "timeout", default=0)
+        limit = self.parse_integer(request, "limit", required=True)
+        gap = self.parse_boolean(request, "gap", default=True)
+        sort = self.parse_string(
+            request, "sort", default="timeline,asc",
+            allowed_values=self.ALLOWED_SORT
+        )
+        since = self.parse_string(request, "since")
+        set_presence = self.parse_string(
+            request, "set_presence", default="online",
+            allowed_values=self.ALLOWED_PRESENCE
+        )
+        backfill = self.parse_boolean(request, "backfill", default=False)
+        filter_id = self.parse_string(request, "filter", default=None)
+
+        logger.info(
+            "/sync: user=%r, timeout=%r, limit=%r, gap=%r, sort=%r, since=%r,"
+            " set_presence=%r, backfill=%r, filter_id=%r" % (
+                user, timeout, limit, gap, sort, since, set_presence,
+                backfill, filter_id
+            )
+        )
+
+        # TODO(mjark): Load filter and apply overrides.
+        # filter = self.filters.load_fitler(filter_id_str)
+        # filter = filter.apply_overrides(http_request)
+        # if filter.matches(event):
+        #   # stuff
+
+        sync_config = SyncConfig(
+            user=user,
+            device="TODO",  # TODO(mjark) Get the device_id from access_token
+            gap=gap,
+            limit=limit,
+            sort=sort,
+            backfill=backfill,
+            filter="TODO",  # TODO(mjark) Add the filter to the config.
+        )
+
+        if since is not None:
+            since_token = StreamToken.from_string(since)
+        else:
+            since_token = None
+
+        sync_result = yield self.sync_handler.wait_for_sync_for_user(
+            sync_config, since_token=since_token, timeout=timeout
+        )
+
+        time_now = self.clock.time_msec()
+
+        response_content = {
+            "public_user_data": self.encode_user_data(
+                sync_result.public_user_data, filter, time_now
+            ),
+            "private_user_data": self.encode_user_data(
+                sync_result.private_user_data, filter, time_now
+            ),
+            "rooms": self.encode_rooms(
+                sync_result.rooms, filter, time_now, client.token_id
+            ),
+            "next_batch": sync_result.next_batch.to_string(),
+        }
+
+        defer.returnValue((200, response_content))
+
+    def encode_user_data(self, events, filter, time_now):
+        return events
+
+    def encode_rooms(self, rooms, filter, time_now, token_id):
+        return [
+            self.encode_room(room, filter, time_now, token_id)
+            for room in rooms
+        ]
+
+    @staticmethod
+    def encode_room(room, filter, time_now, token_id):
+        event_map = {}
+        state_event_ids = []
+        recent_event_ids = []
+        for event in room.state:
+            # TODO(mjark): Respect formatting requirements in the filter.
+            event_map[event.event_id] = serialize_event(
+                event, time_now, token_id=token_id,
+                event_format=format_event_for_client_v2_without_event_id,
+            )
+            state_event_ids.append(event.event_id)
+
+        for event in room.events:
+            # TODO(mjark): Respect formatting requirements in the filter.
+            event_map[event.event_id] = serialize_event(
+                event, time_now, token_id=token_id,
+                event_format=format_event_for_client_v2_without_event_id,
+            )
+            recent_event_ids.append(event.event_id)
+        result = {
+            "room_id": room.room_id,
+            "event_map": event_map,
+            "events": {
+                "batch": recent_event_ids,
+                "prev_batch": room.prev_batch.to_string(),
+            },
+            "state": state_event_ids,
+            "limited": room.limited,
+            "published": room.published,
+            "ephemeral": room.ephemeral,
+        }
+        return result
+
+
+def register_servlets(hs, http_server):
+    SyncRestServlet(hs).register(http_server)
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 062ca06fb3..2ea5e1a021 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -181,6 +181,13 @@ class StreamStore(SQLBaseStore):
                 get_prev_content=True
             )
 
+            for event, row in zip(ret, rows):
+                stream = row["stream_ordering"]
+                topo = event.depth
+                internal = event.internal_metadata
+                internal.before = str(_StreamToken(topo, stream - 1))
+                internal.after = str(_StreamToken(topo, stream))
+
             if rows:
                 key = "s%d" % max([r["stream_ordering"] for r in rows])
             else:
@@ -265,17 +272,37 @@ class StreamStore(SQLBaseStore):
         return self.runInteraction("paginate_room_events", f)
 
     def get_recent_events_for_room(self, room_id, limit, end_token,
-                                   with_feedback=False):
+                                   with_feedback=False, from_token=None):
         # TODO (erikj): Handle compressed feedback
 
-        sql = (
-            "SELECT stream_ordering, topological_ordering, event_id FROM events "
-            "WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0 "
-            "ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ? "
-        )
+        end_token = _StreamToken.parse_stream_token(end_token)
 
-        def f(txn):
-            txn.execute(sql, (room_id, end_token, limit,))
+        if from_token is None:
+            sql = (
+                "SELECT stream_ordering, topological_ordering, event_id"
+                " FROM events"
+                " WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0"
+                " ORDER BY topological_ordering DESC, stream_ordering DESC"
+                " LIMIT ?"
+            )
+        else:
+            from_token = _StreamToken.parse_stream_token(from_token)
+            sql = (
+                "SELECT stream_ordering, topological_ordering, event_id"
+                " FROM events"
+                " WHERE room_id = ? AND stream_ordering > ?"
+                " AND stream_ordering <= ? AND outlier = 0"
+                " ORDER BY topological_ordering DESC, stream_ordering DESC"
+                " LIMIT ?"
+            )
+
+        def get_recent_events_for_room_txn(txn):
+            if from_token is None:
+                txn.execute(sql, (room_id, end_token.stream, limit,))
+            else:
+                txn.execute(sql, (
+                    room_id, from_token.stream, end_token.stream, limit
+                ))
 
             rows = self.cursor_to_dict(txn)
 
@@ -303,7 +330,9 @@ class StreamStore(SQLBaseStore):
 
             return events, token
 
-        return self.runInteraction("get_recent_events_for_room", f)
+        return self.runInteraction(
+            "get_recent_events_for_room", get_recent_events_for_room_txn
+        )
 
     def get_room_events_max_id(self):
         return self.runInteraction(