summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/event_push_actions.py24
-rw-r--r--synapse/storage/events.py104
-rw-r--r--synapse/storage/events_worker.py5
-rw-r--r--synapse/storage/registration.py4
-rw-r--r--synapse/storage/room.py6
-rw-r--r--synapse/storage/schema/delta/30/as_users.py4
-rw-r--r--synapse/storage/schema/delta/48/group_unique_indexes.py57
-rw-r--r--synapse/storage/search.py2
-rw-r--r--synapse/storage/stream.py460
-rw-r--r--synapse/storage/tags.py4
10 files changed, 380 insertions, 290 deletions
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index e78f8d0114..c22762eb5c 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -448,6 +448,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
             "add_push_actions_to_staging", _add_push_actions_to_staging_txn
         )
 
+    @defer.inlineCallbacks
     def remove_push_actions_from_staging(self, event_id):
         """Called if we failed to persist the event to ensure that stale push
         actions don't build up in the DB
@@ -456,13 +457,22 @@ class EventPushActionsWorkerStore(SQLBaseStore):
             event_id (str)
         """
 
-        return self._simple_delete(
-            table="event_push_actions_staging",
-            keyvalues={
-                "event_id": event_id,
-            },
-            desc="remove_push_actions_from_staging",
-        )
+        try:
+            res = yield self._simple_delete(
+                table="event_push_actions_staging",
+                keyvalues={
+                    "event_id": event_id,
+                },
+                desc="remove_push_actions_from_staging",
+            )
+            defer.returnValue(res)
+        except Exception:
+            # this method is called from an exception handler, so propagating
+            # another exception here really isn't helpful - there's nothing
+            # the caller can do about it. Just log the exception and move on.
+            logger.exception(
+                "Error removing push actions after event persistence failure",
+            )
 
     @defer.inlineCallbacks
     def _find_stream_orderings_for_times(self):
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 5fe4a0e56c..05cde96afc 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -22,7 +22,6 @@ import logging
 import simplejson as json
 from twisted.internet import defer
 
-
 from synapse.storage.events_worker import EventsWorkerStore
 from synapse.util.async import ObservableDeferred
 from synapse.util.frozenutils import frozendict_json_encoder
@@ -425,7 +424,9 @@ class EventsStore(EventsWorkerStore):
                             )
                             current_state = yield self._get_new_state_after_events(
                                 room_id,
-                                ev_ctx_rm, new_latest_event_ids,
+                                ev_ctx_rm,
+                                latest_event_ids,
+                                new_latest_event_ids,
                             )
                             if current_state is not None:
                                 current_state_for_room[room_id] = current_state
@@ -513,7 +514,8 @@ class EventsStore(EventsWorkerStore):
         defer.returnValue(new_latest_event_ids)
 
     @defer.inlineCallbacks
-    def _get_new_state_after_events(self, room_id, events_context, new_latest_event_ids):
+    def _get_new_state_after_events(self, room_id, events_context, old_latest_event_ids,
+                                    new_latest_event_ids):
         """Calculate the current state dict after adding some new events to
         a room
 
@@ -524,6 +526,9 @@ class EventsStore(EventsWorkerStore):
             events_context (list[(EventBase, EventContext)]):
                 events and contexts which are being added to the room
 
+            old_latest_event_ids (iterable[str]):
+                the old forward extremities for the room.
+
             new_latest_event_ids (iterable[str]):
                 the new forward extremities for the room.
 
@@ -534,64 +539,89 @@ class EventsStore(EventsWorkerStore):
         """
 
         if not new_latest_event_ids:
-            defer.returnValue({})
+            return
 
         # map from state_group to ((type, key) -> event_id) state map
-        state_groups = {}
-        missing_event_ids = []
-        was_updated = False
+        state_groups_map = {}
+        for ev, ctx in events_context:
+            if ctx.state_group is None:
+                # I don't think this can happen, but let's double-check
+                raise Exception(
+                    "Context for new extremity event %s has no state "
+                    "group" % (ev.event_id, ),
+                )
+
+            if ctx.state_group in state_groups_map:
+                continue
+
+            state_groups_map[ctx.state_group] = ctx.current_state_ids
+
+        # We need to map the event_ids to their state groups. First, let's
+        # check if the event is one we're persisting, in which case we can
+        # pull the state group from its context.
+        # Otherwise we need to pull the state group from the database.
+
+        # Set of events we need to fetch groups for. (We know none of the old
+        # extremities are going to be in events_context).
+        missing_event_ids = set(old_latest_event_ids)
+
+        event_id_to_state_group = {}
         for event_id in new_latest_event_ids:
-            # First search in the list of new events we're adding,
-            # and then use the current state from that
+            # First search in the list of new events we're adding.
             for ev, ctx in events_context:
                 if event_id == ev.event_id:
-                    if ctx.current_state_ids is None:
-                        raise Exception("Unknown current state")
-
-                    if ctx.state_group is None:
-                        # I don't think this can happen, but let's double-check
-                        raise Exception(
-                            "Context for new extremity event %s has no state "
-                            "group" % (event_id, ),
-                        )
-
-                    # If we've already seen the state group don't bother adding
-                    # it to the state sets again
-                    if ctx.state_group not in state_groups:
-                        state_groups[ctx.state_group] = ctx.current_state_ids
-                        if ctx.delta_ids or hasattr(ev, "state_key"):
-                            was_updated = True
+                    event_id_to_state_group[event_id] = ctx.state_group
                     break
             else:
                 # If we couldn't find it, then we'll need to pull
                 # the state from the database
-                was_updated = True
-                missing_event_ids.append(event_id)
-
-        if not was_updated:
-            return
+                missing_event_ids.add(event_id)
 
         if missing_event_ids:
-            # Now pull out the state for any missing events from DB
+            # Now pull out the state groups for any missing events from DB
             event_to_groups = yield self._get_state_group_for_events(
                 missing_event_ids,
             )
+            event_id_to_state_group.update(event_to_groups)
+
+        # State groups of old_latest_event_ids
+        old_state_groups = set(
+            event_id_to_state_group[evid] for evid in old_latest_event_ids
+        )
+
+        # State groups of new_latest_event_ids
+        new_state_groups = set(
+            event_id_to_state_group[evid] for evid in new_latest_event_ids
+        )
 
-            groups = set(event_to_groups.itervalues()) - set(state_groups.iterkeys())
+        # If they old and new groups are the same then we don't need to do
+        # anything.
+        if old_state_groups == new_state_groups:
+            return
 
-            if groups:
-                group_to_state = yield self._get_state_for_groups(groups)
-                state_groups.update(group_to_state)
+        # Now that we have calculated new_state_groups we need to get
+        # their state IDs so we can resolve to a single state set.
+        missing_state = new_state_groups - set(state_groups_map)
+        if missing_state:
+            group_to_state = yield self._get_state_for_groups(missing_state)
+            state_groups_map.update(group_to_state)
 
-        if len(state_groups) == 1:
+        if len(new_state_groups) == 1:
             # If there is only one state group, then we know what the current
             # state is.
-            defer.returnValue(state_groups.values()[0])
+            defer.returnValue(state_groups_map[new_state_groups.pop()])
+
+        # Ok, we need to defer to the state handler to resolve our state sets.
 
         def get_events(ev_ids):
             return self.get_events(
                 ev_ids, get_prev_content=False, check_redacted=False,
             )
+
+        state_groups = {
+            sg: state_groups_map[sg] for sg in new_state_groups
+        }
+
         events_map = {ev.event_id: ev for ev, _ in events_context}
         logger.debug("calling resolve_state_groups from preserve_events")
         res = yield self._state_resolution_handler.resolve_state_groups(
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index a937b9bceb..ba834854e1 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -20,7 +20,7 @@ from synapse.events import FrozenEvent
 from synapse.events.utils import prune_event
 
 from synapse.util.logcontext import (
-    preserve_fn, PreserveLoggingContext, make_deferred_yieldable
+    PreserveLoggingContext, make_deferred_yieldable, run_in_background,
 )
 from synapse.util.metrics import Measure
 from synapse.api.errors import SynapseError
@@ -319,7 +319,8 @@ class EventsWorkerStore(SQLBaseStore):
 
         res = yield make_deferred_yieldable(defer.gatherResults(
             [
-                preserve_fn(self._get_event_from_row)(
+                run_in_background(
+                    self._get_event_from_row,
                     row["internal_metadata"], row["json"], row["redacts"],
                     rejected_reason=row["rejects"],
                 )
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 6b557ca0cf..a50717db2d 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -22,6 +22,8 @@ from synapse.storage import background_updates
 from synapse.storage._base import SQLBaseStore
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 
+from six.moves import range
+
 
 class RegistrationWorkerStore(SQLBaseStore):
     @cached()
@@ -469,7 +471,7 @@ class RegistrationStore(RegistrationWorkerStore,
                 match = regex.search(user_id)
                 if match:
                     found.add(int(match.group(1)))
-            for i in xrange(len(found) + 1):
+            for i in range(len(found) + 1):
                 if i not in found:
                     return i
 
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 740c036975..ea6a189185 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -530,7 +530,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
 
             # Convert the IDs to MXC URIs
             for media_id in local_mxcs:
-                local_media_mxcs.append("mxc://%s/%s" % (self.hostname, media_id))
+                local_media_mxcs.append("mxc://%s/%s" % (self.hs.hostname, media_id))
             for hostname, media_id in remote_mxcs:
                 remote_media_mxcs.append("mxc://%s/%s" % (hostname, media_id))
 
@@ -595,7 +595,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
         while next_token:
             sql = """
                 SELECT stream_ordering, json FROM events
-                JOIN event_json USING (event_id)
+                JOIN event_json USING (room_id, event_id)
                 WHERE room_id = ?
                     AND stream_ordering < ?
                     AND contains_url = ? AND outlier = ?
@@ -619,7 +619,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
                     if matches:
                         hostname = matches.group(1)
                         media_id = matches.group(2)
-                        if hostname == self.hostname:
+                        if hostname == self.hs.hostname:
                             local_media_mxcs.append(media_id)
                         else:
                             remote_media_mxcs.append((hostname, media_id))
diff --git a/synapse/storage/schema/delta/30/as_users.py b/synapse/storage/schema/delta/30/as_users.py
index c53e53c94f..85bd1a2006 100644
--- a/synapse/storage/schema/delta/30/as_users.py
+++ b/synapse/storage/schema/delta/30/as_users.py
@@ -14,6 +14,8 @@
 import logging
 from synapse.config.appservice import load_appservices
 
+from six.moves import range
+
 
 logger = logging.getLogger(__name__)
 
@@ -58,7 +60,7 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs):
 
     for as_id, user_ids in owned.items():
         n = 100
-        user_chunks = (user_ids[i:i + 100] for i in xrange(0, len(user_ids), n))
+        user_chunks = (user_ids[i:i + 100] for i in range(0, len(user_ids), n))
         for chunk in user_chunks:
             cur.execute(
                 database_engine.convert_param_style(
diff --git a/synapse/storage/schema/delta/48/group_unique_indexes.py b/synapse/storage/schema/delta/48/group_unique_indexes.py
new file mode 100644
index 0000000000..2233af87d7
--- /dev/null
+++ b/synapse/storage/schema/delta/48/group_unique_indexes.py
@@ -0,0 +1,57 @@
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.storage.engines import PostgresEngine
+from synapse.storage.prepare_database import get_statements
+
+FIX_INDEXES = """
+-- rebuild indexes as uniques
+DROP INDEX groups_invites_g_idx;
+CREATE UNIQUE INDEX group_invites_g_idx ON group_invites(group_id, user_id);
+DROP INDEX groups_users_g_idx;
+CREATE UNIQUE INDEX group_users_g_idx ON group_users(group_id, user_id);
+
+-- rename other indexes to actually match their table names..
+DROP INDEX groups_users_u_idx;
+CREATE INDEX group_users_u_idx ON group_users(user_id);
+DROP INDEX groups_invites_u_idx;
+CREATE INDEX group_invites_u_idx ON group_invites(user_id);
+DROP INDEX groups_rooms_g_idx;
+CREATE UNIQUE INDEX group_rooms_g_idx ON group_rooms(group_id, room_id);
+DROP INDEX groups_rooms_r_idx;
+CREATE INDEX group_rooms_r_idx ON group_rooms(room_id);
+"""
+
+
+def run_create(cur, database_engine, *args, **kwargs):
+    rowid = "ctid" if isinstance(database_engine, PostgresEngine) else "rowid"
+
+    # remove duplicates from group_users & group_invites tables
+    cur.execute("""
+        DELETE FROM group_users WHERE %s NOT IN (
+           SELECT min(%s) FROM group_users GROUP BY group_id, user_id
+        );
+    """ % (rowid, rowid))
+    cur.execute("""
+        DELETE FROM group_invites WHERE %s NOT IN (
+           SELECT min(%s) FROM group_invites GROUP BY group_id, user_id
+        );
+    """ % (rowid, rowid))
+
+    for statement in get_statements(FIX_INDEXES.splitlines()):
+        cur.execute(statement)
+
+
+def run_upgrade(*args, **kwargs):
+    pass
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index 426cbe6e1a..6ba3e59889 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -77,7 +77,7 @@ class SearchStore(BackgroundUpdateStore):
             sql = (
                 "SELECT stream_ordering, event_id, room_id, type, json, "
                 " origin_server_ts FROM events"
-                " JOIN event_json USING (event_id)"
+                " JOIN event_json USING (room_id, event_id)"
                 " WHERE ? <= stream_ordering AND stream_ordering < ?"
                 " AND (%s)"
                 " ORDER BY stream_ordering DESC"
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 2956c3b3e0..ea24710ad8 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -38,15 +38,17 @@ from twisted.internet import defer
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.events import EventsWorkerStore
 
-from synapse.util.caches.descriptors import cached
 from synapse.types import RoomStreamToken
 from synapse.util.caches.stream_change_cache import StreamChangeCache
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
-from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
+from synapse.storage.engines import PostgresEngine
 
 import abc
 import logging
 
+from six.moves import range
+from collections import namedtuple
+
 
 logger = logging.getLogger(__name__)
 
@@ -58,6 +60,12 @@ _STREAM_TOKEN = "stream"
 _TOPOLOGICAL_TOKEN = "topological"
 
 
+# Used as return values for pagination APIs
+_EventDictReturn = namedtuple("_EventDictReturn", (
+    "event_id", "topological_ordering", "stream_ordering",
+))
+
+
 def lower_bound(token, engine, inclusive=False):
     inclusive = "=" if inclusive else ""
     if token.topological is None:
@@ -196,13 +204,14 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         results = {}
         room_ids = list(room_ids)
-        for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)):
+        for rm_ids in (room_ids[i:i + 20] for i in range(0, len(room_ids), 20)):
             res = yield make_deferred_yieldable(defer.gatherResults([
-                preserve_fn(self.get_room_events_stream_for_room)(
+                run_in_background(
+                    self.get_room_events_stream_for_room,
                     room_id, from_key, to_key, limit, order=order,
                 )
                 for room_id in rm_ids
-            ]))
+            ], consumeErrors=True))
             results.update(dict(zip(rm_ids, res)))
 
         defer.returnValue(results)
@@ -224,54 +233,55 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
     @defer.inlineCallbacks
     def get_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0,
                                         order='DESC'):
-        # Note: If from_key is None then we return in topological order. This
-        # is because in that case we're using this as a "get the last few messages
-        # in a room" function, rather than "get new messages since last sync"
-        if from_key is not None:
-            from_id = RoomStreamToken.parse_stream_token(from_key).stream
-        else:
-            from_id = None
-        to_id = RoomStreamToken.parse_stream_token(to_key).stream
 
+        """Get new room events in stream ordering since `from_key`.
+
+        Args:
+            room_id (str)
+            from_key (str): Token from which no events are returned before
+            to_key (str): Token from which no events are returned after. (This
+                is typically the current stream token)
+            limit (int): Maximum number of events to return
+            order (str): Either "DESC" or "ASC". Determines which events are
+                returned when the result is limited. If "DESC" then the most
+                recent `limit` events are returned, otherwise returns the
+                oldest `limit` events.
+
+        Returns:
+            Deferred[tuple[list[FrozenEvent], str]]: Returns the list of
+            events (in ascending order) and the token from the start of
+            the chunk of events returned.
+        """
         if from_key == to_key:
             defer.returnValue(([], from_key))
 
-        if from_id:
-            has_changed = yield self._events_stream_cache.has_entity_changed(
-                room_id, from_id
-            )
-
-            if not has_changed:
-                defer.returnValue(([], from_key))
+        from_id = RoomStreamToken.parse_stream_token(from_key).stream
+        to_id = RoomStreamToken.parse_stream_token(to_key).stream
 
-        def f(txn):
-            if from_id is not None:
-                sql = (
-                    "SELECT event_id, stream_ordering FROM events WHERE"
-                    " room_id = ?"
-                    " AND not outlier"
-                    " AND stream_ordering > ? AND stream_ordering <= ?"
-                    " ORDER BY stream_ordering %s LIMIT ?"
-                ) % (order,)
-                txn.execute(sql, (room_id, from_id, to_id, limit))
-            else:
-                sql = (
-                    "SELECT event_id, stream_ordering FROM events WHERE"
-                    " room_id = ?"
-                    " AND not outlier"
-                    " AND stream_ordering <= ?"
-                    " ORDER BY topological_ordering %s, stream_ordering %s LIMIT ?"
-                ) % (order, order,)
-                txn.execute(sql, (room_id, to_id, limit))
+        has_changed = yield self._events_stream_cache.has_entity_changed(
+            room_id, from_id
+        )
 
-            rows = self.cursor_to_dict(txn)
+        if not has_changed:
+            defer.returnValue(([], from_key))
 
+        def f(txn):
+            sql = (
+                "SELECT event_id, stream_ordering FROM events WHERE"
+                " room_id = ?"
+                " AND not outlier"
+                " AND stream_ordering > ? AND stream_ordering <= ?"
+                " ORDER BY stream_ordering %s LIMIT ?"
+            ) % (order,)
+            txn.execute(sql, (room_id, from_id, to_id, limit))
+
+            rows = [_EventDictReturn(row[0], None, row[1]) for row in txn]
             return rows
 
         rows = yield self.runInteraction("get_room_events_stream_for_room", f)
 
         ret = yield self._get_events(
-            [r["event_id"] for r in rows],
+            [r.event_id for r in rows],
             get_prev_content=True
         )
 
@@ -281,7 +291,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             ret.reverse()
 
         if rows:
-            key = "s%d" % min(r["stream_ordering"] for r in rows)
+            key = "s%d" % min(r.stream_ordering for r in rows)
         else:
             # Assume we didn't get anything because there was nothing to
             # get.
@@ -291,10 +301,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
     @defer.inlineCallbacks
     def get_membership_changes_for_user(self, user_id, from_key, to_key):
-        if from_key is not None:
-            from_id = RoomStreamToken.parse_stream_token(from_key).stream
-        else:
-            from_id = None
+        from_id = RoomStreamToken.parse_stream_token(from_key).stream
         to_id = RoomStreamToken.parse_stream_token(to_key).stream
 
         if from_key == to_key:
@@ -308,34 +315,24 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                 defer.returnValue([])
 
         def f(txn):
-            if from_id is not None:
-                sql = (
-                    "SELECT m.event_id, stream_ordering FROM events AS e,"
-                    " room_memberships AS m"
-                    " WHERE e.event_id = m.event_id"
-                    " AND m.user_id = ?"
-                    " AND e.stream_ordering > ? AND e.stream_ordering <= ?"
-                    " ORDER BY e.stream_ordering ASC"
-                )
-                txn.execute(sql, (user_id, from_id, to_id,))
-            else:
-                sql = (
-                    "SELECT m.event_id, stream_ordering FROM events AS e,"
-                    " room_memberships AS m"
-                    " WHERE e.event_id = m.event_id"
-                    " AND m.user_id = ?"
-                    " AND stream_ordering <= ?"
-                    " ORDER BY stream_ordering ASC"
-                )
-                txn.execute(sql, (user_id, to_id,))
-            rows = self.cursor_to_dict(txn)
+            sql = (
+                "SELECT m.event_id, stream_ordering FROM events AS e,"
+                " room_memberships AS m"
+                " WHERE e.event_id = m.event_id"
+                " AND m.user_id = ?"
+                " AND e.stream_ordering > ? AND e.stream_ordering <= ?"
+                " ORDER BY e.stream_ordering ASC"
+            )
+            txn.execute(sql, (user_id, from_id, to_id,))
+
+            rows = [_EventDictReturn(row[0], None, row[1]) for row in txn]
 
             return rows
 
         rows = yield self.runInteraction("get_membership_changes_for_user", f)
 
         ret = yield self._get_events(
-            [r["event_id"] for r in rows],
+            [r.event_id for r in rows],
             get_prev_content=True
         )
 
@@ -344,14 +341,28 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         defer.returnValue(ret)
 
     @defer.inlineCallbacks
-    def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None):
+    def get_recent_events_for_room(self, room_id, limit, end_token):
+        """Get the most recent events in the room in topological ordering.
+
+        Args:
+            room_id (str)
+            limit (int)
+            end_token (str): The stream token representing now.
+
+        Returns:
+            Deferred[tuple[list[FrozenEvent],  str]]: Returns a list of
+            events and a token pointing to the start of the returned
+            events.
+            The events returned are in ascending order.
+        """
+
         rows, token = yield self.get_recent_event_ids_for_room(
-            room_id, limit, end_token, from_token
+            room_id, limit, end_token,
         )
 
         logger.debug("stream before")
         events = yield self._get_events(
-            [r["event_id"] for r in rows],
+            [r.event_id for r in rows],
             get_prev_content=True
         )
         logger.debug("stream after")
@@ -360,60 +371,36 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         defer.returnValue((events, token))
 
-    @cached(num_args=4)
-    def get_recent_event_ids_for_room(self, room_id, limit, end_token, from_token=None):
-        end_token = RoomStreamToken.parse_stream_token(end_token)
-
-        if from_token is None:
-            sql = (
-                "SELECT stream_ordering, topological_ordering, event_id"
-                " FROM events"
-                " WHERE room_id = ? AND stream_ordering <= ? AND outlier = ?"
-                " ORDER BY topological_ordering DESC, stream_ordering DESC"
-                " LIMIT ?"
-            )
-        else:
-            from_token = RoomStreamToken.parse_stream_token(from_token)
-            sql = (
-                "SELECT stream_ordering, topological_ordering, event_id"
-                " FROM events"
-                " WHERE room_id = ? AND stream_ordering > ?"
-                " AND stream_ordering <= ? AND outlier = ?"
-                " ORDER BY topological_ordering DESC, stream_ordering DESC"
-                " LIMIT ?"
-            )
-
-        def get_recent_events_for_room_txn(txn):
-            if from_token is None:
-                txn.execute(sql, (room_id, end_token.stream, False, limit,))
-            else:
-                txn.execute(sql, (
-                    room_id, from_token.stream, end_token.stream, False, limit
-                ))
+    @defer.inlineCallbacks
+    def get_recent_event_ids_for_room(self, room_id, limit, end_token):
+        """Get the most recent events in the room in topological ordering.
 
-            rows = self.cursor_to_dict(txn)
+        Args:
+            room_id (str)
+            limit (int)
+            end_token (str): The stream token representing now.
 
-            rows.reverse()  # As we selected with reverse ordering
+        Returns:
+            Deferred[tuple[list[_EventDictReturn],  str]]: Returns a list of
+            _EventDictReturn and a token pointing to the start of the returned
+            events.
+            The events returned are in ascending order.
+        """
+        # Allow a zero limit here, and no-op.
+        if limit == 0:
+            defer.returnValue(([], end_token))
 
-            if rows:
-                # Tokens are positions between events.
-                # This token points *after* the last event in the chunk.
-                # We need it to point to the event before it in the chunk
-                # since we are going backwards so we subtract one from the
-                # stream part.
-                topo = rows[0]["topological_ordering"]
-                toke = rows[0]["stream_ordering"] - 1
-                start_token = str(RoomStreamToken(topo, toke))
+        end_token = RoomStreamToken.parse(end_token)
 
-                token = (start_token, str(end_token))
-            else:
-                token = (str(end_token), str(end_token))
+        rows, token = yield self.runInteraction(
+            "get_recent_event_ids_for_room", self._paginate_room_events_txn,
+            room_id, from_token=end_token, limit=limit,
+        )
 
-            return rows, token
+        # We want to return the results in ascending order.
+        rows.reverse()
 
-        return self.runInteraction(
-            "get_recent_events_for_room", get_recent_events_for_room_txn
-        )
+        defer.returnValue((rows, token))
 
     def get_room_event_after_stream_ordering(self, room_id, stream_ordering):
         """Gets details of the first event in a room at or after a stream ordering
@@ -517,10 +504,20 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
     @staticmethod
     def _set_before_and_after(events, rows, topo_order=True):
+        """Inserts ordering information to events' internal metadata from
+        the DB rows.
+
+        Args:
+            events (list[FrozenEvent])
+            rows (list[_EventDictReturn])
+            topo_order (bool): Whether the events were ordered topologically
+                or by stream ordering. If true then all rows should have a non
+                null topological_ordering.
+        """
         for event, row in zip(events, rows):
-            stream = row["stream_ordering"]
-            if topo_order:
-                topo = event.depth
+            stream = row.stream_ordering
+            if topo_order and row.topological_ordering:
+                topo = row.topological_ordering
             else:
                 topo = None
             internal = event.internal_metadata
@@ -592,87 +589,27 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             retcols=["stream_ordering", "topological_ordering"],
         )
 
-        token = RoomStreamToken(
-            results["topological_ordering"],
+        # Paginating backwards includes the event at the token, but paginating
+        # forward doesn't.
+        before_token = RoomStreamToken(
+            results["topological_ordering"] - 1,
             results["stream_ordering"],
         )
 
-        if isinstance(self.database_engine, Sqlite3Engine):
-            # SQLite3 doesn't optimise ``(x < a) OR (x = a AND y < b)``
-            # So we give pass it to SQLite3 as the UNION ALL of the two queries.
-
-            query_before = (
-                "SELECT topological_ordering, stream_ordering, event_id FROM events"
-                " WHERE room_id = ? AND topological_ordering < ?"
-                " UNION ALL"
-                " SELECT topological_ordering, stream_ordering, event_id FROM events"
-                " WHERE room_id = ? AND topological_ordering = ? AND stream_ordering < ?"
-                " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?"
-            )
-            before_args = (
-                room_id, token.topological,
-                room_id, token.topological, token.stream,
-                before_limit,
-            )
-
-            query_after = (
-                "SELECT topological_ordering, stream_ordering, event_id FROM events"
-                " WHERE room_id = ? AND topological_ordering > ?"
-                " UNION ALL"
-                " SELECT topological_ordering, stream_ordering, event_id FROM events"
-                " WHERE room_id = ? AND topological_ordering = ? AND stream_ordering > ?"
-                " ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?"
-            )
-            after_args = (
-                room_id, token.topological,
-                room_id, token.topological, token.stream,
-                after_limit,
-            )
-        else:
-            query_before = (
-                "SELECT topological_ordering, stream_ordering, event_id FROM events"
-                " WHERE room_id = ? AND %s"
-                " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?"
-            ) % (upper_bound(token, self.database_engine, inclusive=False),)
-
-            before_args = (room_id, before_limit)
-
-            query_after = (
-                "SELECT topological_ordering, stream_ordering, event_id FROM events"
-                " WHERE room_id = ? AND %s"
-                " ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?"
-            ) % (lower_bound(token, self.database_engine, inclusive=False),)
-
-            after_args = (room_id, after_limit)
-
-        txn.execute(query_before, before_args)
-
-        rows = self.cursor_to_dict(txn)
-        events_before = [r["event_id"] for r in rows]
-
-        if rows:
-            start_token = str(RoomStreamToken(
-                rows[0]["topological_ordering"],
-                rows[0]["stream_ordering"] - 1,
-            ))
-        else:
-            start_token = str(RoomStreamToken(
-                token.topological,
-                token.stream - 1,
-            ))
-
-        txn.execute(query_after, after_args)
+        after_token = RoomStreamToken(
+            results["topological_ordering"],
+            results["stream_ordering"],
+        )
 
-        rows = self.cursor_to_dict(txn)
-        events_after = [r["event_id"] for r in rows]
+        rows, start_token = self._paginate_room_events_txn(
+            txn, room_id, before_token, direction='b', limit=before_limit,
+        )
+        events_before = [r.event_id for r in rows]
 
-        if rows:
-            end_token = str(RoomStreamToken(
-                rows[-1]["topological_ordering"],
-                rows[-1]["stream_ordering"],
-            ))
-        else:
-            end_token = str(token)
+        rows, end_token = self._paginate_room_events_txn(
+            txn, room_id, after_token, direction='f', limit=after_limit,
+        )
+        events_after = [r.event_id for r in rows]
 
         return {
             "before": {
@@ -735,17 +672,28 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
     def has_room_changed_since(self, room_id, stream_id):
         return self._events_stream_cache.has_entity_changed(room_id, stream_id)
 
+    def _paginate_room_events_txn(self, txn, room_id, from_token, to_token=None,
+                                  direction='b', limit=-1, event_filter=None):
+        """Returns list of events before or after a given token.
 
-class StreamStore(StreamWorkerStore):
-    def get_room_max_stream_ordering(self):
-        return self._stream_id_gen.get_current_token()
-
-    def get_room_min_stream_ordering(self):
-        return self._backfill_id_gen.get_current_token()
+        Args:
+            txn
+            room_id (str)
+            from_token (RoomStreamToken): The token used to stream from
+            to_token (RoomStreamToken|None): A token which if given limits the
+                results to only those before
+            direction(char): Either 'b' or 'f' to indicate whether we are
+                paginating forwards or backwards from `from_key`.
+            limit (int): The maximum number of events to return. Zero or less
+                means no limit.
+            event_filter (Filter|None): If provided filters the events to
+                those that match the filter.
 
-    @defer.inlineCallbacks
-    def paginate_room_events(self, room_id, from_key, to_key=None,
-                             direction='b', limit=-1, event_filter=None):
+        Returns:
+            Deferred[tuple[list[_EventDictReturn], str]]: Returns the results
+            as a list of _EventDictReturn and a token that points to the end
+            of the result set.
+        """
         # Tokens really represent positions between elements, but we use
         # the convention of pointing to the event before the gap. Hence
         # we have a bit of asymmetry when it comes to equalities.
@@ -753,20 +701,20 @@ class StreamStore(StreamWorkerStore):
         if direction == 'b':
             order = "DESC"
             bounds = upper_bound(
-                RoomStreamToken.parse(from_key), self.database_engine
+                from_token, self.database_engine
             )
-            if to_key:
+            if to_token:
                 bounds = "%s AND %s" % (bounds, lower_bound(
-                    RoomStreamToken.parse(to_key), self.database_engine
+                    to_token, self.database_engine
                 ))
         else:
             order = "ASC"
             bounds = lower_bound(
-                RoomStreamToken.parse(from_key), self.database_engine
+                from_token, self.database_engine
             )
-            if to_key:
+            if to_token:
                 bounds = "%s AND %s" % (bounds, upper_bound(
-                    RoomStreamToken.parse(to_key), self.database_engine
+                    to_token, self.database_engine
                 ))
 
         filter_clause, filter_args = filter_to_clause(event_filter)
@@ -782,7 +730,8 @@ class StreamStore(StreamWorkerStore):
             limit_str = ""
 
         sql = (
-            "SELECT * FROM events"
+            "SELECT event_id, topological_ordering, stream_ordering"
+            " FROM events"
             " WHERE outlier = ? AND room_id = ? AND %(bounds)s"
             " ORDER BY topological_ordering %(order)s,"
             " stream_ordering %(order)s %(limit)s"
@@ -792,35 +741,72 @@ class StreamStore(StreamWorkerStore):
             "limit": limit_str
         }
 
-        def f(txn):
-            txn.execute(sql, args)
-
-            rows = self.cursor_to_dict(txn)
-
-            if rows:
-                topo = rows[-1]["topological_ordering"]
-                toke = rows[-1]["stream_ordering"]
-                if direction == 'b':
-                    # Tokens are positions between events.
-                    # This token points *after* the last event in the chunk.
-                    # We need it to point to the event before it in the chunk
-                    # when we are going backwards so we subtract one from the
-                    # stream part.
-                    toke -= 1
-                next_token = str(RoomStreamToken(topo, toke))
-            else:
-                # TODO (erikj): We should work out what to do here instead.
-                next_token = to_key if to_key else from_key
+        txn.execute(sql, args)
+
+        rows = [_EventDictReturn(row[0], row[1], row[2]) for row in txn]
+
+        if rows:
+            topo = rows[-1].topological_ordering
+            toke = rows[-1].stream_ordering
+            if direction == 'b':
+                # Tokens are positions between events.
+                # This token points *after* the last event in the chunk.
+                # We need it to point to the event before it in the chunk
+                # when we are going backwards so we subtract one from the
+                # stream part.
+                toke -= 1
+            next_token = RoomStreamToken(topo, toke)
+        else:
+            # TODO (erikj): We should work out what to do here instead.
+            next_token = to_token if to_token else from_token
+
+        return rows, str(next_token),
+
+    @defer.inlineCallbacks
+    def paginate_room_events(self, room_id, from_key, to_key=None,
+                             direction='b', limit=-1, event_filter=None):
+        """Returns list of events before or after a given token.
 
-            return rows, next_token,
+        Args:
+            room_id (str)
+            from_key (str): The token used to stream from
+            to_key (str|None): A token which if given limits the results to
+                only those before
+            direction(char): Either 'b' or 'f' to indicate whether we are
+                paginating forwards or backwards from `from_key`.
+            limit (int): The maximum number of events to return. Zero or less
+                means no limit.
+            event_filter (Filter|None): If provided filters the events to
+                those that match the filter.
 
-        rows, token = yield self.runInteraction("paginate_room_events", f)
+        Returns:
+            tuple[list[dict], str]: Returns the results as a list of dicts and
+            a token that points to the end of the result set. The dicts have
+            the keys "event_id", "topological_ordering" and "stream_orderign".
+        """
+
+        from_key = RoomStreamToken.parse(from_key)
+        if to_key:
+            to_key = RoomStreamToken.parse(to_key)
+
+        rows, token = yield self.runInteraction(
+            "paginate_room_events", self._paginate_room_events_txn,
+            room_id, from_key, to_key, direction, limit, event_filter,
+        )
 
         events = yield self._get_events(
-            [r["event_id"] for r in rows],
+            [r.event_id for r in rows],
             get_prev_content=True
         )
 
         self._set_before_and_after(events, rows)
 
         defer.returnValue((events, token))
+
+
+class StreamStore(StreamWorkerStore):
+    def get_room_max_stream_ordering(self):
+        return self._stream_id_gen.get_current_token()
+
+    def get_room_min_stream_ordering(self):
+        return self._backfill_id_gen.get_current_token()
diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py
index 13bff9f055..6671d3cfca 100644
--- a/synapse/storage/tags.py
+++ b/synapse/storage/tags.py
@@ -22,6 +22,8 @@ from twisted.internet import defer
 import simplejson as json
 import logging
 
+from six.moves import range
+
 logger = logging.getLogger(__name__)
 
 
@@ -98,7 +100,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
 
         batch_size = 50
         results = []
-        for i in xrange(0, len(tag_ids), batch_size):
+        for i in range(0, len(tag_ids), batch_size):
             tags = yield self.runInteraction(
                 "get_all_updated_tag_content",
                 get_tag_content,