summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-06-09 14:21:23 +0100
committerErik Johnston <erik@matrix.org>2016-06-09 14:21:23 +0100
commitba0406d10da32ebebf4185f01841f236371e0ae8 (patch)
tree70f492094b7fb962a8161bd2304c6846b3ac3f40 /synapse/storage
parentMerge pull request #801 from ruma/readme-history-storage (diff)
parentChange CHANGELOG (diff)
downloadsynapse-ba0406d10da32ebebf4185f01841f236371e0ae8.tar.xz
Merge branch 'release-v0.16.0' of github.com:matrix-org/synapse v0.16.0
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py131
-rw-r--r--synapse/storage/_base.py48
-rw-r--r--synapse/storage/account_data.py54
-rw-r--r--synapse/storage/appservice.py126
-rw-r--r--synapse/storage/background_updates.py3
-rw-r--r--synapse/storage/client_ips.py68
-rw-r--r--synapse/storage/engines/__init__.py6
-rw-r--r--synapse/storage/engines/postgres.py8
-rw-r--r--synapse/storage/engines/sqlite3.py13
-rw-r--r--synapse/storage/event_federation.py16
-rw-r--r--synapse/storage/event_push_actions.py211
-rw-r--r--synapse/storage/events.py750
-rw-r--r--synapse/storage/media_repository.py57
-rw-r--r--synapse/storage/openid.py32
-rw-r--r--synapse/storage/prepare_database.py78
-rw-r--r--synapse/storage/presence.py17
-rw-r--r--synapse/storage/push_rule.py69
-rw-r--r--synapse/storage/pusher.py187
-rw-r--r--synapse/storage/receipts.py86
-rw-r--r--synapse/storage/registration.py40
-rw-r--r--synapse/storage/room.py80
-rw-r--r--synapse/storage/roommember.py206
-rw-r--r--synapse/storage/schema/delta/14/upgrade_appservice_db.py6
-rw-r--r--synapse/storage/schema/delta/20/pushers.py6
-rw-r--r--synapse/storage/schema/delta/25/fts.py6
-rw-r--r--synapse/storage/schema/delta/27/ts.py6
-rw-r--r--synapse/storage/schema/delta/30/as_users.py8
-rw-r--r--synapse/storage/schema/delta/30/state_stream.sql38
-rw-r--r--synapse/storage/schema/delta/31/invites.sql42
-rw-r--r--synapse/storage/schema/delta/31/local_media_repository_url_cache.sql27
-rw-r--r--synapse/storage/schema/delta/31/pushers.py79
-rw-r--r--synapse/storage/schema/delta/31/pushers_index.sql18
-rw-r--r--synapse/storage/schema/delta/31/search_update.py65
-rw-r--r--synapse/storage/schema/delta/32/events.sql16
-rw-r--r--synapse/storage/schema/delta/32/openid.sql9
-rw-r--r--synapse/storage/schema/delta/32/pusher_throttle.sql23
-rw-r--r--synapse/storage/schema/delta/32/remove_indices.sql38
-rw-r--r--synapse/storage/schema/delta/32/reports.sql25
-rw-r--r--synapse/storage/search.py121
-rw-r--r--synapse/storage/signatures.py25
-rw-r--r--synapse/storage/state.py82
-rw-r--r--synapse/storage/stream.py126
-rw-r--r--synapse/storage/tags.py6
-rw-r--r--synapse/storage/transactions.py165
-rw-r--r--synapse/storage/util/id_generators.py63
45 files changed, 2255 insertions, 1031 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 250ba536ea..e93c3de66c 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -17,7 +17,7 @@ from twisted.internet import defer
 from .appservice import (
     ApplicationServiceStore, ApplicationServiceTransactionStore
 )
-from ._base import Cache
+from ._base import LoggingTransaction
 from .directory import DirectoryStore
 from .events import EventsStore
 from .presence import PresenceStore, UserPresenceState
@@ -44,6 +44,8 @@ from .receipts import ReceiptsStore
 from .search import SearchStore
 from .tags import TagsStore
 from .account_data import AccountDataStore
+from .openid import OpenIdStore
+from .client_ips import ClientIpStore
 
 from .util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator
 
@@ -57,12 +59,6 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-# Number of msec of granularity to store the user IP 'last seen' time. Smaller
-# times give more inserts into the database even for readonly API hits
-# 120 seconds == 2 minutes
-LAST_SEEN_GRANULARITY = 120 * 1000
-
-
 class DataStore(RoomMemberStore, RoomStore,
                 RegistrationStore, StreamStore, ProfileStore,
                 PresenceStore, TransactionStore,
@@ -81,29 +77,22 @@ class DataStore(RoomMemberStore, RoomStore,
                 SearchStore,
                 TagsStore,
                 AccountDataStore,
-                EventPushActionsStore
+                EventPushActionsStore,
+                OpenIdStore,
+                ClientIpStore,
                 ):
 
     def __init__(self, db_conn, hs):
         self.hs = hs
+        self._clock = hs.get_clock()
         self.database_engine = hs.database_engine
 
-        cur = db_conn.cursor()
-        try:
-            cur.execute("SELECT MIN(stream_ordering) FROM events",)
-            rows = cur.fetchall()
-            self.min_stream_token = rows[0][0] if rows and rows[0] and rows[0][0] else -1
-            self.min_stream_token = min(self.min_stream_token, -1)
-        finally:
-            cur.close()
-
-        self.client_ip_last_seen = Cache(
-            name="client_ip_last_seen",
-            keylen=4,
-        )
-
         self._stream_id_gen = StreamIdGenerator(
-            db_conn, "events", "stream_ordering"
+            db_conn, "events", "stream_ordering",
+            extra_tables=[("local_invites", "stream_id")]
+        )
+        self._backfill_id_gen = StreamIdGenerator(
+            db_conn, "events", "stream_ordering", step=-1
         )
         self._receipts_id_gen = StreamIdGenerator(
             db_conn, "receipts_linearized", "stream_id"
@@ -116,9 +105,10 @@ class DataStore(RoomMemberStore, RoomStore,
         )
 
         self._transaction_id_gen = IdGenerator(db_conn, "sent_transactions", "id")
-        self._state_groups_id_gen = IdGenerator(db_conn, "state_groups", "id")
+        self._state_groups_id_gen = StreamIdGenerator(db_conn, "state_groups", "id")
         self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
         self._refresh_tokens_id_gen = IdGenerator(db_conn, "refresh_tokens", "id")
+        self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")
         self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id")
         self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id")
         self._push_rules_stream_id_gen = ChainedIdGenerator(
@@ -129,7 +119,7 @@ class DataStore(RoomMemberStore, RoomStore,
             extra_tables=[("deleted_pushers", "stream_id")],
         )
 
-        events_max = self._stream_id_gen.get_max_token()
+        events_max = self._stream_id_gen.get_current_token()
         event_cache_prefill, min_event_val = self._get_cache_dict(
             db_conn, "events",
             entity_column="room_id",
@@ -145,18 +135,18 @@ class DataStore(RoomMemberStore, RoomStore,
             "MembershipStreamChangeCache", events_max,
         )
 
-        account_max = self._account_data_id_gen.get_max_token()
+        account_max = self._account_data_id_gen.get_current_token()
         self._account_data_stream_cache = StreamChangeCache(
             "AccountDataAndTagsChangeCache", account_max,
         )
 
-        self.__presence_on_startup = self._get_active_presence(db_conn)
+        self._presence_on_startup = self._get_active_presence(db_conn)
 
         presence_cache_prefill, min_presence_val = self._get_cache_dict(
             db_conn, "presence_stream",
             entity_column="user_id",
             stream_column="stream_id",
-            max_value=self._presence_id_gen.get_max_token(),
+            max_value=self._presence_id_gen.get_current_token(),
         )
         self.presence_stream_cache = StreamChangeCache(
             "PresenceStreamChangeCache", min_presence_val,
@@ -167,7 +157,7 @@ class DataStore(RoomMemberStore, RoomStore,
             db_conn, "push_rules_stream",
             entity_column="user_id",
             stream_column="stream_id",
-            max_value=self._push_rules_stream_id_gen.get_max_token()[0],
+            max_value=self._push_rules_stream_id_gen.get_current_token()[0],
         )
 
         self.push_rules_stream_cache = StreamChangeCache(
@@ -175,46 +165,26 @@ class DataStore(RoomMemberStore, RoomStore,
             prefilled_cache=push_rules_prefill,
         )
 
+        cur = LoggingTransaction(
+            db_conn.cursor(),
+            name="_find_stream_orderings_for_times_txn",
+            database_engine=self.database_engine,
+            after_callbacks=[]
+        )
+        self._find_stream_orderings_for_times_txn(cur)
+        cur.close()
+
+        self.find_stream_orderings_looping_call = self._clock.looping_call(
+            self._find_stream_orderings_for_times, 60 * 60 * 1000
+        )
+
         super(DataStore, self).__init__(hs)
 
     def take_presence_startup_info(self):
-        active_on_startup = self.__presence_on_startup
-        self.__presence_on_startup = None
+        active_on_startup = self._presence_on_startup
+        self._presence_on_startup = None
         return active_on_startup
 
-    def _get_cache_dict(self, db_conn, table, entity_column, stream_column, max_value):
-        # Fetch a mapping of room_id -> max stream position for "recent" rooms.
-        # It doesn't really matter how many we get, the StreamChangeCache will
-        # do the right thing to ensure it respects the max size of cache.
-        sql = (
-            "SELECT %(entity)s, MAX(%(stream)s) FROM %(table)s"
-            " WHERE %(stream)s > ? - 100000"
-            " GROUP BY %(entity)s"
-        ) % {
-            "table": table,
-            "entity": entity_column,
-            "stream": stream_column,
-        }
-
-        sql = self.database_engine.convert_param_style(sql)
-
-        txn = db_conn.cursor()
-        txn.execute(sql, (int(max_value),))
-        rows = txn.fetchall()
-        txn.close()
-
-        cache = {
-            row[0]: int(row[1])
-            for row in rows
-        }
-
-        if cache:
-            min_val = min(cache.values())
-        else:
-            min_val = max_value
-
-        return cache, min_val
-
     def _get_active_presence(self, db_conn):
         """Fetch non-offline presence from the database so that we can register
         the appropriate time outs.
@@ -238,39 +208,6 @@ class DataStore(RoomMemberStore, RoomStore,
         return [UserPresenceState(**row) for row in rows]
 
     @defer.inlineCallbacks
-    def insert_client_ip(self, user, access_token, ip, user_agent):
-        now = int(self._clock.time_msec())
-        key = (user.to_string(), access_token, ip)
-
-        try:
-            last_seen = self.client_ip_last_seen.get(key)
-        except KeyError:
-            last_seen = None
-
-        # Rate-limited inserts
-        if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
-            defer.returnValue(None)
-
-        self.client_ip_last_seen.prefill(key, now)
-
-        # It's safe not to lock here: a) no unique constraint,
-        # b) LAST_SEEN_GRANULARITY makes concurrent updates incredibly unlikely
-        yield self._simple_upsert(
-            "user_ips",
-            keyvalues={
-                "user_id": user.to_string(),
-                "access_token": access_token,
-                "ip": ip,
-                "user_agent": user_agent,
-            },
-            values={
-                "last_seen": now,
-            },
-            desc="insert_client_ip",
-            lock=False,
-        )
-
-    @defer.inlineCallbacks
     def count_daily_users(self):
         """
         Counts the number of users who used this homeserver in the last 24 hours.
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index b75b79df36..32c6677d47 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -152,8 +152,8 @@ class SQLBaseStore(object):
 
     def __init__(self, hs):
         self.hs = hs
-        self._db_pool = hs.get_db_pool()
         self._clock = hs.get_clock()
+        self._db_pool = hs.get_db_pool()
 
         self._previous_txn_total_time = 0
         self._current_txn_total_time = 0
@@ -453,7 +453,9 @@ class SQLBaseStore(object):
             keyvalues (dict): The unique key tables and their new values
             values (dict): The nonunique columns and their new values
             insertion_values (dict): key/values to use when inserting
-        Returns: A deferred
+        Returns:
+            Deferred(bool): True if a new entry was created, False if an
+                existing one was updated.
         """
         return self.runInteraction(
             desc,
@@ -498,6 +500,10 @@ class SQLBaseStore(object):
             )
             txn.execute(sql, allvalues.values())
 
+            return True
+        else:
+            return False
+
     def _simple_select_one(self, table, keyvalues, retcols,
                            allow_none=False, desc="_simple_select_one"):
         """Executes a SELECT query on the named table, which is expected to
@@ -810,11 +816,39 @@ class SQLBaseStore(object):
 
         return txn.execute(sql, keyvalues.values())
 
-    def get_next_stream_id(self):
-        with self._next_stream_id_lock:
-            i = self._next_stream_id
-            self._next_stream_id += 1
-            return i
+    def _get_cache_dict(self, db_conn, table, entity_column, stream_column,
+                        max_value):
+        # Fetch a mapping of room_id -> max stream position for "recent" rooms.
+        # It doesn't really matter how many we get, the StreamChangeCache will
+        # do the right thing to ensure it respects the max size of cache.
+        sql = (
+            "SELECT %(entity)s, MAX(%(stream)s) FROM %(table)s"
+            " WHERE %(stream)s > ? - 100000"
+            " GROUP BY %(entity)s"
+        ) % {
+            "table": table,
+            "entity": entity_column,
+            "stream": stream_column,
+        }
+
+        sql = self.database_engine.convert_param_style(sql)
+
+        txn = db_conn.cursor()
+        txn.execute(sql, (int(max_value),))
+        rows = txn.fetchall()
+        txn.close()
+
+        cache = {
+            row[0]: int(row[1])
+            for row in rows
+        }
+
+        if cache:
+            min_val = min(cache.values())
+        else:
+            min_val = max_value
+
+        return cache, min_val
 
 
 class _RollbackButIsFineException(Exception):
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index faddefe219..ec7e8d40d2 100644
--- a/synapse/storage/account_data.py
+++ b/synapse/storage/account_data.py
@@ -16,6 +16,8 @@
 from ._base import SQLBaseStore
 from twisted.internet import defer
 
+from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
+
 import ujson as json
 import logging
 
@@ -24,6 +26,7 @@ logger = logging.getLogger(__name__)
 
 class AccountDataStore(SQLBaseStore):
 
+    @cached()
     def get_account_data_for_user(self, user_id):
         """Get all the client account_data for a user.
 
@@ -60,6 +63,47 @@ class AccountDataStore(SQLBaseStore):
             "get_account_data_for_user", get_account_data_for_user_txn
         )
 
+    @cachedInlineCallbacks(num_args=2)
+    def get_global_account_data_by_type_for_user(self, data_type, user_id):
+        """
+        Returns:
+            Deferred: A dict
+        """
+        result = yield self._simple_select_one_onecol(
+            table="account_data",
+            keyvalues={
+                "user_id": user_id,
+                "account_data_type": data_type,
+            },
+            retcol="content",
+            desc="get_global_account_data_by_type_for_user",
+            allow_none=True,
+        )
+
+        if result:
+            defer.returnValue(json.loads(result))
+        else:
+            defer.returnValue(None)
+
+    @cachedList(cached_method_name="get_global_account_data_by_type_for_user",
+                num_args=2, list_name="user_ids", inlineCallbacks=True)
+    def get_global_account_data_by_type_for_users(self, data_type, user_ids):
+        rows = yield self._simple_select_many_batch(
+            table="account_data",
+            column="user_id",
+            iterable=user_ids,
+            keyvalues={
+                "account_data_type": data_type,
+            },
+            retcols=("user_id", "content",),
+            desc="get_global_account_data_by_type_for_users",
+        )
+
+        defer.returnValue({
+            row["user_id"]: json.loads(row["content"]) if row["content"] else None
+            for row in rows
+        })
+
     def get_account_data_for_room(self, user_id, room_id):
         """Get all the client account_data for a user for a room.
 
@@ -193,6 +237,7 @@ class AccountDataStore(SQLBaseStore):
                 self._account_data_stream_cache.entity_has_changed,
                 user_id, next_id,
             )
+            txn.call_after(self.get_account_data_for_user.invalidate, (user_id,))
             self._update_max_stream_id(txn, next_id)
 
         with self._account_data_id_gen.get_next() as next_id:
@@ -200,7 +245,7 @@ class AccountDataStore(SQLBaseStore):
                 "add_room_account_data", add_account_data_txn, next_id
             )
 
-        result = self._account_data_id_gen.get_max_token()
+        result = self._account_data_id_gen.get_current_token()
         defer.returnValue(result)
 
     @defer.inlineCallbacks
@@ -232,6 +277,11 @@ class AccountDataStore(SQLBaseStore):
                 self._account_data_stream_cache.entity_has_changed,
                 user_id, next_id,
             )
+            txn.call_after(self.get_account_data_for_user.invalidate, (user_id,))
+            txn.call_after(
+                self.get_global_account_data_by_type_for_user.invalidate,
+                (account_data_type, user_id,)
+            )
             self._update_max_stream_id(txn, next_id)
 
         with self._account_data_id_gen.get_next() as next_id:
@@ -239,7 +289,7 @@ class AccountDataStore(SQLBaseStore):
                 "add_user_account_data", add_account_data_txn, next_id
             )
 
-        result = self._account_data_id_gen.get_max_token()
+        result = self._account_data_id_gen.get_current_token()
         defer.returnValue(result)
 
     def _update_max_stream_id(self, txn, next_id):
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 371600eebb..d1ee533fac 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -13,16 +13,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-import urllib
-import yaml
 import simplejson as json
 from twisted.internet import defer
 
 from synapse.api.constants import Membership
-from synapse.appservice import ApplicationService, AppServiceTransaction
-from synapse.config._base import ConfigError
+from synapse.appservice import AppServiceTransaction
+from synapse.config.appservice import load_appservices
 from synapse.storage.roommember import RoomsForUser
-from synapse.types import UserID
 from ._base import SQLBaseStore
 
 
@@ -34,7 +31,7 @@ class ApplicationServiceStore(SQLBaseStore):
     def __init__(self, hs):
         super(ApplicationServiceStore, self).__init__(hs)
         self.hostname = hs.hostname
-        self.services_cache = ApplicationServiceStore.load_appservices(
+        self.services_cache = load_appservices(
             hs.hostname,
             hs.config.app_service_config_files
         )
@@ -144,102 +141,6 @@ class ApplicationServiceStore(SQLBaseStore):
 
         return rooms_for_user_matching_user_id
 
-    @classmethod
-    def _load_appservice(cls, hostname, as_info, config_filename):
-        required_string_fields = [
-            "id", "url", "as_token", "hs_token", "sender_localpart"
-        ]
-        for field in required_string_fields:
-            if not isinstance(as_info.get(field), basestring):
-                raise KeyError("Required string field: '%s' (%s)" % (
-                    field, config_filename,
-                ))
-
-        localpart = as_info["sender_localpart"]
-        if urllib.quote(localpart) != localpart:
-            raise ValueError(
-                "sender_localpart needs characters which are not URL encoded."
-            )
-        user = UserID(localpart, hostname)
-        user_id = user.to_string()
-
-        # namespace checks
-        if not isinstance(as_info.get("namespaces"), dict):
-            raise KeyError("Requires 'namespaces' object.")
-        for ns in ApplicationService.NS_LIST:
-            # specific namespaces are optional
-            if ns in as_info["namespaces"]:
-                # expect a list of dicts with exclusive and regex keys
-                for regex_obj in as_info["namespaces"][ns]:
-                    if not isinstance(regex_obj, dict):
-                        raise ValueError(
-                            "Expected namespace entry in %s to be an object,"
-                            " but got %s", ns, regex_obj
-                        )
-                    if not isinstance(regex_obj.get("regex"), basestring):
-                        raise ValueError(
-                            "Missing/bad type 'regex' key in %s", regex_obj
-                        )
-                    if not isinstance(regex_obj.get("exclusive"), bool):
-                        raise ValueError(
-                            "Missing/bad type 'exclusive' key in %s", regex_obj
-                        )
-        return ApplicationService(
-            token=as_info["as_token"],
-            url=as_info["url"],
-            namespaces=as_info["namespaces"],
-            hs_token=as_info["hs_token"],
-            sender=user_id,
-            id=as_info["id"],
-        )
-
-    @classmethod
-    def load_appservices(cls, hostname, config_files):
-        """Returns a list of Application Services from the config files."""
-        if not isinstance(config_files, list):
-            logger.warning(
-                "Expected %s to be a list of AS config files.", config_files
-            )
-            return []
-
-        # Dicts of value -> filename
-        seen_as_tokens = {}
-        seen_ids = {}
-
-        appservices = []
-
-        for config_file in config_files:
-            try:
-                with open(config_file, 'r') as f:
-                    appservice = ApplicationServiceStore._load_appservice(
-                        hostname, yaml.load(f), config_file
-                    )
-                    if appservice.id in seen_ids:
-                        raise ConfigError(
-                            "Cannot reuse ID across application services: "
-                            "%s (files: %s, %s)" % (
-                                appservice.id, config_file, seen_ids[appservice.id],
-                            )
-                        )
-                    seen_ids[appservice.id] = config_file
-                    if appservice.token in seen_as_tokens:
-                        raise ConfigError(
-                            "Cannot reuse as_token across application services: "
-                            "%s (files: %s, %s)" % (
-                                appservice.token,
-                                config_file,
-                                seen_as_tokens[appservice.token],
-                            )
-                        )
-                    seen_as_tokens[appservice.token] = config_file
-                    logger.info("Loaded application service: %s", appservice)
-                    appservices.append(appservice)
-            except Exception as e:
-                logger.error("Failed to load appservice from '%s'", config_file)
-                logger.exception(e)
-                raise
-        return appservices
-
 
 class ApplicationServiceTransactionStore(SQLBaseStore):
 
@@ -397,6 +298,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
             dict(txn_id=txn_id, as_id=service.id)
         )
 
+    @defer.inlineCallbacks
     def get_oldest_unsent_txn(self, service):
         """Get the oldest transaction which has not been sent for this
         service.
@@ -407,12 +309,23 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
             A Deferred which resolves to an AppServiceTransaction or
             None.
         """
-        return self.runInteraction(
+        entry = yield self.runInteraction(
             "get_oldest_unsent_appservice_txn",
             self._get_oldest_unsent_txn,
             service
         )
 
+        if not entry:
+            defer.returnValue(None)
+
+        event_ids = json.loads(entry["event_ids"])
+
+        events = yield self._get_events(event_ids)
+
+        defer.returnValue(AppServiceTransaction(
+            service=service, id=entry["txn_id"], events=events
+        ))
+
     def _get_oldest_unsent_txn(self, txn, service):
         # Monotonically increasing txn ids, so just select the smallest
         # one in the txns table (we delete them when they are sent)
@@ -427,12 +340,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
 
         entry = rows[0]
 
-        event_ids = json.loads(entry["event_ids"])
-        events = self._get_events_txn(txn, event_ids)
-
-        return AppServiceTransaction(
-            service=service, id=entry["txn_id"], events=events
-        )
+        return entry
 
     def _get_last_txn(self, txn, service_id):
         txn.execute(
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 49904046cf..66a995157d 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -173,11 +173,12 @@ class BackgroundUpdateStore(SQLBaseStore):
 
         logger.info(
             "Updating %r. Updated %r items in %rms."
-            " (total_rate=%r/ms, current_rate=%r/ms, total_updated=%r)",
+            " (total_rate=%r/ms, current_rate=%r/ms, total_updated=%r, batch_size=%r)",
             update_name, items_updated, duration_ms,
             performance.total_items_per_ms(),
             performance.average_items_per_ms(),
             performance.total_item_count,
+            batch_size,
         )
 
         performance.update(items_updated, duration_ms)
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
new file mode 100644
index 0000000000..a90990e006
--- /dev/null
+++ b/synapse/storage/client_ips.py
@@ -0,0 +1,68 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import SQLBaseStore, Cache
+
+from twisted.internet import defer
+
+
+# Number of msec of granularity to store the user IP 'last seen' time. Smaller
+# times give more inserts into the database even for readonly API hits
+# 120 seconds == 2 minutes
+LAST_SEEN_GRANULARITY = 120 * 1000
+
+
+class ClientIpStore(SQLBaseStore):
+
+    def __init__(self, hs):
+        self.client_ip_last_seen = Cache(
+            name="client_ip_last_seen",
+            keylen=4,
+        )
+
+        super(ClientIpStore, self).__init__(hs)
+
+    @defer.inlineCallbacks
+    def insert_client_ip(self, user, access_token, ip, user_agent):
+        now = int(self._clock.time_msec())
+        key = (user.to_string(), access_token, ip)
+
+        try:
+            last_seen = self.client_ip_last_seen.get(key)
+        except KeyError:
+            last_seen = None
+
+        # Rate-limited inserts
+        if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
+            defer.returnValue(None)
+
+        self.client_ip_last_seen.prefill(key, now)
+
+        # It's safe not to lock here: a) no unique constraint,
+        # b) LAST_SEEN_GRANULARITY makes concurrent updates incredibly unlikely
+        yield self._simple_upsert(
+            "user_ips",
+            keyvalues={
+                "user_id": user.to_string(),
+                "access_token": access_token,
+                "ip": ip,
+                "user_agent": user_agent,
+            },
+            values={
+                "last_seen": now,
+            },
+            desc="insert_client_ip",
+            lock=False,
+        )
diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py
index a48230b93f..7bb5de1fe7 100644
--- a/synapse/storage/engines/__init__.py
+++ b/synapse/storage/engines/__init__.py
@@ -26,13 +26,13 @@ SUPPORTED_MODULE = {
 }
 
 
-def create_engine(config):
-    name = config.database_config["name"]
+def create_engine(database_config):
+    name = database_config["name"]
     engine_class = SUPPORTED_MODULE.get(name, None)
 
     if engine_class:
         module = importlib.import_module(name)
-        return engine_class(module, config=config)
+        return engine_class(module)
 
     raise RuntimeError(
         "Unsupported database engine '%s'" % (name,)
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index a09685b4df..c2290943b4 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -13,18 +13,15 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.storage.prepare_database import prepare_database
-
 from ._base import IncorrectDatabaseSetup
 
 
 class PostgresEngine(object):
     single_threaded = False
 
-    def __init__(self, database_module, config):
+    def __init__(self, database_module):
         self.module = database_module
         self.module.extensions.register_type(self.module.extensions.UNICODE)
-        self.config = config
 
     def check_database(self, txn):
         txn.execute("SHOW SERVER_ENCODING")
@@ -44,9 +41,6 @@ class PostgresEngine(object):
             self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ
         )
 
-    def prepare_database(self, db_conn):
-        prepare_database(db_conn, self, config=self.config)
-
     def is_deadlock(self, error):
         if isinstance(error, self.module.DatabaseError):
             return error.pgcode in ["40001", "40P01"]
diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py
index 522b905949..14203aa500 100644
--- a/synapse/storage/engines/sqlite3.py
+++ b/synapse/storage/engines/sqlite3.py
@@ -13,9 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.storage.prepare_database import (
-    prepare_database, prepare_sqlite3_database
-)
+from synapse.storage.prepare_database import prepare_database
 
 import struct
 
@@ -23,9 +21,8 @@ import struct
 class Sqlite3Engine(object):
     single_threaded = True
 
-    def __init__(self, database_module, config):
+    def __init__(self, database_module):
         self.module = database_module
-        self.config = config
 
     def check_database(self, txn):
         pass
@@ -34,13 +31,9 @@ class Sqlite3Engine(object):
         return sql
 
     def on_new_connection(self, db_conn):
-        self.prepare_database(db_conn)
+        prepare_database(db_conn, self, config=None)
         db_conn.create_function("rank", 1, _rank)
 
-    def prepare_database(self, db_conn):
-        prepare_sqlite3_database(db_conn)
-        prepare_database(db_conn, self, config=self.config)
-
     def is_deadlock(self, error):
         return False
 
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 3489315e0d..0827946207 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -163,6 +163,22 @@ class EventFederationStore(SQLBaseStore):
             room_id,
         )
 
+    @defer.inlineCallbacks
+    def get_max_depth_of_events(self, event_ids):
+        sql = (
+            "SELECT MAX(depth) FROM events WHERE event_id IN (%s)"
+        ) % (",".join(["?"] * len(event_ids)),)
+
+        rows = yield self._execute(
+            "get_max_depth_of_events", None,
+            sql, *event_ids
+        )
+
+        if rows:
+            defer.returnValue(rows[0][0])
+        else:
+            defer.returnValue(1)
+
     def _get_min_depth_interaction(self, txn, room_id):
         min_depth = self._simple_select_one_onecol_txn(
             txn,
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index dc5830450a..940e11d7a2 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -24,10 +24,15 @@ logger = logging.getLogger(__name__)
 
 
 class EventPushActionsStore(SQLBaseStore):
+    def __init__(self, hs):
+        self.stream_ordering_month_ago = None
+        super(EventPushActionsStore, self).__init__(hs)
+
     def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
         """
-        :param event: the event set actions for
-        :param tuples: list of tuples of (user_id, actions)
+        Args:
+            event: the event set actions for
+            tuples: list of tuples of (user_id, actions)
         """
         values = []
         for uid, actions in tuples:
@@ -99,6 +104,121 @@ class EventPushActionsStore(SQLBaseStore):
         )
         defer.returnValue(ret)
 
+    @defer.inlineCallbacks
+    def get_push_action_users_in_range(self, min_stream_ordering, max_stream_ordering):
+        def f(txn):
+            sql = (
+                "SELECT DISTINCT(user_id) FROM event_push_actions WHERE"
+                " stream_ordering >= ? AND stream_ordering <= ?"
+            )
+            txn.execute(sql, (min_stream_ordering, max_stream_ordering))
+            return [r[0] for r in txn.fetchall()]
+        ret = yield self.runInteraction("get_push_action_users_in_range", f)
+        defer.returnValue(ret)
+
+    @defer.inlineCallbacks
+    def get_unread_push_actions_for_user_in_range(self, user_id,
+                                                  min_stream_ordering,
+                                                  max_stream_ordering=None,
+                                                  limit=20):
+        def get_after_receipt(txn):
+            sql = (
+                "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, "
+                "e.received_ts "
+                "FROM ("
+                "   SELECT room_id, user_id, "
+                "       max(topological_ordering) as topological_ordering, "
+                "       max(stream_ordering) as stream_ordering "
+                "       FROM events"
+                "   NATURAL JOIN receipts_linearized WHERE receipt_type = 'm.read'"
+                "   GROUP BY room_id, user_id"
+                ") AS rl,"
+                " event_push_actions AS ep"
+                " INNER JOIN events AS e USING (room_id, event_id)"
+                " WHERE"
+                "   ep.room_id = rl.room_id"
+                "   AND ("
+                "       ep.topological_ordering > rl.topological_ordering"
+                "       OR ("
+                "           ep.topological_ordering = rl.topological_ordering"
+                "           AND ep.stream_ordering > rl.stream_ordering"
+                "       )"
+                "   )"
+                "   AND ep.stream_ordering > ?"
+                "   AND ep.user_id = ?"
+                "   AND ep.user_id = rl.user_id"
+            )
+            args = [min_stream_ordering, user_id]
+            if max_stream_ordering is not None:
+                sql += " AND ep.stream_ordering <= ?"
+                args.append(max_stream_ordering)
+            sql += " ORDER BY ep.stream_ordering ASC LIMIT ?"
+            args.append(limit)
+            txn.execute(sql, args)
+            return txn.fetchall()
+        after_read_receipt = yield self.runInteraction(
+            "get_unread_push_actions_for_user_in_range", get_after_receipt
+        )
+
+        def get_no_receipt(txn):
+            sql = (
+                "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
+                " e.received_ts"
+                " FROM event_push_actions AS ep"
+                " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id"
+                " WHERE ep.room_id not in ("
+                "   SELECT room_id FROM events NATURAL JOIN receipts_linearized"
+                "   WHERE receipt_type = 'm.read' AND user_id = ?"
+                "   GROUP BY room_id"
+                ") AND ep.user_id = ? AND ep.stream_ordering > ?"
+            )
+            args = [user_id, user_id, min_stream_ordering]
+            if max_stream_ordering is not None:
+                sql += " AND ep.stream_ordering <= ?"
+                args.append(max_stream_ordering)
+            sql += " ORDER BY ep.stream_ordering ASC"
+            txn.execute(sql, args)
+            return txn.fetchall()
+        no_read_receipt = yield self.runInteraction(
+            "get_unread_push_actions_for_user_in_range", get_no_receipt
+        )
+
+        defer.returnValue([
+            {
+                "event_id": row[0],
+                "room_id": row[1],
+                "stream_ordering": row[2],
+                "actions": json.loads(row[3]),
+                "received_ts": row[4],
+            } for row in after_read_receipt + no_read_receipt
+        ])
+
+    @defer.inlineCallbacks
+    def get_time_of_last_push_action_before(self, stream_ordering):
+        def f(txn):
+            sql = (
+                "SELECT e.received_ts"
+                " FROM event_push_actions AS ep"
+                " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id"
+                " WHERE ep.stream_ordering > ?"
+                " ORDER BY ep.stream_ordering ASC"
+                " LIMIT 1"
+            )
+            txn.execute(sql, (stream_ordering,))
+            return txn.fetchone()
+        result = yield self.runInteraction("get_time_of_last_push_action_before", f)
+        defer.returnValue(result[0] if result else None)
+
+    @defer.inlineCallbacks
+    def get_latest_push_action_stream_ordering(self):
+        def f(txn):
+            txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions")
+            return txn.fetchone()
+        result = yield self.runInteraction(
+            "get_latest_push_action_stream_ordering", f
+        )
+        defer.returnValue(result[0] or 0)
+
     def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id):
         # Sad that we have to blow away the cache for the whole room here
         txn.call_after(
@@ -110,6 +230,93 @@ class EventPushActionsStore(SQLBaseStore):
             (room_id, event_id)
         )
 
+    def _remove_old_push_actions_before_txn(self, txn, room_id, user_id,
+                                            topological_ordering):
+        """
+        Purges old, stale push actions for a user and room before a given
+        topological_ordering
+        Args:
+            txn: The transcation
+            room_id: Room ID to delete from
+            user_id: user ID to delete for
+            topological_ordering: The lowest topological ordering which will
+                                  not be deleted.
+        """
+        txn.call_after(
+            self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
+            (room_id, user_id, )
+        )
+
+        # We need to join on the events table to get the received_ts for
+        # event_push_actions and sqlite won't let us use a join in a delete so
+        # we can't just delete where received_ts < x. Furthermore we can
+        # only identify event_push_actions by a tuple of room_id, event_id
+        # we we can't use a subquery.
+        # Instead, we look up the stream ordering for the last event in that
+        # room received before the threshold time and delete event_push_actions
+        # in the room with a stream_odering before that.
+        txn.execute(
+            "DELETE FROM event_push_actions "
+            " WHERE user_id = ? AND room_id = ? AND "
+            " topological_ordering < ? AND stream_ordering < ?",
+            (user_id, room_id, topological_ordering, self.stream_ordering_month_ago)
+        )
+
+    @defer.inlineCallbacks
+    def _find_stream_orderings_for_times(self):
+        yield self.runInteraction(
+            "_find_stream_orderings_for_times",
+            self._find_stream_orderings_for_times_txn
+        )
+
+    def _find_stream_orderings_for_times_txn(self, txn):
+        logger.info("Searching for stream ordering 1 month ago")
+        self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn(
+            txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000
+        )
+        logger.info(
+            "Found stream ordering 1 month ago: it's %d",
+            self.stream_ordering_month_ago
+        )
+
+    def _find_first_stream_ordering_after_ts_txn(self, txn, ts):
+        """
+        Find the stream_ordering of the first event that was received after
+        a given timestamp. This is relatively slow as there is no index on
+        received_ts but we can then use this to delete push actions before
+        this.
+
+        received_ts must necessarily be in the same order as stream_ordering
+        and stream_ordering is indexed, so we manually binary search using
+        stream_ordering
+        """
+        txn.execute("SELECT MAX(stream_ordering) FROM events")
+        max_stream_ordering = txn.fetchone()[0]
+
+        if max_stream_ordering is None:
+            return 0
+
+        range_start = 0
+        range_end = max_stream_ordering
+
+        sql = (
+            "SELECT received_ts FROM events"
+            " WHERE stream_ordering > ?"
+            " ORDER BY stream_ordering"
+            " LIMIT 1"
+        )
+
+        while range_end - range_start > 1:
+            middle = int((range_end + range_start) / 2)
+            txn.execute(sql, (middle,))
+            middle_ts = txn.fetchone()[0]
+            if ts > middle_ts:
+                range_start = middle
+            else:
+                range_end = middle
+
+        return range_end
+
 
 def _action_has_highlight(actions):
     for action in actions:
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 5233430028..6d978ffcd5 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -19,12 +19,17 @@ from twisted.internet import defer, reactor
 from synapse.events import FrozenEvent, USE_FROZEN_DICTS
 from synapse.events.utils import prune_event
 
+from synapse.util.async import ObservableDeferred
 from synapse.util.logcontext import preserve_fn, PreserveLoggingContext
 from synapse.util.logutils import log_function
 from synapse.api.constants import EventTypes
 
 from canonicaljson import encode_canonical_json
-from contextlib import contextmanager
+from collections import deque, namedtuple
+
+import synapse
+import synapse.metrics
+
 
 import logging
 import math
@@ -33,6 +38,10 @@ import ujson as json
 logger = logging.getLogger(__name__)
 
 
+metrics = synapse.metrics.get_metrics_for(__name__)
+persist_event_counter = metrics.register_counter("persisted_events")
+
+
 def encode_json(json_object):
     if USE_FROZEN_DICTS:
         # ujson doesn't like frozen_dicts
@@ -50,76 +59,237 @@ EVENT_QUEUE_ITERATIONS = 3  # No. times we block waiting for requests for events
 EVENT_QUEUE_TIMEOUT_S = 0.1  # Timeout when waiting for requests for events
 
 
+class _EventPeristenceQueue(object):
+    """Queues up events so that they can be persisted in bulk with only one
+    concurrent transaction per room.
+    """
+
+    _EventPersistQueueItem = namedtuple("_EventPersistQueueItem", (
+        "events_and_contexts", "current_state", "backfilled", "deferred",
+    ))
+
+    def __init__(self):
+        self._event_persist_queues = {}
+        self._currently_persisting_rooms = set()
+
+    def add_to_queue(self, room_id, events_and_contexts, backfilled, current_state):
+        """Add events to the queue, with the given persist_event options.
+        """
+        queue = self._event_persist_queues.setdefault(room_id, deque())
+        if queue:
+            end_item = queue[-1]
+            if end_item.current_state or current_state:
+                # We perist events with current_state set to True one at a time
+                pass
+            if end_item.backfilled == backfilled:
+                end_item.events_and_contexts.extend(events_and_contexts)
+                return end_item.deferred.observe()
+
+        deferred = ObservableDeferred(defer.Deferred())
+
+        queue.append(self._EventPersistQueueItem(
+            events_and_contexts=events_and_contexts,
+            backfilled=backfilled,
+            current_state=current_state,
+            deferred=deferred,
+        ))
+
+        return deferred.observe()
+
+    def handle_queue(self, room_id, per_item_callback):
+        """Attempts to handle the queue for a room if not already being handled.
+
+        The given callback will be invoked with for each item in the queue,1
+        of type _EventPersistQueueItem. The per_item_callback will continuously
+        be called with new items, unless the queue becomnes empty. The return
+        value of the function will be given to the deferreds waiting on the item,
+        exceptions will be passed to the deferres as well.
+
+        This function should therefore be called whenever anything is added
+        to the queue.
+
+        If another callback is currently handling the queue then it will not be
+        invoked.
+        """
+
+        if room_id in self._currently_persisting_rooms:
+            return
+
+        self._currently_persisting_rooms.add(room_id)
+
+        @defer.inlineCallbacks
+        def handle_queue_loop():
+            try:
+                queue = self._get_drainining_queue(room_id)
+                for item in queue:
+                    try:
+                        ret = yield per_item_callback(item)
+                        item.deferred.callback(ret)
+                    except Exception as e:
+                        item.deferred.errback(e)
+            finally:
+                queue = self._event_persist_queues.pop(room_id, None)
+                if queue:
+                    self._event_persist_queues[room_id] = queue
+                self._currently_persisting_rooms.discard(room_id)
+
+        preserve_fn(handle_queue_loop)()
+
+    def _get_drainining_queue(self, room_id):
+        queue = self._event_persist_queues.setdefault(room_id, deque())
+
+        try:
+            while True:
+                yield queue.popleft()
+        except IndexError:
+            # Queue has been drained.
+            pass
+
+
+_EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))
+
+
 class EventsStore(SQLBaseStore):
     EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
 
     def __init__(self, hs):
         super(EventsStore, self).__init__(hs)
+        self._clock = hs.get_clock()
         self.register_background_update_handler(
             self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
         )
 
+        self._event_persist_queue = _EventPeristenceQueue()
+
+    def persist_events(self, events_and_contexts, backfilled=False):
+        """
+        Write events to the database
+        Args:
+            events_and_contexts: list of tuples of (event, context)
+            backfilled: ?
+        """
+        partitioned = {}
+        for event, ctx in events_and_contexts:
+            partitioned.setdefault(event.room_id, []).append((event, ctx))
+
+        deferreds = []
+        for room_id, evs_ctxs in partitioned.items():
+            d = self._event_persist_queue.add_to_queue(
+                room_id, evs_ctxs,
+                backfilled=backfilled,
+                current_state=None,
+            )
+            deferreds.append(d)
+
+        for room_id in partitioned.keys():
+            self._maybe_start_persisting(room_id)
+
+        return defer.gatherResults(deferreds, consumeErrors=True)
+
     @defer.inlineCallbacks
-    def persist_events(self, events_and_contexts, backfilled=False,
-                       is_new_state=True):
+    @log_function
+    def persist_event(self, event, context, current_state=None, backfilled=False):
+        deferred = self._event_persist_queue.add_to_queue(
+            event.room_id, [(event, context)],
+            backfilled=backfilled,
+            current_state=current_state,
+        )
+
+        self._maybe_start_persisting(event.room_id)
+
+        yield deferred
+
+        max_persisted_id = yield self._stream_id_gen.get_current_token()
+        defer.returnValue((event.internal_metadata.stream_ordering, max_persisted_id))
+
+    def _maybe_start_persisting(self, room_id):
+        @defer.inlineCallbacks
+        def persisting_queue(item):
+            if item.current_state:
+                for event, context in item.events_and_contexts:
+                    # There should only ever be one item in
+                    # events_and_contexts when current_state is
+                    # not None
+                    yield self._persist_event(
+                        event, context,
+                        current_state=item.current_state,
+                        backfilled=item.backfilled,
+                    )
+            else:
+                yield self._persist_events(
+                    item.events_and_contexts,
+                    backfilled=item.backfilled,
+                )
+
+        self._event_persist_queue.handle_queue(room_id, persisting_queue)
+
+    @defer.inlineCallbacks
+    def _persist_events(self, events_and_contexts, backfilled=False):
         if not events_and_contexts:
             return
 
         if backfilled:
-            start = self.min_stream_token - 1
-            self.min_stream_token -= len(events_and_contexts) + 1
-            stream_orderings = range(start, self.min_stream_token, -1)
-
-            @contextmanager
-            def stream_ordering_manager():
-                yield stream_orderings
-            stream_ordering_manager = stream_ordering_manager()
+            stream_ordering_manager = self._backfill_id_gen.get_next_mult(
+                len(events_and_contexts)
+            )
         else:
             stream_ordering_manager = self._stream_id_gen.get_next_mult(
                 len(events_and_contexts)
             )
 
+        state_group_id_manager = self._state_groups_id_gen.get_next_mult(
+            len(events_and_contexts)
+        )
         with stream_ordering_manager as stream_orderings:
-            for (event, _), stream in zip(events_and_contexts, stream_orderings):
-                event.internal_metadata.stream_ordering = stream
-
-            chunks = [
-                events_and_contexts[x:x + 100]
-                for x in xrange(0, len(events_and_contexts), 100)
-            ]
-
-            for chunk in chunks:
-                # We can't easily parallelize these since different chunks
-                # might contain the same event. :(
-                yield self.runInteraction(
-                    "persist_events",
-                    self._persist_events_txn,
-                    events_and_contexts=chunk,
-                    backfilled=backfilled,
-                    is_new_state=is_new_state,
-                )
+            with state_group_id_manager as state_group_ids:
+                for (event, context), stream, state_group_id in zip(
+                    events_and_contexts, stream_orderings, state_group_ids
+                ):
+                    event.internal_metadata.stream_ordering = stream
+                    # Assign a state group_id in case a new id is needed for
+                    # this context. In theory we only need to assign this
+                    # for contexts that have current_state and aren't outliers
+                    # but that make the code more complicated. Assigning an ID
+                    # per event only causes the state_group_ids to grow as fast
+                    # as the stream_ordering so in practise shouldn't be a problem.
+                    context.new_state_group_id = state_group_id
+
+                chunks = [
+                    events_and_contexts[x:x + 100]
+                    for x in xrange(0, len(events_and_contexts), 100)
+                ]
+
+                for chunk in chunks:
+                    # We can't easily parallelize these since different chunks
+                    # might contain the same event. :(
+                    yield self.runInteraction(
+                        "persist_events",
+                        self._persist_events_txn,
+                        events_and_contexts=chunk,
+                        backfilled=backfilled,
+                    )
+                    persist_event_counter.inc_by(len(chunk))
 
     @defer.inlineCallbacks
     @log_function
-    def persist_event(self, event, context,
-                      is_new_state=True, current_state=None):
+    def _persist_event(self, event, context, current_state=None, backfilled=False):
         try:
             with self._stream_id_gen.get_next() as stream_ordering:
-                event.internal_metadata.stream_ordering = stream_ordering
-                yield self.runInteraction(
-                    "persist_event",
-                    self._persist_event_txn,
-                    event=event,
-                    context=context,
-                    is_new_state=is_new_state,
-                    current_state=current_state,
-                )
+                with self._state_groups_id_gen.get_next() as state_group_id:
+                    event.internal_metadata.stream_ordering = stream_ordering
+                    context.new_state_group_id = state_group_id
+                    yield self.runInteraction(
+                        "persist_event",
+                        self._persist_event_txn,
+                        event=event,
+                        context=context,
+                        current_state=current_state,
+                        backfilled=backfilled,
+                    )
+                    persist_event_counter.inc()
         except _RollbackButIsFineException:
             pass
 
-        max_persisted_id = yield self._stream_id_gen.get_max_token()
-        defer.returnValue((stream_ordering, max_persisted_id))
-
     @defer.inlineCallbacks
     def get_event(self, event_id, check_redacted=True,
                   get_prev_content=False, allow_rejected=False,
@@ -177,8 +347,7 @@ class EventsStore(SQLBaseStore):
         defer.returnValue({e.event_id: e for e in events})
 
     @log_function
-    def _persist_event_txn(self, txn, event, context,
-                           is_new_state=True, current_state=None):
+    def _persist_event_txn(self, txn, event, context, current_state, backfilled=False):
         # We purposefully do this first since if we include a `current_state`
         # key, we *want* to update the `current_state_events` table
         if current_state:
@@ -186,7 +355,16 @@ class EventsStore(SQLBaseStore):
             txn.call_after(self.get_rooms_for_user.invalidate_all)
             txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
             txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
-            txn.call_after(self.get_room_name_and_aliases, event.room_id)
+            txn.call_after(self.get_room_name_and_aliases.invalidate, (event.room_id,))
+
+            # Add an entry to the current_state_resets table to record the point
+            # where we clobbered the current state
+            stream_order = event.internal_metadata.stream_ordering
+            self._simple_insert_txn(
+                txn,
+                table="current_state_resets",
+                values={"event_stream_ordering": stream_order}
+            )
 
             self._simple_delete_txn(
                 txn,
@@ -209,13 +387,11 @@ class EventsStore(SQLBaseStore):
         return self._persist_events_txn(
             txn,
             [(event, context)],
-            backfilled=False,
-            is_new_state=is_new_state,
+            backfilled=backfilled,
         )
 
     @log_function
-    def _persist_events_txn(self, txn, events_and_contexts, backfilled,
-                            is_new_state=True):
+    def _persist_events_txn(self, txn, events_and_contexts, backfilled):
         depth_updates = {}
         for event, context in events_and_contexts:
             # Remove the any existing cache entries for the event_ids
@@ -282,9 +458,7 @@ class EventsStore(SQLBaseStore):
 
             outlier_persisted = have_persisted[event.event_id]
             if not event.internal_metadata.is_outlier() and outlier_persisted:
-                self._store_state_groups_txn(
-                    txn, event, context,
-                )
+                self._store_mult_state_groups_txn(txn, ((event, context),))
 
                 metadata_json = encode_json(
                     event.internal_metadata.get_dict()
@@ -299,6 +473,18 @@ class EventsStore(SQLBaseStore):
                     (metadata_json, event.event_id,)
                 )
 
+                stream_order = event.internal_metadata.stream_ordering
+                state_group_id = context.state_group or context.new_state_group_id
+                self._simple_insert_txn(
+                    txn,
+                    table="ex_outlier_stream",
+                    values={
+                        "event_stream_ordering": stream_order,
+                        "event_id": event.event_id,
+                        "state_group": state_group_id,
+                    }
+                )
+
                 sql = (
                     "UPDATE events SET outlier = ?"
                     " WHERE event_id = ?"
@@ -310,19 +496,14 @@ class EventsStore(SQLBaseStore):
 
                 self._update_extremeties(txn, [event])
 
-        events_and_contexts = filter(
-            lambda ec: ec[0] not in to_remove,
-            events_and_contexts
-        )
+        events_and_contexts = [
+            ec for ec in events_and_contexts if ec[0] not in to_remove
+        ]
 
         if not events_and_contexts:
             return
 
-        self._store_mult_state_groups_txn(txn, [
-            (event, context)
-            for event, context in events_and_contexts
-            if not event.internal_metadata.is_outlier()
-        ])
+        self._store_mult_state_groups_txn(txn, events_and_contexts)
 
         self._handle_mult_prev_events(
             txn,
@@ -349,7 +530,8 @@ class EventsStore(SQLBaseStore):
                 event
                 for event, _ in events_and_contexts
                 if event.type == EventTypes.Member
-            ]
+            ],
+            backfilled=backfilled,
         )
 
         def event_dict(event):
@@ -393,6 +575,7 @@ class EventsStore(SQLBaseStore):
                     "outlier": event.internal_metadata.is_outlier(),
                     "content": encode_json(event.content).decode("UTF-8"),
                     "origin_server_ts": int(event.origin_server_ts),
+                    "received_ts": self._clock.time_msec(),
                 }
                 for event, _ in events_and_contexts
             ],
@@ -421,10 +604,9 @@ class EventsStore(SQLBaseStore):
             txn, [event for event, _ in events_and_contexts]
         )
 
-        state_events_and_contexts = filter(
-            lambda i: i[0].is_state(),
-            events_and_contexts,
-        )
+        state_events_and_contexts = [
+            ec for ec in events_and_contexts if ec[0].is_state()
+        ]
 
         state_values = []
         for event, context in state_events_and_contexts:
@@ -462,35 +644,88 @@ class EventsStore(SQLBaseStore):
             ],
         )
 
-        if is_new_state:
-            for event, _ in state_events_and_contexts:
-                if not context.rejected:
-                    txn.call_after(
-                        self._get_current_state_for_key.invalidate,
-                        (event.room_id, event.type, event.state_key,)
-                    )
+        self._add_to_cache(txn, events_and_contexts)
 
-                    if event.type in [EventTypes.Name, EventTypes.Aliases]:
-                        txn.call_after(
-                            self.get_room_name_and_aliases.invalidate,
-                            (event.room_id,)
-                        )
+        if backfilled:
+            # Backfilled events come before the current state so we don't need
+            # to update the current state table
+            return
 
-                    self._simple_upsert_txn(
-                        txn,
-                        "current_state_events",
-                        keyvalues={
-                            "room_id": event.room_id,
-                            "type": event.type,
-                            "state_key": event.state_key,
-                        },
-                        values={
-                            "event_id": event.event_id,
-                        }
-                    )
+        for event, _ in state_events_and_contexts:
+            if event.internal_metadata.is_outlier():
+                # Outlier events shouldn't clobber the current state.
+                continue
+
+            if context.rejected:
+                # If the event failed it's auth checks then it shouldn't
+                # clobbler the current state.
+                continue
+
+            txn.call_after(
+                self._get_current_state_for_key.invalidate,
+                (event.room_id, event.type, event.state_key,)
+            )
+
+            if event.type in [EventTypes.Name, EventTypes.Aliases]:
+                txn.call_after(
+                    self.get_room_name_and_aliases.invalidate,
+                    (event.room_id,)
+                )
+
+            self._simple_upsert_txn(
+                txn,
+                "current_state_events",
+                keyvalues={
+                    "room_id": event.room_id,
+                    "type": event.type,
+                    "state_key": event.state_key,
+                },
+                values={
+                    "event_id": event.event_id,
+                }
+            )
 
         return
 
+    def _add_to_cache(self, txn, events_and_contexts):
+        to_prefill = []
+
+        rows = []
+        N = 200
+        for i in range(0, len(events_and_contexts), N):
+            ev_map = {
+                e[0].event_id: e[0]
+                for e in events_and_contexts[i:i + N]
+            }
+            if not ev_map:
+                break
+
+            sql = (
+                "SELECT "
+                " e.event_id as event_id, "
+                " r.redacts as redacts,"
+                " rej.event_id as rejects "
+                " FROM events as e"
+                " LEFT JOIN rejections as rej USING (event_id)"
+                " LEFT JOIN redactions as r ON e.event_id = r.redacts"
+                " WHERE e.event_id IN (%s)"
+            ) % (",".join(["?"] * len(ev_map)),)
+
+            txn.execute(sql, ev_map.keys())
+            rows = self.cursor_to_dict(txn)
+            for row in rows:
+                event = ev_map[row["event_id"]]
+                if not row["rejects"] and not row["redacts"]:
+                    to_prefill.append(_EventCacheEntry(
+                        event=event,
+                        redacted_event=None,
+                    ))
+
+        def prefill():
+            for cache_entry in to_prefill:
+                self._get_event_cache.prefill((cache_entry[0].event_id,), cache_entry)
+        txn.call_after(prefill)
+
     def _store_redaction(self, txn, event):
         # invalidate the cache for the redacted event
         txn.call_after(self._invalidate_get_event_cache, event.redacts)
@@ -499,6 +734,22 @@ class EventsStore(SQLBaseStore):
             (event.event_id, event.redacts)
         )
 
+    @defer.inlineCallbacks
+    def have_events_in_timeline(self, event_ids):
+        """Given a list of event ids, check if we have already processed and
+        stored them as non outliers.
+        """
+        rows = yield self._simple_select_many_batch(
+            table="events",
+            retcols=("event_id",),
+            column="event_id",
+            iterable=list(event_ids),
+            keyvalues={"outlier": False},
+            desc="have_events_in_timeline",
+        )
+
+        defer.returnValue(set(r["event_id"] for r in rows))
+
     def have_events(self, event_ids):
         """Given a list of event ids, check if we have already processed them.
 
@@ -540,100 +791,65 @@ class EventsStore(SQLBaseStore):
         event_id_list = event_ids
         event_ids = set(event_ids)
 
-        event_map = self._get_events_from_cache(
+        event_entry_map = self._get_events_from_cache(
             event_ids,
-            check_redacted=check_redacted,
-            get_prev_content=get_prev_content,
             allow_rejected=allow_rejected,
         )
 
-        missing_events_ids = [e for e in event_ids if e not in event_map]
+        missing_events_ids = [e for e in event_ids if e not in event_entry_map]
 
         if missing_events_ids:
             missing_events = yield self._enqueue_events(
                 missing_events_ids,
                 check_redacted=check_redacted,
-                get_prev_content=get_prev_content,
                 allow_rejected=allow_rejected,
             )
 
-            event_map.update(missing_events)
-
-        defer.returnValue([
-            event_map[e_id] for e_id in event_id_list
-            if e_id in event_map and event_map[e_id]
-        ])
-
-    def _get_events_txn(self, txn, event_ids, check_redacted=True,
-                        get_prev_content=False, allow_rejected=False):
-        if not event_ids:
-            return []
-
-        event_map = self._get_events_from_cache(
-            event_ids,
-            check_redacted=check_redacted,
-            get_prev_content=get_prev_content,
-            allow_rejected=allow_rejected,
-        )
+            event_entry_map.update(missing_events)
 
-        missing_events_ids = [e for e in event_ids if e not in event_map]
+        events = []
+        for event_id in event_id_list:
+            entry = event_entry_map.get(event_id, None)
+            if not entry:
+                continue
 
-        if not missing_events_ids:
-            return [
-                event_map[e_id] for e_id in event_ids
-                if e_id in event_map and event_map[e_id]
-            ]
+            if allow_rejected or not entry.event.rejected_reason:
+                if check_redacted and entry.redacted_event:
+                    event = entry.redacted_event
+                else:
+                    event = entry.event
 
-        missing_events = self._fetch_events_txn(
-            txn,
-            missing_events_ids,
-            check_redacted=check_redacted,
-            get_prev_content=get_prev_content,
-            allow_rejected=allow_rejected,
-        )
+                events.append(event)
 
-        event_map.update(missing_events)
+                if get_prev_content:
+                    if "replaces_state" in event.unsigned:
+                        prev = yield self.get_event(
+                            event.unsigned["replaces_state"],
+                            get_prev_content=False,
+                            allow_none=True,
+                        )
+                        if prev:
+                            event.unsigned = dict(event.unsigned)
+                            event.unsigned["prev_content"] = prev.content
+                            event.unsigned["prev_sender"] = prev.sender
 
-        return [
-            event_map[e_id] for e_id in event_ids
-            if e_id in event_map and event_map[e_id]
-        ]
+        defer.returnValue(events)
 
     def _invalidate_get_event_cache(self, event_id):
-        for check_redacted in (False, True):
-            for get_prev_content in (False, True):
-                self._get_event_cache.invalidate(
-                    (event_id, check_redacted, get_prev_content)
-                )
+            self._get_event_cache.invalidate((event_id,))
 
-    def _get_event_txn(self, txn, event_id, check_redacted=True,
-                       get_prev_content=False, allow_rejected=False):
-
-        events = self._get_events_txn(
-            txn, [event_id],
-            check_redacted=check_redacted,
-            get_prev_content=get_prev_content,
-            allow_rejected=allow_rejected,
-        )
-
-        return events[0] if events else None
-
-    def _get_events_from_cache(self, events, check_redacted, get_prev_content,
-                               allow_rejected):
+    def _get_events_from_cache(self, events, allow_rejected):
         event_map = {}
 
         for event_id in events:
-            try:
-                ret = self._get_event_cache.get(
-                    (event_id, check_redacted, get_prev_content,)
-                )
+            ret = self._get_event_cache.get((event_id,), None)
+            if not ret:
+                continue
 
-                if allow_rejected or not ret.rejected_reason:
-                    event_map[event_id] = ret
-                else:
-                    event_map[event_id] = None
-            except KeyError:
-                pass
+            if allow_rejected or not ret.event.rejected_reason:
+                event_map[event_id] = ret
+            else:
+                event_map[event_id] = None
 
         return event_map
 
@@ -704,8 +920,7 @@ class EventsStore(SQLBaseStore):
                         reactor.callFromThread(fire, event_list)
 
     @defer.inlineCallbacks
-    def _enqueue_events(self, events, check_redacted=True,
-                        get_prev_content=False, allow_rejected=False):
+    def _enqueue_events(self, events, check_redacted=True, allow_rejected=False):
         """Fetches events from the database using the _event_fetch_list. This
         allows batch and bulk fetching of events - it allows us to fetch events
         without having to create a new transaction for each request for events.
@@ -743,8 +958,6 @@ class EventsStore(SQLBaseStore):
             [
                 preserve_fn(self._get_event_from_row)(
                     row["internal_metadata"], row["json"], row["redacts"],
-                    check_redacted=check_redacted,
-                    get_prev_content=get_prev_content,
                     rejected_reason=row["rejects"],
                 )
                 for row in rows
@@ -753,7 +966,7 @@ class EventsStore(SQLBaseStore):
         )
 
         defer.returnValue({
-            e.event_id: e
+            e.event.event_id: e
             for e in res if e
         })
 
@@ -783,37 +996,8 @@ class EventsStore(SQLBaseStore):
 
         return rows
 
-    def _fetch_events_txn(self, txn, events, check_redacted=True,
-                          get_prev_content=False, allow_rejected=False):
-        if not events:
-            return {}
-
-        rows = self._fetch_event_rows(
-            txn, events,
-        )
-
-        if not allow_rejected:
-            rows[:] = [r for r in rows if not r["rejects"]]
-
-        res = [
-            self._get_event_from_row_txn(
-                txn,
-                row["internal_metadata"], row["json"], row["redacts"],
-                check_redacted=check_redacted,
-                get_prev_content=get_prev_content,
-                rejected_reason=row["rejects"],
-            )
-            for row in rows
-        ]
-
-        return {
-            r.event_id: r
-            for r in res
-        }
-
     @defer.inlineCallbacks
     def _get_event_from_row(self, internal_metadata, js, redacted,
-                            check_redacted=True, get_prev_content=False,
                             rejected_reason=None):
         d = json.loads(js)
         internal_metadata = json.loads(internal_metadata)
@@ -823,26 +1007,27 @@ class EventsStore(SQLBaseStore):
                 table="rejections",
                 keyvalues={"event_id": rejected_reason},
                 retcol="reason",
-                desc="_get_event_from_row",
+                desc="_get_event_from_row_rejected_reason",
             )
 
-        ev = FrozenEvent(
+        original_ev = FrozenEvent(
             d,
             internal_metadata_dict=internal_metadata,
             rejected_reason=rejected_reason,
         )
 
-        if check_redacted and redacted:
-            ev = prune_event(ev)
+        redacted_event = None
+        if redacted:
+            redacted_event = prune_event(original_ev)
 
             redaction_id = yield self._simple_select_one_onecol(
                 table="redactions",
-                keyvalues={"redacts": ev.event_id},
+                keyvalues={"redacts": redacted_event.event_id},
                 retcol="event_id",
-                desc="_get_event_from_row",
+                desc="_get_event_from_row_redactions",
             )
 
-            ev.unsigned["redacted_by"] = redaction_id
+            redacted_event.unsigned["redacted_by"] = redaction_id
             # Get the redaction event.
 
             because = yield self.get_event(
@@ -854,86 +1039,16 @@ class EventsStore(SQLBaseStore):
             if because:
                 # It's fine to do add the event directly, since get_pdu_json
                 # will serialise this field correctly
-                ev.unsigned["redacted_because"] = because
-
-        if get_prev_content and "replaces_state" in ev.unsigned:
-            prev = yield self.get_event(
-                ev.unsigned["replaces_state"],
-                get_prev_content=False,
-                allow_none=True,
-            )
-            if prev:
-                ev.unsigned["prev_content"] = prev.content
-                ev.unsigned["prev_sender"] = prev.sender
+                redacted_event.unsigned["redacted_because"] = because
 
-        self._get_event_cache.prefill(
-            (ev.event_id, check_redacted, get_prev_content), ev
+        cache_entry = _EventCacheEntry(
+            event=original_ev,
+            redacted_event=redacted_event,
         )
 
-        defer.returnValue(ev)
+        self._get_event_cache.prefill((original_ev.event_id,), cache_entry)
 
-    def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted,
-                                check_redacted=True, get_prev_content=False,
-                                rejected_reason=None):
-        d = json.loads(js)
-        internal_metadata = json.loads(internal_metadata)
-
-        if rejected_reason:
-            rejected_reason = self._simple_select_one_onecol_txn(
-                txn,
-                table="rejections",
-                keyvalues={"event_id": rejected_reason},
-                retcol="reason",
-            )
-
-        ev = FrozenEvent(
-            d,
-            internal_metadata_dict=internal_metadata,
-            rejected_reason=rejected_reason,
-        )
-
-        if check_redacted and redacted:
-            ev = prune_event(ev)
-
-            redaction_id = self._simple_select_one_onecol_txn(
-                txn,
-                table="redactions",
-                keyvalues={"redacts": ev.event_id},
-                retcol="event_id",
-            )
-
-            ev.unsigned["redacted_by"] = redaction_id
-            # Get the redaction event.
-
-            because = self._get_event_txn(
-                txn,
-                redaction_id,
-                check_redacted=False
-            )
-
-            if because:
-                ev.unsigned["redacted_because"] = because
-
-        if get_prev_content and "replaces_state" in ev.unsigned:
-            prev = self._get_event_txn(
-                txn,
-                ev.unsigned["replaces_state"],
-                get_prev_content=False,
-            )
-            if prev:
-                ev.unsigned["prev_content"] = prev.content
-                ev.unsigned["prev_sender"] = prev.sender
-
-        self._get_event_cache.prefill(
-            (ev.event_id, check_redacted, get_prev_content), ev
-        )
-
-        return ev
-
-    def _parse_events_txn(self, txn, rows):
-        event_ids = [r["event_id"] for r in rows]
-
-        return self._get_events_txn(txn, event_ids)
+        defer.returnValue(cache_entry)
 
     @defer.inlineCallbacks
     def count_daily_messages(self):
@@ -1076,45 +1191,106 @@ class EventsStore(SQLBaseStore):
 
     def get_current_backfill_token(self):
         """The current minimum token that backfilled events have reached"""
-
-        # TODO: Fix race with the persit_event txn by using one of the
-        # stream id managers
-        return -self.min_stream_token
+        return -self._backfill_id_gen.get_current_token()
 
     def get_all_new_events(self, last_backfill_id, last_forward_id,
                            current_backfill_id, current_forward_id, limit):
         """Get all the new events that have arrived at the server either as
         new events or as backfilled events"""
+        have_backfill_events = last_backfill_id != current_backfill_id
+        have_forward_events = last_forward_id != current_forward_id
+
+        if not have_backfill_events and not have_forward_events:
+            return defer.succeed(AllNewEventsResult([], [], [], [], []))
+
         def get_all_new_events_txn(txn):
             sql = (
-                "SELECT e.stream_ordering, ej.internal_metadata, ej.json"
+                "SELECT e.stream_ordering, ej.internal_metadata, ej.json, eg.state_group"
                 " FROM events as e"
                 " JOIN event_json as ej"
                 " ON e.event_id = ej.event_id AND e.room_id = ej.room_id"
+                " LEFT JOIN event_to_state_groups as eg"
+                " ON e.event_id = eg.event_id"
                 " WHERE ? < e.stream_ordering AND e.stream_ordering <= ?"
                 " ORDER BY e.stream_ordering ASC"
                 " LIMIT ?"
             )
-            if last_forward_id != current_forward_id:
+            if have_forward_events:
                 txn.execute(sql, (last_forward_id, current_forward_id, limit))
                 new_forward_events = txn.fetchall()
+
+                if len(new_forward_events) == limit:
+                    upper_bound = new_forward_events[-1][0]
+                else:
+                    upper_bound = current_forward_id
+
+                sql = (
+                    "SELECT event_stream_ordering FROM current_state_resets"
+                    " WHERE ? < event_stream_ordering"
+                    " AND event_stream_ordering <= ?"
+                    " ORDER BY event_stream_ordering ASC"
+                )
+                txn.execute(sql, (last_forward_id, upper_bound))
+                state_resets = txn.fetchall()
+
+                sql = (
+                    "SELECT event_stream_ordering, event_id, state_group"
+                    " FROM ex_outlier_stream"
+                    " WHERE ? > event_stream_ordering"
+                    " AND event_stream_ordering >= ?"
+                    " ORDER BY event_stream_ordering DESC"
+                )
+                txn.execute(sql, (last_forward_id, upper_bound))
+                forward_ex_outliers = txn.fetchall()
             else:
                 new_forward_events = []
+                state_resets = []
+                forward_ex_outliers = []
 
             sql = (
-                "SELECT -e.stream_ordering, ej.internal_metadata, ej.json"
+                "SELECT -e.stream_ordering, ej.internal_metadata, ej.json,"
+                " eg.state_group"
                 " FROM events as e"
                 " JOIN event_json as ej"
                 " ON e.event_id = ej.event_id AND e.room_id = ej.room_id"
+                " LEFT JOIN event_to_state_groups as eg"
+                " ON e.event_id = eg.event_id"
                 " WHERE ? > e.stream_ordering AND e.stream_ordering >= ?"
                 " ORDER BY e.stream_ordering DESC"
                 " LIMIT ?"
             )
-            if last_backfill_id != current_backfill_id:
+            if have_backfill_events:
                 txn.execute(sql, (-last_backfill_id, -current_backfill_id, limit))
                 new_backfill_events = txn.fetchall()
+
+                if len(new_backfill_events) == limit:
+                    upper_bound = new_backfill_events[-1][0]
+                else:
+                    upper_bound = current_backfill_id
+
+                sql = (
+                    "SELECT -event_stream_ordering, event_id, state_group"
+                    " FROM ex_outlier_stream"
+                    " WHERE ? > event_stream_ordering"
+                    " AND event_stream_ordering >= ?"
+                    " ORDER BY event_stream_ordering DESC"
+                )
+                txn.execute(sql, (-last_backfill_id, -upper_bound))
+                backward_ex_outliers = txn.fetchall()
             else:
                 new_backfill_events = []
+                backward_ex_outliers = []
 
-            return (new_forward_events, new_backfill_events)
+            return AllNewEventsResult(
+                new_forward_events, new_backfill_events,
+                forward_ex_outliers, backward_ex_outliers,
+                state_resets,
+            )
         return self.runInteraction("get_all_new_events", get_all_new_events_txn)
+
+
+AllNewEventsResult = namedtuple("AllNewEventsResult", [
+    "new_forward_events", "new_backfill_events",
+    "forward_ex_outliers", "backward_ex_outliers",
+    "state_resets"
+])
diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py
index 9d3ba32478..a820fcf07f 100644
--- a/synapse/storage/media_repository.py
+++ b/synapse/storage/media_repository.py
@@ -25,7 +25,7 @@ class MediaRepositoryStore(SQLBaseStore):
     def get_local_media(self, media_id):
         """Get the metadata for a local piece of media
         Returns:
-            None if the meia_id doesn't exist.
+            None if the media_id doesn't exist.
         """
         return self._simple_select_one(
             "local_media_repository",
@@ -50,6 +50,61 @@ class MediaRepositoryStore(SQLBaseStore):
             desc="store_local_media",
         )
 
+    def get_url_cache(self, url, ts):
+        """Get the media_id and ts for a cached URL as of the given timestamp
+        Returns:
+            None if the URL isn't cached.
+        """
+        def get_url_cache_txn(txn):
+            # get the most recently cached result (relative to the given ts)
+            sql = (
+                "SELECT response_code, etag, expires, og, media_id, download_ts"
+                " FROM local_media_repository_url_cache"
+                " WHERE url = ? AND download_ts <= ?"
+                " ORDER BY download_ts DESC LIMIT 1"
+            )
+            txn.execute(sql, (url, ts))
+            row = txn.fetchone()
+
+            if not row:
+                # ...or if we've requested a timestamp older than the oldest
+                # copy in the cache, return the oldest copy (if any)
+                sql = (
+                    "SELECT response_code, etag, expires, og, media_id, download_ts"
+                    " FROM local_media_repository_url_cache"
+                    " WHERE url = ? AND download_ts > ?"
+                    " ORDER BY download_ts ASC LIMIT 1"
+                )
+                txn.execute(sql, (url, ts))
+                row = txn.fetchone()
+
+            if not row:
+                return None
+
+            return dict(zip((
+                'response_code', 'etag', 'expires', 'og', 'media_id', 'download_ts'
+            ), row))
+
+        return self.runInteraction(
+            "get_url_cache", get_url_cache_txn
+        )
+
+    def store_url_cache(self, url, response_code, etag, expires, og, media_id,
+                        download_ts):
+        return self._simple_insert(
+            "local_media_repository_url_cache",
+            {
+                "url": url,
+                "response_code": response_code,
+                "etag": etag,
+                "expires": expires,
+                "og": og,
+                "media_id": media_id,
+                "download_ts": download_ts,
+            },
+            desc="store_url_cache",
+        )
+
     def get_local_media_thumbnails(self, media_id):
         return self._simple_select_list(
             "local_media_repository_thumbnails",
diff --git a/synapse/storage/openid.py b/synapse/storage/openid.py
new file mode 100644
index 0000000000..5dabb607bd
--- /dev/null
+++ b/synapse/storage/openid.py
@@ -0,0 +1,32 @@
+from ._base import SQLBaseStore
+
+
+class OpenIdStore(SQLBaseStore):
+    def insert_open_id_token(self, token, ts_valid_until_ms, user_id):
+        return self._simple_insert(
+            table="open_id_tokens",
+            values={
+                "token": token,
+                "ts_valid_until_ms": ts_valid_until_ms,
+                "user_id": user_id,
+            },
+            desc="insert_open_id_token"
+        )
+
+    def get_user_id_for_open_id_token(self, token, ts_now_ms):
+        def get_user_id_for_token_txn(txn):
+            sql = (
+                "SELECT user_id FROM open_id_tokens"
+                " WHERE token = ? AND ? <= ts_valid_until_ms"
+            )
+
+            txn.execute(sql, (token, ts_now_ms))
+
+            rows = txn.fetchall()
+            if not rows:
+                return None
+            else:
+                return rows[0][0]
+        return self.runInteraction(
+            "get_user_id_for_token", get_user_id_for_token_txn
+        )
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 3f29aad1e8..c8487c8838 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -25,23 +25,11 @@ logger = logging.getLogger(__name__)
 
 # Remember to update this number every time a change is made to database
 # schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 30
+SCHEMA_VERSION = 32
 
 dir_path = os.path.abspath(os.path.dirname(__file__))
 
 
-def read_schema(path):
-    """ Read the named database schema.
-
-    Args:
-        path: Path of the database schema.
-    Returns:
-        A string containing the database schema.
-    """
-    with open(path) as schema_file:
-        return schema_file.read()
-
-
 class PrepareDatabaseException(Exception):
     pass
 
@@ -53,6 +41,9 @@ class UpgradeDatabaseException(PrepareDatabaseException):
 def prepare_database(db_conn, database_engine, config):
     """Prepares a database for usage. Will either create all necessary tables
     or upgrade from an older schema version.
+
+    If `config` is None then prepare_database will assert that no upgrade is
+    necessary, *or* will create a fresh database if the database is empty.
     """
     try:
         cur = db_conn.cursor()
@@ -60,13 +51,18 @@ def prepare_database(db_conn, database_engine, config):
 
         if version_info:
             user_version, delta_files, upgraded = version_info
-            _upgrade_existing_database(
-                cur, user_version, delta_files, upgraded, database_engine, config
-            )
-        else:
-            _setup_new_database(cur, database_engine, config)
 
-        # cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,))
+            if config is None:
+                if user_version != SCHEMA_VERSION:
+                    # If we don't pass in a config file then we are expecting to
+                    # have already upgraded the DB.
+                    raise UpgradeDatabaseException("Database needs to be upgraded")
+            else:
+                _upgrade_existing_database(
+                    cur, user_version, delta_files, upgraded, database_engine, config
+                )
+        else:
+            _setup_new_database(cur, database_engine)
 
         cur.close()
         db_conn.commit()
@@ -75,7 +71,7 @@ def prepare_database(db_conn, database_engine, config):
         raise
 
 
-def _setup_new_database(cur, database_engine, config):
+def _setup_new_database(cur, database_engine):
     """Sets up the database by finding a base set of "full schemas" and then
     applying any necessary deltas.
 
@@ -148,12 +144,13 @@ def _setup_new_database(cur, database_engine, config):
         applied_delta_files=[],
         upgraded=False,
         database_engine=database_engine,
-        config=config,
+        config=None,
+        is_empty=True,
     )
 
 
 def _upgrade_existing_database(cur, current_version, applied_delta_files,
-                               upgraded, database_engine, config):
+                               upgraded, database_engine, config, is_empty=False):
     """Upgrades an existing database.
 
     Delta files can either be SQL stored in *.sql files, or python modules
@@ -246,7 +243,9 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
                         module_name, absolute_path, python_file
                     )
                 logger.debug("Running script %s", relative_path)
-                module.run_upgrade(cur, database_engine, config=config)
+                module.run_create(cur, database_engine)
+                if not is_empty:
+                    module.run_upgrade(cur, database_engine, config=config)
             elif ext == ".pyc":
                 # Sometimes .pyc files turn up anyway even though we've
                 # disabled their generation; e.g. from distribution package
@@ -361,36 +360,3 @@ def _get_or_create_schema_state(txn, database_engine):
         return current_version, applied_deltas, upgraded
 
     return None
-
-
-def prepare_sqlite3_database(db_conn):
-    """This function should be called before `prepare_database` on sqlite3
-    databases.
-
-    Since we changed the way we store the current schema version and handle
-    updates to schemas, we need a way to upgrade from the old method to the
-    new. This only affects sqlite databases since they were the only ones
-    supported at the time.
-    """
-    with db_conn:
-        schema_path = os.path.join(
-            dir_path, "schema", "schema_version.sql",
-        )
-        create_schema = read_schema(schema_path)
-        db_conn.executescript(create_schema)
-
-        c = db_conn.execute("SELECT * FROM schema_version")
-        rows = c.fetchall()
-        c.close()
-
-        if not rows:
-            c = db_conn.execute("PRAGMA user_version")
-            row = c.fetchone()
-            c.close()
-
-            if row and row[0]:
-                db_conn.execute(
-                    "REPLACE INTO schema_version (version, upgraded)"
-                    " VALUES (?,?)",
-                    (row[0], False)
-                )
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 4cec31e316..3fab57a7e8 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -68,7 +68,9 @@ class PresenceStore(SQLBaseStore):
                 self._update_presence_txn, stream_orderings, presence_states,
             )
 
-        defer.returnValue((stream_orderings[-1], self._presence_id_gen.get_max_token()))
+        defer.returnValue((
+            stream_orderings[-1], self._presence_id_gen.get_current_token()
+        ))
 
     def _update_presence_txn(self, txn, stream_orderings, presence_states):
         for stream_id, state in zip(stream_orderings, presence_states):
@@ -147,6 +149,7 @@ class PresenceStore(SQLBaseStore):
                 "status_msg",
                 "currently_active",
             ),
+            desc="get_presence_for_users",
         )
 
         for row in rows:
@@ -155,7 +158,7 @@ class PresenceStore(SQLBaseStore):
         defer.returnValue([UserPresenceState(**row) for row in rows])
 
     def get_current_presence_token(self):
-        return self._presence_id_gen.get_max_token()
+        return self._presence_id_gen.get_current_token()
 
     def allow_presence_visible(self, observed_localpart, observer_userid):
         return self._simple_insert(
@@ -174,16 +177,6 @@ class PresenceStore(SQLBaseStore):
             desc="disallow_presence_visible",
         )
 
-    def is_presence_visible(self, observed_localpart, observer_userid):
-        return self._simple_select_one(
-            table="presence_allow_inbound",
-            keyvalues={"observed_user_id": observed_localpart,
-                       "observer_user_id": observer_userid},
-            retcols=["observed_user_id"],
-            allow_none=True,
-            desc="is_presence_visible",
-        )
-
     def add_presence_list_pending(self, observer_localpart, observed_userid):
         return self._simple_insert(
             table="presence_list",
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index 9dbad2fd5f..786d6f6d67 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -14,7 +14,8 @@
 # limitations under the License.
 
 from ._base import SQLBaseStore
-from synapse.util.caches.descriptors import cachedInlineCallbacks
+from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
+from synapse.push.baserules import list_with_base_rules
 from twisted.internet import defer
 
 import logging
@@ -23,8 +24,31 @@ import simplejson as json
 logger = logging.getLogger(__name__)
 
 
+def _load_rules(rawrules, enabled_map):
+    ruleslist = []
+    for rawrule in rawrules:
+        rule = dict(rawrule)
+        rule["conditions"] = json.loads(rawrule["conditions"])
+        rule["actions"] = json.loads(rawrule["actions"])
+        ruleslist.append(rule)
+
+    # We're going to be mutating this a lot, so do a deep copy
+    rules = list(list_with_base_rules(ruleslist))
+
+    for i, rule in enumerate(rules):
+        rule_id = rule['rule_id']
+        if rule_id in enabled_map:
+            if rule.get('enabled', True) != bool(enabled_map[rule_id]):
+                # Rules are cached across users.
+                rule = dict(rule)
+                rule['enabled'] = bool(enabled_map[rule_id])
+                rules[i] = rule
+
+    return rules
+
+
 class PushRuleStore(SQLBaseStore):
-    @cachedInlineCallbacks()
+    @cachedInlineCallbacks(lru=True)
     def get_push_rules_for_user(self, user_id):
         rows = yield self._simple_select_list(
             table="push_rules",
@@ -42,9 +66,13 @@ class PushRuleStore(SQLBaseStore):
             key=lambda row: (-int(row["priority_class"]), -int(row["priority"]))
         )
 
-        defer.returnValue(rows)
+        enabled_map = yield self.get_push_rules_enabled_for_user(user_id)
 
-    @cachedInlineCallbacks()
+        rules = _load_rules(rows, enabled_map)
+
+        defer.returnValue(rules)
+
+    @cachedInlineCallbacks(lru=True)
     def get_push_rules_enabled_for_user(self, user_id):
         results = yield self._simple_select_list(
             table="push_rules_enable",
@@ -60,12 +88,16 @@ class PushRuleStore(SQLBaseStore):
             r['rule_id']: False if r['enabled'] == 0 else True for r in results
         })
 
-    @defer.inlineCallbacks
+    @cachedList(cached_method_name="get_push_rules_for_user",
+                list_name="user_ids", num_args=1, inlineCallbacks=True)
     def bulk_get_push_rules(self, user_ids):
         if not user_ids:
             defer.returnValue({})
 
-        results = {}
+        results = {
+            user_id: []
+            for user_id in user_ids
+        }
 
         rows = yield self._simple_select_many_batch(
             table="push_rules",
@@ -75,18 +107,32 @@ class PushRuleStore(SQLBaseStore):
             desc="bulk_get_push_rules",
         )
 
-        rows.sort(key=lambda e: (-e["priority_class"], -e["priority"]))
+        rows.sort(
+            key=lambda row: (-int(row["priority_class"]), -int(row["priority"]))
+        )
 
         for row in rows:
             results.setdefault(row['user_name'], []).append(row)
+
+        enabled_map_by_user = yield self.bulk_get_push_rules_enabled(user_ids)
+
+        for user_id, rules in results.items():
+            results[user_id] = _load_rules(
+                rules, enabled_map_by_user.get(user_id, {})
+            )
+
         defer.returnValue(results)
 
-    @defer.inlineCallbacks
+    @cachedList(cached_method_name="get_push_rules_enabled_for_user",
+                list_name="user_ids", num_args=1, inlineCallbacks=True)
     def bulk_get_push_rules_enabled(self, user_ids):
         if not user_ids:
             defer.returnValue({})
 
-        results = {}
+        results = {
+            user_id: {}
+            for user_id in user_ids
+        }
 
         rows = yield self._simple_select_many_batch(
             table="push_rules_enable",
@@ -96,7 +142,8 @@ class PushRuleStore(SQLBaseStore):
             desc="bulk_get_push_rules_enabled",
         )
         for row in rows:
-            results.setdefault(row['user_name'], {})[row['rule_id']] = row['enabled']
+            enabled = bool(row['enabled'])
+            results.setdefault(row['user_name'], {})[row['rule_id']] = enabled
         defer.returnValue(results)
 
     @defer.inlineCallbacks
@@ -392,7 +439,7 @@ class PushRuleStore(SQLBaseStore):
         """Get the position of the push rules stream.
         Returns a pair of a stream id for the push_rules stream and the
         room stream ordering it corresponds to."""
-        return self._push_rules_stream_id_gen.get_max_token()
+        return self._push_rules_stream_id_gen.get_current_token()
 
     def have_push_rules_changed_for_user(self, user_id, last_id):
         if not self.push_rules_stream_cache.has_entity_changed(user_id, last_id):
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 87b2ac5773..a7d7c54d7e 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -18,6 +18,8 @@ from twisted.internet import defer
 
 from canonicaljson import encode_canonical_json
 
+from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
+
 import logging
 import simplejson as json
 import types
@@ -48,23 +50,46 @@ class PusherStore(SQLBaseStore):
         return rows
 
     @defer.inlineCallbacks
-    def get_pushers_by_app_id_and_pushkey(self, app_id, pushkey):
-        def r(txn):
-            sql = (
-                "SELECT * FROM pushers"
-                " WHERE app_id = ? AND pushkey = ?"
-            )
+    def user_has_pusher(self, user_id):
+        ret = yield self._simple_select_one_onecol(
+            "pushers", {"user_name": user_id}, "id", allow_none=True
+        )
+        defer.returnValue(ret is not None)
 
-            txn.execute(sql, (app_id, pushkey,))
-            rows = self.cursor_to_dict(txn)
+    def get_pushers_by_app_id_and_pushkey(self, app_id, pushkey):
+        return self.get_pushers_by({
+            "app_id": app_id,
+            "pushkey": pushkey,
+        })
 
-            return self._decode_pushers_rows(rows)
+    def get_pushers_by_user_id(self, user_id):
+        return self.get_pushers_by({
+            "user_name": user_id,
+        })
 
-        rows = yield self.runInteraction(
-            "get_pushers_by_app_id_and_pushkey", r
+    @defer.inlineCallbacks
+    def get_pushers_by(self, keyvalues):
+        ret = yield self._simple_select_list(
+            "pushers", keyvalues,
+            [
+                "id",
+                "user_name",
+                "access_token",
+                "profile_tag",
+                "kind",
+                "app_id",
+                "app_display_name",
+                "device_display_name",
+                "pushkey",
+                "ts",
+                "lang",
+                "data",
+                "last_stream_ordering",
+                "last_success",
+                "failing_since",
+            ], desc="get_pushers_by"
         )
-
-        defer.returnValue(rows)
+        defer.returnValue(self._decode_pushers_rows(ret))
 
     @defer.inlineCallbacks
     def get_all_pushers(self):
@@ -78,9 +103,12 @@ class PusherStore(SQLBaseStore):
         defer.returnValue(rows)
 
     def get_pushers_stream_token(self):
-        return self._pushers_id_gen.get_max_token()
+        return self._pushers_id_gen.get_current_token()
 
     def get_all_updated_pushers(self, last_id, current_id, limit):
+        if last_id == current_id:
+            return defer.succeed(([], []))
+
         def get_all_updated_pushers_txn(txn):
             sql = (
                 "SELECT id, user_name, access_token, profile_tag, kind,"
@@ -107,35 +135,76 @@ class PusherStore(SQLBaseStore):
             "get_all_updated_pushers", get_all_updated_pushers_txn
         )
 
+    @cachedInlineCallbacks(lru=True, num_args=1, max_entries=15000)
+    def get_if_user_has_pusher(self, user_id):
+        result = yield self._simple_select_many_batch(
+            table='pushers',
+            keyvalues={
+                'user_name': 'user_id',
+            },
+            retcol='user_name',
+            desc='get_if_user_has_pusher',
+            allow_none=True,
+        )
+
+        defer.returnValue(bool(result))
+
+    @cachedList(cached_method_name="get_if_user_has_pusher",
+                list_name="user_ids", num_args=1, inlineCallbacks=True)
+    def get_if_users_have_pushers(self, user_ids):
+        rows = yield self._simple_select_many_batch(
+            table='pushers',
+            column='user_name',
+            iterable=user_ids,
+            retcols=['user_name'],
+            desc='get_if_users_have_pushers'
+        )
+
+        result = {user_id: False for user_id in user_ids}
+        result.update({r['user_name']: True for r in rows})
+
+        defer.returnValue(result)
+
     @defer.inlineCallbacks
     def add_pusher(self, user_id, access_token, kind, app_id,
                    app_display_name, device_display_name,
-                   pushkey, pushkey_ts, lang, data, profile_tag=""):
+                   pushkey, pushkey_ts, lang, data, last_stream_ordering,
+                   profile_tag=""):
         with self._pushers_id_gen.get_next() as stream_id:
-            yield self._simple_upsert(
-                "pushers",
-                dict(
-                    app_id=app_id,
-                    pushkey=pushkey,
-                    user_name=user_id,
-                ),
-                dict(
-                    access_token=access_token,
-                    kind=kind,
-                    app_display_name=app_display_name,
-                    device_display_name=device_display_name,
-                    ts=pushkey_ts,
-                    lang=lang,
-                    data=encode_canonical_json(data),
-                    profile_tag=profile_tag,
-                    id=stream_id,
-                ),
-                desc="add_pusher",
-            )
+            def f(txn):
+                newly_inserted = self._simple_upsert_txn(
+                    txn,
+                    "pushers",
+                    {
+                        "app_id": app_id,
+                        "pushkey": pushkey,
+                        "user_name": user_id,
+                    },
+                    {
+                        "access_token": access_token,
+                        "kind": kind,
+                        "app_display_name": app_display_name,
+                        "device_display_name": device_display_name,
+                        "ts": pushkey_ts,
+                        "lang": lang,
+                        "data": encode_canonical_json(data),
+                        "last_stream_ordering": last_stream_ordering,
+                        "profile_tag": profile_tag,
+                        "id": stream_id,
+                    },
+                )
+                if newly_inserted:
+                    # get_if_user_has_pusher only cares if the user has
+                    # at least *one* pusher.
+                    txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,))
+
+            yield self.runInteraction("add_pusher", f)
 
     @defer.inlineCallbacks
     def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):
         def delete_pusher_txn(txn, stream_id):
+            txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,))
+
             self._simple_delete_one_txn(
                 txn,
                 "pushers",
@@ -147,28 +216,35 @@ class PusherStore(SQLBaseStore):
                 {"app_id": app_id, "pushkey": pushkey, "user_id": user_id},
                 {"stream_id": stream_id},
             )
+
         with self._pushers_id_gen.get_next() as stream_id:
             yield self.runInteraction(
                 "delete_pusher", delete_pusher_txn, stream_id
             )
 
     @defer.inlineCallbacks
-    def update_pusher_last_token(self, app_id, pushkey, user_id, last_token):
+    def update_pusher_last_stream_ordering(self, app_id, pushkey, user_id,
+                                           last_stream_ordering):
         yield self._simple_update_one(
             "pushers",
             {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
-            {'last_token': last_token},
-            desc="update_pusher_last_token",
+            {'last_stream_ordering': last_stream_ordering},
+            desc="update_pusher_last_stream_ordering",
         )
 
     @defer.inlineCallbacks
-    def update_pusher_last_token_and_success(self, app_id, pushkey, user_id,
-                                             last_token, last_success):
+    def update_pusher_last_stream_ordering_and_success(self, app_id, pushkey,
+                                                       user_id,
+                                                       last_stream_ordering,
+                                                       last_success):
         yield self._simple_update_one(
             "pushers",
             {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
-            {'last_token': last_token, 'last_success': last_success},
-            desc="update_pusher_last_token_and_success",
+            {
+                'last_stream_ordering': last_stream_ordering,
+                'last_success': last_success
+            },
+            desc="update_pusher_last_stream_ordering_and_success",
         )
 
     @defer.inlineCallbacks
@@ -180,3 +256,30 @@ class PusherStore(SQLBaseStore):
             {'failing_since': failing_since},
             desc="update_pusher_failing_since",
         )
+
+    @defer.inlineCallbacks
+    def get_throttle_params_by_room(self, pusher_id):
+        res = yield self._simple_select_list(
+            "pusher_throttle",
+            {"pusher": pusher_id},
+            ["room_id", "last_sent_ts", "throttle_ms"],
+            desc="get_throttle_params_by_room"
+        )
+
+        params_by_room = {}
+        for row in res:
+            params_by_room[row["room_id"]] = {
+                "last_sent_ts": row["last_sent_ts"],
+                "throttle_ms": row["throttle_ms"]
+            }
+
+        defer.returnValue(params_by_room)
+
+    @defer.inlineCallbacks
+    def set_throttle_params(self, pusher_id, room_id, params):
+        yield self._simple_upsert(
+            "pusher_throttle",
+            {"pusher": pusher_id, "room_id": room_id},
+            params,
+            desc="set_throttle_params"
+        )
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 6b9d848eaa..8c26f39fbb 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -31,9 +31,29 @@ class ReceiptsStore(SQLBaseStore):
         super(ReceiptsStore, self).__init__(hs)
 
         self._receipts_stream_cache = StreamChangeCache(
-            "ReceiptsRoomChangeCache", self._receipts_id_gen.get_max_token()
+            "ReceiptsRoomChangeCache", self._receipts_id_gen.get_current_token()
         )
 
+    @cachedInlineCallbacks()
+    def get_users_with_read_receipts_in_room(self, room_id):
+        receipts = yield self.get_receipts_for_room(room_id, "m.read")
+        defer.returnValue(set(r['user_id'] for r in receipts))
+
+    def _invalidate_get_users_with_receipts_in_room(self, room_id, receipt_type,
+                                                    user_id):
+        if receipt_type != "m.read":
+            return
+
+        # Returns an ObservableDeferred
+        res = self.get_users_with_read_receipts_in_room.cache.get((room_id,), None)
+
+        if res and res.called and user_id in res.result:
+            # We'd only be adding to the set, so no point invalidating if the
+            # user is already there
+            return
+
+        self.get_users_with_read_receipts_in_room.invalidate((room_id,))
+
     @cached(num_args=2)
     def get_receipts_for_room(self, room_id, receipt_type):
         return self._simple_select_list(
@@ -100,7 +120,7 @@ class ReceiptsStore(SQLBaseStore):
 
         defer.returnValue([ev for res in results.values() for ev in res])
 
-    @cachedInlineCallbacks(num_args=3, max_entries=5000)
+    @cachedInlineCallbacks(num_args=3, max_entries=5000, lru=True, tree=True)
     def get_linearized_receipts_for_room(self, room_id, to_key, from_key=None):
         """Get receipts for a single room for sending to clients.
 
@@ -160,8 +180,8 @@ class ReceiptsStore(SQLBaseStore):
             "content": content,
         }])
 
-    @cachedList(cache=get_linearized_receipts_for_room.cache, list_name="room_ids",
-                num_args=3, inlineCallbacks=True)
+    @cachedList(cached_method_name="get_linearized_receipts_for_room",
+                list_name="room_ids", num_args=3, inlineCallbacks=True)
     def _get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None):
         if not room_ids:
             defer.returnValue({})
@@ -221,7 +241,7 @@ class ReceiptsStore(SQLBaseStore):
         defer.returnValue(results)
 
     def get_max_receipt_stream_id(self):
-        return self._receipts_id_gen.get_max_token()
+        return self._receipts_id_gen.get_current_token()
 
     def insert_linearized_receipt_txn(self, txn, room_id, receipt_type,
                                       user_id, event_id, data, stream_id):
@@ -229,10 +249,14 @@ class ReceiptsStore(SQLBaseStore):
             self.get_receipts_for_room.invalidate, (room_id, receipt_type)
         )
         txn.call_after(
+            self._invalidate_get_users_with_receipts_in_room,
+            room_id, receipt_type, user_id,
+        )
+        txn.call_after(
             self.get_receipts_for_user.invalidate, (user_id, receipt_type)
         )
         # FIXME: This shouldn't invalidate the whole cache
-        txn.call_after(self.get_linearized_receipts_for_room.invalidate_all)
+        txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,))
 
         txn.call_after(
             self._receipts_stream_cache.entity_has_changed,
@@ -244,6 +268,17 @@ class ReceiptsStore(SQLBaseStore):
             (user_id, room_id, receipt_type)
         )
 
+        res = self._simple_select_one_txn(
+            txn,
+            table="events",
+            retcols=["topological_ordering", "stream_ordering"],
+            keyvalues={"event_id": event_id},
+            allow_none=True
+        )
+
+        topological_ordering = int(res["topological_ordering"]) if res else None
+        stream_ordering = int(res["stream_ordering"]) if res else None
+
         # We don't want to clobber receipts for more recent events, so we
         # have to compare orderings of existing receipts
         sql = (
@@ -255,16 +290,7 @@ class ReceiptsStore(SQLBaseStore):
         txn.execute(sql, (room_id, receipt_type, user_id))
         results = txn.fetchall()
 
-        if results:
-            res = self._simple_select_one_txn(
-                txn,
-                table="events",
-                retcols=["topological_ordering", "stream_ordering"],
-                keyvalues={"event_id": event_id},
-            )
-            topological_ordering = int(res["topological_ordering"])
-            stream_ordering = int(res["stream_ordering"])
-
+        if results and topological_ordering:
             for to, so, _ in results:
                 if int(to) > topological_ordering:
                     return False
@@ -294,6 +320,14 @@ class ReceiptsStore(SQLBaseStore):
             }
         )
 
+        if receipt_type == "m.read" and topological_ordering:
+            self._remove_old_push_actions_before_txn(
+                txn,
+                room_id=room_id,
+                user_id=user_id,
+                topological_ordering=topological_ordering,
+            )
+
         return True
 
     @defer.inlineCallbacks
@@ -346,7 +380,7 @@ class ReceiptsStore(SQLBaseStore):
             room_id, receipt_type, user_id, event_ids, data
         )
 
-        max_persisted_id = self._stream_id_gen.get_max_token()
+        max_persisted_id = self._stream_id_gen.get_current_token()
 
         defer.returnValue((stream_id, max_persisted_id))
 
@@ -364,10 +398,14 @@ class ReceiptsStore(SQLBaseStore):
             self.get_receipts_for_room.invalidate, (room_id, receipt_type)
         )
         txn.call_after(
+            self._invalidate_get_users_with_receipts_in_room,
+            room_id, receipt_type, user_id,
+        )
+        txn.call_after(
             self.get_receipts_for_user.invalidate, (user_id, receipt_type)
         )
         # FIXME: This shouldn't invalidate the whole cache
-        txn.call_after(self.get_linearized_receipts_for_room.invalidate_all)
+        txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,))
 
         self._simple_delete_txn(
             txn,
@@ -390,16 +428,22 @@ class ReceiptsStore(SQLBaseStore):
             }
         )
 
-    def get_all_updated_receipts(self, last_id, current_id, limit):
+    def get_all_updated_receipts(self, last_id, current_id, limit=None):
+        if last_id == current_id:
+            return defer.succeed([])
+
         def get_all_updated_receipts_txn(txn):
             sql = (
                 "SELECT stream_id, room_id, receipt_type, user_id, event_id, data"
                 " FROM receipts_linearized"
                 " WHERE ? < stream_id AND stream_id <= ?"
                 " ORDER BY stream_id ASC"
-                " LIMIT ?"
             )
-            txn.execute(sql, (last_id, current_id, limit))
+            args = [last_id, current_id]
+            if limit is not None:
+                sql += " LIMIT ?"
+                args.append(limit)
+            txn.execute(sql, args)
 
             return txn.fetchall()
         return self.runInteraction(
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index bd4eb88a92..bda84a744a 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -20,7 +20,7 @@ from twisted.internet import defer
 from synapse.api.errors import StoreError, Codes
 
 from ._base import SQLBaseStore
-from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
+from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 
 
 class RegistrationStore(SQLBaseStore):
@@ -101,6 +101,7 @@ class RegistrationStore(SQLBaseStore):
             make_guest,
             appservice_id
         )
+        self.get_user_by_id.invalidate((user_id,))
         self.is_guest.invalidate((user_id,))
 
     def _register(
@@ -156,6 +157,7 @@ class RegistrationStore(SQLBaseStore):
                 (next_id, user_id, token,)
             )
 
+    @cached()
     def get_user_by_id(self, user_id):
         return self._simple_select_one(
             table="users",
@@ -193,6 +195,7 @@ class RegistrationStore(SQLBaseStore):
         }, {
             'password_hash': password_hash
         })
+        self.get_user_by_id.invalidate((user_id,))
 
     @defer.inlineCallbacks
     def user_delete_access_tokens(self, user_id, except_token_ids=[]):
@@ -319,26 +322,6 @@ class RegistrationStore(SQLBaseStore):
 
         defer.returnValue(res if res else False)
 
-    @cachedList(cache=is_guest.cache, list_name="user_ids", num_args=1,
-                inlineCallbacks=True)
-    def are_guests(self, user_ids):
-        sql = "SELECT name, is_guest FROM users WHERE name IN (%s)" % (
-            ",".join("?" for _ in user_ids),
-        )
-
-        rows = yield self._execute(
-            "are_guests", self.cursor_to_dict, sql, *user_ids
-        )
-
-        result = {user_id: False for user_id in user_ids}
-
-        result.update({
-            row["name"]: bool(row["is_guest"])
-            for row in rows
-        })
-
-        defer.returnValue(result)
-
     def _query_for_auth(self, txn, token):
         sql = (
             "SELECT users.name, users.is_guest, access_tokens.id as token_id"
@@ -458,12 +441,15 @@ class RegistrationStore(SQLBaseStore):
         """
         Gets the 3pid's guest access token if exists, else saves access_token.
 
-        :param medium (str): Medium of the 3pid. Must be "email".
-        :param address (str): 3pid address.
-        :param access_token (str): The access token to persist if none is
-            already persisted.
-        :param inviter_user_id (str): User ID of the inviter.
-        :return (deferred str): Whichever access token is persisted at the end
+        Args:
+            medium (str): Medium of the 3pid. Must be "email".
+            address (str): 3pid address.
+            access_token (str): The access token to persist if none is
+                already persisted.
+            inviter_user_id (str): User ID of the inviter.
+
+        Returns:
+            deferred str: Whichever access token is persisted at the end
             of this function call.
         """
         def insert(txn):
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 9be977f387..97f9f1929c 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -23,6 +23,7 @@ from .engines import PostgresEngine, Sqlite3Engine
 
 import collections
 import logging
+import ujson as json
 
 logger = logging.getLogger(__name__)
 
@@ -169,47 +170,84 @@ class RoomStore(SQLBaseStore):
     def _store_event_search_txn(self, txn, event, key, value):
         if isinstance(self.database_engine, PostgresEngine):
             sql = (
-                "INSERT INTO event_search (event_id, room_id, key, vector)"
-                " VALUES (?,?,?,to_tsvector('english', ?))"
+                "INSERT INTO event_search"
+                " (event_id, room_id, key, vector, stream_ordering, origin_server_ts)"
+                " VALUES (?,?,?,to_tsvector('english', ?),?,?)"
+            )
+            txn.execute(
+                sql,
+                (
+                    event.event_id, event.room_id, key, value,
+                    event.internal_metadata.stream_ordering,
+                    event.origin_server_ts,
+                )
             )
         elif isinstance(self.database_engine, Sqlite3Engine):
             sql = (
                 "INSERT INTO event_search (event_id, room_id, key, value)"
                 " VALUES (?,?,?,?)"
             )
+            txn.execute(sql, (event.event_id, event.room_id, key, value,))
         else:
             # This should be unreachable.
             raise Exception("Unrecognized database engine")
 
-        txn.execute(sql, (event.event_id, event.room_id, key, value,))
-
     @cachedInlineCallbacks()
     def get_room_name_and_aliases(self, room_id):
-        def f(txn):
+        def get_room_name(txn):
             sql = (
-                "SELECT event_id FROM current_state_events "
-                "WHERE room_id = ? "
+                "SELECT name FROM room_names"
+                " INNER JOIN current_state_events USING (room_id, event_id)"
+                " WHERE room_id = ?"
+                " LIMIT 1"
             )
 
-            sql += " AND ((type = 'm.room.name' AND state_key = '')"
-            sql += " OR type = 'm.room.aliases')"
-
             txn.execute(sql, (room_id,))
-            results = self.cursor_to_dict(txn)
+            rows = txn.fetchall()
+            if rows:
+                return rows[0][0]
+            else:
+                return None
+
+            return [row[0] for row in txn.fetchall()]
 
-            return self._parse_events_txn(txn, results)
+        def get_room_aliases(txn):
+            sql = (
+                "SELECT content FROM current_state_events"
+                " INNER JOIN events USING (room_id, event_id)"
+                " WHERE room_id = ?"
+            )
+            txn.execute(sql, (room_id,))
+            return [row[0] for row in txn.fetchall()]
 
-        events = yield self.runInteraction("get_room_name_and_aliases", f)
+        name = yield self.runInteraction("get_room_name", get_room_name)
+        alias_contents = yield self.runInteraction("get_room_aliases", get_room_aliases)
 
-        name = None
         aliases = []
 
-        for e in events:
-            if e.type == 'm.room.name':
-                if 'name' in e.content:
-                    name = e.content['name']
-            elif e.type == 'm.room.aliases':
-                if 'aliases' in e.content:
-                    aliases.extend(e.content['aliases'])
+        for c in alias_contents:
+            try:
+                content = json.loads(c)
+            except:
+                continue
+
+            aliases.extend(content.get('aliases', []))
 
         defer.returnValue((name, aliases))
+
+    def add_event_report(self, room_id, event_id, user_id, reason, content,
+                         received_ts):
+        next_id = self._event_reports_id_gen.get_next()
+        return self._simple_insert(
+            table="event_reports",
+            values={
+                "id": next_id,
+                "received_ts": received_ts,
+                "room_id": room_id,
+                "event_id": event_id,
+                "user_id": user_id,
+                "reason": reason,
+                "content": json.dumps(content),
+            },
+            desc="add_event_report"
+        )
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 430b49c12e..8bd693be72 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -21,7 +21,7 @@ from ._base import SQLBaseStore
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 
 from synapse.api.constants import Membership
-from synapse.types import UserID
+from synapse.types import get_domain_from_id
 
 import logging
 
@@ -36,7 +36,7 @@ RoomsForUser = namedtuple(
 
 class RoomMemberStore(SQLBaseStore):
 
-    def _store_room_members_txn(self, txn, events):
+    def _store_room_members_txn(self, txn, events, backfilled):
         """Store a room member in the database.
         """
         self._simple_insert_many_txn(
@@ -62,27 +62,65 @@ class RoomMemberStore(SQLBaseStore):
                 self._membership_stream_cache.entity_has_changed,
                 event.state_key, event.internal_metadata.stream_ordering
             )
+            txn.call_after(
+                self.get_invited_rooms_for_user.invalidate, (event.state_key,)
+            )
 
-    def get_room_member(self, user_id, room_id):
-        """Retrieve the current state of a room member.
+            # We update the local_invites table only if the event is "current",
+            # i.e., its something that has just happened.
+            # The only current event that can also be an outlier is if its an
+            # invite that has come in across federation.
+            is_new_state = not backfilled and (
+                not event.internal_metadata.is_outlier()
+                or event.internal_metadata.is_invite_from_remote()
+            )
+            is_mine = self.hs.is_mine_id(event.state_key)
+            if is_new_state and is_mine:
+                if event.membership == Membership.INVITE:
+                    self._simple_insert_txn(
+                        txn,
+                        table="local_invites",
+                        values={
+                            "event_id": event.event_id,
+                            "invitee": event.state_key,
+                            "inviter": event.sender,
+                            "room_id": event.room_id,
+                            "stream_id": event.internal_metadata.stream_ordering,
+                        }
+                    )
+                else:
+                    sql = (
+                        "UPDATE local_invites SET stream_id = ?, replaced_by = ? WHERE"
+                        " room_id = ? AND invitee = ? AND locally_rejected is NULL"
+                        " AND replaced_by is NULL"
+                    )
+
+                    txn.execute(sql, (
+                        event.internal_metadata.stream_ordering,
+                        event.event_id,
+                        event.room_id,
+                        event.state_key,
+                    ))
 
-        Args:
-            user_id (str): The member's user ID.
-            room_id (str): The room the member is in.
-        Returns:
-            Deferred: Results in a MembershipEvent or None.
-        """
-        return self.runInteraction(
-            "get_room_member",
-            self._get_members_events_txn,
-            room_id,
-            user_id=user_id,
-        ).addCallback(
-            self._get_events
-        ).addCallback(
-            lambda events: events[0] if events else None
+    @defer.inlineCallbacks
+    def locally_reject_invite(self, user_id, room_id):
+        sql = (
+            "UPDATE local_invites SET stream_id = ?, locally_rejected = ? WHERE"
+            " room_id = ? AND invitee = ? AND locally_rejected is NULL"
+            " AND replaced_by is NULL"
         )
 
+        def f(txn, stream_ordering):
+            txn.execute(sql, (
+                stream_ordering,
+                True,
+                room_id,
+                user_id,
+            ))
+
+        with self._stream_id_gen.get_next() as stream_ordering:
+            yield self.runInteraction("locally_reject_invite", f, stream_ordering)
+
     @cached(max_entries=5000)
     def get_users_in_room(self, room_id):
         def f(txn):
@@ -96,24 +134,6 @@ class RoomMemberStore(SQLBaseStore):
             return [r["user_id"] for r in rows]
         return self.runInteraction("get_users_in_room", f)
 
-    def get_room_members(self, room_id, membership=None):
-        """Retrieve the current room member list for a room.
-
-        Args:
-            room_id (str): The room to get the list of members.
-            membership (synapse.api.constants.Membership): The filter to apply
-            to this list, or None to return all members with some state
-            associated with this room.
-        Returns:
-            list of namedtuples representing the members in this room.
-        """
-        return self.runInteraction(
-            "get_room_members",
-            self._get_members_events_txn,
-            room_id,
-            membership=membership,
-        ).addCallback(self._get_events)
-
     @cached()
     def get_invited_rooms_for_user(self, user_id):
         """ Get all the rooms the user is invited to
@@ -127,18 +147,23 @@ class RoomMemberStore(SQLBaseStore):
             user_id, [Membership.INVITE]
         )
 
-    def get_leave_and_ban_events_for_user(self, user_id):
-        """ Get all the leave events for a user
+    @defer.inlineCallbacks
+    def get_invite_for_user_in_room(self, user_id, room_id):
+        """Gets the invite for the given user and room
+
         Args:
-            user_id (str): The user ID.
+            user_id (str)
+            room_id (str)
+
         Returns:
-            A deferred list of event objects.
+            Deferred: Resolves to either a RoomsForUser or None if no invite was
+                found.
         """
-        return self.get_rooms_for_user_where_membership_is(
-            user_id, (Membership.LEAVE, Membership.BAN)
-        ).addCallback(lambda leaves: self._get_events([
-            leave.event_id for leave in leaves
-        ]))
+        invites = yield self.get_invited_rooms_for_user(user_id)
+        for invite in invites:
+            if invite.room_id == room_id:
+                defer.returnValue(invite)
+        defer.returnValue(None)
 
     def get_rooms_for_user_where_membership_is(self, user_id, membership_list):
         """ Get all the rooms for this user where the membership for this user
@@ -163,57 +188,60 @@ class RoomMemberStore(SQLBaseStore):
 
     def _get_rooms_for_user_where_membership_is_txn(self, txn, user_id,
                                                     membership_list):
-        where_clause = "user_id = ? AND (%s) AND forgotten = 0" % (
-            " OR ".join(["membership = ?" for _ in membership_list]),
-        )
 
-        args = [user_id]
-        args.extend(membership_list)
+        do_invite = Membership.INVITE in membership_list
+        membership_list = [m for m in membership_list if m != Membership.INVITE]
 
-        sql = (
-            "SELECT m.room_id, m.sender, m.membership, m.event_id, e.stream_ordering"
-            " FROM current_state_events as c"
-            " INNER JOIN room_memberships as m"
-            " ON m.event_id = c.event_id"
-            " INNER JOIN events as e"
-            " ON e.event_id = c.event_id"
-            " AND m.room_id = c.room_id"
-            " AND m.user_id = c.state_key"
-            " WHERE %s"
-        ) % (where_clause,)
+        results = []
+        if membership_list:
+            where_clause = "user_id = ? AND (%s) AND forgotten = 0" % (
+                " OR ".join(["membership = ?" for _ in membership_list]),
+            )
 
-        txn.execute(sql, args)
-        return [
-            RoomsForUser(**r) for r in self.cursor_to_dict(txn)
-        ]
+            args = [user_id]
+            args.extend(membership_list)
 
-    @cached(max_entries=5000)
-    def get_joined_hosts_for_room(self, room_id):
-        return self.runInteraction(
-            "get_joined_hosts_for_room",
-            self._get_joined_hosts_for_room_txn,
-            room_id,
-        )
+            sql = (
+                "SELECT m.room_id, m.sender, m.membership, m.event_id, e.stream_ordering"
+                " FROM current_state_events as c"
+                " INNER JOIN room_memberships as m"
+                " ON m.event_id = c.event_id"
+                " INNER JOIN events as e"
+                " ON e.event_id = c.event_id"
+                " AND m.room_id = c.room_id"
+                " AND m.user_id = c.state_key"
+                " WHERE %s"
+            ) % (where_clause,)
+
+            txn.execute(sql, args)
+            results = [
+                RoomsForUser(**r) for r in self.cursor_to_dict(txn)
+            ]
 
-    def _get_joined_hosts_for_room_txn(self, txn, room_id):
-        rows = self._get_members_rows_txn(
-            txn,
-            room_id, membership=Membership.JOIN
-        )
+        if do_invite:
+            sql = (
+                "SELECT i.room_id, inviter, i.event_id, e.stream_ordering"
+                " FROM local_invites as i"
+                " INNER JOIN events as e USING (event_id)"
+                " WHERE invitee = ? AND locally_rejected is NULL"
+                " AND replaced_by is NULL"
+            )
 
-        joined_domains = set(
-            UserID.from_string(r["user_id"]).domain
-            for r in rows
-        )
+            txn.execute(sql, (user_id,))
+            results.extend(RoomsForUser(
+                room_id=r["room_id"],
+                sender=r["inviter"],
+                event_id=r["event_id"],
+                stream_ordering=r["stream_ordering"],
+                membership=Membership.INVITE,
+            ) for r in self.cursor_to_dict(txn))
 
-        return joined_domains
+        return results
 
-    def _get_members_events_txn(self, txn, room_id, membership=None, user_id=None):
-        rows = self._get_members_rows_txn(
-            txn,
-            room_id, membership, user_id,
-        )
-        return [r["event_id"] for r in rows]
+    @cachedInlineCallbacks(max_entries=5000)
+    def get_joined_hosts_for_room(self, room_id):
+        user_ids = yield self.get_users_in_room(room_id)
+        defer.returnValue(set(get_domain_from_id(uid) for uid in user_ids))
 
     def _get_members_rows_txn(self, txn, room_id, membership=None, user_id=None):
         where_clause = "c.room_id = ?"
diff --git a/synapse/storage/schema/delta/14/upgrade_appservice_db.py b/synapse/storage/schema/delta/14/upgrade_appservice_db.py
index 5c40a77757..8755bb2e49 100644
--- a/synapse/storage/schema/delta/14/upgrade_appservice_db.py
+++ b/synapse/storage/schema/delta/14/upgrade_appservice_db.py
@@ -18,7 +18,7 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-def run_upgrade(cur, *args, **kwargs):
+def run_create(cur, *args, **kwargs):
     cur.execute("SELECT id, regex FROM application_services_regex")
     for row in cur.fetchall():
         try:
@@ -35,3 +35,7 @@ def run_upgrade(cur, *args, **kwargs):
                 "UPDATE application_services_regex SET regex=? WHERE id=?",
                 (new_regex, row[0])
             )
+
+
+def run_upgrade(*args, **kwargs):
+    pass
diff --git a/synapse/storage/schema/delta/20/pushers.py b/synapse/storage/schema/delta/20/pushers.py
index 29164732af..147496a38b 100644
--- a/synapse/storage/schema/delta/20/pushers.py
+++ b/synapse/storage/schema/delta/20/pushers.py
@@ -27,7 +27,7 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-def run_upgrade(cur, database_engine, *args, **kwargs):
+def run_create(cur, database_engine, *args, **kwargs):
     logger.info("Porting pushers table...")
     cur.execute("""
         CREATE TABLE IF NOT EXISTS pushers2 (
@@ -74,3 +74,7 @@ def run_upgrade(cur, database_engine, *args, **kwargs):
     cur.execute("DROP TABLE pushers")
     cur.execute("ALTER TABLE pushers2 RENAME TO pushers")
     logger.info("Moved %d pushers to new table", count)
+
+
+def run_upgrade(*args, **kwargs):
+    pass
diff --git a/synapse/storage/schema/delta/25/fts.py b/synapse/storage/schema/delta/25/fts.py
index d3ff2b1779..4269ac69ad 100644
--- a/synapse/storage/schema/delta/25/fts.py
+++ b/synapse/storage/schema/delta/25/fts.py
@@ -43,7 +43,7 @@ SQLITE_TABLE = (
 )
 
 
-def run_upgrade(cur, database_engine, *args, **kwargs):
+def run_create(cur, database_engine, *args, **kwargs):
     if isinstance(database_engine, PostgresEngine):
         for statement in get_statements(POSTGRES_TABLE.splitlines()):
             cur.execute(statement)
@@ -76,3 +76,7 @@ def run_upgrade(cur, database_engine, *args, **kwargs):
         sql = database_engine.convert_param_style(sql)
 
         cur.execute(sql, ("event_search", progress_json))
+
+
+def run_upgrade(*args, **kwargs):
+    pass
diff --git a/synapse/storage/schema/delta/27/ts.py b/synapse/storage/schema/delta/27/ts.py
index f8c16391a2..71b12a2731 100644
--- a/synapse/storage/schema/delta/27/ts.py
+++ b/synapse/storage/schema/delta/27/ts.py
@@ -27,7 +27,7 @@ ALTER_TABLE = (
 )
 
 
-def run_upgrade(cur, database_engine, *args, **kwargs):
+def run_create(cur, database_engine, *args, **kwargs):
     for statement in get_statements(ALTER_TABLE.splitlines()):
         cur.execute(statement)
 
@@ -55,3 +55,7 @@ def run_upgrade(cur, database_engine, *args, **kwargs):
         sql = database_engine.convert_param_style(sql)
 
         cur.execute(sql, ("event_origin_server_ts", progress_json))
+
+
+def run_upgrade(*args, **kwargs):
+    pass
diff --git a/synapse/storage/schema/delta/30/as_users.py b/synapse/storage/schema/delta/30/as_users.py
index 4f6e9dd540..5b7d8d1ab5 100644
--- a/synapse/storage/schema/delta/30/as_users.py
+++ b/synapse/storage/schema/delta/30/as_users.py
@@ -12,13 +12,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-from synapse.storage.appservice import ApplicationServiceStore
+from synapse.config.appservice import load_appservices
 
 
 logger = logging.getLogger(__name__)
 
 
-def run_upgrade(cur, database_engine, config, *args, **kwargs):
+def run_create(cur, database_engine, *args, **kwargs):
     # NULL indicates user was not registered by an appservice.
     try:
         cur.execute("ALTER TABLE users ADD COLUMN appservice_id TEXT")
@@ -26,6 +26,8 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs):
         # Maybe we already added the column? Hope so...
         pass
 
+
+def run_upgrade(cur, database_engine, config, *args, **kwargs):
     cur.execute("SELECT name FROM users")
     rows = cur.fetchall()
 
@@ -36,7 +38,7 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs):
         logger.warning("Could not get app_service_config_files from config")
         pass
 
-    appservices = ApplicationServiceStore.load_appservices(
+    appservices = load_appservices(
         config.server_name, config_files
     )
 
diff --git a/synapse/storage/schema/delta/30/state_stream.sql b/synapse/storage/schema/delta/30/state_stream.sql
new file mode 100644
index 0000000000..706fe1dcf4
--- /dev/null
+++ b/synapse/storage/schema/delta/30/state_stream.sql
@@ -0,0 +1,38 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * The positions in the event stream_ordering when the current_state was
+ * replaced by the state at the event.
+ */
+
+CREATE TABLE IF NOT EXISTS current_state_resets(
+    event_stream_ordering BIGINT PRIMARY KEY NOT NULL
+);
+
+/* The outlier events that have aquired a state group typically through
+ * backfill. This is tracked separately to the events table, as assigning a
+ * state group change the position of the existing event in the stream
+ * ordering.
+ * However since a stream_ordering is assigned in persist_event for the
+ * (event, state) pair, we can use that stream_ordering to identify when
+ * the new state was assigned for the event.
+ */
+CREATE TABLE IF NOT EXISTS ex_outlier_stream(
+    event_stream_ordering BIGINT PRIMARY KEY NOT NULL,
+    event_id TEXT NOT NULL,
+    state_group BIGINT NOT NULL
+);
diff --git a/synapse/storage/schema/delta/31/invites.sql b/synapse/storage/schema/delta/31/invites.sql
new file mode 100644
index 0000000000..2c57846d5a
--- /dev/null
+++ b/synapse/storage/schema/delta/31/invites.sql
@@ -0,0 +1,42 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+CREATE TABLE local_invites(
+    stream_id BIGINT NOT NULL,
+    inviter TEXT NOT NULL,
+    invitee TEXT NOT NULL,
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    locally_rejected TEXT,
+    replaced_by TEXT
+);
+
+-- Insert all invites for local users into new `invites` table
+INSERT INTO local_invites SELECT
+        stream_ordering as stream_id,
+        sender as inviter,
+        state_key as invitee,
+        event_id,
+        room_id,
+        NULL as locally_rejected,
+        NULL as replaced_by
+    FROM events
+    NATURAL JOIN current_state_events
+    NATURAL JOIN room_memberships
+    WHERE membership = 'invite'  AND state_key IN (SELECT name FROM users);
+
+CREATE INDEX local_invites_id ON local_invites(stream_id);
+CREATE INDEX local_invites_for_user_idx ON local_invites(invitee, locally_rejected, replaced_by, room_id);
diff --git a/synapse/storage/schema/delta/31/local_media_repository_url_cache.sql b/synapse/storage/schema/delta/31/local_media_repository_url_cache.sql
new file mode 100644
index 0000000000..9efb4280eb
--- /dev/null
+++ b/synapse/storage/schema/delta/31/local_media_repository_url_cache.sql
@@ -0,0 +1,27 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE local_media_repository_url_cache(
+    url TEXT,              -- the URL being cached
+    response_code INTEGER, -- the HTTP response code of this download attempt
+    etag TEXT,             -- the etag header of this response
+    expires INTEGER,       -- the number of ms this response was valid for
+    og TEXT,               -- cache of the OG metadata of this URL as JSON
+    media_id TEXT,         -- the media_id, if any, of the URL's content in the repo
+    download_ts BIGINT     -- the timestamp of this download attempt
+);
+
+CREATE INDEX local_media_repository_url_cache_by_url_download_ts
+    ON local_media_repository_url_cache(url, download_ts);
diff --git a/synapse/storage/schema/delta/31/pushers.py b/synapse/storage/schema/delta/31/pushers.py
new file mode 100644
index 0000000000..93367fa09e
--- /dev/null
+++ b/synapse/storage/schema/delta/31/pushers.py
@@ -0,0 +1,79 @@
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# Change the last_token to last_stream_ordering now that pushers no longer
+# listen on an event stream but instead select out of the event_push_actions
+# table.
+
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+def token_to_stream_ordering(token):
+    return int(token[1:].split('_')[0])
+
+
+def run_create(cur, database_engine, *args, **kwargs):
+    logger.info("Porting pushers table, delta 31...")
+    cur.execute("""
+        CREATE TABLE IF NOT EXISTS pushers2 (
+          id BIGINT PRIMARY KEY,
+          user_name TEXT NOT NULL,
+          access_token BIGINT DEFAULT NULL,
+          profile_tag VARCHAR(32) NOT NULL,
+          kind VARCHAR(8) NOT NULL,
+          app_id VARCHAR(64) NOT NULL,
+          app_display_name VARCHAR(64) NOT NULL,
+          device_display_name VARCHAR(128) NOT NULL,
+          pushkey TEXT NOT NULL,
+          ts BIGINT NOT NULL,
+          lang VARCHAR(8),
+          data TEXT,
+          last_stream_ordering INTEGER,
+          last_success BIGINT,
+          failing_since BIGINT,
+          UNIQUE (app_id, pushkey, user_name)
+        )
+    """)
+    cur.execute("""SELECT
+        id, user_name, access_token, profile_tag, kind,
+        app_id, app_display_name, device_display_name,
+        pushkey, ts, lang, data, last_token, last_success,
+        failing_since
+        FROM pushers
+    """)
+    count = 0
+    for row in cur.fetchall():
+        row = list(row)
+        row[12] = token_to_stream_ordering(row[12])
+        cur.execute(database_engine.convert_param_style("""
+            INSERT into pushers2 (
+            id, user_name, access_token, profile_tag, kind,
+            app_id, app_display_name, device_display_name,
+            pushkey, ts, lang, data, last_stream_ordering, last_success,
+            failing_since
+            ) values (%s)""" % (','.join(['?' for _ in range(len(row))]))),
+            row
+        )
+        count += 1
+    cur.execute("DROP TABLE pushers")
+    cur.execute("ALTER TABLE pushers2 RENAME TO pushers")
+    logger.info("Moved %d pushers to new table", count)
+
+
+def run_upgrade(cur, database_engine, *args, **kwargs):
+    pass
diff --git a/synapse/storage/schema/delta/31/pushers_index.sql b/synapse/storage/schema/delta/31/pushers_index.sql
new file mode 100644
index 0000000000..9027bccc69
--- /dev/null
+++ b/synapse/storage/schema/delta/31/pushers_index.sql
@@ -0,0 +1,18 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ CREATE INDEX event_push_actions_stream_ordering on event_push_actions(
+     stream_ordering, user_id
+ );
diff --git a/synapse/storage/schema/delta/31/search_update.py b/synapse/storage/schema/delta/31/search_update.py
new file mode 100644
index 0000000000..470ae0c005
--- /dev/null
+++ b/synapse/storage/schema/delta/31/search_update.py
@@ -0,0 +1,65 @@
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.storage.engines import PostgresEngine
+from synapse.storage.prepare_database import get_statements
+
+import logging
+import ujson
+
+logger = logging.getLogger(__name__)
+
+
+ALTER_TABLE = """
+ALTER TABLE event_search ADD COLUMN origin_server_ts BIGINT;
+ALTER TABLE event_search ADD COLUMN stream_ordering BIGINT;
+"""
+
+
+def run_create(cur, database_engine, *args, **kwargs):
+    if not isinstance(database_engine, PostgresEngine):
+        return
+
+    for statement in get_statements(ALTER_TABLE.splitlines()):
+        cur.execute(statement)
+
+    cur.execute("SELECT MIN(stream_ordering) FROM events")
+    rows = cur.fetchall()
+    min_stream_id = rows[0][0]
+
+    cur.execute("SELECT MAX(stream_ordering) FROM events")
+    rows = cur.fetchall()
+    max_stream_id = rows[0][0]
+
+    if min_stream_id is not None and max_stream_id is not None:
+        progress = {
+            "target_min_stream_id_inclusive": min_stream_id,
+            "max_stream_id_exclusive": max_stream_id + 1,
+            "rows_inserted": 0,
+            "have_added_indexes": False,
+        }
+        progress_json = ujson.dumps(progress)
+
+        sql = (
+            "INSERT into background_updates (update_name, progress_json)"
+            " VALUES (?, ?)"
+        )
+
+        sql = database_engine.convert_param_style(sql)
+
+        cur.execute(sql, ("event_search_order", progress_json))
+
+
+def run_upgrade(cur, database_engine, *args, **kwargs):
+    pass
diff --git a/synapse/storage/schema/delta/32/events.sql b/synapse/storage/schema/delta/32/events.sql
new file mode 100644
index 0000000000..1dd0f9e170
--- /dev/null
+++ b/synapse/storage/schema/delta/32/events.sql
@@ -0,0 +1,16 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE events ADD COLUMN received_ts BIGINT;
diff --git a/synapse/storage/schema/delta/32/openid.sql b/synapse/storage/schema/delta/32/openid.sql
new file mode 100644
index 0000000000..36f37b11c8
--- /dev/null
+++ b/synapse/storage/schema/delta/32/openid.sql
@@ -0,0 +1,9 @@
+
+CREATE TABLE open_id_tokens (
+    token TEXT NOT NULL PRIMARY KEY,
+    ts_valid_until_ms bigint NOT NULL,
+    user_id TEXT NOT NULL,
+    UNIQUE (token)
+);
+
+CREATE index open_id_tokens_ts_valid_until_ms ON open_id_tokens(ts_valid_until_ms);
diff --git a/synapse/storage/schema/delta/32/pusher_throttle.sql b/synapse/storage/schema/delta/32/pusher_throttle.sql
new file mode 100644
index 0000000000..d86d30c13c
--- /dev/null
+++ b/synapse/storage/schema/delta/32/pusher_throttle.sql
@@ -0,0 +1,23 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+CREATE TABLE pusher_throttle(
+    pusher BIGINT NOT NULL,
+    room_id TEXT NOT NULL,
+    last_sent_ts BIGINT,
+    throttle_ms BIGINT,
+    PRIMARY KEY (pusher, room_id)
+);
diff --git a/synapse/storage/schema/delta/32/remove_indices.sql b/synapse/storage/schema/delta/32/remove_indices.sql
new file mode 100644
index 0000000000..f859be46a6
--- /dev/null
+++ b/synapse/storage/schema/delta/32/remove_indices.sql
@@ -0,0 +1,38 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+-- The following indices are redundant, other indices are equivalent or
+-- supersets
+DROP INDEX IF EXISTS events_room_id; -- Prefix of events_room_stream
+DROP INDEX IF EXISTS events_order; -- Prefix of events_order_topo_stream_room
+DROP INDEX IF EXISTS events_topological_ordering; -- Prefix of events_order_topo_stream_room
+DROP INDEX IF EXISTS events_stream_ordering; -- Duplicate of PRIMARY KEY
+DROP INDEX IF EXISTS state_groups_id; -- Duplicate of PRIMARY KEY
+DROP INDEX IF EXISTS event_to_state_groups_id; -- Duplicate of PRIMARY KEY
+DROP INDEX IF EXISTS event_push_actions_room_id_event_id_user_id_profile_tag; -- Duplicate of UNIQUE CONSTRAINT
+
+DROP INDEX IF EXISTS event_destinations_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS st_extrem_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS event_content_hashes_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS event_signatures_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS event_edge_hashes_id; -- Prefix of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS redactions_event_id; -- Duplicate of UNIQUE CONSTRAINT
+DROP INDEX IF EXISTS room_hosts_room_id; -- Prefix of UNIQUE CONSTRAINT
+
+-- The following indices were unused
+DROP INDEX IF EXISTS remote_media_cache_thumbnails_media_id;
+DROP INDEX IF EXISTS evauth_edges_auth_id;
+DROP INDEX IF EXISTS presence_stream_state;
diff --git a/synapse/storage/schema/delta/32/reports.sql b/synapse/storage/schema/delta/32/reports.sql
new file mode 100644
index 0000000000..d13609776f
--- /dev/null
+++ b/synapse/storage/schema/delta/32/reports.sql
@@ -0,0 +1,25 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+CREATE TABLE event_reports(
+    id BIGINT NOT NULL PRIMARY KEY,
+    received_ts BIGINT NOT NULL,
+    room_id TEXT NOT NULL,
+    event_id TEXT NOT NULL,
+    user_id TEXT NOT NULL,
+    reason TEXT,
+    content TEXT
+);
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index 59ac7f424c..12941d1775 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -21,6 +21,7 @@ from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 
 import logging
 import re
+import ujson as json
 
 
 logger = logging.getLogger(__name__)
@@ -29,12 +30,17 @@ logger = logging.getLogger(__name__)
 class SearchStore(BackgroundUpdateStore):
 
     EVENT_SEARCH_UPDATE_NAME = "event_search"
+    EVENT_SEARCH_ORDER_UPDATE_NAME = "event_search_order"
 
     def __init__(self, hs):
         super(SearchStore, self).__init__(hs)
         self.register_background_update_handler(
             self.EVENT_SEARCH_UPDATE_NAME, self._background_reindex_search
         )
+        self.register_background_update_handler(
+            self.EVENT_SEARCH_ORDER_UPDATE_NAME,
+            self._background_reindex_search_order
+        )
 
     @defer.inlineCallbacks
     def _background_reindex_search(self, progress, batch_size):
@@ -47,7 +53,7 @@ class SearchStore(BackgroundUpdateStore):
 
         def reindex_search_txn(txn):
             sql = (
-                "SELECT stream_ordering, event_id FROM events"
+                "SELECT stream_ordering, event_id, room_id, type, content FROM events"
                 " WHERE ? <= stream_ordering AND stream_ordering < ?"
                 " AND (%s)"
                 " ORDER BY stream_ordering DESC"
@@ -56,28 +62,30 @@ class SearchStore(BackgroundUpdateStore):
 
             txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
 
-            rows = txn.fetchall()
+            rows = self.cursor_to_dict(txn)
             if not rows:
                 return 0
 
-            min_stream_id = rows[-1][0]
-            event_ids = [row[1] for row in rows]
-
-            events = self._get_events_txn(txn, event_ids)
+            min_stream_id = rows[-1]["stream_ordering"]
 
             event_search_rows = []
-            for event in events:
+            for row in rows:
                 try:
-                    event_id = event.event_id
-                    room_id = event.room_id
-                    content = event.content
-                    if event.type == "m.room.message":
+                    event_id = row["event_id"]
+                    room_id = row["room_id"]
+                    etype = row["type"]
+                    try:
+                        content = json.loads(row["content"])
+                    except:
+                        continue
+
+                    if etype == "m.room.message":
                         key = "content.body"
                         value = content["body"]
-                    elif event.type == "m.room.topic":
+                    elif etype == "m.room.topic":
                         key = "content.topic"
                         value = content["topic"]
-                    elif event.type == "m.room.name":
+                    elif etype == "m.room.name":
                         key = "content.name"
                         value = content["name"]
                 except (KeyError, AttributeError):
@@ -132,6 +140,82 @@ class SearchStore(BackgroundUpdateStore):
         defer.returnValue(result)
 
     @defer.inlineCallbacks
+    def _background_reindex_search_order(self, progress, batch_size):
+        target_min_stream_id = progress["target_min_stream_id_inclusive"]
+        max_stream_id = progress["max_stream_id_exclusive"]
+        rows_inserted = progress.get("rows_inserted", 0)
+        have_added_index = progress['have_added_indexes']
+
+        if not have_added_index:
+            def create_index(conn):
+                conn.rollback()
+                conn.set_session(autocommit=True)
+                c = conn.cursor()
+
+                # We create with NULLS FIRST so that when we search *backwards*
+                # we get the ones with non null origin_server_ts *first*
+                c.execute(
+                    "CREATE INDEX CONCURRENTLY event_search_room_order ON event_search("
+                    "room_id, origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)"
+                )
+                c.execute(
+                    "CREATE INDEX CONCURRENTLY event_search_order ON event_search("
+                    "origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)"
+                )
+                conn.set_session(autocommit=False)
+
+            yield self.runWithConnection(create_index)
+
+            pg = dict(progress)
+            pg["have_added_indexes"] = True
+
+            yield self.runInteraction(
+                self.EVENT_SEARCH_ORDER_UPDATE_NAME,
+                self._background_update_progress_txn,
+                self.EVENT_SEARCH_ORDER_UPDATE_NAME, pg,
+            )
+
+        def reindex_search_txn(txn):
+            sql = (
+                "UPDATE event_search AS es SET stream_ordering = e.stream_ordering,"
+                " origin_server_ts = e.origin_server_ts"
+                " FROM events AS e"
+                " WHERE e.event_id = es.event_id"
+                " AND ? <= e.stream_ordering AND e.stream_ordering < ?"
+                " RETURNING es.stream_ordering"
+            )
+
+            min_stream_id = max_stream_id - batch_size
+            txn.execute(sql, (min_stream_id, max_stream_id))
+            rows = txn.fetchall()
+
+            if min_stream_id < target_min_stream_id:
+                # We've recached the end.
+                return len(rows), False
+
+            progress = {
+                "target_min_stream_id_inclusive": target_min_stream_id,
+                "max_stream_id_exclusive": min_stream_id,
+                "rows_inserted": rows_inserted + len(rows),
+                "have_added_indexes": True,
+            }
+
+            self._background_update_progress_txn(
+                txn, self.EVENT_SEARCH_ORDER_UPDATE_NAME, progress
+            )
+
+            return len(rows), True
+
+        num_rows, finished = yield self.runInteraction(
+            self.EVENT_SEARCH_ORDER_UPDATE_NAME, reindex_search_txn
+        )
+
+        if not finished:
+            yield self._end_background_update(self.EVENT_SEARCH_ORDER_UPDATE_NAME)
+
+        defer.returnValue(num_rows)
+
+    @defer.inlineCallbacks
     def search_msgs(self, room_ids, search_term, keys):
         """Performs a full text search over events with given keys.
 
@@ -310,7 +394,6 @@ class SearchStore(BackgroundUpdateStore):
                 "SELECT ts_rank_cd(vector, to_tsquery('english', ?)) as rank,"
                 " origin_server_ts, stream_ordering, room_id, event_id"
                 " FROM event_search"
-                " NATURAL JOIN events"
                 " WHERE vector @@ to_tsquery('english', ?) AND "
             )
             args = [search_query, search_query] + args
@@ -355,7 +438,15 @@ class SearchStore(BackgroundUpdateStore):
 
         # We add an arbitrary limit here to ensure we don't try to pull the
         # entire table from the database.
-        sql += " ORDER BY origin_server_ts DESC, stream_ordering DESC LIMIT ?"
+        if isinstance(self.database_engine, PostgresEngine):
+            sql += (
+                " ORDER BY origin_server_ts DESC NULLS LAST,"
+                " stream_ordering DESC NULLS LAST LIMIT ?"
+            )
+        elif isinstance(self.database_engine, Sqlite3Engine):
+            sql += " ORDER BY origin_server_ts DESC, stream_ordering DESC LIMIT ?"
+        else:
+            raise Exception("Unrecognized database engine")
 
         args.append(limit)
 
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index b10f2a5787..ea6823f18d 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -19,17 +19,24 @@ from ._base import SQLBaseStore
 
 from unpaddedbase64 import encode_base64
 from synapse.crypto.event_signing import compute_event_reference_hash
+from synapse.util.caches.descriptors import cached, cachedList
 
 
 class SignatureStore(SQLBaseStore):
     """Persistence for event signatures and hashes"""
 
+    @cached(lru=True)
+    def get_event_reference_hash(self, event_id):
+        return self._get_event_reference_hashes_txn(event_id)
+
+    @cachedList(cached_method_name="get_event_reference_hash",
+                list_name="event_ids", num_args=1)
     def get_event_reference_hashes(self, event_ids):
         def f(txn):
-            return [
-                self._get_event_reference_hashes_txn(txn, ev)
-                for ev in event_ids
-            ]
+            return {
+                event_id: self._get_event_reference_hashes_txn(txn, event_id)
+                for event_id in event_ids
+            }
 
         return self.runInteraction(
             "get_event_reference_hashes",
@@ -41,15 +48,15 @@ class SignatureStore(SQLBaseStore):
         hashes = yield self.get_event_reference_hashes(
             event_ids
         )
-        hashes = [
-            {
+        hashes = {
+            e_id: {
                 k: encode_base64(v) for k, v in h.items()
                 if k == "sha256"
             }
-            for h in hashes
-        ]
+            for e_id, h in hashes.items()
+        }
 
-        defer.returnValue(zip(event_ids, hashes))
+        defer.returnValue(hashes.items())
 
     def _get_event_reference_hashes_txn(self, txn, event_id):
         """Get all the hashes for a given PDU.
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 02cefdff26..5b743db67a 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -64,12 +64,12 @@ class StateStore(SQLBaseStore):
             for group, state_map in group_to_state.items()
         })
 
-    def _store_state_groups_txn(self, txn, event, context):
-        return self._store_mult_state_groups_txn(txn, [(event, context)])
-
     def _store_mult_state_groups_txn(self, txn, events_and_contexts):
         state_groups = {}
         for event, context in events_and_contexts:
+            if event.internal_metadata.is_outlier():
+                continue
+
             if context.current_state is None:
                 continue
 
@@ -82,7 +82,8 @@ class StateStore(SQLBaseStore):
             if event.is_state():
                 state_events[(event.type, event.state_key)] = event
 
-            state_group = self._state_groups_id_gen.get_next()
+            state_group = context.new_state_group_id
+
             self._simple_insert_txn(
                 txn,
                 table="state_groups",
@@ -114,11 +115,10 @@ class StateStore(SQLBaseStore):
             table="event_to_state_groups",
             values=[
                 {
-                    "state_group": state_groups[event.event_id],
-                    "event_id": event.event_id,
+                    "state_group": state_group_id,
+                    "event_id": event_id,
                 }
-                for event, context in events_and_contexts
-                if context.current_state is not None
+                for event_id, state_group_id in state_groups.items()
             ],
         )
 
@@ -174,6 +174,12 @@ class StateStore(SQLBaseStore):
             return [r[0] for r in results]
         return self.runInteraction("get_current_state_for_key", f)
 
+    @cached(num_args=2, lru=True, max_entries=1000)
+    def _get_state_group_from_group(self, group, types):
+        raise NotImplementedError()
+
+    @cachedList(cached_method_name="_get_state_group_from_group",
+                list_name="groups", num_args=2, inlineCallbacks=True)
     def _get_state_groups_from_groups(self, groups, types):
         """Returns dictionary state_group -> (dict of (type, state_key) -> event id)
         """
@@ -201,18 +207,23 @@ class StateStore(SQLBaseStore):
             txn.execute(sql, args)
             rows = self.cursor_to_dict(txn)
 
-            results = {}
+            results = {group: {} for group in groups}
             for row in rows:
                 key = (row["type"], row["state_key"])
-                results.setdefault(row["state_group"], {})[key] = row["event_id"]
+                results[row["state_group"]][key] = row["event_id"]
             return results
 
+        results = {}
+
         chunks = [groups[i:i + 100] for i in xrange(0, len(groups), 100)]
         for chunk in chunks:
-            return self.runInteraction(
+            res = yield self.runInteraction(
                 "_get_state_groups_from_groups",
                 f, chunk
             )
+            results.update(res)
+
+        defer.returnValue(results)
 
     @defer.inlineCallbacks
     def get_state_for_events(self, event_ids, types):
@@ -249,11 +260,14 @@ class StateStore(SQLBaseStore):
         """
         Get the state dict corresponding to a particular event
 
-        :param str event_id: event whose state should be returned
-        :param list[(str, str)]|None types: List of (type, state_key) tuples
-            which are used to filter the state fetched. May be None, which
-            matches any key
-        :return: a deferred dict from (type, state_key) -> state_event
+        Args:
+            event_id(str): event whose state should be returned
+            types(list[(str, str)]|None): List of (type, state_key) tuples
+                which are used to filter the state fetched. May be None, which
+                matches any key
+
+        Returns:
+            A deferred dict from (type, state_key) -> state_event
         """
         state_map = yield self.get_state_for_events([event_id], types)
         defer.returnValue(state_map[event_id])
@@ -270,8 +284,8 @@ class StateStore(SQLBaseStore):
             desc="_get_state_group_for_event",
         )
 
-    @cachedList(cache=_get_state_group_for_event.cache, list_name="event_ids",
-                num_args=1, inlineCallbacks=True)
+    @cachedList(cached_method_name="_get_state_group_for_event",
+                list_name="event_ids", num_args=1, inlineCallbacks=True)
     def _get_state_group_for_events(self, event_ids):
         """Returns mapping event_id -> state_group
         """
@@ -356,6 +370,8 @@ class StateStore(SQLBaseStore):
         a `state_key` of None matches all state_keys. If `types` is None then
         all events are returned.
         """
+        if types:
+            types = frozenset(types)
         results = {}
         missing_groups = []
         if types is not None:
@@ -429,3 +445,33 @@ class StateStore(SQLBaseStore):
             }
 
         defer.returnValue(results)
+
+    def get_all_new_state_groups(self, last_id, current_id, limit):
+        def get_all_new_state_groups_txn(txn):
+            sql = (
+                "SELECT id, room_id, event_id FROM state_groups"
+                " WHERE ? < id AND id <= ? ORDER BY id LIMIT ?"
+            )
+            txn.execute(sql, (last_id, current_id, limit))
+            groups = txn.fetchall()
+
+            if not groups:
+                return ([], [])
+
+            lower_bound = groups[0][0]
+            upper_bound = groups[-1][0]
+            sql = (
+                "SELECT state_group, type, state_key, event_id"
+                " FROM state_groups_state"
+                " WHERE ? <= state_group AND state_group <= ?"
+            )
+
+            txn.execute(sql, (lower_bound, upper_bound))
+            state_group_state = txn.fetchall()
+            return (groups, state_group_state)
+        return self.runInteraction(
+            "get_all_new_state_groups", get_all_new_state_groups_txn
+        )
+
+    def get_state_stream_token(self):
+        return self._state_groups_id_gen.get_current_token()
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index cf84938be5..b9ad965fd6 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -132,29 +132,25 @@ class StreamStore(SQLBaseStore):
                         return True
                 return False
 
-            ret = self._get_events_txn(
-                txn,
-                # apply the filter on the room id list
-                [
-                    r["event_id"] for r in rows
-                    if app_service_interested(r)
-                ],
-                get_prev_content=True
-            )
+            return [r for r in rows if app_service_interested(r)]
 
-            self._set_before_and_after(ret, rows)
+        rows = yield self.runInteraction("get_appservice_room_stream", f)
 
-            if rows:
-                key = "s%d" % max(r["stream_ordering"] for r in rows)
-            else:
-                # Assume we didn't get anything because there was nothing to
-                # get.
-                key = to_key
+        ret = yield self._get_events(
+            [r["event_id"] for r in rows],
+            get_prev_content=True
+        )
 
-            return ret, key
+        self._set_before_and_after(ret, rows, topo_order=from_id is None)
 
-        results = yield self.runInteraction("get_appservice_room_stream", f)
-        defer.returnValue(results)
+        if rows:
+            key = "s%d" % max(r["stream_ordering"] for r in rows)
+        else:
+            # Assume we didn't get anything because there was nothing to
+            # get.
+            key = to_key
+
+        defer.returnValue((ret, key))
 
     @defer.inlineCallbacks
     def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0,
@@ -303,96 +299,6 @@ class StreamStore(SQLBaseStore):
 
         defer.returnValue(ret)
 
-    def get_room_events_stream(
-        self,
-        user_id,
-        from_key,
-        to_key,
-        limit=0,
-        is_guest=False,
-        room_ids=None
-    ):
-        room_ids = room_ids or []
-        room_ids = [r for r in room_ids]
-        if is_guest:
-            current_room_membership_sql = (
-                "SELECT c.room_id FROM history_visibility AS h"
-                " INNER JOIN current_state_events AS c"
-                " ON h.event_id = c.event_id"
-                " WHERE c.room_id IN (%s)"
-                " AND h.history_visibility = 'world_readable'" % (
-                    ",".join(map(lambda _: "?", room_ids))
-                )
-            )
-            current_room_membership_args = room_ids
-        else:
-            current_room_membership_sql = (
-                "SELECT m.room_id FROM room_memberships as m "
-                " INNER JOIN current_state_events as c"
-                " ON m.event_id = c.event_id AND c.state_key = m.user_id"
-                " WHERE m.user_id = ? AND m.membership = 'join'"
-            )
-            current_room_membership_args = [user_id]
-
-        # We also want to get any membership events about that user, e.g.
-        # invites or leave notifications.
-        membership_sql = (
-            "SELECT m.event_id FROM room_memberships as m "
-            "INNER JOIN current_state_events as c ON m.event_id = c.event_id "
-            "WHERE m.user_id = ? "
-        )
-        membership_args = [user_id]
-
-        if limit:
-            limit = max(limit, MAX_STREAM_SIZE)
-        else:
-            limit = MAX_STREAM_SIZE
-
-        # From and to keys should be integers from ordering.
-        from_id = RoomStreamToken.parse_stream_token(from_key)
-        to_id = RoomStreamToken.parse_stream_token(to_key)
-
-        if from_key == to_key:
-            return defer.succeed(([], to_key))
-
-        sql = (
-            "SELECT e.event_id, e.stream_ordering FROM events AS e WHERE "
-            "(e.outlier = ? AND (room_id IN (%(current)s)) OR "
-            "(event_id IN (%(invites)s))) "
-            "AND e.stream_ordering > ? AND e.stream_ordering <= ? "
-            "ORDER BY stream_ordering ASC LIMIT %(limit)d "
-        ) % {
-            "current": current_room_membership_sql,
-            "invites": membership_sql,
-            "limit": limit
-        }
-
-        def f(txn):
-            args = ([False] + current_room_membership_args + membership_args +
-                    [from_id.stream, to_id.stream])
-            txn.execute(sql, args)
-
-            rows = self.cursor_to_dict(txn)
-
-            ret = self._get_events_txn(
-                txn,
-                [r["event_id"] for r in rows],
-                get_prev_content=True
-            )
-
-            self._set_before_and_after(ret, rows)
-
-            if rows:
-                key = "s%d" % max(r["stream_ordering"] for r in rows)
-            else:
-                # Assume we didn't get anything because there was nothing to
-                # get.
-                key = to_key
-
-            return ret, key
-
-        return self.runInteraction("get_room_events_stream", f)
-
     @defer.inlineCallbacks
     def paginate_room_events(self, room_id, from_key, to_key=None,
                              direction='b', limit=-1):
@@ -539,7 +445,7 @@ class StreamStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def get_room_events_max_id(self, direction='f'):
-        token = yield self._stream_id_gen.get_max_token()
+        token = yield self._stream_id_gen.get_current_token()
         if direction != 'b':
             defer.returnValue("s%d" % (token,))
         else:
diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py
index a0e6b42b30..9da23f34cb 100644
--- a/synapse/storage/tags.py
+++ b/synapse/storage/tags.py
@@ -30,7 +30,7 @@ class TagsStore(SQLBaseStore):
         Returns:
             A deferred int.
         """
-        return self._account_data_id_gen.get_max_token()
+        return self._account_data_id_gen.get_current_token()
 
     @cached()
     def get_tags_for_user(self, user_id):
@@ -200,7 +200,7 @@ class TagsStore(SQLBaseStore):
 
         self.get_tags_for_user.invalidate((user_id,))
 
-        result = self._account_data_id_gen.get_max_token()
+        result = self._account_data_id_gen.get_current_token()
         defer.returnValue(result)
 
     @defer.inlineCallbacks
@@ -222,7 +222,7 @@ class TagsStore(SQLBaseStore):
 
         self.get_tags_for_user.invalidate((user_id,))
 
-        result = self._account_data_id_gen.get_max_token()
+        result = self._account_data_id_gen.get_current_token()
         defer.returnValue(result)
 
     def _update_revision_txn(self, txn, user_id, room_id, next_id):
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index d338dfcf0a..6c7481a728 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -16,16 +16,56 @@
 from ._base import SQLBaseStore
 from synapse.util.caches.descriptors import cached
 
+from twisted.internet import defer, reactor
+
 from canonicaljson import encode_canonical_json
+
+from collections import namedtuple
+
+import itertools
 import logging
 
 logger = logging.getLogger(__name__)
 
 
+_TransactionRow = namedtuple(
+    "_TransactionRow", (
+        "id", "transaction_id", "destination", "ts", "response_code",
+        "response_json",
+    )
+)
+
+_UpdateTransactionRow = namedtuple(
+    "_TransactionRow", (
+        "response_code", "response_json",
+    )
+)
+
+
 class TransactionStore(SQLBaseStore):
     """A collection of queries for handling PDUs.
     """
 
+    def __init__(self, hs):
+        super(TransactionStore, self).__init__(hs)
+
+        # New transactions that are currently in flights
+        self.inflight_transactions = {}
+
+        # Newly delievered transactions that *weren't* persisted while in flight
+        self.new_delivered_transactions = {}
+
+        # Newly delivered transactions that *were* persisted while in flight
+        self.update_delivered_transactions = {}
+
+        self.last_transaction = {}
+
+        reactor.addSystemEventTrigger("before", "shutdown", self._persist_in_mem_txns)
+        hs.get_clock().looping_call(
+            self._persist_in_mem_txns,
+            1000,
+        )
+
     def get_received_txn_response(self, transaction_id, origin):
         """For an incoming transaction from a given origin, check if we have
         already responded to it. If so, return the response code and response
@@ -108,17 +148,30 @@ class TransactionStore(SQLBaseStore):
             list: A list of previous transaction ids.
         """
 
-        return self.runInteraction(
-            "prep_send_transaction",
-            self._prep_send_transaction,
-            transaction_id, destination, origin_server_ts
+        auto_id = self._transaction_id_gen.get_next()
+
+        txn_row = _TransactionRow(
+            id=auto_id,
+            transaction_id=transaction_id,
+            destination=destination,
+            ts=origin_server_ts,
+            response_code=0,
+            response_json=None,
         )
 
-    def _prep_send_transaction(self, txn, transaction_id, destination,
-                               origin_server_ts):
+        self.inflight_transactions.setdefault(destination, {})[transaction_id] = txn_row
 
-        next_id = self._transaction_id_gen.get_next()
+        prev_txn = self.last_transaction.get(destination)
+        if prev_txn:
+            return defer.succeed(prev_txn)
+        else:
+            return self.runInteraction(
+                "_get_prevs_txn",
+                self._get_prevs_txn,
+                destination,
+            )
 
+    def _get_prevs_txn(self, txn, destination):
         # First we find out what the prev_txns should be.
         # Since we know that we are only sending one transaction at a time,
         # we can simply take the last one.
@@ -133,23 +186,6 @@ class TransactionStore(SQLBaseStore):
 
         prev_txns = [r["transaction_id"] for r in results]
 
-        # Actually add the new transaction to the sent_transactions table.
-
-        self._simple_insert_txn(
-            txn,
-            table="sent_transactions",
-            values={
-                "id": next_id,
-                "transaction_id": transaction_id,
-                "destination": destination,
-                "ts": origin_server_ts,
-                "response_code": 0,
-                "response_json": None,
-            }
-        )
-
-        # TODO Update the tx id -> pdu id mapping
-
         return prev_txns
 
     def delivered_txn(self, transaction_id, destination, code, response_dict):
@@ -161,27 +197,23 @@ class TransactionStore(SQLBaseStore):
             code (int)
             response_json (str)
         """
-        return self.runInteraction(
-            "delivered_txn",
-            self._delivered_txn,
-            transaction_id, destination, code,
-            buffer(encode_canonical_json(response_dict)),
-        )
 
-    def _delivered_txn(self, txn, transaction_id, destination,
-                       code, response_json):
-        self._simple_update_one_txn(
-            txn,
-            table="sent_transactions",
-            keyvalues={
-                "transaction_id": transaction_id,
-                "destination": destination,
-            },
-            updatevalues={
-                "response_code": code,
-                "response_json": None,  # For now, don't persist response_json
-            }
-        )
+        txn_row = self.inflight_transactions.get(
+            destination, {}
+        ).pop(transaction_id, None)
+
+        self.last_transaction[destination] = transaction_id
+
+        if txn_row:
+            d = self.new_delivered_transactions.setdefault(destination, {})
+            d[transaction_id] = txn_row._replace(
+                response_code=code,
+                response_json=None,  # For now, don't persist response
+            )
+        else:
+            d = self.update_delivered_transactions.setdefault(destination, {})
+            # For now, don't persist response
+            d[transaction_id] = _UpdateTransactionRow(code, None)
 
     def get_transactions_after(self, transaction_id, destination):
         """Get all transactions after a given local transaction_id.
@@ -305,3 +337,48 @@ class TransactionStore(SQLBaseStore):
 
         txn.execute(query, (self._clock.time_msec(),))
         return self.cursor_to_dict(txn)
+
+    @defer.inlineCallbacks
+    def _persist_in_mem_txns(self):
+        try:
+            inflight = self.inflight_transactions
+            new_delivered = self.new_delivered_transactions
+            update_delivered = self.update_delivered_transactions
+
+            self.inflight_transactions = {}
+            self.new_delivered_transactions = {}
+            self.update_delivered_transactions = {}
+
+            full_rows = [
+                row._asdict()
+                for txn_map in itertools.chain(inflight.values(), new_delivered.values())
+                for row in txn_map.values()
+            ]
+
+            def f(txn):
+                if full_rows:
+                    self._simple_insert_many_txn(
+                        txn=txn,
+                        table="sent_transactions",
+                        values=full_rows
+                    )
+
+                for dest, txn_map in update_delivered.items():
+                    for txn_id, update_row in txn_map.items():
+                        self._simple_update_one_txn(
+                            txn,
+                            table="sent_transactions",
+                            keyvalues={
+                                "transaction_id": txn_id,
+                                "destination": dest,
+                            },
+                            updatevalues={
+                                "response_code": update_row.response_code,
+                                "response_json": None,  # For now, don't persist response
+                            }
+                        )
+
+            if full_rows or update_delivered:
+                yield self.runInteraction("_persist_in_mem_txns", f)
+        except:
+            logger.exception("Failed to persist transactions!")
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index a02dfc7d58..46cf93ff87 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -21,7 +21,7 @@ import threading
 class IdGenerator(object):
     def __init__(self, db_conn, table, column):
         self._lock = threading.Lock()
-        self._next_id = _load_max_id(db_conn, table, column)
+        self._next_id = _load_current_id(db_conn, table, column)
 
     def get_next(self):
         with self._lock:
@@ -29,12 +29,16 @@ class IdGenerator(object):
             return self._next_id
 
 
-def _load_max_id(db_conn, table, column):
+def _load_current_id(db_conn, table, column, step=1):
     cur = db_conn.cursor()
-    cur.execute("SELECT MAX(%s) FROM %s" % (column, table,))
+    if step == 1:
+        cur.execute("SELECT MAX(%s) FROM %s" % (column, table,))
+    else:
+        cur.execute("SELECT MIN(%s) FROM %s" % (column, table,))
     val, = cur.fetchone()
     cur.close()
-    return int(val) if val else 1
+    current_id = int(val) if val else step
+    return (max if step > 0 else min)(current_id, step)
 
 
 class StreamIdGenerator(object):
@@ -45,17 +49,32 @@ class StreamIdGenerator(object):
     all ids less than or equal to it have completed. This handles the fact that
     persistence of events can complete out of order.
 
+    Args:
+        db_conn(connection):  A database connection to use to fetch the
+            initial value of the generator from.
+        table(str): A database table to read the initial value of the id
+            generator from.
+        column(str): The column of the database table to read the initial
+            value from the id generator from.
+        extra_tables(list): List of pairs of database tables and columns to
+            use to source the initial value of the generator from. The value
+            with the largest magnitude is used.
+        step(int): which direction the stream ids grow in. +1 to grow
+            upwards, -1 to grow downwards.
+
     Usage:
         with stream_id_gen.get_next() as stream_id:
             # ... persist event ...
     """
-    def __init__(self, db_conn, table, column, extra_tables=[]):
+    def __init__(self, db_conn, table, column, extra_tables=[], step=1):
+        assert step != 0
         self._lock = threading.Lock()
-        self._current_max = _load_max_id(db_conn, table, column)
+        self._step = step
+        self._current = _load_current_id(db_conn, table, column, step)
         for table, column in extra_tables:
-            self._current_max = max(
-                self._current_max,
-                _load_max_id(db_conn, table, column)
+            self._current = (max if step > 0 else min)(
+                self._current,
+                _load_current_id(db_conn, table, column, step)
             )
         self._unfinished_ids = deque()
 
@@ -66,8 +85,8 @@ class StreamIdGenerator(object):
                 # ... persist event ...
         """
         with self._lock:
-            self._current_max += 1
-            next_id = self._current_max
+            self._current += self._step
+            next_id = self._current
 
             self._unfinished_ids.append(next_id)
 
@@ -88,8 +107,12 @@ class StreamIdGenerator(object):
                 # ... persist events ...
         """
         with self._lock:
-            next_ids = range(self._current_max + 1, self._current_max + n + 1)
-            self._current_max += n
+            next_ids = range(
+                self._current + self._step,
+                self._current + self._step * (n + 1),
+                self._step
+            )
+            self._current += n * self._step
 
             for next_id in next_ids:
                 self._unfinished_ids.append(next_id)
@@ -105,15 +128,15 @@ class StreamIdGenerator(object):
 
         return manager()
 
-    def get_max_token(self):
+    def get_current_token(self):
         """Returns the maximum stream id such that all stream ids less than or
         equal to it have been successfully persisted.
         """
         with self._lock:
             if self._unfinished_ids:
-                return self._unfinished_ids[0] - 1
+                return self._unfinished_ids[0] - self._step
 
-            return self._current_max
+            return self._current
 
 
 class ChainedIdGenerator(object):
@@ -125,7 +148,7 @@ class ChainedIdGenerator(object):
     def __init__(self, chained_generator, db_conn, table, column):
         self.chained_generator = chained_generator
         self._lock = threading.Lock()
-        self._current_max = _load_max_id(db_conn, table, column)
+        self._current_max = _load_current_id(db_conn, table, column)
         self._unfinished_ids = deque()
 
     def get_next(self):
@@ -137,7 +160,7 @@ class ChainedIdGenerator(object):
         with self._lock:
             self._current_max += 1
             next_id = self._current_max
-            chained_id = self.chained_generator.get_max_token()
+            chained_id = self.chained_generator.get_current_token()
 
             self._unfinished_ids.append((next_id, chained_id))
 
@@ -151,7 +174,7 @@ class ChainedIdGenerator(object):
 
         return manager()
 
-    def get_max_token(self):
+    def get_current_token(self):
         """Returns the maximum stream id such that all stream ids less than or
         equal to it have been successfully persisted.
         """
@@ -160,4 +183,4 @@ class ChainedIdGenerator(object):
                 stream_id, chained_id = self._unfinished_ids[0]
                 return (stream_id - 1, chained_id)
 
-            return (self._current_max, self.chained_generator.get_max_token())
+            return (self._current_max, self.chained_generator.get_current_token())