summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py56
-rw-r--r--synapse/storage/_base.py34
-rw-r--r--synapse/storage/account_data.py4
-rw-r--r--synapse/storage/engines/__init__.py6
-rw-r--r--synapse/storage/engines/postgres.py8
-rw-r--r--synapse/storage/engines/sqlite3.py13
-rw-r--r--synapse/storage/event_federation.py16
-rw-r--r--synapse/storage/event_push_actions.py5
-rw-r--r--synapse/storage/events.py186
-rw-r--r--synapse/storage/prepare_database.py66
-rw-r--r--synapse/storage/presence.py6
-rw-r--r--synapse/storage/push_rule.py2
-rw-r--r--synapse/storage/pusher.py2
-rw-r--r--synapse/storage/receipts.py10
-rw-r--r--synapse/storage/registration.py17
-rw-r--r--synapse/storage/roommember.py144
-rw-r--r--synapse/storage/schema/delta/14/upgrade_appservice_db.py6
-rw-r--r--synapse/storage/schema/delta/20/pushers.py6
-rw-r--r--synapse/storage/schema/delta/25/fts.py6
-rw-r--r--synapse/storage/schema/delta/27/ts.py6
-rw-r--r--synapse/storage/schema/delta/30/as_users.py4
-rw-r--r--synapse/storage/schema/delta/30/state_stream.sql38
-rw-r--r--synapse/storage/schema/delta/31/invites.sql42
-rw-r--r--synapse/storage/state.py19
-rw-r--r--synapse/storage/stream.py2
-rw-r--r--synapse/storage/tags.py6
-rw-r--r--synapse/storage/util/id_generators.py63
27 files changed, 526 insertions, 247 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index aaad38039e..045ae6c03f 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -88,22 +88,17 @@ class DataStore(RoomMemberStore, RoomStore,
         self.hs = hs
         self.database_engine = hs.database_engine
 
-        cur = db_conn.cursor()
-        try:
-            cur.execute("SELECT MIN(stream_ordering) FROM events",)
-            rows = cur.fetchall()
-            self.min_stream_token = rows[0][0] if rows and rows[0] and rows[0][0] else -1
-            self.min_stream_token = min(self.min_stream_token, -1)
-        finally:
-            cur.close()
-
         self.client_ip_last_seen = Cache(
             name="client_ip_last_seen",
             keylen=4,
         )
 
         self._stream_id_gen = StreamIdGenerator(
-            db_conn, "events", "stream_ordering"
+            db_conn, "events", "stream_ordering",
+            extra_tables=[("local_invites", "stream_id")]
+        )
+        self._backfill_id_gen = StreamIdGenerator(
+            db_conn, "events", "stream_ordering", step=-1
         )
         self._receipts_id_gen = StreamIdGenerator(
             db_conn, "receipts_linearized", "stream_id"
@@ -129,7 +124,7 @@ class DataStore(RoomMemberStore, RoomStore,
             extra_tables=[("deleted_pushers", "stream_id")],
         )
 
-        events_max = self._stream_id_gen.get_max_token()
+        events_max = self._stream_id_gen.get_current_token()
         event_cache_prefill, min_event_val = self._get_cache_dict(
             db_conn, "events",
             entity_column="room_id",
@@ -145,7 +140,7 @@ class DataStore(RoomMemberStore, RoomStore,
             "MembershipStreamChangeCache", events_max,
         )
 
-        account_max = self._account_data_id_gen.get_max_token()
+        account_max = self._account_data_id_gen.get_current_token()
         self._account_data_stream_cache = StreamChangeCache(
             "AccountDataAndTagsChangeCache", account_max,
         )
@@ -156,7 +151,7 @@ class DataStore(RoomMemberStore, RoomStore,
             db_conn, "presence_stream",
             entity_column="user_id",
             stream_column="stream_id",
-            max_value=self._presence_id_gen.get_max_token(),
+            max_value=self._presence_id_gen.get_current_token(),
         )
         self.presence_stream_cache = StreamChangeCache(
             "PresenceStreamChangeCache", min_presence_val,
@@ -167,7 +162,7 @@ class DataStore(RoomMemberStore, RoomStore,
             db_conn, "push_rules_stream",
             entity_column="user_id",
             stream_column="stream_id",
-            max_value=self._push_rules_stream_id_gen.get_max_token()[0],
+            max_value=self._push_rules_stream_id_gen.get_current_token()[0],
         )
 
         self.push_rules_stream_cache = StreamChangeCache(
@@ -182,39 +177,6 @@ class DataStore(RoomMemberStore, RoomStore,
         self.__presence_on_startup = None
         return active_on_startup
 
-    def _get_cache_dict(self, db_conn, table, entity_column, stream_column, max_value):
-        # Fetch a mapping of room_id -> max stream position for "recent" rooms.
-        # It doesn't really matter how many we get, the StreamChangeCache will
-        # do the right thing to ensure it respects the max size of cache.
-        sql = (
-            "SELECT %(entity)s, MAX(%(stream)s) FROM %(table)s"
-            " WHERE %(stream)s > ? - 100000"
-            " GROUP BY %(entity)s"
-        ) % {
-            "table": table,
-            "entity": entity_column,
-            "stream": stream_column,
-        }
-
-        sql = self.database_engine.convert_param_style(sql)
-
-        txn = db_conn.cursor()
-        txn.execute(sql, (int(max_value),))
-        rows = txn.fetchall()
-        txn.close()
-
-        cache = {
-            row[0]: int(row[1])
-            for row in rows
-        }
-
-        if cache:
-            min_val = min(cache.values())
-        else:
-            min_val = max_value
-
-        return cache, min_val
-
     def _get_active_presence(self, db_conn):
         """Fetch non-offline presence from the database so that we can register
         the appropriate time outs.
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index b75b79df36..04d7fcf6d6 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -816,6 +816,40 @@ class SQLBaseStore(object):
             self._next_stream_id += 1
             return i
 
+    def _get_cache_dict(self, db_conn, table, entity_column, stream_column,
+                        max_value):
+        # Fetch a mapping of room_id -> max stream position for "recent" rooms.
+        # It doesn't really matter how many we get, the StreamChangeCache will
+        # do the right thing to ensure it respects the max size of cache.
+        sql = (
+            "SELECT %(entity)s, MAX(%(stream)s) FROM %(table)s"
+            " WHERE %(stream)s > ? - 100000"
+            " GROUP BY %(entity)s"
+        ) % {
+            "table": table,
+            "entity": entity_column,
+            "stream": stream_column,
+        }
+
+        sql = self.database_engine.convert_param_style(sql)
+
+        txn = db_conn.cursor()
+        txn.execute(sql, (int(max_value),))
+        rows = txn.fetchall()
+        txn.close()
+
+        cache = {
+            row[0]: int(row[1])
+            for row in rows
+        }
+
+        if cache:
+            min_val = min(cache.values())
+        else:
+            min_val = max_value
+
+        return cache, min_val
+
 
 class _RollbackButIsFineException(Exception):
     """ This exception is used to rollback a transaction without implying
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index faddefe219..7a7fbf1e52 100644
--- a/synapse/storage/account_data.py
+++ b/synapse/storage/account_data.py
@@ -200,7 +200,7 @@ class AccountDataStore(SQLBaseStore):
                 "add_room_account_data", add_account_data_txn, next_id
             )
 
-        result = self._account_data_id_gen.get_max_token()
+        result = self._account_data_id_gen.get_current_token()
         defer.returnValue(result)
 
     @defer.inlineCallbacks
@@ -239,7 +239,7 @@ class AccountDataStore(SQLBaseStore):
                 "add_user_account_data", add_account_data_txn, next_id
             )
 
-        result = self._account_data_id_gen.get_max_token()
+        result = self._account_data_id_gen.get_current_token()
         defer.returnValue(result)
 
     def _update_max_stream_id(self, txn, next_id):
diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py
index a48230b93f..7bb5de1fe7 100644
--- a/synapse/storage/engines/__init__.py
+++ b/synapse/storage/engines/__init__.py
@@ -26,13 +26,13 @@ SUPPORTED_MODULE = {
 }
 
 
-def create_engine(config):
-    name = config.database_config["name"]
+def create_engine(database_config):
+    name = database_config["name"]
     engine_class = SUPPORTED_MODULE.get(name, None)
 
     if engine_class:
         module = importlib.import_module(name)
-        return engine_class(module, config=config)
+        return engine_class(module)
 
     raise RuntimeError(
         "Unsupported database engine '%s'" % (name,)
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index a09685b4df..c2290943b4 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -13,18 +13,15 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.storage.prepare_database import prepare_database
-
 from ._base import IncorrectDatabaseSetup
 
 
 class PostgresEngine(object):
     single_threaded = False
 
-    def __init__(self, database_module, config):
+    def __init__(self, database_module):
         self.module = database_module
         self.module.extensions.register_type(self.module.extensions.UNICODE)
-        self.config = config
 
     def check_database(self, txn):
         txn.execute("SHOW SERVER_ENCODING")
@@ -44,9 +41,6 @@ class PostgresEngine(object):
             self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ
         )
 
-    def prepare_database(self, db_conn):
-        prepare_database(db_conn, self, config=self.config)
-
     def is_deadlock(self, error):
         if isinstance(error, self.module.DatabaseError):
             return error.pgcode in ["40001", "40P01"]
diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py
index 522b905949..14203aa500 100644
--- a/synapse/storage/engines/sqlite3.py
+++ b/synapse/storage/engines/sqlite3.py
@@ -13,9 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.storage.prepare_database import (
-    prepare_database, prepare_sqlite3_database
-)
+from synapse.storage.prepare_database import prepare_database
 
 import struct
 
@@ -23,9 +21,8 @@ import struct
 class Sqlite3Engine(object):
     single_threaded = True
 
-    def __init__(self, database_module, config):
+    def __init__(self, database_module):
         self.module = database_module
-        self.config = config
 
     def check_database(self, txn):
         pass
@@ -34,13 +31,9 @@ class Sqlite3Engine(object):
         return sql
 
     def on_new_connection(self, db_conn):
-        self.prepare_database(db_conn)
+        prepare_database(db_conn, self, config=None)
         db_conn.create_function("rank", 1, _rank)
 
-    def prepare_database(self, db_conn):
-        prepare_sqlite3_database(db_conn)
-        prepare_database(db_conn, self, config=self.config)
-
     def is_deadlock(self, error):
         return False
 
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 3489315e0d..0827946207 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -163,6 +163,22 @@ class EventFederationStore(SQLBaseStore):
             room_id,
         )
 
+    @defer.inlineCallbacks
+    def get_max_depth_of_events(self, event_ids):
+        sql = (
+            "SELECT MAX(depth) FROM events WHERE event_id IN (%s)"
+        ) % (",".join(["?"] * len(event_ids)),)
+
+        rows = yield self._execute(
+            "get_max_depth_of_events", None,
+            sql, *event_ids
+        )
+
+        if rows:
+            defer.returnValue(rows[0][0])
+        else:
+            defer.returnValue(1)
+
     def _get_min_depth_interaction(self, txn, room_id):
         min_depth = self._simple_select_one_onecol_txn(
             txn,
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index dc5830450a..3933b6e2c5 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -26,8 +26,9 @@ logger = logging.getLogger(__name__)
 class EventPushActionsStore(SQLBaseStore):
     def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
         """
-        :param event: the event set actions for
-        :param tuples: list of tuples of (user_id, actions)
+        Args:
+            event: the event set actions for
+            tuples: list of tuples of (user_id, actions)
         """
         values = []
         for uid, actions in tuples:
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index e0ef7f46b2..ee87a71719 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -24,8 +24,7 @@ from synapse.util.logutils import log_function
 from synapse.api.constants import EventTypes
 
 from canonicaljson import encode_canonical_json
-from contextlib import contextmanager
-
+from collections import namedtuple
 
 import logging
 import math
@@ -61,20 +60,14 @@ class EventsStore(SQLBaseStore):
         )
 
     @defer.inlineCallbacks
-    def persist_events(self, events_and_contexts, backfilled=False,
-                       is_new_state=True):
+    def persist_events(self, events_and_contexts, backfilled=False):
         if not events_and_contexts:
             return
 
         if backfilled:
-            start = self.min_stream_token - 1
-            self.min_stream_token -= len(events_and_contexts) + 1
-            stream_orderings = range(start, self.min_stream_token, -1)
-
-            @contextmanager
-            def stream_ordering_manager():
-                yield stream_orderings
-            stream_ordering_manager = stream_ordering_manager()
+            stream_ordering_manager = self._backfill_id_gen.get_next_mult(
+                len(events_and_contexts)
+            )
         else:
             stream_ordering_manager = self._stream_id_gen.get_next_mult(
                 len(events_and_contexts)
@@ -110,13 +103,11 @@ class EventsStore(SQLBaseStore):
                         self._persist_events_txn,
                         events_and_contexts=chunk,
                         backfilled=backfilled,
-                        is_new_state=is_new_state,
                     )
 
     @defer.inlineCallbacks
     @log_function
-    def persist_event(self, event, context,
-                      is_new_state=True, current_state=None):
+    def persist_event(self, event, context, current_state=None):
 
         try:
             with self._stream_id_gen.get_next() as stream_ordering:
@@ -128,13 +119,12 @@ class EventsStore(SQLBaseStore):
                         self._persist_event_txn,
                         event=event,
                         context=context,
-                        is_new_state=is_new_state,
                         current_state=current_state,
                     )
         except _RollbackButIsFineException:
             pass
 
-        max_persisted_id = yield self._stream_id_gen.get_max_token()
+        max_persisted_id = yield self._stream_id_gen.get_current_token()
         defer.returnValue((stream_ordering, max_persisted_id))
 
     @defer.inlineCallbacks
@@ -194,8 +184,7 @@ class EventsStore(SQLBaseStore):
         defer.returnValue({e.event_id: e for e in events})
 
     @log_function
-    def _persist_event_txn(self, txn, event, context,
-                           is_new_state, current_state):
+    def _persist_event_txn(self, txn, event, context, current_state):
         # We purposefully do this first since if we include a `current_state`
         # key, we *want* to update the `current_state_events` table
         if current_state:
@@ -203,7 +192,16 @@ class EventsStore(SQLBaseStore):
             txn.call_after(self.get_rooms_for_user.invalidate_all)
             txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
             txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
-            txn.call_after(self.get_room_name_and_aliases, event.room_id)
+            txn.call_after(self.get_room_name_and_aliases.invalidate, (event.room_id,))
+
+            # Add an entry to the current_state_resets table to record the point
+            # where we clobbered the current state
+            stream_order = event.internal_metadata.stream_ordering
+            self._simple_insert_txn(
+                txn,
+                table="current_state_resets",
+                values={"event_stream_ordering": stream_order}
+            )
 
             self._simple_delete_txn(
                 txn,
@@ -227,12 +225,10 @@ class EventsStore(SQLBaseStore):
             txn,
             [(event, context)],
             backfilled=False,
-            is_new_state=is_new_state,
         )
 
     @log_function
-    def _persist_events_txn(self, txn, events_and_contexts, backfilled,
-                            is_new_state):
+    def _persist_events_txn(self, txn, events_and_contexts, backfilled):
         depth_updates = {}
         for event, context in events_and_contexts:
             # Remove the any existing cache entries for the event_ids
@@ -314,6 +310,18 @@ class EventsStore(SQLBaseStore):
                     (metadata_json, event.event_id,)
                 )
 
+                stream_order = event.internal_metadata.stream_ordering
+                state_group_id = context.state_group or context.new_state_group_id
+                self._simple_insert_txn(
+                    txn,
+                    table="ex_outlier_stream",
+                    values={
+                        "event_stream_ordering": stream_order,
+                        "event_id": event.event_id,
+                        "state_group": state_group_id,
+                    }
+                )
+
                 sql = (
                     "UPDATE events SET outlier = ?"
                     " WHERE event_id = ?"
@@ -359,7 +367,8 @@ class EventsStore(SQLBaseStore):
                 event
                 for event, _ in events_and_contexts
                 if event.type == EventTypes.Member
-            ]
+            ],
+            backfilled=backfilled,
         )
 
         def event_dict(event):
@@ -431,10 +440,9 @@ class EventsStore(SQLBaseStore):
             txn, [event for event, _ in events_and_contexts]
         )
 
-        state_events_and_contexts = filter(
-            lambda i: i[0].is_state(),
-            events_and_contexts,
-        )
+        state_events_and_contexts = [
+            ec for ec in events_and_contexts if ec[0].is_state()
+        ]
 
         state_values = []
         for event, context in state_events_and_contexts:
@@ -472,32 +480,44 @@ class EventsStore(SQLBaseStore):
             ],
         )
 
-        if is_new_state:
-            for event, _ in state_events_and_contexts:
-                if not context.rejected:
-                    txn.call_after(
-                        self._get_current_state_for_key.invalidate,
-                        (event.room_id, event.type, event.state_key,)
-                    )
+        if backfilled:
+            # Backfilled events come before the current state so we don't need
+            # to update the current state table
+            return
 
-                    if event.type in [EventTypes.Name, EventTypes.Aliases]:
-                        txn.call_after(
-                            self.get_room_name_and_aliases.invalidate,
-                            (event.room_id,)
-                        )
-
-                    self._simple_upsert_txn(
-                        txn,
-                        "current_state_events",
-                        keyvalues={
-                            "room_id": event.room_id,
-                            "type": event.type,
-                            "state_key": event.state_key,
-                        },
-                        values={
-                            "event_id": event.event_id,
-                        }
-                    )
+        for event, _ in state_events_and_contexts:
+            if event.internal_metadata.is_outlier():
+                # Outlier events shouldn't clobber the current state.
+                continue
+
+            if context.rejected:
+                # If the event failed it's auth checks then it shouldn't
+                # clobbler the current state.
+                continue
+
+            txn.call_after(
+                self._get_current_state_for_key.invalidate,
+                (event.room_id, event.type, event.state_key,)
+            )
+
+            if event.type in [EventTypes.Name, EventTypes.Aliases]:
+                txn.call_after(
+                    self.get_room_name_and_aliases.invalidate,
+                    (event.room_id,)
+                )
+
+            self._simple_upsert_txn(
+                txn,
+                "current_state_events",
+                keyvalues={
+                    "room_id": event.room_id,
+                    "type": event.type,
+                    "state_key": event.state_key,
+                },
+                values={
+                    "event_id": event.event_id,
+                }
+            )
 
         return
 
@@ -1086,10 +1106,7 @@ class EventsStore(SQLBaseStore):
 
     def get_current_backfill_token(self):
         """The current minimum token that backfilled events have reached"""
-
-        # TODO: Fix race with the persit_event txn by using one of the
-        # stream id managers
-        return -self.min_stream_token
+        return -self._backfill_id_gen.get_current_token()
 
     def get_all_new_events(self, last_backfill_id, last_forward_id,
                            current_backfill_id, current_forward_id, limit):
@@ -1110,8 +1127,34 @@ class EventsStore(SQLBaseStore):
             if last_forward_id != current_forward_id:
                 txn.execute(sql, (last_forward_id, current_forward_id, limit))
                 new_forward_events = txn.fetchall()
+
+                if len(new_forward_events) == limit:
+                    upper_bound = new_forward_events[-1][0]
+                else:
+                    upper_bound = current_forward_id
+
+                sql = (
+                    "SELECT event_stream_ordering FROM current_state_resets"
+                    " WHERE ? < event_stream_ordering"
+                    " AND event_stream_ordering <= ?"
+                    " ORDER BY event_stream_ordering ASC"
+                )
+                txn.execute(sql, (last_forward_id, upper_bound))
+                state_resets = txn.fetchall()
+
+                sql = (
+                    "SELECT event_stream_ordering, event_id, state_group"
+                    " FROM ex_outlier_stream"
+                    " WHERE ? > event_stream_ordering"
+                    " AND event_stream_ordering >= ?"
+                    " ORDER BY event_stream_ordering DESC"
+                )
+                txn.execute(sql, (last_forward_id, upper_bound))
+                forward_ex_outliers = txn.fetchall()
             else:
                 new_forward_events = []
+                state_resets = []
+                forward_ex_outliers = []
 
             sql = (
                 "SELECT -e.stream_ordering, ej.internal_metadata, ej.json,"
@@ -1128,8 +1171,35 @@ class EventsStore(SQLBaseStore):
             if last_backfill_id != current_backfill_id:
                 txn.execute(sql, (-last_backfill_id, -current_backfill_id, limit))
                 new_backfill_events = txn.fetchall()
+
+                if len(new_backfill_events) == limit:
+                    upper_bound = new_backfill_events[-1][0]
+                else:
+                    upper_bound = current_backfill_id
+
+                sql = (
+                    "SELECT -event_stream_ordering, event_id, state_group"
+                    " FROM ex_outlier_stream"
+                    " WHERE ? > event_stream_ordering"
+                    " AND event_stream_ordering >= ?"
+                    " ORDER BY event_stream_ordering DESC"
+                )
+                txn.execute(sql, (-last_backfill_id, -upper_bound))
+                backward_ex_outliers = txn.fetchall()
             else:
                 new_backfill_events = []
+                backward_ex_outliers = []
 
-            return (new_forward_events, new_backfill_events)
+            return AllNewEventsResult(
+                new_forward_events, new_backfill_events,
+                forward_ex_outliers, backward_ex_outliers,
+                state_resets,
+            )
         return self.runInteraction("get_all_new_events", get_all_new_events_txn)
+
+
+AllNewEventsResult = namedtuple("AllNewEventsResult", [
+    "new_forward_events", "new_backfill_events",
+    "forward_ex_outliers", "backward_ex_outliers",
+    "state_resets"
+])
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 3f29aad1e8..00833422af 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
 
 # Remember to update this number every time a change is made to database
 # schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 30
+SCHEMA_VERSION = 31
 
 dir_path = os.path.abspath(os.path.dirname(__file__))
 
@@ -53,6 +53,9 @@ class UpgradeDatabaseException(PrepareDatabaseException):
 def prepare_database(db_conn, database_engine, config):
     """Prepares a database for usage. Will either create all necessary tables
     or upgrade from an older schema version.
+
+    If `config` is None then prepare_database will assert that no upgrade is
+    necessary, *or* will create a fresh database if the database is empty.
     """
     try:
         cur = db_conn.cursor()
@@ -60,13 +63,18 @@ def prepare_database(db_conn, database_engine, config):
 
         if version_info:
             user_version, delta_files, upgraded = version_info
-            _upgrade_existing_database(
-                cur, user_version, delta_files, upgraded, database_engine, config
-            )
-        else:
-            _setup_new_database(cur, database_engine, config)
 
-        # cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,))
+            if config is None:
+                if user_version != SCHEMA_VERSION:
+                    # If we don't pass in a config file then we are expecting to
+                    # have already upgraded the DB.
+                    raise UpgradeDatabaseException("Database needs to be upgraded")
+            else:
+                _upgrade_existing_database(
+                    cur, user_version, delta_files, upgraded, database_engine, config
+                )
+        else:
+            _setup_new_database(cur, database_engine)
 
         cur.close()
         db_conn.commit()
@@ -75,7 +83,7 @@ def prepare_database(db_conn, database_engine, config):
         raise
 
 
-def _setup_new_database(cur, database_engine, config):
+def _setup_new_database(cur, database_engine):
     """Sets up the database by finding a base set of "full schemas" and then
     applying any necessary deltas.
 
@@ -148,12 +156,13 @@ def _setup_new_database(cur, database_engine, config):
         applied_delta_files=[],
         upgraded=False,
         database_engine=database_engine,
-        config=config,
+        config=None,
+        is_empty=True,
     )
 
 
 def _upgrade_existing_database(cur, current_version, applied_delta_files,
-                               upgraded, database_engine, config):
+                               upgraded, database_engine, config, is_empty=False):
     """Upgrades an existing database.
 
     Delta files can either be SQL stored in *.sql files, or python modules
@@ -246,7 +255,9 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
                         module_name, absolute_path, python_file
                     )
                 logger.debug("Running script %s", relative_path)
-                module.run_upgrade(cur, database_engine, config=config)
+                module.run_create(cur, database_engine)
+                if not is_empty:
+                    module.run_upgrade(cur, database_engine, config=config)
             elif ext == ".pyc":
                 # Sometimes .pyc files turn up anyway even though we've
                 # disabled their generation; e.g. from distribution package
@@ -361,36 +372,3 @@ def _get_or_create_schema_state(txn, database_engine):
         return current_version, applied_deltas, upgraded
 
     return None
-
-
-def prepare_sqlite3_database(db_conn):
-    """This function should be called before `prepare_database` on sqlite3
-    databases.
-
-    Since we changed the way we store the current schema version and handle
-    updates to schemas, we need a way to upgrade from the old method to the
-    new. This only affects sqlite databases since they were the only ones
-    supported at the time.
-    """
-    with db_conn:
-        schema_path = os.path.join(
-            dir_path, "schema", "schema_version.sql",
-        )
-        create_schema = read_schema(schema_path)
-        db_conn.executescript(create_schema)
-
-        c = db_conn.execute("SELECT * FROM schema_version")
-        rows = c.fetchall()
-        c.close()
-
-        if not rows:
-            c = db_conn.execute("PRAGMA user_version")
-            row = c.fetchone()
-            c.close()
-
-            if row and row[0]:
-                db_conn.execute(
-                    "REPLACE INTO schema_version (version, upgraded)"
-                    " VALUES (?,?)",
-                    (row[0], False)
-                )
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 4cec31e316..59b4ef5ce6 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -68,7 +68,9 @@ class PresenceStore(SQLBaseStore):
                 self._update_presence_txn, stream_orderings, presence_states,
             )
 
-        defer.returnValue((stream_orderings[-1], self._presence_id_gen.get_max_token()))
+        defer.returnValue((
+            stream_orderings[-1], self._presence_id_gen.get_current_token()
+        ))
 
     def _update_presence_txn(self, txn, stream_orderings, presence_states):
         for stream_id, state in zip(stream_orderings, presence_states):
@@ -155,7 +157,7 @@ class PresenceStore(SQLBaseStore):
         defer.returnValue([UserPresenceState(**row) for row in rows])
 
     def get_current_presence_token(self):
-        return self._presence_id_gen.get_max_token()
+        return self._presence_id_gen.get_current_token()
 
     def allow_presence_visible(self, observed_localpart, observer_userid):
         return self._simple_insert(
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index 9dbad2fd5f..d2bf7f2aec 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -392,7 +392,7 @@ class PushRuleStore(SQLBaseStore):
         """Get the position of the push rules stream.
         Returns a pair of a stream id for the push_rules stream and the
         room stream ordering it corresponds to."""
-        return self._push_rules_stream_id_gen.get_max_token()
+        return self._push_rules_stream_id_gen.get_current_token()
 
     def have_push_rules_changed_for_user(self, user_id, last_id):
         if not self.push_rules_stream_cache.has_entity_changed(user_id, last_id):
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 87b2ac5773..d1669c778a 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -78,7 +78,7 @@ class PusherStore(SQLBaseStore):
         defer.returnValue(rows)
 
     def get_pushers_stream_token(self):
-        return self._pushers_id_gen.get_max_token()
+        return self._pushers_id_gen.get_current_token()
 
     def get_all_updated_pushers(self, last_id, current_id, limit):
         def get_all_updated_pushers_txn(txn):
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 6b9d848eaa..7fdd84bbdc 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -31,7 +31,7 @@ class ReceiptsStore(SQLBaseStore):
         super(ReceiptsStore, self).__init__(hs)
 
         self._receipts_stream_cache = StreamChangeCache(
-            "ReceiptsRoomChangeCache", self._receipts_id_gen.get_max_token()
+            "ReceiptsRoomChangeCache", self._receipts_id_gen.get_current_token()
         )
 
     @cached(num_args=2)
@@ -160,8 +160,8 @@ class ReceiptsStore(SQLBaseStore):
             "content": content,
         }])
 
-    @cachedList(cache=get_linearized_receipts_for_room.cache, list_name="room_ids",
-                num_args=3, inlineCallbacks=True)
+    @cachedList(cached_method_name="get_linearized_receipts_for_room",
+                list_name="room_ids", num_args=3, inlineCallbacks=True)
     def _get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None):
         if not room_ids:
             defer.returnValue({})
@@ -221,7 +221,7 @@ class ReceiptsStore(SQLBaseStore):
         defer.returnValue(results)
 
     def get_max_receipt_stream_id(self):
-        return self._receipts_id_gen.get_max_token()
+        return self._receipts_id_gen.get_current_token()
 
     def insert_linearized_receipt_txn(self, txn, room_id, receipt_type,
                                       user_id, event_id, data, stream_id):
@@ -346,7 +346,7 @@ class ReceiptsStore(SQLBaseStore):
             room_id, receipt_type, user_id, event_ids, data
         )
 
-        max_persisted_id = self._stream_id_gen.get_max_token()
+        max_persisted_id = self._stream_id_gen.get_current_token()
 
         defer.returnValue((stream_id, max_persisted_id))
 
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index bd4eb88a92..1f71773aaa 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -319,7 +319,7 @@ class RegistrationStore(SQLBaseStore):
 
         defer.returnValue(res if res else False)
 
-    @cachedList(cache=is_guest.cache, list_name="user_ids", num_args=1,
+    @cachedList(cached_method_name="is_guest", list_name="user_ids", num_args=1,
                 inlineCallbacks=True)
     def are_guests(self, user_ids):
         sql = "SELECT name, is_guest FROM users WHERE name IN (%s)" % (
@@ -458,12 +458,15 @@ class RegistrationStore(SQLBaseStore):
         """
         Gets the 3pid's guest access token if exists, else saves access_token.
 
-        :param medium (str): Medium of the 3pid. Must be "email".
-        :param address (str): 3pid address.
-        :param access_token (str): The access token to persist if none is
-            already persisted.
-        :param inviter_user_id (str): User ID of the inviter.
-        :return (deferred str): Whichever access token is persisted at the end
+        Args:
+            medium (str): Medium of the 3pid. Must be "email".
+            address (str): 3pid address.
+            access_token (str): The access token to persist if none is
+                already persisted.
+            inviter_user_id (str): User ID of the inviter.
+
+        Returns:
+            deferred str: Whichever access token is persisted at the end
             of this function call.
         """
         def insert(txn):
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 430b49c12e..66e7a40e3c 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -36,7 +36,7 @@ RoomsForUser = namedtuple(
 
 class RoomMemberStore(SQLBaseStore):
 
-    def _store_room_members_txn(self, txn, events):
+    def _store_room_members_txn(self, txn, events, backfilled):
         """Store a room member in the database.
         """
         self._simple_insert_many_txn(
@@ -62,6 +62,64 @@ class RoomMemberStore(SQLBaseStore):
                 self._membership_stream_cache.entity_has_changed,
                 event.state_key, event.internal_metadata.stream_ordering
             )
+            txn.call_after(
+                self.get_invited_rooms_for_user.invalidate, (event.state_key,)
+            )
+
+            # We update the local_invites table only if the event is "current",
+            # i.e., its something that has just happened.
+            # The only current event that can also be an outlier is if its an
+            # invite that has come in across federation.
+            is_new_state = not backfilled and (
+                not event.internal_metadata.is_outlier()
+                or event.internal_metadata.is_invite_from_remote()
+            )
+            is_mine = self.hs.is_mine_id(event.state_key)
+            if is_new_state and is_mine:
+                if event.membership == Membership.INVITE:
+                    self._simple_insert_txn(
+                        txn,
+                        table="local_invites",
+                        values={
+                            "event_id": event.event_id,
+                            "invitee": event.state_key,
+                            "inviter": event.sender,
+                            "room_id": event.room_id,
+                            "stream_id": event.internal_metadata.stream_ordering,
+                        }
+                    )
+                else:
+                    sql = (
+                        "UPDATE local_invites SET stream_id = ?, replaced_by = ? WHERE"
+                        " room_id = ? AND invitee = ? AND locally_rejected is NULL"
+                        " AND replaced_by is NULL"
+                    )
+
+                    txn.execute(sql, (
+                        event.internal_metadata.stream_ordering,
+                        event.event_id,
+                        event.room_id,
+                        event.state_key,
+                    ))
+
+    @defer.inlineCallbacks
+    def locally_reject_invite(self, user_id, room_id):
+        sql = (
+            "UPDATE local_invites SET stream_id = ?, locally_rejected = ? WHERE"
+            " room_id = ? AND invitee = ? AND locally_rejected is NULL"
+            " AND replaced_by is NULL"
+        )
+
+        def f(txn, stream_ordering):
+            txn.execute(sql, (
+                stream_ordering,
+                True,
+                room_id,
+                user_id,
+            ))
+
+        with self._stream_id_gen.get_next() as stream_ordering:
+            yield self.runInteraction("locally_reject_invite", f, stream_ordering)
 
     def get_room_member(self, user_id, room_id):
         """Retrieve the current state of a room member.
@@ -127,6 +185,24 @@ class RoomMemberStore(SQLBaseStore):
             user_id, [Membership.INVITE]
         )
 
+    @defer.inlineCallbacks
+    def get_invite_for_user_in_room(self, user_id, room_id):
+        """Gets the invite for the given user and room
+
+        Args:
+            user_id (str)
+            room_id (str)
+
+        Returns:
+            Deferred: Resolves to either a RoomsForUser or None if no invite was
+                found.
+        """
+        invites = yield self.get_invited_rooms_for_user(user_id)
+        for invite in invites:
+            if invite.room_id == room_id:
+                defer.returnValue(invite)
+        defer.returnValue(None)
+
     def get_leave_and_ban_events_for_user(self, user_id):
         """ Get all the leave events for a user
         Args:
@@ -163,29 +239,55 @@ class RoomMemberStore(SQLBaseStore):
 
     def _get_rooms_for_user_where_membership_is_txn(self, txn, user_id,
                                                     membership_list):
-        where_clause = "user_id = ? AND (%s) AND forgotten = 0" % (
-            " OR ".join(["membership = ?" for _ in membership_list]),
-        )
 
-        args = [user_id]
-        args.extend(membership_list)
+        do_invite = Membership.INVITE in membership_list
+        membership_list = [m for m in membership_list if m != Membership.INVITE]
 
-        sql = (
-            "SELECT m.room_id, m.sender, m.membership, m.event_id, e.stream_ordering"
-            " FROM current_state_events as c"
-            " INNER JOIN room_memberships as m"
-            " ON m.event_id = c.event_id"
-            " INNER JOIN events as e"
-            " ON e.event_id = c.event_id"
-            " AND m.room_id = c.room_id"
-            " AND m.user_id = c.state_key"
-            " WHERE %s"
-        ) % (where_clause,)
+        results = []
+        if membership_list:
+            where_clause = "user_id = ? AND (%s) AND forgotten = 0" % (
+                " OR ".join(["membership = ?" for _ in membership_list]),
+            )
+
+            args = [user_id]
+            args.extend(membership_list)
+
+            sql = (
+                "SELECT m.room_id, m.sender, m.membership, m.event_id, e.stream_ordering"
+                " FROM current_state_events as c"
+                " INNER JOIN room_memberships as m"
+                " ON m.event_id = c.event_id"
+                " INNER JOIN events as e"
+                " ON e.event_id = c.event_id"
+                " AND m.room_id = c.room_id"
+                " AND m.user_id = c.state_key"
+                " WHERE %s"
+            ) % (where_clause,)
+
+            txn.execute(sql, args)
+            results = [
+                RoomsForUser(**r) for r in self.cursor_to_dict(txn)
+            ]
+
+        if do_invite:
+            sql = (
+                "SELECT i.room_id, inviter, i.event_id, e.stream_ordering"
+                " FROM local_invites as i"
+                " INNER JOIN events as e USING (event_id)"
+                " WHERE invitee = ? AND locally_rejected is NULL"
+                " AND replaced_by is NULL"
+            )
+
+            txn.execute(sql, (user_id,))
+            results.extend(RoomsForUser(
+                room_id=r["room_id"],
+                sender=r["inviter"],
+                event_id=r["event_id"],
+                stream_ordering=r["stream_ordering"],
+                membership=Membership.INVITE,
+            ) for r in self.cursor_to_dict(txn))
 
-        txn.execute(sql, args)
-        return [
-            RoomsForUser(**r) for r in self.cursor_to_dict(txn)
-        ]
+        return results
 
     @cached(max_entries=5000)
     def get_joined_hosts_for_room(self, room_id):
diff --git a/synapse/storage/schema/delta/14/upgrade_appservice_db.py b/synapse/storage/schema/delta/14/upgrade_appservice_db.py
index 5c40a77757..8755bb2e49 100644
--- a/synapse/storage/schema/delta/14/upgrade_appservice_db.py
+++ b/synapse/storage/schema/delta/14/upgrade_appservice_db.py
@@ -18,7 +18,7 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-def run_upgrade(cur, *args, **kwargs):
+def run_create(cur, *args, **kwargs):
     cur.execute("SELECT id, regex FROM application_services_regex")
     for row in cur.fetchall():
         try:
@@ -35,3 +35,7 @@ def run_upgrade(cur, *args, **kwargs):
                 "UPDATE application_services_regex SET regex=? WHERE id=?",
                 (new_regex, row[0])
             )
+
+
+def run_upgrade(*args, **kwargs):
+    pass
diff --git a/synapse/storage/schema/delta/20/pushers.py b/synapse/storage/schema/delta/20/pushers.py
index 29164732af..147496a38b 100644
--- a/synapse/storage/schema/delta/20/pushers.py
+++ b/synapse/storage/schema/delta/20/pushers.py
@@ -27,7 +27,7 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-def run_upgrade(cur, database_engine, *args, **kwargs):
+def run_create(cur, database_engine, *args, **kwargs):
     logger.info("Porting pushers table...")
     cur.execute("""
         CREATE TABLE IF NOT EXISTS pushers2 (
@@ -74,3 +74,7 @@ def run_upgrade(cur, database_engine, *args, **kwargs):
     cur.execute("DROP TABLE pushers")
     cur.execute("ALTER TABLE pushers2 RENAME TO pushers")
     logger.info("Moved %d pushers to new table", count)
+
+
+def run_upgrade(*args, **kwargs):
+    pass
diff --git a/synapse/storage/schema/delta/25/fts.py b/synapse/storage/schema/delta/25/fts.py
index d3ff2b1779..4269ac69ad 100644
--- a/synapse/storage/schema/delta/25/fts.py
+++ b/synapse/storage/schema/delta/25/fts.py
@@ -43,7 +43,7 @@ SQLITE_TABLE = (
 )
 
 
-def run_upgrade(cur, database_engine, *args, **kwargs):
+def run_create(cur, database_engine, *args, **kwargs):
     if isinstance(database_engine, PostgresEngine):
         for statement in get_statements(POSTGRES_TABLE.splitlines()):
             cur.execute(statement)
@@ -76,3 +76,7 @@ def run_upgrade(cur, database_engine, *args, **kwargs):
         sql = database_engine.convert_param_style(sql)
 
         cur.execute(sql, ("event_search", progress_json))
+
+
+def run_upgrade(*args, **kwargs):
+    pass
diff --git a/synapse/storage/schema/delta/27/ts.py b/synapse/storage/schema/delta/27/ts.py
index f8c16391a2..71b12a2731 100644
--- a/synapse/storage/schema/delta/27/ts.py
+++ b/synapse/storage/schema/delta/27/ts.py
@@ -27,7 +27,7 @@ ALTER_TABLE = (
 )
 
 
-def run_upgrade(cur, database_engine, *args, **kwargs):
+def run_create(cur, database_engine, *args, **kwargs):
     for statement in get_statements(ALTER_TABLE.splitlines()):
         cur.execute(statement)
 
@@ -55,3 +55,7 @@ def run_upgrade(cur, database_engine, *args, **kwargs):
         sql = database_engine.convert_param_style(sql)
 
         cur.execute(sql, ("event_origin_server_ts", progress_json))
+
+
+def run_upgrade(*args, **kwargs):
+    pass
diff --git a/synapse/storage/schema/delta/30/as_users.py b/synapse/storage/schema/delta/30/as_users.py
index 4f6e9dd540..b417e3ac08 100644
--- a/synapse/storage/schema/delta/30/as_users.py
+++ b/synapse/storage/schema/delta/30/as_users.py
@@ -18,7 +18,7 @@ from synapse.storage.appservice import ApplicationServiceStore
 logger = logging.getLogger(__name__)
 
 
-def run_upgrade(cur, database_engine, config, *args, **kwargs):
+def run_create(cur, database_engine, *args, **kwargs):
     # NULL indicates user was not registered by an appservice.
     try:
         cur.execute("ALTER TABLE users ADD COLUMN appservice_id TEXT")
@@ -26,6 +26,8 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs):
         # Maybe we already added the column? Hope so...
         pass
 
+
+def run_upgrade(cur, database_engine, config, *args, **kwargs):
     cur.execute("SELECT name FROM users")
     rows = cur.fetchall()
 
diff --git a/synapse/storage/schema/delta/30/state_stream.sql b/synapse/storage/schema/delta/30/state_stream.sql
new file mode 100644
index 0000000000..706fe1dcf4
--- /dev/null
+++ b/synapse/storage/schema/delta/30/state_stream.sql
@@ -0,0 +1,38 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * The positions in the event stream_ordering when the current_state was
+ * replaced by the state at the event.
+ */
+
+CREATE TABLE IF NOT EXISTS current_state_resets(
+    event_stream_ordering BIGINT PRIMARY KEY NOT NULL
+);
+
+/* The outlier events that have aquired a state group typically through
+ * backfill. This is tracked separately to the events table, as assigning a
+ * state group change the position of the existing event in the stream
+ * ordering.
+ * However since a stream_ordering is assigned in persist_event for the
+ * (event, state) pair, we can use that stream_ordering to identify when
+ * the new state was assigned for the event.
+ */
+CREATE TABLE IF NOT EXISTS ex_outlier_stream(
+    event_stream_ordering BIGINT PRIMARY KEY NOT NULL,
+    event_id TEXT NOT NULL,
+    state_group BIGINT NOT NULL
+);
diff --git a/synapse/storage/schema/delta/31/invites.sql b/synapse/storage/schema/delta/31/invites.sql
new file mode 100644
index 0000000000..2c57846d5a
--- /dev/null
+++ b/synapse/storage/schema/delta/31/invites.sql
@@ -0,0 +1,42 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+CREATE TABLE local_invites(
+    stream_id BIGINT NOT NULL,
+    inviter TEXT NOT NULL,
+    invitee TEXT NOT NULL,
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    locally_rejected TEXT,
+    replaced_by TEXT
+);
+
+-- Insert all invites for local users into new `invites` table
+INSERT INTO local_invites SELECT
+        stream_ordering as stream_id,
+        sender as inviter,
+        state_key as invitee,
+        event_id,
+        room_id,
+        NULL as locally_rejected,
+        NULL as replaced_by
+    FROM events
+    NATURAL JOIN current_state_events
+    NATURAL JOIN room_memberships
+    WHERE membership = 'invite'  AND state_key IN (SELECT name FROM users);
+
+CREATE INDEX local_invites_id ON local_invites(stream_id);
+CREATE INDEX local_invites_for_user_idx ON local_invites(invitee, locally_rejected, replaced_by, room_id);
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 7fc9a4f264..c5d2a3a6df 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -249,11 +249,14 @@ class StateStore(SQLBaseStore):
         """
         Get the state dict corresponding to a particular event
 
-        :param str event_id: event whose state should be returned
-        :param list[(str, str)]|None types: List of (type, state_key) tuples
-            which are used to filter the state fetched. May be None, which
-            matches any key
-        :return: a deferred dict from (type, state_key) -> state_event
+        Args:
+            event_id(str): event whose state should be returned
+            types(list[(str, str)]|None): List of (type, state_key) tuples
+                which are used to filter the state fetched. May be None, which
+                matches any key
+
+        Returns:
+            A deferred dict from (type, state_key) -> state_event
         """
         state_map = yield self.get_state_for_events([event_id], types)
         defer.returnValue(state_map[event_id])
@@ -270,8 +273,8 @@ class StateStore(SQLBaseStore):
             desc="_get_state_group_for_event",
         )
 
-    @cachedList(cache=_get_state_group_for_event.cache, list_name="event_ids",
-                num_args=1, inlineCallbacks=True)
+    @cachedList(cached_method_name="_get_state_group_for_event",
+                list_name="event_ids", num_args=1, inlineCallbacks=True)
     def _get_state_group_for_events(self, event_ids):
         """Returns mapping event_id -> state_group
         """
@@ -458,4 +461,4 @@ class StateStore(SQLBaseStore):
         )
 
     def get_state_stream_token(self):
-        return self._state_groups_id_gen.get_max_token()
+        return self._state_groups_id_gen.get_current_token()
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index cf84938be5..76bcd9cd00 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -539,7 +539,7 @@ class StreamStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def get_room_events_max_id(self, direction='f'):
-        token = yield self._stream_id_gen.get_max_token()
+        token = yield self._stream_id_gen.get_current_token()
         if direction != 'b':
             defer.returnValue("s%d" % (token,))
         else:
diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py
index a0e6b42b30..9da23f34cb 100644
--- a/synapse/storage/tags.py
+++ b/synapse/storage/tags.py
@@ -30,7 +30,7 @@ class TagsStore(SQLBaseStore):
         Returns:
             A deferred int.
         """
-        return self._account_data_id_gen.get_max_token()
+        return self._account_data_id_gen.get_current_token()
 
     @cached()
     def get_tags_for_user(self, user_id):
@@ -200,7 +200,7 @@ class TagsStore(SQLBaseStore):
 
         self.get_tags_for_user.invalidate((user_id,))
 
-        result = self._account_data_id_gen.get_max_token()
+        result = self._account_data_id_gen.get_current_token()
         defer.returnValue(result)
 
     @defer.inlineCallbacks
@@ -222,7 +222,7 @@ class TagsStore(SQLBaseStore):
 
         self.get_tags_for_user.invalidate((user_id,))
 
-        result = self._account_data_id_gen.get_max_token()
+        result = self._account_data_id_gen.get_current_token()
         defer.returnValue(result)
 
     def _update_revision_txn(self, txn, user_id, room_id, next_id):
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index a02dfc7d58..f69f1cdad4 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -21,7 +21,7 @@ import threading
 class IdGenerator(object):
     def __init__(self, db_conn, table, column):
         self._lock = threading.Lock()
-        self._next_id = _load_max_id(db_conn, table, column)
+        self._next_id = _load_current_id(db_conn, table, column)
 
     def get_next(self):
         with self._lock:
@@ -29,12 +29,16 @@ class IdGenerator(object):
             return self._next_id
 
 
-def _load_max_id(db_conn, table, column):
+def _load_current_id(db_conn, table, column, step=1):
     cur = db_conn.cursor()
-    cur.execute("SELECT MAX(%s) FROM %s" % (column, table,))
+    if step == 1:
+        cur.execute("SELECT MAX(%s) FROM %s" % (column, table,))
+    else:
+        cur.execute("SELECT MIN(%s) FROM %s" % (column, table,))
     val, = cur.fetchone()
     cur.close()
-    return int(val) if val else 1
+    current_id = int(val) if val else step
+    return (max if step > 0 else min)(current_id, step)
 
 
 class StreamIdGenerator(object):
@@ -45,17 +49,32 @@ class StreamIdGenerator(object):
     all ids less than or equal to it have completed. This handles the fact that
     persistence of events can complete out of order.
 
+    Args:
+        db_conn(connection):  A database connection to use to fetch the
+            initial value of the generator from.
+        table(str): A database table to read the initial value of the id
+            generator from.
+        column(str): The column of the database table to read the initial
+            value from the id generator from.
+        extra_tables(list): List of pairs of database tables and columns to
+            use to source the initial value of the generator from. The value
+            with the largest magnitude is used.
+        step(int): which direction the stream ids grow in. +1 to grow
+            upwards, -1 to grow downwards.
+
     Usage:
         with stream_id_gen.get_next() as stream_id:
             # ... persist event ...
     """
-    def __init__(self, db_conn, table, column, extra_tables=[]):
+    def __init__(self, db_conn, table, column, extra_tables=[], step=1):
+        assert step != 0
         self._lock = threading.Lock()
-        self._current_max = _load_max_id(db_conn, table, column)
+        self._step = step
+        self._current = _load_current_id(db_conn, table, column, step)
         for table, column in extra_tables:
-            self._current_max = max(
-                self._current_max,
-                _load_max_id(db_conn, table, column)
+            self._current = (max if step > 0 else min)(
+                self._current,
+                _load_current_id(db_conn, table, column, step)
             )
         self._unfinished_ids = deque()
 
@@ -66,8 +85,8 @@ class StreamIdGenerator(object):
                 # ... persist event ...
         """
         with self._lock:
-            self._current_max += 1
-            next_id = self._current_max
+            self._current += self._step
+            next_id = self._current
 
             self._unfinished_ids.append(next_id)
 
@@ -88,8 +107,12 @@ class StreamIdGenerator(object):
                 # ... persist events ...
         """
         with self._lock:
-            next_ids = range(self._current_max + 1, self._current_max + n + 1)
-            self._current_max += n
+            next_ids = range(
+                self._current + self._step,
+                self._current + self._step * (n + 1),
+                self._step
+            )
+            self._current += n
 
             for next_id in next_ids:
                 self._unfinished_ids.append(next_id)
@@ -105,15 +128,15 @@ class StreamIdGenerator(object):
 
         return manager()
 
-    def get_max_token(self):
+    def get_current_token(self):
         """Returns the maximum stream id such that all stream ids less than or
         equal to it have been successfully persisted.
         """
         with self._lock:
             if self._unfinished_ids:
-                return self._unfinished_ids[0] - 1
+                return self._unfinished_ids[0] - self._step
 
-            return self._current_max
+            return self._current
 
 
 class ChainedIdGenerator(object):
@@ -125,7 +148,7 @@ class ChainedIdGenerator(object):
     def __init__(self, chained_generator, db_conn, table, column):
         self.chained_generator = chained_generator
         self._lock = threading.Lock()
-        self._current_max = _load_max_id(db_conn, table, column)
+        self._current_max = _load_current_id(db_conn, table, column)
         self._unfinished_ids = deque()
 
     def get_next(self):
@@ -137,7 +160,7 @@ class ChainedIdGenerator(object):
         with self._lock:
             self._current_max += 1
             next_id = self._current_max
-            chained_id = self.chained_generator.get_max_token()
+            chained_id = self.chained_generator.get_current_token()
 
             self._unfinished_ids.append((next_id, chained_id))
 
@@ -151,7 +174,7 @@ class ChainedIdGenerator(object):
 
         return manager()
 
-    def get_max_token(self):
+    def get_current_token(self):
         """Returns the maximum stream id such that all stream ids less than or
         equal to it have been successfully persisted.
         """
@@ -160,4 +183,4 @@ class ChainedIdGenerator(object):
                 stream_id, chained_id = self._unfinished_ids[0]
                 return (stream_id - 1, chained_id)
 
-            return (self._current_max, self.chained_generator.get_max_token())
+            return (self._current_max, self.chained_generator.get_current_token())