summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py42
-rw-r--r--synapse/storage/_base.py48
-rw-r--r--synapse/storage/push_rule.py209
-rw-r--r--synapse/storage/pusher.py173
-rw-r--r--synapse/storage/room.py7
-rw-r--r--synapse/storage/roommember.py5
-rw-r--r--synapse/storage/schema/delta/v12.sql46
-rw-r--r--synapse/storage/schema/pusher.sql46
-rw-r--r--synapse/storage/state.py6
-rw-r--r--synapse/storage/stream.py197
10 files changed, 664 insertions, 115 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 4beb951b9f..277581b4e2 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -29,6 +29,8 @@ from .stream import StreamStore
 from .transactions import TransactionStore
 from .keys import KeyStore
 from .event_federation import EventFederationStore
+from .pusher import PusherStore
+from .push_rule import PushRuleStore
 from .media_repository import MediaRepositoryStore
 
 from .state import StateStore
@@ -60,13 +62,14 @@ SCHEMAS = [
     "state",
     "event_edges",
     "event_signatures",
+    "pusher",
     "media_repository",
 ]
 
 
 # Remember to update this number every time an incompatible change is made to
 # database schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 11
+SCHEMA_VERSION = 12
 
 
 class _RollbackButIsFineException(Exception):
@@ -82,6 +85,8 @@ class DataStore(RoomMemberStore, RoomStore,
                 DirectoryStore, KeyStore, StateStore, SignatureStore,
                 EventFederationStore,
                 MediaRepositoryStore,
+                PusherStore,
+                PushRuleStore
                 ):
 
     def __init__(self, hs):
@@ -382,6 +387,41 @@ class DataStore(RoomMemberStore, RoomStore,
         defer.returnValue(events)
 
     @defer.inlineCallbacks
+    def get_room_name_and_aliases(self, room_id):
+        del_sql = (
+            "SELECT event_id FROM redactions WHERE redacts = e.event_id "
+            "LIMIT 1"
+        )
+
+        sql = (
+            "SELECT e.*, (%(redacted)s) AS redacted FROM events as e "
+            "INNER JOIN current_state_events as c ON e.event_id = c.event_id "
+            "INNER JOIN state_events as s ON e.event_id = s.event_id "
+            "WHERE c.room_id = ? "
+        ) % {
+            "redacted": del_sql,
+        }
+
+        sql += " AND ((s.type = 'm.room.name' AND s.state_key = '')"
+        sql += " OR s.type = 'm.room.aliases')"
+        args = (room_id,)
+
+        results = yield self._execute_and_decode(sql, *args)
+
+        events = yield self._parse_events(results)
+
+        name = None
+        aliases = []
+
+        for e in events:
+            if e.type == 'm.room.name':
+                name = e.content['name']
+            elif e.type == 'm.room.aliases':
+                aliases.extend(e.content['aliases'])
+
+        defer.returnValue((name, aliases))
+
+    @defer.inlineCallbacks
     def _get_min_token(self):
         row = yield self._execute(
             None,
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index f660fc6eaf..4e8bd3faa9 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -193,6 +193,50 @@ class SQLBaseStore(object):
         txn.execute(sql, values.values())
         return txn.lastrowid
 
+    def _simple_upsert(self, table, keyvalues, values):
+        """
+        Args:
+            table (str): The table to upsert into
+            keyvalues (dict): The unique key tables and their new values
+            values (dict): The nonunique columns and their new values
+        Returns: A deferred
+        """
+        return self.runInteraction(
+            "_simple_upsert",
+            self._simple_upsert_txn, table, keyvalues, values
+        )
+
+    def _simple_upsert_txn(self, txn, table, keyvalues, values):
+        # Try to update
+        sql = "UPDATE %s SET %s WHERE %s" % (
+            table,
+            ", ".join("%s = ?" % (k,) for k in values),
+            " AND ".join("%s = ?" % (k,) for k in keyvalues)
+        )
+        sqlargs = values.values() + keyvalues.values()
+        logger.debug(
+            "[SQL] %s Args=%s",
+            sql, sqlargs,
+        )
+
+        txn.execute(sql, sqlargs)
+        if txn.rowcount == 0:
+            # We didn't update and rows so insert a new one
+            allvalues = {}
+            allvalues.update(keyvalues)
+            allvalues.update(values)
+
+            sql = "INSERT INTO %s (%s) VALUES (%s)" % (
+                table,
+                ", ".join(k for k in allvalues),
+                ", ".join("?" for _ in allvalues)
+            )
+            logger.debug(
+                "[SQL] %s Args=%s",
+                sql, keyvalues.values(),
+            )
+            txn.execute(sql, allvalues.values())
+
     def _simple_select_one(self, table, keyvalues, retcols,
                            allow_none=False):
         """Executes a SELECT query on the named table, which is expected to
@@ -344,8 +388,8 @@ class SQLBaseStore(object):
         if updatevalues:
             update_sql = "UPDATE %s SET %s WHERE %s" % (
                 table,
-                ", ".join("%s = ?" % (k) for k in updatevalues),
-                " AND ".join("%s = ?" % (k) for k in keyvalues)
+                ", ".join("%s = ?" % (k,) for k in updatevalues),
+                " AND ".join("%s = ?" % (k,) for k in keyvalues)
             )
 
         def func(txn):
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
new file mode 100644
index 0000000000..0342996ed1
--- /dev/null
+++ b/synapse/storage/push_rule.py
@@ -0,0 +1,209 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+
+from ._base import SQLBaseStore, Table
+from twisted.internet import defer
+
+import logging
+import copy
+import json
+
+logger = logging.getLogger(__name__)
+
+
+class PushRuleStore(SQLBaseStore):
+    @defer.inlineCallbacks
+    def get_push_rules_for_user_name(self, user_name):
+        sql = (
+            "SELECT "+",".join(PushRuleTable.fields)+" "
+            "FROM "+PushRuleTable.table_name+" "
+            "WHERE user_name = ? "
+            "ORDER BY priority_class DESC, priority DESC"
+        )
+        rows = yield self._execute(None, sql, user_name)
+
+        dicts = []
+        for r in rows:
+            d = {}
+            for i, f in enumerate(PushRuleTable.fields):
+                d[f] = r[i]
+            dicts.append(d)
+
+        defer.returnValue(dicts)
+
+    @defer.inlineCallbacks
+    def add_push_rule(self, before, after, **kwargs):
+        vals = copy.copy(kwargs)
+        if 'conditions' in vals:
+            vals['conditions'] = json.dumps(vals['conditions'])
+        if 'actions' in vals:
+            vals['actions'] = json.dumps(vals['actions'])
+        # we could check the rest of the keys are valid column names
+        # but sqlite will do that anyway so I think it's just pointless.
+        if 'id' in vals:
+            del vals['id']
+
+        if before or after:
+            ret = yield self.runInteraction(
+                "_add_push_rule_relative_txn",
+                self._add_push_rule_relative_txn,
+                before=before,
+                after=after,
+                **vals
+            )
+            defer.returnValue(ret)
+        else:
+            ret = yield self.runInteraction(
+                "_add_push_rule_highest_priority_txn",
+                self._add_push_rule_highest_priority_txn,
+                **vals
+            )
+            defer.returnValue(ret)
+
+    def _add_push_rule_relative_txn(self, txn, user_name, **kwargs):
+        after = None
+        relative_to_rule = None
+        if 'after' in kwargs and kwargs['after']:
+            after = kwargs['after']
+            relative_to_rule = after
+        if 'before' in kwargs and kwargs['before']:
+            relative_to_rule = kwargs['before']
+
+        # get the priority of the rule we're inserting after/before
+        sql = (
+            "SELECT priority_class, priority FROM ? "
+            "WHERE user_name = ? and rule_id = ?" % (PushRuleTable.table_name,)
+        )
+        txn.execute(sql, (user_name, relative_to_rule))
+        res = txn.fetchall()
+        if not res:
+            raise RuleNotFoundException("before/after rule not found: %s" % (relative_to_rule))
+        priority_class, base_rule_priority = res[0]
+
+        if 'priority_class' in kwargs and kwargs['priority_class'] != priority_class:
+            raise InconsistentRuleException(
+                "Given priority class does not match class of relative rule"
+            )
+
+        new_rule = copy.copy(kwargs)
+        if 'before' in new_rule:
+            del new_rule['before']
+        if 'after' in new_rule:
+            del new_rule['after']
+        new_rule['priority_class'] = priority_class
+        new_rule['user_name'] = user_name
+
+        # check if the priority before/after is free
+        new_rule_priority = base_rule_priority
+        if after:
+            new_rule_priority -= 1
+        else:
+            new_rule_priority += 1
+
+        new_rule['priority'] = new_rule_priority
+
+        sql = (
+            "SELECT COUNT(*) FROM "+PushRuleTable.table_name+
+            " WHERE user_name = ? AND priority_class = ? AND priority = ?"
+        )
+        txn.execute(sql, (user_name, priority_class, new_rule_priority))
+        res = txn.fetchall()
+        num_conflicting = res[0][0]
+
+        # if there are conflicting rules, bump everything
+        if num_conflicting:
+            sql = "UPDATE "+PushRuleTable.table_name+" SET priority = priority "
+            if after:
+                sql += "-1"
+            else:
+                sql += "+1"
+            sql += " WHERE user_name = ? AND priority_class = ? AND priority "
+            if after:
+                sql += "<= ?"
+            else:
+                sql += ">= ?"
+
+            txn.execute(sql, (user_name, priority_class, new_rule_priority))
+
+        # now insert the new rule
+        sql = "INSERT OR REPLACE INTO "+PushRuleTable.table_name+" ("
+        sql += ",".join(new_rule.keys())+") VALUES ("
+        sql += ", ".join(["?" for _ in new_rule.keys()])+")"
+
+        txn.execute(sql, new_rule.values())
+
+    def _add_push_rule_highest_priority_txn(self, txn, user_name, priority_class, **kwargs):
+        # find the highest priority rule in that class
+        sql = (
+            "SELECT COUNT(*), MAX(priority) FROM "+PushRuleTable.table_name+
+            " WHERE user_name = ? and priority_class = ?"
+        )
+        txn.execute(sql, (user_name, priority_class))
+        res = txn.fetchall()
+        (how_many, highest_prio) = res[0]
+
+        new_prio = 0
+        if how_many > 0:
+            new_prio = highest_prio + 1
+
+        # and insert the new rule
+        new_rule = copy.copy(kwargs)
+        if 'id' in new_rule:
+            del new_rule['id']
+        new_rule['user_name'] = user_name
+        new_rule['priority_class'] = priority_class
+        new_rule['priority'] = new_prio
+
+        sql = "INSERT OR REPLACE INTO "+PushRuleTable.table_name+" ("
+        sql += ",".join(new_rule.keys())+") VALUES ("
+        sql += ", ".join(["?" for _ in new_rule.keys()])+")"
+
+        txn.execute(sql, new_rule.values())
+
+    @defer.inlineCallbacks
+    def delete_push_rule(self, user_name, rule_id):
+        yield self._simple_delete_one(
+            PushRuleTable.table_name,
+            {
+                'user_name': user_name,
+                'rule_id': rule_id
+            }
+        )
+
+
+class RuleNotFoundException(Exception):
+    pass
+
+
+class InconsistentRuleException(Exception):
+    pass
+
+
+class PushRuleTable(Table):
+    table_name = "push_rules"
+
+    fields = [
+        "id",
+        "user_name",
+        "rule_id",
+        "priority_class",
+        "priority",
+        "conditions",
+        "actions",
+    ]
+
+    EntryType = collections.namedtuple("PushRuleEntry", fields)
\ No newline at end of file
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
new file mode 100644
index 0000000000..113cdc8a8e
--- /dev/null
+++ b/synapse/storage/pusher.py
@@ -0,0 +1,173 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+
+from ._base import SQLBaseStore, Table
+from twisted.internet import defer
+
+from synapse.api.errors import StoreError
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class PusherStore(SQLBaseStore):
+    @defer.inlineCallbacks
+    def get_pushers_by_app_id_and_pushkey(self, app_id_and_pushkey):
+        sql = (
+            "SELECT id, user_name, kind, instance_handle, app_id,"
+            "app_display_name, device_display_name, pushkey, ts, data, "
+            "last_token, last_success, failing_since "
+            "FROM pushers "
+            "WHERE app_id = ? AND pushkey = ?"
+        )
+
+        rows = yield self._execute(
+            None, sql, app_id_and_pushkey[0], app_id_and_pushkey[1]
+        )
+
+        ret = [
+            {
+                "id": r[0],
+                "user_name": r[1],
+                "kind": r[2],
+                "instance_handle": r[3],
+                "app_id": r[4],
+                "app_display_name": r[5],
+                "device_display_name": r[6],
+                "pushkey": r[7],
+                "pushkey_ts": r[8],
+                "data": r[9],
+                "last_token": r[10],
+                "last_success": r[11],
+                "failing_since": r[12]
+            }
+            for r in rows
+        ]
+
+        defer.returnValue(ret[0])
+
+    @defer.inlineCallbacks
+    def get_all_pushers(self):
+        sql = (
+            "SELECT id, user_name, kind, instance_handle, app_id,"
+            "app_display_name, device_display_name, pushkey, ts, data, "
+            "last_token, last_success, failing_since "
+            "FROM pushers"
+        )
+
+        rows = yield self._execute(None, sql)
+
+        ret = [
+            {
+                "id": r[0],
+                "user_name": r[1],
+                "kind": r[2],
+                "instance_handle": r[3],
+                "app_id": r[4],
+                "app_display_name": r[5],
+                "device_display_name": r[6],
+                "pushkey": r[7],
+                "pushkey_ts": r[8],
+                "data": r[9],
+                "last_token": r[10],
+                "last_success": r[11],
+                "failing_since": r[12]
+            }
+            for r in rows
+        ]
+
+        defer.returnValue(ret)
+
+    @defer.inlineCallbacks
+    def add_pusher(self, user_name, instance_handle, kind, app_id,
+                   app_display_name, device_display_name,
+                   pushkey, pushkey_ts, lang, data):
+        try:
+            yield self._simple_upsert(
+                PushersTable.table_name,
+                dict(
+                    app_id=app_id,
+                    pushkey=pushkey,
+                ),
+                dict(
+                    user_name=user_name,
+                    kind=kind,
+                    instance_handle=instance_handle,
+                    app_display_name=app_display_name,
+                    device_display_name=device_display_name,
+                    ts=pushkey_ts,
+                    lang=lang,
+                    data=data
+                ))
+        except Exception as e:
+            logger.error("create_pusher with failed: %s", e)
+            raise StoreError(500, "Problem creating pusher.")
+
+    @defer.inlineCallbacks
+    def delete_pusher_by_app_id_pushkey(self, app_id, pushkey):
+        yield self._simple_delete_one(
+            PushersTable.table_name,
+            dict(app_id=app_id, pushkey=pushkey)
+        )
+
+    @defer.inlineCallbacks
+    def update_pusher_last_token(self, user_name, pushkey, last_token):
+        yield self._simple_update_one(
+            PushersTable.table_name,
+            {'user_name': user_name, 'pushkey': pushkey},
+            {'last_token': last_token}
+        )
+
+    @defer.inlineCallbacks
+    def update_pusher_last_token_and_success(self, user_name, pushkey,
+                                             last_token, last_success):
+        yield self._simple_update_one(
+            PushersTable.table_name,
+            {'user_name': user_name, 'pushkey': pushkey},
+            {'last_token': last_token, 'last_success': last_success}
+        )
+
+    @defer.inlineCallbacks
+    def update_pusher_failing_since(self, user_name, pushkey, failing_since):
+        yield self._simple_update_one(
+            PushersTable.table_name,
+            {'user_name': user_name, 'pushkey': pushkey},
+            {'failing_since': failing_since}
+        )
+
+
+class PushersTable(Table):
+    table_name = "pushers"
+
+    fields = [
+        "id",
+        "user_name",
+        "kind",
+        "instance_handle",
+        "app_id",
+        "app_display_name",
+        "device_display_name",
+        "pushkey",
+        "pushkey_ts",
+        "data",
+        "last_token",
+        "last_success",
+        "failing_since"
+    ]
+
+    EntryType = collections.namedtuple("PusherEntry", fields)
\ No newline at end of file
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 978b2c4a48..6542f8e4f8 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -58,13 +58,6 @@ class RoomStore(SQLBaseStore):
             logger.error("store_room with room_id=%s failed: %s", room_id, e)
             raise StoreError(500, "Problem creating room.")
 
-    def store_room_config(self, room_id, visibility):
-        return self._simple_update_one(
-            table=RoomsTable.table_name,
-            keyvalues={"room_id": room_id},
-            updatevalues={"is_public": visibility}
-        )
-
     def get_room(self, room_id):
         """Retrieve a room.
 
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index e59e65529b..c69dd995ce 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -20,6 +20,7 @@ from collections import namedtuple
 from ._base import SQLBaseStore
 
 from synapse.api.constants import Membership
+from synapse.types import UserID
 
 import logging
 
@@ -39,7 +40,7 @@ class RoomMemberStore(SQLBaseStore):
         """
         try:
             target_user_id = event.state_key
-            domain = self.hs.parse_userid(target_user_id).domain
+            domain = UserID.from_string(target_user_id).domain
         except:
             logger.exception(
                 "Failed to parse target_user_id=%s", target_user_id
@@ -84,7 +85,7 @@ class RoomMemberStore(SQLBaseStore):
             for e in member_events:
                 try:
                     joined_domains.add(
-                        self.hs.parse_userid(e.state_key).domain
+                        UserID.from_string(e.state_key).domain
                     )
                 except:
                     # FIXME: How do we deal with invalid user ids in the db?
diff --git a/synapse/storage/schema/delta/v12.sql b/synapse/storage/schema/delta/v12.sql
new file mode 100644
index 0000000000..8c4dfd5c1b
--- /dev/null
+++ b/synapse/storage/schema/delta/v12.sql
@@ -0,0 +1,46 @@
+/* Copyright 2014 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-- Push notification endpoints that users have configured
+CREATE TABLE IF NOT EXISTS pushers (
+  id INTEGER PRIMARY KEY AUTOINCREMENT,
+  user_name TEXT NOT NULL,
+  instance_handle varchar(32) NOT NULL,
+  kind varchar(8) NOT NULL,
+  app_id varchar(64) NOT NULL,
+  app_display_name varchar(64) NOT NULL,
+  device_display_name varchar(128) NOT NULL,
+  pushkey blob NOT NULL,
+  ts BIGINT NOT NULL,
+  lang varchar(8),
+  data blob,
+  last_token TEXT,
+  last_success BIGINT,
+  failing_since BIGINT,
+  FOREIGN KEY(user_name) REFERENCES users(name),
+  UNIQUE (app_id, pushkey)
+);
+
+CREATE TABLE IF NOT EXISTS push_rules (
+  id INTEGER PRIMARY KEY AUTOINCREMENT,
+  user_name TEXT NOT NULL,
+  rule_id TEXT NOT NULL,
+  priority_class TINYINT NOT NULL,
+  priority INTEGER NOT NULL DEFAULT 0,
+  conditions TEXT NOT NULL,
+  actions TEXT NOT NULL,
+  UNIQUE(user_name, rule_id)
+);
+
+CREATE INDEX IF NOT EXISTS push_rules_user_name on push_rules (user_name);
diff --git a/synapse/storage/schema/pusher.sql b/synapse/storage/schema/pusher.sql
new file mode 100644
index 0000000000..8c4dfd5c1b
--- /dev/null
+++ b/synapse/storage/schema/pusher.sql
@@ -0,0 +1,46 @@
+/* Copyright 2014 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-- Push notification endpoints that users have configured
+CREATE TABLE IF NOT EXISTS pushers (
+  id INTEGER PRIMARY KEY AUTOINCREMENT,
+  user_name TEXT NOT NULL,
+  instance_handle varchar(32) NOT NULL,
+  kind varchar(8) NOT NULL,
+  app_id varchar(64) NOT NULL,
+  app_display_name varchar(64) NOT NULL,
+  device_display_name varchar(128) NOT NULL,
+  pushkey blob NOT NULL,
+  ts BIGINT NOT NULL,
+  lang varchar(8),
+  data blob,
+  last_token TEXT,
+  last_success BIGINT,
+  failing_since BIGINT,
+  FOREIGN KEY(user_name) REFERENCES users(name),
+  UNIQUE (app_id, pushkey)
+);
+
+CREATE TABLE IF NOT EXISTS push_rules (
+  id INTEGER PRIMARY KEY AUTOINCREMENT,
+  user_name TEXT NOT NULL,
+  rule_id TEXT NOT NULL,
+  priority_class TINYINT NOT NULL,
+  priority INTEGER NOT NULL DEFAULT 0,
+  conditions TEXT NOT NULL,
+  actions TEXT NOT NULL,
+  UNIQUE(user_name, rule_id)
+);
+
+CREATE INDEX IF NOT EXISTS push_rules_user_name on push_rules (user_name);
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 5327517704..71db16d0e5 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -78,12 +78,6 @@ class StateStore(SQLBaseStore):
             f,
         )
 
-    def store_state_groups(self, event):
-        return self.runInteraction(
-            "store_state_groups",
-            self._store_state_groups_txn, event
-        )
-
     def _store_state_groups_txn(self, txn, event, context):
         if context.current_state is None:
             return
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index bedc3c6c52..8ac2adab05 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -39,6 +39,8 @@ from ._base import SQLBaseStore
 from synapse.api.errors import SynapseError
 from synapse.util.logutils import log_function
 
+from collections import namedtuple
+
 import logging
 
 
@@ -52,91 +54,79 @@ _STREAM_TOKEN = "stream"
 _TOPOLOGICAL_TOKEN = "topological"
 
 
-def _parse_stream_token(string):
-    try:
-        if string[0] != 's':
-            raise
-        return int(string[1:])
-    except:
-        raise SynapseError(400, "Invalid token")
-
-
-def _parse_topological_token(string):
-    try:
-        if string[0] != 't':
-            raise
-        parts = string[1:].split('-', 1)
-        return (int(parts[0]), int(parts[1]))
-    except:
-        raise SynapseError(400, "Invalid token")
-
-
-def is_stream_token(string):
-    try:
-        _parse_stream_token(string)
-        return True
-    except:
-        return False
-
-
-def is_topological_token(string):
-    try:
-        _parse_topological_token(string)
-        return True
-    except:
-        return False
-
-
-def _get_token_bound(token, comparison):
-    try:
-        s = _parse_stream_token(token)
-        return "%s %s %d" % ("stream_ordering", comparison, s)
-    except:
-        pass
-
-    try:
-        top, stream = _parse_topological_token(token)
-        return "%s %s %d AND %s %s %d" % (
-            "topological_ordering", comparison, top,
-            "stream_ordering", comparison, stream,
-        )
-    except:
-        pass
-
-    raise SynapseError(400, "Invalid token")
-
-
-class StreamStore(SQLBaseStore):
-    @log_function
-    def get_room_events(self, user_id, from_key, to_key, room_id, limit=0,
-                        direction='f', with_feedback=False):
-        # We deal with events request in two different ways depending on if
-        # this looks like an /events request or a pagination request.
-        is_events = (
-            direction == 'f'
-            and user_id
-            and is_stream_token(from_key)
-            and to_key and is_stream_token(to_key)
-        )
+class _StreamToken(namedtuple("_StreamToken", "topological stream")):
+    """Tokens are positions between events. The token "s1" comes after event 1.
+
+            s0    s1
+            |     |
+        [0] V [1] V [2]
+
+    Tokens can either be a point in the live event stream or a cursor going
+    through historic events.
+
+    When traversing the live event stream events are ordered by when they
+    arrived at the homeserver.
+
+    When traversing historic events the events are ordered by their depth in
+    the event graph "topological_ordering" and then by when they arrived at the
+    homeserver "stream_ordering".
+
+    Live tokens start with an "s" followed by the "stream_ordering" id of the
+    event it comes after. Historic tokens start with a "t" followed by the
+    "topological_ordering" id of the event it comes after, follewed by "-",
+    followed by the "stream_ordering" id of the event it comes after.
+    """
+    __slots__ = []
+
+    @classmethod
+    def parse(cls, string):
+        try:
+            if string[0] == 's':
+                return cls(None, int(string[1:]))
+            if string[0] == 't':
+                parts = string[1:].split('-', 1)
+                return cls(int(parts[1]), int(parts[0]))
+        except:
+            pass
+        raise SynapseError(400, "Invalid token %r" % (string,))
+
+    @classmethod
+    def parse_stream_token(cls, string):
+        try:
+            if string[0] == 's':
+                return cls(None, int(string[1:]))
+        except:
+            pass
+        raise SynapseError(400, "Invalid token %r" % (string,))
+
+    def __str__(self):
+        if self.topological is not None:
+            return "t%d-%d" % (self.topological, self.stream)
+        else:
+            return "s%d" % (self.stream,)
 
-        if is_events:
-            return self.get_room_events_stream(
-                user_id=user_id,
-                from_key=from_key,
-                to_key=to_key,
-                room_id=room_id,
-                limit=limit,
-                with_feedback=with_feedback,
+    def lower_bound(self):
+        if self.topological is None:
+            return "(%d < %s)" % (self.stream, "stream_ordering")
+        else:
+            return "(%d < %s OR (%d == %s AND %d < %s))" % (
+                self.topological, "topological_ordering",
+                self.topological, "topological_ordering",
+                self.stream, "stream_ordering",
             )
+
+    def upper_bound(self):
+        if self.topological is None:
+            return "(%d >= %s)" % (self.stream, "stream_ordering")
         else:
-            return self.paginate_room_events(
-                from_key=from_key,
-                to_key=to_key,
-                room_id=room_id,
-                limit=limit,
-                with_feedback=with_feedback,
+            return "(%d > %s OR (%d == %s AND %d >= %s))" % (
+                self.topological, "topological_ordering",
+                self.topological, "topological_ordering",
+                self.stream, "stream_ordering",
             )
 
+
+class StreamStore(SQLBaseStore):
     @log_function
     def get_room_events_stream(self, user_id, from_key, to_key, room_id,
                                limit=0, with_feedback=False):
@@ -162,8 +152,8 @@ class StreamStore(SQLBaseStore):
             limit = MAX_STREAM_SIZE
 
         # From and to keys should be integers from ordering.
-        from_id = _parse_stream_token(from_key)
-        to_id = _parse_stream_token(to_key)
+        from_id = _StreamToken.parse_stream_token(from_key)
+        to_id = _StreamToken.parse_stream_token(to_key)
 
         if from_key == to_key:
             return defer.succeed(([], to_key))
@@ -181,7 +171,7 @@ class StreamStore(SQLBaseStore):
         }
 
         def f(txn):
-            txn.execute(sql, (user_id, user_id, from_id, to_id,))
+            txn.execute(sql, (user_id, user_id, from_id.stream, to_id.stream,))
 
             rows = self.cursor_to_dict(txn)
 
@@ -211,17 +201,21 @@ class StreamStore(SQLBaseStore):
         # Tokens really represent positions between elements, but we use
         # the convention of pointing to the event before the gap. Hence
         # we have a bit of asymmetry when it comes to equalities.
-        from_comp = '<=' if direction == 'b' else '>'
-        to_comp = '>' if direction == 'b' else '<='
-        order = "DESC" if direction == 'b' else "ASC"
-
         args = [room_id]
-
-        bounds = _get_token_bound(from_key, from_comp)
-        if to_key:
-            bounds = "%s AND %s" % (
-                bounds, _get_token_bound(to_key, to_comp)
-            )
+        if direction == 'b':
+            order = "DESC"
+            bounds = _StreamToken.parse(from_key).upper_bound()
+            if to_key:
+                bounds = "%s AND %s" % (
+                    bounds, _StreamToken.parse(to_key).lower_bound()
+                )
+        else:
+            order = "ASC"
+            bounds = _StreamToken.parse(from_key).lower_bound()
+            if to_key:
+                bounds = "%s AND %s" % (
+                    bounds, _StreamToken.parse(to_key).upper_bound()
+                )
 
         if int(limit) > 0:
             args.append(int(limit))
@@ -249,9 +243,13 @@ class StreamStore(SQLBaseStore):
                 topo = rows[-1]["topological_ordering"]
                 toke = rows[-1]["stream_ordering"]
                 if direction == 'b':
-                    topo -= 1
+                    # Tokens are positions between events.
+                    # This token points *after* the last event in the chunk.
+                    # We need it to point to the event before it in the chunk
+                    # when we are going backwards so we subtract one from the
+                    # stream part.
                     toke -= 1
-                next_token = "t%s-%s" % (topo, toke)
+                next_token = str(_StreamToken(topo, toke))
             else:
                 # TODO (erikj): We should work out what to do here instead.
                 next_token = to_key if to_key else from_key
@@ -284,9 +282,14 @@ class StreamStore(SQLBaseStore):
             rows.reverse()  # As we selected with reverse ordering
 
             if rows:
+                # Tokens are positions between events.
+                # This token points *after* the last event in the chunk.
+                # We need it to point to the event before it in the chunk
+                # since we are going backwards so we subtract one from the
+                # stream part.
                 topo = rows[0]["topological_ordering"]
-                toke = rows[0]["stream_ordering"]
-                start_token = "t%s-%s" % (topo, toke)
+                toke = rows[0]["stream_ordering"] - 1
+                start_token = str(_StreamToken(topo, toke))
 
                 token = (start_token, end_token)
             else: