summary refs log tree commit diff
diff options
context:
space:
mode:
-rwxr-xr-xdemo/start.sh2
-rw-r--r--synapse/events/utils.py7
-rw-r--r--synapse/handlers/__init__.py2
-rw-r--r--synapse/handlers/sync.py332
-rw-r--r--synapse/notifier.py48
-rw-r--r--synapse/rest/client/v2_alpha/__init__.py6
-rw-r--r--synapse/rest/client/v2_alpha/sync.py191
-rw-r--r--synapse/storage/stream.py40
8 files changed, 616 insertions, 12 deletions
diff --git a/demo/start.sh b/demo/start.sh
index bb2248770d..a7502e7a8d 100755
--- a/demo/start.sh
+++ b/demo/start.sh
@@ -16,7 +16,7 @@ if [ $# -eq 1 ]; then
     fi
 fi
 
-for port in 8080 8081 8082; do
+for port in 8080; do
     echo "Starting server on port $port... "
 
     https_port=$((port + 400))
diff --git a/synapse/events/utils.py b/synapse/events/utils.py
index e391aca4cc..42fb0371e5 100644
--- a/synapse/events/utils.py
+++ b/synapse/events/utils.py
@@ -89,7 +89,7 @@ def prune_event(event):
     return type(event)(allowed_fields)
 
 
-def serialize_event(e, time_now_ms, client_event=True):
+def serialize_event(e, time_now_ms, client_event=True, strip_ids=False):
     # FIXME(erikj): To handle the case of presence events and the like
     if not isinstance(e, EventBase):
         return e
@@ -137,5 +137,10 @@ def serialize_event(e, time_now_ms, client_event=True):
     d.pop("depth", None)
     d.pop("unsigned", None)
     d.pop("origin", None)
+    d.pop("prev_state", None)
+
+    if strip_ids:
+        d.pop("room_id", None)
+        d.pop("event_id", None)
 
     return d
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index fe071a4bc2..a32eab9316 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -26,6 +26,7 @@ from .presence import PresenceHandler
 from .directory import DirectoryHandler
 from .typing import TypingNotificationHandler
 from .admin import AdminHandler
+from .sync import SyncHandler
 
 
 class Handlers(object):
@@ -51,3 +52,4 @@ class Handlers(object):
         self.directory_handler = DirectoryHandler(hs)
         self.typing_notification_handler = TypingNotificationHandler(hs)
         self.admin_handler = AdminHandler(hs)
+        self.sync_handler = SyncHandler(hs)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
new file mode 100644
index 0000000000..82a2c6986a
--- /dev/null
+++ b/synapse/handlers/sync.py
@@ -0,0 +1,332 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseHandler
+
+from synapse.streams.config import PaginationConfig
+from synapse.api.constants import Membership
+
+from twisted.internet import defer
+
+import collections
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+SyncConfig = collections.namedtuple("SyncConfig", [
+    "user",
+    "device",
+    "limit",
+    "gap",
+    "sort",
+    "backfill",
+    "filter",
+])
+
+
+class RoomSyncResult(collections.namedtuple("RoomSyncResult", [
+    "room_id",
+    "limited",
+    "published",
+    "events",
+    "state",
+    "prev_batch",
+])):
+    __slots__ = []
+
+    def __nonzero__(self):
+        return bool(self.events or self.state)
+
+
+class SyncResult(collections.namedtuple("SyncResult", [
+    "next_batch",  # Token for the next sync
+    "private_user_data",  # List of private events for the user.
+    "public_user_data",  # List of public events for all users.
+    "rooms",  # RoomSyncResult for each room.
+])):
+    __slots__ = []
+
+    def __nonzero__(self):
+        return bool(
+            self.private_user_data or self.public_user_data or self.rooms
+        )
+
+
+class SyncHandler(BaseHandler):
+
+    def __init__(self, hs):
+        super(SyncHandler, self).__init__(hs)
+        self.event_sources = hs.get_event_sources()
+        self.clock = hs.get_clock()
+
+    @defer.inlineCallbacks
+    def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0):
+        """Get the sync for a client if we have new data for it now. Otherwise
+        wait for new data to arrive on the server. If the timeout expires, then
+        return an empty sync result.
+        Returns:
+            A Deferred SyncResult.
+        """
+        if timeout == 0 or since_token is None:
+            result = yield self.current_sync_for_user(sync_config, since_token)
+            defer.returnValue(result)
+        else:
+            def current_sync_callback():
+                return self.current_sync_for_user(sync_config, since_token)
+
+            rm_handler = self.hs.get_handlers().room_member_handler
+            room_ids = yield rm_handler.get_rooms_for_user(sync_config.user)
+            result = yield self.notifier.wait_for_events(
+                sync_config.user, room_ids,
+                sync_config.filter, timeout, current_sync_callback
+            )
+            defer.returnValue(result)
+
+    def current_sync_for_user(self, sync_config, since_token=None):
+        """Get the sync for client needed to match what the server has now.
+        Returns:
+            A Deferred SyncResult.
+        """
+        if since_token is None:
+            return self.initial_sync(sync_config)
+        else:
+            if sync_config.gap:
+                return self.incremental_sync_with_gap(sync_config, since_token)
+            else:
+                #TODO(mjark): Handle gapless sync
+                pass
+
+    @defer.inlineCallbacks
+    def initial_sync(self, sync_config):
+        """Get a sync for a client which is starting without any state
+        Returns:
+            A Deferred SyncResult.
+        """
+        if sync_config.sort == "timeline,desc":
+            # TODO(mjark): Handle going through events in reverse order?.
+            # What does "most recent events" mean when applying the limits mean
+            # in this case?
+            raise NotImplementedError()
+
+        now_token = yield self.event_sources.get_current_token()
+
+        presence_stream = self.event_sources.sources["presence"]
+        # TODO (mjark): This looks wrong, shouldn't we be getting the presence
+        # UP to the present rather than after the present?
+        pagination_config = PaginationConfig(from_token=now_token)
+        presence, _ = yield presence_stream.get_pagination_rows(
+            user=sync_config.user,
+            pagination_config=pagination_config.get_source_config("presence"),
+            key=None
+        )
+        room_list = yield self.store.get_rooms_for_user_where_membership_is(
+            user_id=sync_config.user.to_string(),
+            membership_list=[Membership.INVITE, Membership.JOIN]
+        )
+
+        # TODO (mjark): Does public mean "published"?
+        published_rooms = yield self.store.get_rooms(is_public=True)
+        published_room_ids = set(r["room_id"] for r in published_rooms)
+
+        rooms = []
+        for event in room_list:
+            room_sync = yield self.initial_sync_for_room(
+                event.room_id, sync_config, now_token, published_room_ids
+            )
+            rooms.append(room_sync)
+
+        defer.returnValue(SyncResult(
+            public_user_data=presence,
+            private_user_data=[],
+            rooms=rooms,
+            next_batch=now_token,
+        ))
+
+    @defer.inlineCallbacks
+    def intial_sync_for_room(self, room_id, sync_config, now_token,
+                             published_room_ids):
+        """Sync a room for a client which is starting without any state
+        Returns:
+            A Deferred RoomSyncResult.
+        """
+        recent_events, token = yield self.store.get_recent_events_for_room(
+            room_id,
+            limit=sync_config.limit,
+            end_token=now_token.room_key,
+        )
+        prev_batch_token = now_token.copy_and_replace("room_key", token[0])
+        current_state_events = yield self.state_handler.get_current_state(
+            room_id
+        )
+
+        defer.returnValue(RoomSyncResult(
+            room_id=room_id,
+            published=room_id in published_room_ids,
+            events=recent_events,
+            prev_batch=prev_batch_token,
+            state=current_state_events,
+            limited=True,
+        ))
+
+    @defer.inlineCallbacks
+    def incremental_sync_with_gap(self, sync_config, since_token):
+        """ Get the incremental delta needed to bring the client up to
+        date with the server.
+        Returns:
+            A Deferred SyncResult.
+        """
+        if sync_config.sort == "timeline,desc":
+            # TODO(mjark): Handle going through events in reverse order?.
+            # What does "most recent events" mean when applying the limits mean
+            # in this case?
+            raise NotImplementedError()
+
+        now_token = yield self.event_sources.get_current_token()
+
+        presence_stream = self.event_sources.sources["presence"]
+        pagination_config = PaginationConfig(
+            from_token=since_token, to_token=now_token
+        )
+        presence, _ = yield presence_stream.get_pagination_rows(
+            user=sync_config.user,
+            pagination_config=pagination_config.get_source_config("presence"),
+            key=None
+        )
+        room_list = yield self.store.get_rooms_for_user_where_membership_is(
+            user_id=sync_config.user.to_string(),
+            membership_list=[Membership.INVITE, Membership.JOIN]
+        )
+
+        # TODO (mjark): Does public mean "published"?
+        published_rooms = yield self.store.get_rooms(is_public=True)
+        published_room_ids = set(r["room_id"] for r in published_rooms)
+
+        rooms = []
+        for event in room_list:
+            room_sync = yield self.incremental_sync_with_gap_for_room(
+                event.room_id, sync_config, since_token, now_token,
+                published_room_ids
+            )
+            if room_sync:
+                rooms.append(room_sync)
+
+        defer.returnValue(SyncResult(
+            public_user_data=presence,
+            private_user_data=[],
+            rooms=rooms,
+            next_batch=now_token,
+        ))
+
+    @defer.inlineCallbacks
+    def incremental_sync_with_gap_for_room(self, room_id, sync_config,
+                                           since_token, now_token,
+                                           published_room_ids):
+        """ Get the incremental delta needed to bring the client up to date for
+        the room. Gives the client the most recent events and the changes to
+        state.
+        Returns:
+            A Deferred RoomSyncResult
+        """
+        # TODO(mjark): Check if they have joined the room between
+        # the previous sync and this one.
+        # TODO(mjark): Apply the event filter in sync_config
+        # TODO(mjark): Check for redactions we might have missed.
+        # TODO(mjark): Typing notifications.
+        recents, token = yield self.store.get_recent_events_for_room(
+            room_id,
+            limit=sync_config.limit + 1,
+            from_token=since_token.room_key,
+            end_token=now_token.room_key,
+        )
+
+        logging.debug("Recents %r", recents)
+
+        if len(recents) > sync_config.limit:
+            limited = True
+            recents = recents[1:]
+        else:
+            limited = False
+
+        prev_batch_token = now_token.copy_and_replace("room_key", token[0])
+
+        # TODO(mjark): This seems racy since this isn't being passed a
+        # token to indicate what point in the stream this is
+        current_state_events = yield self.state_handler.get_current_state(
+            room_id
+        )
+
+        state_at_previous_sync = yield self.get_state_at_previous_sync(
+            room_id, since_token=since_token
+        )
+
+        state_events_delta = yield self.compute_state_delta(
+            since_token=since_token,
+            previous_state=state_at_previous_sync,
+            current_state=current_state_events,
+        )
+
+        room_sync = RoomSyncResult(
+            room_id=room_id,
+            published=room_id in published_room_ids,
+            events=recents,
+            prev_batch=prev_batch_token,
+            state=state_events_delta,
+            limited=limited,
+        )
+
+        logging.debug("Room sync: %r", room_sync)
+
+        defer.returnValue(room_sync)
+
+    @defer.inlineCallbacks
+    def get_state_at_previous_sync(self, room_id, since_token):
+        """ Get the room state at the previous sync the client made.
+        Returns:
+            A Deferred list of Events.
+        """
+        last_events, token = yield self.store.get_recent_events_for_room(
+            room_id, end_token=since_token.room_key, limit=1,
+        )
+
+        if last_events:
+            last_event = last_events[0]
+            last_context = yield self.state_handler.compute_event_context(
+                last_event
+            )
+            if last_event.is_state():
+                state = [last_event] + last_context.current_state.values()
+            else:
+                state = last_context.current_state.values()
+        else:
+            state = ()
+        defer.returnValue(state)
+
+    def compute_state_delta(self, since_token, previous_state, current_state):
+        """ Works out the differnce in state between the current state and the
+        state the client got when it last performed a sync.
+        Returns:
+            A list of events.
+        """
+        # TODO(mjark) Check if the state events were received by the server
+        # after the previous sync, since we need to include those state
+        # updates even if they occured logically before the previous event.
+        # TODO(mjark) Check for new redactions in the state events.
+        previous_dict = {event.event_id: event for event in previous_state}
+        state_delta = []
+        for event in current_state:
+            if event.event_id not in previous_dict:
+                state_delta.append(event)
+        return state_delta
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 3aec1d4af2..e3b6ead620 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
 from synapse.util.logutils import log_function
 from synapse.util.logcontext import PreserveLoggingContext
 from synapse.util.async import run_on_reactor
+from synapse.types import StreamToken
 
 import logging
 
@@ -205,6 +206,53 @@ class Notifier(object):
                 [notify(l).addErrback(eb) for l in listeners]
             )
 
+    @defer.inlineCallbacks
+    def wait_for_events(self, user, rooms, filter, timeout, callback):
+        """Wait until the callback returns a non empty response or the
+        timeout fires.
+        """
+
+        deferred = defer.Deferred()
+
+        from_token = StreamToken("s0", "0", "0")
+
+        listener = [_NotificationListener(
+            user=user,
+            rooms=rooms,
+            from_token=from_token,
+            limit=1,
+            timeout=timeout,
+            deferred=deferred,
+        )]
+
+        if timeout:
+            self._register_with_keys(listener[0])
+
+        result = yield callback()
+        if timeout:
+            timed_out = [False]
+
+            def _timeout_listener():
+                timed_out[0] = True
+                listener[0].notify(self, [], from_token, from_token)
+
+            self.clock.call_later(timeout/1000., _timeout_listener)
+            while not result and not timed_out[0]:
+                yield deferred
+                deferred = defer.Deferred()
+                listener[0] = _NotificationListener(
+                    user=user,
+                    rooms=rooms,
+                    from_token=from_token,
+                    limit=1,
+                    timeout=timeout,
+                    deferred=deferred,
+                )
+                self._register_with_keys(listener[0])
+                result = yield callback()
+
+        defer.returnValue(result)
+
     def get_events_for(self, user, rooms, pagination_config, timeout):
         """ For the given user and rooms, return any new events for them. If
         there are no new events wait for up to `timeout` milliseconds for any
diff --git a/synapse/rest/client/v2_alpha/__init__.py b/synapse/rest/client/v2_alpha/__init__.py
index bb740e2803..e75f9d250f 100644
--- a/synapse/rest/client/v2_alpha/__init__.py
+++ b/synapse/rest/client/v2_alpha/__init__.py
@@ -14,6 +14,10 @@
 # limitations under the License.
 
 
+from . import (
+    sync,
+)
+
 from synapse.http.server import JsonResource
 
 
@@ -26,4 +30,4 @@ class ClientV2AlphaRestResource(JsonResource):
 
     @staticmethod
     def register_servlets(client_resource, hs):
-        pass
+        sync.register_servlets(hs, client_resource)
diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
new file mode 100644
index 0000000000..0c17208cd3
--- /dev/null
+++ b/synapse/rest/client/v2_alpha/sync.py
@@ -0,0 +1,191 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from synapse.http.servlet import RestServlet
+from synapse.handlers.sync import SyncConfig
+from synapse.types import StreamToken
+from synapse.events.utils import serialize_event
+from ._base import client_v2_pattern
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class SyncRestServlet(RestServlet):
+    """
+
+    GET parameters::
+        timeout(int): How long to wait for new events in milliseconds.
+        limit(int): Maxiumum number of events per room to return.
+        gap(bool): Create gaps the message history if limit is exceeded to
+            ensure that the client has the most recent messages. Defaults to
+            "true".
+        sort(str,str): tuple of sort key (e.g. "timeline") and direction
+            (e.g. "asc", "desc"). Defaults to "timeline,asc".
+        since(batch_token): Batch token when asking for incremental deltas.
+        set_presence(str): What state the device presence should be set to.
+            default is "online".
+        backfill(bool): Should the HS request message history from other
+            servers. This may take a long time making it unsuitable for clients
+            expecting a prompt response. Defaults to "true".
+        filter(filter_id): A filter to apply to the events returned.
+        filter_*: Filter override parameters.
+
+    Response JSON::
+        {
+            "next_batch": // batch token for the next /sync
+            "private_user_data": // private events for this user.
+            "public_user_data": // public events for all users including the
+                                // public events for this user.
+            "rooms": [{ // List of rooms with updates.
+                "room_id": // Id of the room being updated
+                "limited": // Was the per-room event limit exceeded?
+                "published": // Is the room published by our HS?
+                "event_map": // Map of EventID -> event JSON.
+                "events": { // The recent events in the room if gap is "true"
+                            // otherwise the next events in the room.
+                    "batch": [] // list of EventIDs in the "event_map".
+                    "prev_batch": // back token for getting previous events.
+                }
+                "state": [] // list of EventIDs updating the current state to
+                            // be what it should be at the end of the batch.
+            }]
+        }
+    """
+
+    PATTERN = client_v2_pattern("/sync$")
+    ALLOWED_SORT = set(["timeline,asc", "timeline,desc"])
+    ALLOWED_PRESENCE = set(["online", "offline", "idle"])
+
+    def __init__(self, hs):
+        super(SyncRestServlet, self).__init__()
+        self.auth = hs.get_auth()
+        self.sync_handler = hs.get_handlers().sync_handler
+        self.clock = hs.get_clock()
+
+    @defer.inlineCallbacks
+    def on_GET(self, request):
+        user = yield self.auth.get_user_by_req(request)
+
+        timeout = self.parse_integer(request, "timeout", default=0)
+        limit = self.parse_integer(request, "limit", required=True)
+        gap = self.parse_boolean(request, "gap", default=True)
+        sort = self.parse_string(
+            request, "sort", default="timeline,asc",
+            allowed_values=self.ALLOWED_SORT
+        )
+        since = self.parse_string(request, "since")
+        set_presence = self.parse_string(
+            request, "set_presence", default="online",
+            allowed_values=self.ALLOWED_PRESENCE
+        )
+        backfill = self.parse_boolean(request, "backfill", default=False)
+        filter_id = self.parse_string(request, "filter", default=None)
+
+        logger.info(
+            "/sync: user=%r, timeout=%r, limit=%r, gap=%r, sort=%r, since=%r,"
+            " set_presence=%r, backfill=%r, filter_id=%r" % (
+                user, timeout, limit, gap, sort, since, set_presence,
+                backfill, filter_id
+            )
+        )
+
+        # TODO(mjark): Load filter and apply overrides.
+        # filter = self.filters.load_fitler(filter_id_str)
+        # filter = filter.apply_overrides(http_request)
+        # if filter.matches(event):
+        #   # stuff
+
+        sync_config = SyncConfig(
+            user=user,
+            device="TODO",  # TODO(mjark) Get the device_id from access_token
+            gap=gap,
+            limit=limit,
+            sort=sort,
+            backfill=backfill,
+            filter="TODO",  # TODO(mjark) Add the filter to the config.
+        )
+
+        if since is not None:
+            since_token = StreamToken.from_string(since)
+        else:
+            since_token = None
+
+        sync_result = yield self.sync_handler.wait_for_sync_for_user(
+            sync_config, since_token=since_token, timeout=timeout
+        )
+
+        time_now = self.clock.time_msec()
+
+        response_content = {
+            "public_user_data": self.encode_events(
+                sync_result.public_user_data, filter, time_now
+            ),
+            "private_user_data": self.encode_events(
+                sync_result.private_user_data, filter, time_now
+            ),
+            "rooms": self.encode_rooms(sync_result.rooms, filter, time_now),
+            "next_batch": sync_result.next_batch.to_string(),
+        }
+
+        defer.returnValue((200, response_content))
+
+    def encode_events(self, events, filter, time_now):
+        return [self.encode_event(event, filter, time_now) for event in events]
+
+    @staticmethod
+    def encode_event(event, filter, time_now):
+        # TODO(mjark): Respect formatting requirements in the filter.
+        return serialize_event(event, time_now)
+
+    def encode_rooms(self, rooms, filter, time_now):
+        return [self.encode_room(room, filter, time_now) for room in rooms]
+
+    @staticmethod
+    def encode_room(room, filter, time_now):
+        event_map = {}
+        state_event_ids = []
+        recent_event_ids = []
+        for event in room.state:
+            # TODO(mjark): Respect formatting requirements in the filter.
+            event_map[event.event_id] = serialize_event(
+                event, time_now, strip_ids=True
+            )
+            state_event_ids.append(event.event_id)
+
+        for event in room.events:
+            # TODO(mjark): Respect formatting requirements in the filter.
+            event_map[event.event_id] = serialize_event(
+                event, time_now, strip_ids=True
+            )
+            recent_event_ids.append(event.event_id)
+        return {
+            "room_id": room.room_id,
+            "event_map": event_map,
+            "events": {
+                "batch": recent_event_ids,
+                "prev_batch": room.prev_batch.to_string(),
+            },
+            "state": state_event_ids,
+            "limited": room.limited,
+            "published": room.published,
+        }
+
+
+def register_servlets(hs, http_server):
+    SyncRestServlet(hs).register(http_server)
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 8ac2adab05..db1816ea84 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -265,17 +265,37 @@ class StreamStore(SQLBaseStore):
         return self.runInteraction("paginate_room_events", f)
 
     def get_recent_events_for_room(self, room_id, limit, end_token,
-                                   with_feedback=False):
+                                   with_feedback=False, from_token=None):
         # TODO (erikj): Handle compressed feedback
 
-        sql = (
-            "SELECT stream_ordering, topological_ordering, event_id FROM events "
-            "WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0 "
-            "ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ? "
-        )
+        end_token = _StreamToken.parse_stream_token(end_token)
 
-        def f(txn):
-            txn.execute(sql, (room_id, end_token, limit,))
+        if from_token is None:
+            sql = (
+                "SELECT stream_ordering, topological_ordering, event_id"
+                " FROM events"
+                " WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0"
+                " ORDER BY topological_ordering DESC, stream_ordering DESC"
+                " LIMIT ?"
+            )
+        else:
+            from_token = _StreamToken.parse_stream_token(from_token)
+            sql = (
+                "SELECT stream_ordering, topological_ordering, event_id"
+                " FROM events"
+                " WHERE room_id = ? AND stream_ordering > ?"
+                " AND stream_ordering <= ? AND outlier = 0"
+                " ORDER BY topological_ordering DESC, stream_ordering DESC"
+                " LIMIT ?"
+            )
+
+        def get_recent_events_for_room_txn(txn):
+            if from_token is None:
+                txn.execute(sql, (room_id, end_token.stream, limit,))
+            else:
+                txn.execute(sql, (
+                    room_id, from_token.stream, end_token.stream, limit
+                ))
 
             rows = self.cursor_to_dict(txn)
 
@@ -303,7 +323,9 @@ class StreamStore(SQLBaseStore):
 
             return events, token
 
-        return self.runInteraction("get_recent_events_for_room", f)
+        return self.runInteraction(
+            "get_recent_events_for_room", get_recent_events_for_room_txn
+        )
 
     def get_room_events_max_id(self):
         return self.runInteraction(