summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2015-02-11 10:33:19 +0000
committerErik Johnston <erik@matrix.org>2015-02-11 10:33:19 +0000
commitfb233dc40b3111e905e394e627b8ecc3b2a91e80 (patch)
tree931ab474f042b5eb5df9663306a4936406dcf17c /synapse/storage
parentPEP8 (diff)
parentMerge branch 'master' of github.com:matrix-org/synapse into develop (diff)
downloadsynapse-fb233dc40b3111e905e394e627b8ecc3b2a91e80.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into timeout-federation-requests
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py296
-rw-r--r--synapse/storage/_base.py161
-rw-r--r--synapse/storage/filtering.py63
-rw-r--r--synapse/storage/push_rule.py218
-rw-r--r--synapse/storage/pusher.py173
-rw-r--r--synapse/storage/registration.py3
-rw-r--r--synapse/storage/rejections.py43
-rw-r--r--synapse/storage/room.py7
-rw-r--r--synapse/storage/roommember.py5
-rw-r--r--synapse/storage/schema/delta/v12.sql65
-rw-r--r--synapse/storage/schema/filtering.sql24
-rw-r--r--synapse/storage/schema/pusher.sql46
-rw-r--r--synapse/storage/schema/rejections.sql21
-rw-r--r--synapse/storage/state.py6
-rw-r--r--synapse/storage/stream.py257
15 files changed, 1176 insertions, 212 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 4beb951b9f..a63c59a8a2 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -29,10 +29,14 @@ from .stream import StreamStore
 from .transactions import TransactionStore
 from .keys import KeyStore
 from .event_federation import EventFederationStore
+from .pusher import PusherStore
+from .push_rule import PushRuleStore
 from .media_repository import MediaRepositoryStore
+from .rejections import RejectionsStore
 
 from .state import StateStore
 from .signatures import SignatureStore
+from .filtering import FilteringStore
 
 from syutil.base64util import decode_base64
 from syutil.jsonutil import encode_canonical_json
@@ -60,13 +64,16 @@ SCHEMAS = [
     "state",
     "event_edges",
     "event_signatures",
+    "pusher",
     "media_repository",
+    "filtering",
+    "rejections",
 ]
 
 
 # Remember to update this number every time an incompatible change is made to
 # database schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 11
+SCHEMA_VERSION = 12
 
 
 class _RollbackButIsFineException(Exception):
@@ -82,6 +89,10 @@ class DataStore(RoomMemberStore, RoomStore,
                 DirectoryStore, KeyStore, StateStore, SignatureStore,
                 EventFederationStore,
                 MediaRepositoryStore,
+                RejectionsStore,
+                FilteringStore,
+                PusherStore,
+                PushRuleStore
                 ):
 
     def __init__(self, hs):
@@ -117,21 +128,144 @@ class DataStore(RoomMemberStore, RoomStore,
             pass
 
     @defer.inlineCallbacks
-    def get_event(self, event_id, allow_none=False):
-        events = yield self._get_events([event_id])
+    def get_event(self, event_id, check_redacted=True,
+                  get_prev_content=False, allow_rejected=False,
+                  allow_none=False):
+        """Get an event from the database by event_id.
+
+        Args:
+            event_id (str): The event_id of the event to fetch
+            check_redacted (bool): If True, check if event has been redacted
+                and redact it.
+            get_prev_content (bool): If True and event is a state event,
+                include the previous states content in the unsigned field.
+            allow_rejected (bool): If True return rejected events.
+            allow_none (bool): If True, return None if no event found, if
+                False throw an exception.
+
+        Returns:
+            Deferred : A FrozenEvent.
+        """
+        event = yield self.runInteraction(
+            "get_event", self._get_event_txn,
+            event_id,
+            check_redacted=check_redacted,
+            get_prev_content=get_prev_content,
+            allow_rejected=allow_rejected,
+        )
 
-        if not events:
-            if allow_none:
-                defer.returnValue(None)
-            else:
-                raise RuntimeError("Could not find event %s" % (event_id,))
+        if not event and not allow_none:
+            raise RuntimeError("Could not find event %s" % (event_id,))
 
-        defer.returnValue(events[0])
+        defer.returnValue(event)
 
     @log_function
     def _persist_event_txn(self, txn, event, context, backfilled,
                            stream_ordering=None, is_new_state=True,
                            current_state=None):
+
+        # We purposefully do this first since if we include a `current_state`
+        # key, we *want* to update the `current_state_events` table
+        if current_state:
+            txn.execute(
+                "DELETE FROM current_state_events WHERE room_id = ?",
+                (event.room_id,)
+            )
+
+            for s in current_state:
+                self._simple_insert_txn(
+                    txn,
+                    "current_state_events",
+                    {
+                        "event_id": s.event_id,
+                        "room_id": s.room_id,
+                        "type": s.type,
+                        "state_key": s.state_key,
+                    },
+                    or_replace=True,
+                )
+
+        if event.is_state() and is_new_state:
+            if not backfilled and not context.rejected:
+                self._simple_insert_txn(
+                    txn,
+                    table="state_forward_extremities",
+                    values={
+                        "event_id": event.event_id,
+                        "room_id": event.room_id,
+                        "type": event.type,
+                        "state_key": event.state_key,
+                    },
+                    or_replace=True,
+                )
+
+                for prev_state_id, _ in event.prev_state:
+                    self._simple_delete_txn(
+                        txn,
+                        table="state_forward_extremities",
+                        keyvalues={
+                            "event_id": prev_state_id,
+                        }
+                    )
+
+        outlier = event.internal_metadata.is_outlier()
+
+        if not outlier:
+            self._store_state_groups_txn(txn, event, context)
+
+            self._update_min_depth_for_room_txn(
+                txn,
+                event.room_id,
+                event.depth
+            )
+
+        self._handle_prev_events(
+            txn,
+            outlier=outlier,
+            event_id=event.event_id,
+            prev_events=event.prev_events,
+            room_id=event.room_id,
+        )
+
+        have_persisted = self._simple_select_one_onecol_txn(
+            txn,
+            table="event_json",
+            keyvalues={"event_id": event.event_id},
+            retcol="event_id",
+            allow_none=True,
+        )
+
+        metadata_json = encode_canonical_json(
+            event.internal_metadata.get_dict()
+        )
+
+        # If we have already persisted this event, we don't need to do any
+        # more processing.
+        # The processing above must be done on every call to persist event,
+        # since they might not have happened on previous calls. For example,
+        # if we are persisting an event that we had persisted as an outlier,
+        # but is no longer one.
+        if have_persisted:
+            if not outlier:
+                sql = (
+                    "UPDATE event_json SET internal_metadata = ?"
+                    " WHERE event_id = ?"
+                )
+                txn.execute(
+                    sql,
+                    (metadata_json.decode("UTF-8"), event.event_id,)
+                )
+
+                sql = (
+                    "UPDATE events SET outlier = 0"
+                    " WHERE event_id = ?"
+                )
+                txn.execute(
+                    sql,
+                    (event.event_id,)
+                )
+            return
+
         if event.type == EventTypes.Member:
             self._store_room_member_txn(txn, event)
         elif event.type == EventTypes.Feedback:
@@ -143,8 +277,6 @@ class DataStore(RoomMemberStore, RoomStore,
         elif event.type == EventTypes.Redaction:
             self._store_redaction(txn, event)
 
-        outlier = event.internal_metadata.is_outlier()
-
         event_dict = {
             k: v
             for k, v in event.get_dict().items()
@@ -154,10 +286,6 @@ class DataStore(RoomMemberStore, RoomStore,
             ]
         }
 
-        metadata_json = encode_canonical_json(
-            event.internal_metadata.get_dict()
-        )
-
         self._simple_insert_txn(
             txn,
             table="event_json",
@@ -213,38 +341,10 @@ class DataStore(RoomMemberStore, RoomStore,
             )
             raise _RollbackButIsFineException("_persist_event")
 
-        self._handle_prev_events(
-            txn,
-            outlier=outlier,
-            event_id=event.event_id,
-            prev_events=event.prev_events,
-            room_id=event.room_id,
-        )
-
-        if not outlier:
-            self._store_state_groups_txn(txn, event, context)
-
-        if current_state:
-            txn.execute(
-                "DELETE FROM current_state_events WHERE room_id = ?",
-                (event.room_id,)
-            )
-
-            for s in current_state:
-                self._simple_insert_txn(
-                    txn,
-                    "current_state_events",
-                    {
-                        "event_id": s.event_id,
-                        "room_id": s.room_id,
-                        "type": s.type,
-                        "state_key": s.state_key,
-                    },
-                    or_replace=True,
-                )
+        if context.rejected:
+            self._store_rejections_txn(txn, event.event_id, context.rejected)
 
-        is_state = hasattr(event, "state_key") and event.state_key is not None
-        if is_state:
+        if event.is_state():
             vals = {
                 "event_id": event.event_id,
                 "room_id": event.room_id,
@@ -252,6 +352,7 @@ class DataStore(RoomMemberStore, RoomStore,
                 "state_key": event.state_key,
             }
 
+            # TODO: How does this work with backfilling?
             if hasattr(event, "replaces_state"):
                 vals["prev_state"] = event.replaces_state
 
@@ -262,7 +363,7 @@ class DataStore(RoomMemberStore, RoomStore,
                 or_replace=True,
             )
 
-            if is_new_state:
+            if is_new_state and not context.rejected:
                 self._simple_insert_txn(
                     txn,
                     "current_state_events",
@@ -288,28 +389,6 @@ class DataStore(RoomMemberStore, RoomStore,
                     or_ignore=True,
                 )
 
-            if not backfilled:
-                self._simple_insert_txn(
-                    txn,
-                    table="state_forward_extremities",
-                    values={
-                        "event_id": event.event_id,
-                        "room_id": event.room_id,
-                        "type": event.type,
-                        "state_key": event.state_key,
-                    },
-                    or_replace=True,
-                )
-
-                for prev_state_id, _ in event.prev_state:
-                    self._simple_delete_txn(
-                        txn,
-                        table="state_forward_extremities",
-                        keyvalues={
-                            "event_id": prev_state_id,
-                        }
-                    )
-
         for hash_alg, hash_base64 in event.hashes.items():
             hash_bytes = decode_base64(hash_base64)
             self._store_event_content_hash_txn(
@@ -340,13 +419,6 @@ class DataStore(RoomMemberStore, RoomStore,
             txn, event.event_id, ref_alg, ref_hash_bytes
         )
 
-        if not outlier:
-            self._update_min_depth_for_room_txn(
-                txn,
-                event.room_id,
-                event.depth
-            )
-
     def _store_redaction(self, txn, event):
         txn.execute(
             "INSERT OR IGNORE INTO redactions "
@@ -370,9 +442,12 @@ class DataStore(RoomMemberStore, RoomStore,
             "redacted": del_sql,
         }
 
-        if event_type:
+        if event_type and state_key is not None:
             sql += " AND s.type = ? AND s.state_key = ? "
             args = (room_id, event_type, state_key)
+        elif event_type:
+            sql += " AND s.type = ?"
+            args = (room_id, event_type)
         else:
             args = (room_id, )
 
@@ -382,6 +457,41 @@ class DataStore(RoomMemberStore, RoomStore,
         defer.returnValue(events)
 
     @defer.inlineCallbacks
+    def get_room_name_and_aliases(self, room_id):
+        del_sql = (
+            "SELECT event_id FROM redactions WHERE redacts = e.event_id "
+            "LIMIT 1"
+        )
+
+        sql = (
+            "SELECT e.*, (%(redacted)s) AS redacted FROM events as e "
+            "INNER JOIN current_state_events as c ON e.event_id = c.event_id "
+            "INNER JOIN state_events as s ON e.event_id = s.event_id "
+            "WHERE c.room_id = ? "
+        ) % {
+            "redacted": del_sql,
+        }
+
+        sql += " AND ((s.type = 'm.room.name' AND s.state_key = '')"
+        sql += " OR s.type = 'm.room.aliases')"
+        args = (room_id,)
+
+        results = yield self._execute_and_decode(sql, *args)
+
+        events = yield self._parse_events(results)
+
+        name = None
+        aliases = []
+
+        for e in events:
+            if e.type == 'm.room.name':
+                name = e.content['name']
+            elif e.type == 'm.room.aliases':
+                aliases.extend(e.content['aliases'])
+
+        defer.returnValue((name, aliases))
+
+    @defer.inlineCallbacks
     def _get_min_token(self):
         row = yield self._execute(
             None,
@@ -417,6 +527,38 @@ class DataStore(RoomMemberStore, RoomStore,
             ],
         )
 
+    def have_events(self, event_ids):
+        """Given a list of event ids, check if we have already processed them.
+
+        Returns:
+            dict: Has an entry for each event id we already have seen. Maps to
+            the rejected reason string if we rejected the event, else maps to
+            None.
+        """
+        if not event_ids:
+            return defer.succeed({})
+
+        def f(txn):
+            sql = (
+                "SELECT e.event_id, reason FROM events as e "
+                "LEFT JOIN rejections as r ON e.event_id = r.event_id "
+                "WHERE e.event_id = ?"
+            )
+
+            res = {}
+            for event_id in event_ids:
+                txn.execute(sql, (event_id,))
+                row = txn.fetchone()
+                if row:
+                    _, rejected = row
+                    res[event_id] = rejected
+
+            return res
+
+        return self.runInteraction(
+            "have_events", f,
+        )
+
 
 def schema_path(schema):
     """ Get a filesystem path for the named database schema
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index f660fc6eaf..3e1ab0a159 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -77,6 +77,43 @@ class LoggingTransaction(object):
             sql_logger.debug("[SQL time] {%s} %f", self.name, end - start)
 
 
+class PerformanceCounters(object):
+    def __init__(self):
+        self.current_counters = {}
+        self.previous_counters = {}
+
+    def update(self, key, start_time, end_time=None):
+        if end_time is None:
+            end_time = time.time() * 1000
+        duration = end_time - start_time
+        count, cum_time = self.current_counters.get(key, (0, 0))
+        count += 1
+        cum_time += duration
+        self.current_counters[key] = (count, cum_time)
+        return end_time
+
+    def interval(self, interval_duration, limit=3):
+        counters = []
+        for name, (count, cum_time) in self.current_counters.items():
+            prev_count, prev_time = self.previous_counters.get(name, (0, 0))
+            counters.append((
+                (cum_time - prev_time) / interval_duration,
+                count - prev_count,
+                name
+            ))
+
+        self.previous_counters = dict(self.current_counters)
+
+        counters.sort(reverse=True)
+
+        top_n_counters = ", ".join(
+            "%s(%d): %.3f%%" % (name, count, 100 * ratio)
+            for ratio, count, name in counters[:limit]
+        )
+
+        return top_n_counters
+
+
 class SQLBaseStore(object):
     _TXN_ID = 0
 
@@ -85,6 +122,41 @@ class SQLBaseStore(object):
         self._db_pool = hs.get_db_pool()
         self._clock = hs.get_clock()
 
+        self._previous_txn_total_time = 0
+        self._current_txn_total_time = 0
+        self._previous_loop_ts = 0
+        self._txn_perf_counters = PerformanceCounters()
+        self._get_event_counters = PerformanceCounters()
+
+    def start_profiling(self):
+        self._previous_loop_ts = self._clock.time_msec()
+
+        def loop():
+            curr = self._current_txn_total_time
+            prev = self._previous_txn_total_time
+            self._previous_txn_total_time = curr
+
+            time_now = self._clock.time_msec()
+            time_then = self._previous_loop_ts
+            self._previous_loop_ts = time_now
+
+            ratio = (curr - prev)/(time_now - time_then)
+
+            top_three_counters = self._txn_perf_counters.interval(
+                time_now - time_then, limit=3
+            )
+
+            top_3_event_counters = self._get_event_counters.interval(
+                time_now - time_then, limit=3
+            )
+
+            logger.info(
+                "Total database time: %.3f%% {%s} {%s}",
+                ratio * 100, top_three_counters, top_3_event_counters
+            )
+
+        self._clock.looping_call(loop, 10000)
+
     @defer.inlineCallbacks
     def runInteraction(self, desc, func, *args, **kwargs):
         """Wraps the .runInteraction() method on the underlying db_pool."""
@@ -94,7 +166,7 @@ class SQLBaseStore(object):
             with LoggingContext("runInteraction") as context:
                 current_context.copy_to(context)
                 start = time.time() * 1000
-                txn_id = SQLBaseStore._TXN_ID
+                txn_id = self._TXN_ID
 
                 # We don't really need these to be unique, so lets stop it from
                 # growing really large.
@@ -114,6 +186,10 @@ class SQLBaseStore(object):
                         "[TXN END] {%s} %f",
                         name, end - start
                     )
+
+                    self._current_txn_total_time += end - start
+                    self._txn_perf_counters.update(desc, start, end)
+
         with PreserveLoggingContext():
             result = yield self._db_pool.runInteraction(
                 inner_func, *args, **kwargs
@@ -193,6 +269,50 @@ class SQLBaseStore(object):
         txn.execute(sql, values.values())
         return txn.lastrowid
 
+    def _simple_upsert(self, table, keyvalues, values):
+        """
+        Args:
+            table (str): The table to upsert into
+            keyvalues (dict): The unique key tables and their new values
+            values (dict): The nonunique columns and their new values
+        Returns: A deferred
+        """
+        return self.runInteraction(
+            "_simple_upsert",
+            self._simple_upsert_txn, table, keyvalues, values
+        )
+
+    def _simple_upsert_txn(self, txn, table, keyvalues, values):
+        # Try to update
+        sql = "UPDATE %s SET %s WHERE %s" % (
+            table,
+            ", ".join("%s = ?" % (k,) for k in values),
+            " AND ".join("%s = ?" % (k,) for k in keyvalues)
+        )
+        sqlargs = values.values() + keyvalues.values()
+        logger.debug(
+            "[SQL] %s Args=%s",
+            sql, sqlargs,
+        )
+
+        txn.execute(sql, sqlargs)
+        if txn.rowcount == 0:
+            # We didn't update and rows so insert a new one
+            allvalues = {}
+            allvalues.update(keyvalues)
+            allvalues.update(values)
+
+            sql = "INSERT INTO %s (%s) VALUES (%s)" % (
+                table,
+                ", ".join(k for k in allvalues),
+                ", ".join("?" for _ in allvalues)
+            )
+            logger.debug(
+                "[SQL] %s Args=%s",
+                sql, keyvalues.values(),
+            )
+            txn.execute(sql, allvalues.values())
+
     def _simple_select_one(self, table, keyvalues, retcols,
                            allow_none=False):
         """Executes a SELECT query on the named table, which is expected to
@@ -344,8 +464,8 @@ class SQLBaseStore(object):
         if updatevalues:
             update_sql = "UPDATE %s SET %s WHERE %s" % (
                 table,
-                ", ".join("%s = ?" % (k) for k in updatevalues),
-                " AND ".join("%s = ?" % (k) for k in keyvalues)
+                ", ".join("%s = ?" % (k,) for k in updatevalues),
+                " AND ".join("%s = ?" % (k,) for k in keyvalues)
             )
 
         def func(txn):
@@ -458,14 +578,18 @@ class SQLBaseStore(object):
         return [e for e in events if e]
 
     def _get_event_txn(self, txn, event_id, check_redacted=True,
-                       get_prev_content=False):
+                       get_prev_content=False, allow_rejected=False):
         sql = (
-            "SELECT internal_metadata, json, r.event_id FROM event_json as e "
+            "SELECT e.internal_metadata, e.json, r.event_id, rej.reason "
+            "FROM event_json as e "
             "LEFT JOIN redactions as r ON e.event_id = r.redacts "
+            "LEFT JOIN rejections as rej on rej.event_id = e.event_id  "
             "WHERE e.event_id = ? "
             "LIMIT 1 "
         )
 
+        start_time = time.time() * 1000
+
         txn.execute(sql, (event_id,))
 
         res = txn.fetchone()
@@ -473,20 +597,33 @@ class SQLBaseStore(object):
         if not res:
             return None
 
-        internal_metadata, js, redacted = res
+        internal_metadata, js, redacted, rejected_reason = res
 
-        return self._get_event_from_row_txn(
-            txn, internal_metadata, js, redacted,
-            check_redacted=check_redacted,
-            get_prev_content=get_prev_content,
-        )
+        self._get_event_counters.update("select_event", start_time)
+
+        if allow_rejected or not rejected_reason:
+            return self._get_event_from_row_txn(
+                txn, internal_metadata, js, redacted,
+                check_redacted=check_redacted,
+                get_prev_content=get_prev_content,
+            )
+        else:
+            return None
 
     def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted,
                                 check_redacted=True, get_prev_content=False):
+
+        start_time = time.time() * 1000
+        update_counter = self._get_event_counters.update
+
         d = json.loads(js)
+        start_time = update_counter("decode_json", start_time)
+
         internal_metadata = json.loads(internal_metadata)
+        start_time = update_counter("decode_internal", start_time)
 
         ev = FrozenEvent(d, internal_metadata_dict=internal_metadata)
+        start_time = update_counter("build_frozen_event", start_time)
 
         if check_redacted and redacted:
             ev = prune_event(ev)
@@ -502,6 +639,7 @@ class SQLBaseStore(object):
 
             if because:
                 ev.unsigned["redacted_because"] = because
+            start_time = update_counter("redact_event", start_time)
 
         if get_prev_content and "replaces_state" in ev.unsigned:
             prev = self._get_event_txn(
@@ -511,6 +649,7 @@ class SQLBaseStore(object):
             )
             if prev:
                 ev.unsigned["prev_content"] = prev.get_dict()["content"]
+            start_time = update_counter("get_prev_content", start_time)
 
         return ev
 
diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py
new file mode 100644
index 0000000000..e86eeced45
--- /dev/null
+++ b/synapse/storage/filtering.py
@@ -0,0 +1,63 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from ._base import SQLBaseStore
+
+import json
+
+
+class FilteringStore(SQLBaseStore):
+    @defer.inlineCallbacks
+    def get_user_filter(self, user_localpart, filter_id):
+        def_json = yield self._simple_select_one_onecol(
+            table="user_filters",
+            keyvalues={
+                "user_id": user_localpart,
+                "filter_id": filter_id,
+            },
+            retcol="filter_json",
+            allow_none=False,
+        )
+
+        defer.returnValue(json.loads(def_json))
+
+    def add_user_filter(self, user_localpart, user_filter):
+        def_json = json.dumps(user_filter)
+
+        # Need an atomic transaction to SELECT the maximal ID so far then
+        # INSERT a new one
+        def _do_txn(txn):
+            sql = (
+                "SELECT MAX(filter_id) FROM user_filters "
+                "WHERE user_id = ?"
+            )
+            txn.execute(sql, (user_localpart,))
+            max_id = txn.fetchone()[0]
+            if max_id is None:
+                filter_id = 0
+            else:
+                filter_id = max_id + 1
+
+            sql = (
+                "INSERT INTO user_filters (user_id, filter_id, filter_json)"
+                "VALUES(?, ?, ?)"
+            )
+            txn.execute(sql, (user_localpart, filter_id, def_json))
+
+            return filter_id
+
+        return self.runInteraction("add_user_filter", _do_txn)
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
new file mode 100644
index 0000000000..620de71398
--- /dev/null
+++ b/synapse/storage/push_rule.py
@@ -0,0 +1,218 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+
+from ._base import SQLBaseStore, Table
+from twisted.internet import defer
+
+import logging
+import copy
+import json
+
+logger = logging.getLogger(__name__)
+
+
+class PushRuleStore(SQLBaseStore):
+    @defer.inlineCallbacks
+    def get_push_rules_for_user_name(self, user_name):
+        sql = (
+            "SELECT "+",".join(PushRuleTable.fields)+" "
+            "FROM "+PushRuleTable.table_name+" "
+            "WHERE user_name = ? "
+            "ORDER BY priority_class DESC, priority DESC"
+        )
+        rows = yield self._execute(None, sql, user_name)
+
+        dicts = []
+        for r in rows:
+            d = {}
+            for i, f in enumerate(PushRuleTable.fields):
+                d[f] = r[i]
+            dicts.append(d)
+
+        defer.returnValue(dicts)
+
+    @defer.inlineCallbacks
+    def add_push_rule(self, before, after, **kwargs):
+        vals = copy.copy(kwargs)
+        if 'conditions' in vals:
+            vals['conditions'] = json.dumps(vals['conditions'])
+        if 'actions' in vals:
+            vals['actions'] = json.dumps(vals['actions'])
+        # we could check the rest of the keys are valid column names
+        # but sqlite will do that anyway so I think it's just pointless.
+        if 'id' in vals:
+            del vals['id']
+
+        if before or after:
+            ret = yield self.runInteraction(
+                "_add_push_rule_relative_txn",
+                self._add_push_rule_relative_txn,
+                before=before,
+                after=after,
+                **vals
+            )
+            defer.returnValue(ret)
+        else:
+            ret = yield self.runInteraction(
+                "_add_push_rule_highest_priority_txn",
+                self._add_push_rule_highest_priority_txn,
+                **vals
+            )
+            defer.returnValue(ret)
+
+    def _add_push_rule_relative_txn(self, txn, user_name, **kwargs):
+        after = None
+        relative_to_rule = None
+        if 'after' in kwargs and kwargs['after']:
+            after = kwargs['after']
+            relative_to_rule = after
+        if 'before' in kwargs and kwargs['before']:
+            relative_to_rule = kwargs['before']
+
+        # get the priority of the rule we're inserting after/before
+        sql = (
+            "SELECT priority_class, priority FROM ? "
+            "WHERE user_name = ? and rule_id = ?" % (PushRuleTable.table_name,)
+        )
+        txn.execute(sql, (user_name, relative_to_rule))
+        res = txn.fetchall()
+        if not res:
+            raise RuleNotFoundException(
+                "before/after rule not found: %s" % (relative_to_rule,)
+            )
+        priority_class, base_rule_priority = res[0]
+
+        if 'priority_class' in kwargs and kwargs['priority_class'] != priority_class:
+            raise InconsistentRuleException(
+                "Given priority class does not match class of relative rule"
+            )
+
+        new_rule = copy.copy(kwargs)
+        if 'before' in new_rule:
+            del new_rule['before']
+        if 'after' in new_rule:
+            del new_rule['after']
+        new_rule['priority_class'] = priority_class
+        new_rule['user_name'] = user_name
+
+        # check if the priority before/after is free
+        new_rule_priority = base_rule_priority
+        if after:
+            new_rule_priority -= 1
+        else:
+            new_rule_priority += 1
+
+        new_rule['priority'] = new_rule_priority
+
+        sql = (
+            "SELECT COUNT(*) FROM " + PushRuleTable.table_name +
+            " WHERE user_name = ? AND priority_class = ? AND priority = ?"
+        )
+        txn.execute(sql, (user_name, priority_class, new_rule_priority))
+        res = txn.fetchall()
+        num_conflicting = res[0][0]
+
+        # if there are conflicting rules, bump everything
+        if num_conflicting:
+            sql = "UPDATE "+PushRuleTable.table_name+" SET priority = priority "
+            if after:
+                sql += "-1"
+            else:
+                sql += "+1"
+            sql += " WHERE user_name = ? AND priority_class = ? AND priority "
+            if after:
+                sql += "<= ?"
+            else:
+                sql += ">= ?"
+
+            txn.execute(sql, (user_name, priority_class, new_rule_priority))
+
+        # now insert the new rule
+        sql = "INSERT OR REPLACE INTO "+PushRuleTable.table_name+" ("
+        sql += ",".join(new_rule.keys())+") VALUES ("
+        sql += ", ".join(["?" for _ in new_rule.keys()])+")"
+
+        txn.execute(sql, new_rule.values())
+
+    def _add_push_rule_highest_priority_txn(self, txn, user_name,
+                                            priority_class, **kwargs):
+        # find the highest priority rule in that class
+        sql = (
+            "SELECT COUNT(*), MAX(priority) FROM " + PushRuleTable.table_name +
+            " WHERE user_name = ? and priority_class = ?"
+        )
+        txn.execute(sql, (user_name, priority_class))
+        res = txn.fetchall()
+        (how_many, highest_prio) = res[0]
+
+        new_prio = 0
+        if how_many > 0:
+            new_prio = highest_prio + 1
+
+        # and insert the new rule
+        new_rule = copy.copy(kwargs)
+        if 'id' in new_rule:
+            del new_rule['id']
+        new_rule['user_name'] = user_name
+        new_rule['priority_class'] = priority_class
+        new_rule['priority'] = new_prio
+
+        sql = "INSERT OR REPLACE INTO "+PushRuleTable.table_name+" ("
+        sql += ",".join(new_rule.keys())+") VALUES ("
+        sql += ", ".join(["?" for _ in new_rule.keys()])+")"
+
+        txn.execute(sql, new_rule.values())
+
+    @defer.inlineCallbacks
+    def delete_push_rule(self, user_name, rule_id):
+        """
+        Delete a push rule. Args specify the row to be deleted and can be
+        any of the columns in the push_rule table, but below are the
+        standard ones
+
+        Args:
+            user_name (str): The matrix ID of the push rule owner
+            rule_id (str): The rule_id of the rule to be deleted
+        """
+        yield self._simple_delete_one(
+            PushRuleTable.table_name,
+            {'user_name': user_name, 'rule_id': rule_id}
+        )
+
+
+class RuleNotFoundException(Exception):
+    pass
+
+
+class InconsistentRuleException(Exception):
+    pass
+
+
+class PushRuleTable(Table):
+    table_name = "push_rules"
+
+    fields = [
+        "id",
+        "user_name",
+        "rule_id",
+        "priority_class",
+        "priority",
+        "conditions",
+        "actions",
+    ]
+
+    EntryType = collections.namedtuple("PushRuleEntry", fields)
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
new file mode 100644
index 0000000000..e2a662a6c7
--- /dev/null
+++ b/synapse/storage/pusher.py
@@ -0,0 +1,173 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+
+from ._base import SQLBaseStore, Table
+from twisted.internet import defer
+
+from synapse.api.errors import StoreError
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class PusherStore(SQLBaseStore):
+    @defer.inlineCallbacks
+    def get_pushers_by_app_id_and_pushkey(self, app_id_and_pushkey):
+        sql = (
+            "SELECT id, user_name, kind, profile_tag, app_id,"
+            "app_display_name, device_display_name, pushkey, ts, data, "
+            "last_token, last_success, failing_since "
+            "FROM pushers "
+            "WHERE app_id = ? AND pushkey = ?"
+        )
+
+        rows = yield self._execute(
+            None, sql, app_id_and_pushkey[0], app_id_and_pushkey[1]
+        )
+
+        ret = [
+            {
+                "id": r[0],
+                "user_name": r[1],
+                "kind": r[2],
+                "profile_tag": r[3],
+                "app_id": r[4],
+                "app_display_name": r[5],
+                "device_display_name": r[6],
+                "pushkey": r[7],
+                "pushkey_ts": r[8],
+                "data": r[9],
+                "last_token": r[10],
+                "last_success": r[11],
+                "failing_since": r[12]
+            }
+            for r in rows
+        ]
+
+        defer.returnValue(ret[0])
+
+    @defer.inlineCallbacks
+    def get_all_pushers(self):
+        sql = (
+            "SELECT id, user_name, kind, profile_tag, app_id,"
+            "app_display_name, device_display_name, pushkey, ts, data, "
+            "last_token, last_success, failing_since "
+            "FROM pushers"
+        )
+
+        rows = yield self._execute(None, sql)
+
+        ret = [
+            {
+                "id": r[0],
+                "user_name": r[1],
+                "kind": r[2],
+                "profile_tag": r[3],
+                "app_id": r[4],
+                "app_display_name": r[5],
+                "device_display_name": r[6],
+                "pushkey": r[7],
+                "pushkey_ts": r[8],
+                "data": r[9],
+                "last_token": r[10],
+                "last_success": r[11],
+                "failing_since": r[12]
+            }
+            for r in rows
+        ]
+
+        defer.returnValue(ret)
+
+    @defer.inlineCallbacks
+    def add_pusher(self, user_name, profile_tag, kind, app_id,
+                   app_display_name, device_display_name,
+                   pushkey, pushkey_ts, lang, data):
+        try:
+            yield self._simple_upsert(
+                PushersTable.table_name,
+                dict(
+                    app_id=app_id,
+                    pushkey=pushkey,
+                ),
+                dict(
+                    user_name=user_name,
+                    kind=kind,
+                    profile_tag=profile_tag,
+                    app_display_name=app_display_name,
+                    device_display_name=device_display_name,
+                    ts=pushkey_ts,
+                    lang=lang,
+                    data=data
+                ))
+        except Exception as e:
+            logger.error("create_pusher with failed: %s", e)
+            raise StoreError(500, "Problem creating pusher.")
+
+    @defer.inlineCallbacks
+    def delete_pusher_by_app_id_pushkey(self, app_id, pushkey):
+        yield self._simple_delete_one(
+            PushersTable.table_name,
+            dict(app_id=app_id, pushkey=pushkey)
+        )
+
+    @defer.inlineCallbacks
+    def update_pusher_last_token(self, user_name, pushkey, last_token):
+        yield self._simple_update_one(
+            PushersTable.table_name,
+            {'user_name': user_name, 'pushkey': pushkey},
+            {'last_token': last_token}
+        )
+
+    @defer.inlineCallbacks
+    def update_pusher_last_token_and_success(self, user_name, pushkey,
+                                             last_token, last_success):
+        yield self._simple_update_one(
+            PushersTable.table_name,
+            {'user_name': user_name, 'pushkey': pushkey},
+            {'last_token': last_token, 'last_success': last_success}
+        )
+
+    @defer.inlineCallbacks
+    def update_pusher_failing_since(self, user_name, pushkey, failing_since):
+        yield self._simple_update_one(
+            PushersTable.table_name,
+            {'user_name': user_name, 'pushkey': pushkey},
+            {'failing_since': failing_since}
+        )
+
+
+class PushersTable(Table):
+    table_name = "pushers"
+
+    fields = [
+        "id",
+        "user_name",
+        "kind",
+        "profile_tag",
+        "app_id",
+        "app_display_name",
+        "device_display_name",
+        "pushkey",
+        "pushkey_ts",
+        "data",
+        "last_token",
+        "last_success",
+        "failing_since"
+    ]
+
+    EntryType = collections.namedtuple("PusherEntry", fields)
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 75dffa4db2..029b07cc66 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -122,7 +122,8 @@ class RegistrationStore(SQLBaseStore):
 
     def _query_for_auth(self, txn, token):
         sql = (
-            "SELECT users.name, users.admin, access_tokens.device_id"
+            "SELECT users.name, users.admin,"
+            " access_tokens.device_id, access_tokens.id as token_id"
             " FROM users"
             " INNER JOIN access_tokens on users.id = access_tokens.user_id"
             " WHERE token = ?"
diff --git a/synapse/storage/rejections.py b/synapse/storage/rejections.py
new file mode 100644
index 0000000000..4e1a9a2783
--- /dev/null
+++ b/synapse/storage/rejections.py
@@ -0,0 +1,43 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014, 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import SQLBaseStore
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class RejectionsStore(SQLBaseStore):
+    def _store_rejections_txn(self, txn, event_id, reason):
+        self._simple_insert_txn(
+            txn,
+            table="rejections",
+            values={
+                "event_id": event_id,
+                "reason": reason,
+                "last_check": self._clock.time_msec(),
+            }
+        )
+
+    def get_rejection_reason(self, event_id):
+        return self._simple_select_one_onecol(
+            table="rejections",
+            retcol="reason",
+            keyvalues={
+                "event_id": event_id,
+            },
+            allow_none=True,
+        )
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 978b2c4a48..6542f8e4f8 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -58,13 +58,6 @@ class RoomStore(SQLBaseStore):
             logger.error("store_room with room_id=%s failed: %s", room_id, e)
             raise StoreError(500, "Problem creating room.")
 
-    def store_room_config(self, room_id, visibility):
-        return self._simple_update_one(
-            table=RoomsTable.table_name,
-            keyvalues={"room_id": room_id},
-            updatevalues={"is_public": visibility}
-        )
-
     def get_room(self, room_id):
         """Retrieve a room.
 
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index e59e65529b..c69dd995ce 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -20,6 +20,7 @@ from collections import namedtuple
 from ._base import SQLBaseStore
 
 from synapse.api.constants import Membership
+from synapse.types import UserID
 
 import logging
 
@@ -39,7 +40,7 @@ class RoomMemberStore(SQLBaseStore):
         """
         try:
             target_user_id = event.state_key
-            domain = self.hs.parse_userid(target_user_id).domain
+            domain = UserID.from_string(target_user_id).domain
         except:
             logger.exception(
                 "Failed to parse target_user_id=%s", target_user_id
@@ -84,7 +85,7 @@ class RoomMemberStore(SQLBaseStore):
             for e in member_events:
                 try:
                     joined_domains.add(
-                        self.hs.parse_userid(e.state_key).domain
+                        UserID.from_string(e.state_key).domain
                     )
                 except:
                     # FIXME: How do we deal with invalid user ids in the db?
diff --git a/synapse/storage/schema/delta/v12.sql b/synapse/storage/schema/delta/v12.sql
new file mode 100644
index 0000000000..302d958dbf
--- /dev/null
+++ b/synapse/storage/schema/delta/v12.sql
@@ -0,0 +1,65 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS rejections(
+    event_id TEXT NOT NULL,
+    reason TEXT NOT NULL,
+    last_check TEXT NOT NULL,
+    CONSTRAINT ev_id UNIQUE (event_id) ON CONFLICT REPLACE
+);
+
+-- Push notification endpoints that users have configured
+CREATE TABLE IF NOT EXISTS pushers (
+  id INTEGER PRIMARY KEY AUTOINCREMENT,
+  user_name TEXT NOT NULL,
+  profile_tag varchar(32) NOT NULL,
+  kind varchar(8) NOT NULL,
+  app_id varchar(64) NOT NULL,
+  app_display_name varchar(64) NOT NULL,
+  device_display_name varchar(128) NOT NULL,
+  pushkey blob NOT NULL,
+  ts BIGINT NOT NULL,
+  lang varchar(8),
+  data blob,
+  last_token TEXT,
+  last_success BIGINT,
+  failing_since BIGINT,
+  FOREIGN KEY(user_name) REFERENCES users(name),
+  UNIQUE (app_id, pushkey)
+);
+
+CREATE TABLE IF NOT EXISTS push_rules (
+  id INTEGER PRIMARY KEY AUTOINCREMENT,
+  user_name TEXT NOT NULL,
+  rule_id TEXT NOT NULL,
+  priority_class TINYINT NOT NULL,
+  priority INTEGER NOT NULL DEFAULT 0,
+  conditions TEXT NOT NULL,
+  actions TEXT NOT NULL,
+  UNIQUE(user_name, rule_id)
+);
+
+CREATE INDEX IF NOT EXISTS push_rules_user_name on push_rules (user_name);
+
+CREATE TABLE IF NOT EXISTS user_filters(
+  user_id TEXT,
+  filter_id INTEGER,
+  filter_json TEXT,
+  FOREIGN KEY(user_id) REFERENCES users(id)
+);
+
+CREATE INDEX IF NOT EXISTS user_filters_by_user_id_filter_id ON user_filters(
+  user_id, filter_id
+);
diff --git a/synapse/storage/schema/filtering.sql b/synapse/storage/schema/filtering.sql
new file mode 100644
index 0000000000..beb39ca201
--- /dev/null
+++ b/synapse/storage/schema/filtering.sql
@@ -0,0 +1,24 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+CREATE TABLE IF NOT EXISTS user_filters(
+  user_id TEXT,
+  filter_id INTEGER,
+  filter_json TEXT,
+  FOREIGN KEY(user_id) REFERENCES users(id)
+);
+
+CREATE INDEX IF NOT EXISTS user_filters_by_user_id_filter_id ON user_filters(
+  user_id, filter_id
+);
diff --git a/synapse/storage/schema/pusher.sql b/synapse/storage/schema/pusher.sql
new file mode 100644
index 0000000000..3735b11547
--- /dev/null
+++ b/synapse/storage/schema/pusher.sql
@@ -0,0 +1,46 @@
+/* Copyright 2014 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-- Push notification endpoints that users have configured
+CREATE TABLE IF NOT EXISTS pushers (
+  id INTEGER PRIMARY KEY AUTOINCREMENT,
+  user_name TEXT NOT NULL,
+  profile_tag varchar(32) NOT NULL,
+  kind varchar(8) NOT NULL,
+  app_id varchar(64) NOT NULL,
+  app_display_name varchar(64) NOT NULL,
+  device_display_name varchar(128) NOT NULL,
+  pushkey blob NOT NULL,
+  ts BIGINT NOT NULL,
+  lang varchar(8),
+  data blob,
+  last_token TEXT,
+  last_success BIGINT,
+  failing_since BIGINT,
+  FOREIGN KEY(user_name) REFERENCES users(name),
+  UNIQUE (app_id, pushkey)
+);
+
+CREATE TABLE IF NOT EXISTS push_rules (
+  id INTEGER PRIMARY KEY AUTOINCREMENT,
+  user_name TEXT NOT NULL,
+  rule_id TEXT NOT NULL,
+  priority_class TINYINT NOT NULL,
+  priority INTEGER NOT NULL DEFAULT 0,
+  conditions TEXT NOT NULL,
+  actions TEXT NOT NULL,
+  UNIQUE(user_name, rule_id)
+);
+
+CREATE INDEX IF NOT EXISTS push_rules_user_name on push_rules (user_name);
diff --git a/synapse/storage/schema/rejections.sql b/synapse/storage/schema/rejections.sql
new file mode 100644
index 0000000000..bd2a8b1bb5
--- /dev/null
+++ b/synapse/storage/schema/rejections.sql
@@ -0,0 +1,21 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS rejections(
+    event_id TEXT NOT NULL,
+    reason TEXT NOT NULL,
+    last_check TEXT NOT NULL,
+    CONSTRAINT ev_id UNIQUE (event_id) ON CONFLICT REPLACE
+);
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 5327517704..71db16d0e5 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -78,12 +78,6 @@ class StateStore(SQLBaseStore):
             f,
         )
 
-    def store_state_groups(self, event):
-        return self.runInteraction(
-            "store_state_groups",
-            self._store_state_groups_txn, event
-        )
-
     def _store_state_groups_txn(self, txn, event, context):
         if context.current_state is None:
             return
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index bedc3c6c52..3ccb6f8a61 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -39,6 +39,8 @@ from ._base import SQLBaseStore
 from synapse.api.errors import SynapseError
 from synapse.util.logutils import log_function
 
+from collections import namedtuple
+
 import logging
 
 
@@ -52,91 +54,79 @@ _STREAM_TOKEN = "stream"
 _TOPOLOGICAL_TOKEN = "topological"
 
 
-def _parse_stream_token(string):
-    try:
-        if string[0] != 's':
-            raise
-        return int(string[1:])
-    except:
-        raise SynapseError(400, "Invalid token")
-
-
-def _parse_topological_token(string):
-    try:
-        if string[0] != 't':
-            raise
-        parts = string[1:].split('-', 1)
-        return (int(parts[0]), int(parts[1]))
-    except:
-        raise SynapseError(400, "Invalid token")
-
-
-def is_stream_token(string):
-    try:
-        _parse_stream_token(string)
-        return True
-    except:
-        return False
-
-
-def is_topological_token(string):
-    try:
-        _parse_topological_token(string)
-        return True
-    except:
-        return False
-
-
-def _get_token_bound(token, comparison):
-    try:
-        s = _parse_stream_token(token)
-        return "%s %s %d" % ("stream_ordering", comparison, s)
-    except:
-        pass
-
-    try:
-        top, stream = _parse_topological_token(token)
-        return "%s %s %d AND %s %s %d" % (
-            "topological_ordering", comparison, top,
-            "stream_ordering", comparison, stream,
-        )
-    except:
-        pass
-
-    raise SynapseError(400, "Invalid token")
-
-
-class StreamStore(SQLBaseStore):
-    @log_function
-    def get_room_events(self, user_id, from_key, to_key, room_id, limit=0,
-                        direction='f', with_feedback=False):
-        # We deal with events request in two different ways depending on if
-        # this looks like an /events request or a pagination request.
-        is_events = (
-            direction == 'f'
-            and user_id
-            and is_stream_token(from_key)
-            and to_key and is_stream_token(to_key)
-        )
+class _StreamToken(namedtuple("_StreamToken", "topological stream")):
+    """Tokens are positions between events. The token "s1" comes after event 1.
+
+            s0    s1
+            |     |
+        [0] V [1] V [2]
+
+    Tokens can either be a point in the live event stream or a cursor going
+    through historic events.
+
+    When traversing the live event stream events are ordered by when they
+    arrived at the homeserver.
+
+    When traversing historic events the events are ordered by their depth in
+    the event graph "topological_ordering" and then by when they arrived at the
+    homeserver "stream_ordering".
+
+    Live tokens start with an "s" followed by the "stream_ordering" id of the
+    event it comes after. Historic tokens start with a "t" followed by the
+    "topological_ordering" id of the event it comes after, follewed by "-",
+    followed by the "stream_ordering" id of the event it comes after.
+    """
+    __slots__ = []
+
+    @classmethod
+    def parse(cls, string):
+        try:
+            if string[0] == 's':
+                return cls(topological=None, stream=int(string[1:]))
+            if string[0] == 't':
+                parts = string[1:].split('-', 1)
+                return cls(topological=int(parts[0]), stream=int(parts[1]))
+        except:
+            pass
+        raise SynapseError(400, "Invalid token %r" % (string,))
+
+    @classmethod
+    def parse_stream_token(cls, string):
+        try:
+            if string[0] == 's':
+                return cls(topological=None, stream=int(string[1:]))
+        except:
+            pass
+        raise SynapseError(400, "Invalid token %r" % (string,))
+
+    def __str__(self):
+        if self.topological is not None:
+            return "t%d-%d" % (self.topological, self.stream)
+        else:
+            return "s%d" % (self.stream,)
 
-        if is_events:
-            return self.get_room_events_stream(
-                user_id=user_id,
-                from_key=from_key,
-                to_key=to_key,
-                room_id=room_id,
-                limit=limit,
-                with_feedback=with_feedback,
+    def lower_bound(self):
+        if self.topological is None:
+            return "(%d < %s)" % (self.stream, "stream_ordering")
+        else:
+            return "(%d < %s OR (%d == %s AND %d < %s))" % (
+                self.topological, "topological_ordering",
+                self.topological, "topological_ordering",
+                self.stream, "stream_ordering",
             )
+
+    def upper_bound(self):
+        if self.topological is None:
+            return "(%d >= %s)" % (self.stream, "stream_ordering")
         else:
-            return self.paginate_room_events(
-                from_key=from_key,
-                to_key=to_key,
-                room_id=room_id,
-                limit=limit,
-                with_feedback=with_feedback,
+            return "(%d > %s OR (%d == %s AND %d >= %s))" % (
+                self.topological, "topological_ordering",
+                self.topological, "topological_ordering",
+                self.stream, "stream_ordering",
             )
 
+
+class StreamStore(SQLBaseStore):
     @log_function
     def get_room_events_stream(self, user_id, from_key, to_key, room_id,
                                limit=0, with_feedback=False):
@@ -162,8 +152,8 @@ class StreamStore(SQLBaseStore):
             limit = MAX_STREAM_SIZE
 
         # From and to keys should be integers from ordering.
-        from_id = _parse_stream_token(from_key)
-        to_id = _parse_stream_token(to_key)
+        from_id = _StreamToken.parse_stream_token(from_key)
+        to_id = _StreamToken.parse_stream_token(to_key)
 
         if from_key == to_key:
             return defer.succeed(([], to_key))
@@ -181,7 +171,7 @@ class StreamStore(SQLBaseStore):
         }
 
         def f(txn):
-            txn.execute(sql, (user_id, user_id, from_id, to_id,))
+            txn.execute(sql, (user_id, user_id, from_id.stream, to_id.stream,))
 
             rows = self.cursor_to_dict(txn)
 
@@ -191,8 +181,11 @@ class StreamStore(SQLBaseStore):
                 get_prev_content=True
             )
 
+            self._set_before_and_after(ret, rows)
+
             if rows:
                 key = "s%d" % max([r["stream_ordering"] for r in rows])
+
             else:
                 # Assume we didn't get anything because there was nothing to
                 # get.
@@ -211,17 +204,21 @@ class StreamStore(SQLBaseStore):
         # Tokens really represent positions between elements, but we use
         # the convention of pointing to the event before the gap. Hence
         # we have a bit of asymmetry when it comes to equalities.
-        from_comp = '<=' if direction == 'b' else '>'
-        to_comp = '>' if direction == 'b' else '<='
-        order = "DESC" if direction == 'b' else "ASC"
-
         args = [room_id]
-
-        bounds = _get_token_bound(from_key, from_comp)
-        if to_key:
-            bounds = "%s AND %s" % (
-                bounds, _get_token_bound(to_key, to_comp)
-            )
+        if direction == 'b':
+            order = "DESC"
+            bounds = _StreamToken.parse(from_key).upper_bound()
+            if to_key:
+                bounds = "%s AND %s" % (
+                    bounds, _StreamToken.parse(to_key).lower_bound()
+                )
+        else:
+            order = "ASC"
+            bounds = _StreamToken.parse(from_key).lower_bound()
+            if to_key:
+                bounds = "%s AND %s" % (
+                    bounds, _StreamToken.parse(to_key).upper_bound()
+                )
 
         if int(limit) > 0:
             args.append(int(limit))
@@ -249,9 +246,13 @@ class StreamStore(SQLBaseStore):
                 topo = rows[-1]["topological_ordering"]
                 toke = rows[-1]["stream_ordering"]
                 if direction == 'b':
-                    topo -= 1
+                    # Tokens are positions between events.
+                    # This token points *after* the last event in the chunk.
+                    # We need it to point to the event before it in the chunk
+                    # when we are going backwards so we subtract one from the
+                    # stream part.
                     toke -= 1
-                next_token = "t%s-%s" % (topo, toke)
+                next_token = str(_StreamToken(topo, toke))
             else:
                 # TODO (erikj): We should work out what to do here instead.
                 next_token = to_key if to_key else from_key
@@ -262,35 +263,62 @@ class StreamStore(SQLBaseStore):
                 get_prev_content=True
             )
 
+            self._set_before_and_after(events, rows)
+
             return events, next_token,
 
         return self.runInteraction("paginate_room_events", f)
 
     def get_recent_events_for_room(self, room_id, limit, end_token,
-                                   with_feedback=False):
+                                   with_feedback=False, from_token=None):
         # TODO (erikj): Handle compressed feedback
 
-        sql = (
-            "SELECT stream_ordering, topological_ordering, event_id FROM events "
-            "WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0 "
-            "ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ? "
-        )
+        end_token = _StreamToken.parse_stream_token(end_token)
 
-        def f(txn):
-            txn.execute(sql, (room_id, end_token, limit,))
+        if from_token is None:
+            sql = (
+                "SELECT stream_ordering, topological_ordering, event_id"
+                " FROM events"
+                " WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0"
+                " ORDER BY topological_ordering DESC, stream_ordering DESC"
+                " LIMIT ?"
+            )
+        else:
+            from_token = _StreamToken.parse_stream_token(from_token)
+            sql = (
+                "SELECT stream_ordering, topological_ordering, event_id"
+                " FROM events"
+                " WHERE room_id = ? AND stream_ordering > ?"
+                " AND stream_ordering <= ? AND outlier = 0"
+                " ORDER BY topological_ordering DESC, stream_ordering DESC"
+                " LIMIT ?"
+            )
+
+        def get_recent_events_for_room_txn(txn):
+            if from_token is None:
+                txn.execute(sql, (room_id, end_token.stream, limit,))
+            else:
+                txn.execute(sql, (
+                    room_id, from_token.stream, end_token.stream, limit
+                ))
 
             rows = self.cursor_to_dict(txn)
 
             rows.reverse()  # As we selected with reverse ordering
 
             if rows:
+                # Tokens are positions between events.
+                # This token points *after* the last event in the chunk.
+                # We need it to point to the event before it in the chunk
+                # since we are going backwards so we subtract one from the
+                # stream part.
                 topo = rows[0]["topological_ordering"]
-                toke = rows[0]["stream_ordering"]
-                start_token = "t%s-%s" % (topo, toke)
+                toke = rows[0]["stream_ordering"] - 1
+                start_token = str(_StreamToken(topo, toke))
 
-                token = (start_token, end_token)
+                token = (start_token, str(end_token))
             else:
-                token = (end_token, end_token)
+                token = (str(end_token), str(end_token))
 
             events = self._get_events_txn(
                 txn,
@@ -298,9 +326,13 @@ class StreamStore(SQLBaseStore):
                 get_prev_content=True
             )
 
+            self._set_before_and_after(events, rows)
+
             return events, token
 
-        return self.runInteraction("get_recent_events_for_room", f)
+        return self.runInteraction(
+            "get_recent_events_for_room", get_recent_events_for_room_txn
+        )
 
     def get_room_events_max_id(self):
         return self.runInteraction(
@@ -322,3 +354,12 @@ class StreamStore(SQLBaseStore):
 
         key = res[0]["m"]
         return "s%d" % (key,)
+
+    @staticmethod
+    def _set_before_and_after(events, rows):
+        for event, row in zip(events, rows):
+            stream = row["stream_ordering"]
+            topo = event.depth
+            internal = event.internal_metadata
+            internal.before = str(_StreamToken(topo, stream - 1))
+            internal.after = str(_StreamToken(topo, stream))