summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py407
-rw-r--r--synapse/storage/_base.py193
-rw-r--r--synapse/storage/background_updates.py256
-rw-r--r--synapse/storage/engines/postgres.py2
-rw-r--r--synapse/storage/engines/sqlite3.py31
-rw-r--r--synapse/storage/event_federation.py94
-rw-r--r--synapse/storage/events.py108
-rw-r--r--synapse/storage/filtering.py4
-rw-r--r--synapse/storage/keys.py2
-rw-r--r--synapse/storage/prepare_database.py395
-rw-r--r--synapse/storage/pusher.py6
-rw-r--r--synapse/storage/registration.py102
-rw-r--r--synapse/storage/room.py106
-rw-r--r--synapse/storage/roommember.py47
-rw-r--r--synapse/storage/schema/delta/23/drop_state_index.sql16
-rw-r--r--synapse/storage/schema/delta/23/refresh_tokens.sql21
-rw-r--r--synapse/storage/schema/delta/24/stats_reporting.sql22
-rw-r--r--synapse/storage/schema/delta/25/00background_updates.sql21
-rw-r--r--synapse/storage/schema/delta/25/fts.py78
-rw-r--r--synapse/storage/schema/delta/25/guest_access.sql25
-rw-r--r--synapse/storage/schema/delta/25/history_visibility.sql25
-rw-r--r--synapse/storage/schema/delta/25/tags.sql38
-rw-r--r--synapse/storage/search.py307
-rw-r--r--synapse/storage/signatures.py114
-rw-r--r--synapse/storage/state.py31
-rw-r--r--synapse/storage/stream.py224
-rw-r--r--synapse/storage/tags.py216
-rw-r--r--synapse/storage/transactions.py52
28 files changed, 2022 insertions, 921 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index f154b1c8ae..e7443f2838 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -40,24 +40,16 @@ from .filtering import FilteringStore
 from .end_to_end_keys import EndToEndKeyStore
 
 from .receipts import ReceiptsStore
+from .search import SearchStore
+from .tags import TagsStore
 
 
-import fnmatch
-import imp
 import logging
-import os
-import re
 
 
 logger = logging.getLogger(__name__)
 
 
-# Remember to update this number every time a change is made to database
-# schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 22
-
-dir_path = os.path.abspath(os.path.dirname(__file__))
-
 # Number of msec of granularity to store the user IP 'last seen' time. Smaller
 # times give more inserts into the database even for readonly API hits
 # 120 seconds == 2 minutes
@@ -79,6 +71,8 @@ class DataStore(RoomMemberStore, RoomStore,
                 EventsStore,
                 ReceiptsStore,
                 EndToEndKeyStore,
+                SearchStore,
+                TagsStore,
                 ):
 
     def __init__(self, hs):
@@ -94,9 +88,9 @@ class DataStore(RoomMemberStore, RoomStore,
         )
 
     @defer.inlineCallbacks
-    def insert_client_ip(self, user, access_token, device_id, ip, user_agent):
+    def insert_client_ip(self, user, access_token, ip, user_agent):
         now = int(self._clock.time_msec())
-        key = (user.to_string(), access_token, device_id, ip)
+        key = (user.to_string(), access_token, ip)
 
         try:
             last_seen = self.client_ip_last_seen.get(key)
@@ -120,389 +114,44 @@ class DataStore(RoomMemberStore, RoomStore,
                 "user_agent": user_agent,
             },
             values={
-                "device_id": device_id,
                 "last_seen": now,
             },
             desc="insert_client_ip",
             lock=False,
         )
 
+    @defer.inlineCallbacks
+    def count_daily_users(self):
+        """
+        Counts the number of users who used this homeserver in the last 24 hours.
+        """
+        def _count_users(txn):
+            txn.execute(
+                "SELECT COUNT(DISTINCT user_id) AS users"
+                " FROM user_ips"
+                " WHERE last_seen > ?",
+                # This is close enough to a day for our purposes.
+                (int(self._clock.time_msec()) - (1000 * 60 * 60 * 24),)
+            )
+            rows = self.cursor_to_dict(txn)
+            if rows:
+                return rows[0]["users"]
+            return 0
+
+        ret = yield self.runInteraction("count_users", _count_users)
+        defer.returnValue(ret)
+
     def get_user_ip_and_agents(self, user):
         return self._simple_select_list(
             table="user_ips",
             keyvalues={"user_id": user.to_string()},
             retcols=[
-                "device_id", "access_token", "ip", "user_agent", "last_seen"
+                "access_token", "ip", "user_agent", "last_seen"
             ],
             desc="get_user_ip_and_agents",
         )
 
 
-def read_schema(path):
-    """ Read the named database schema.
-
-    Args:
-        path: Path of the database schema.
-    Returns:
-        A string containing the database schema.
-    """
-    with open(path) as schema_file:
-        return schema_file.read()
-
-
-class PrepareDatabaseException(Exception):
-    pass
-
-
-class UpgradeDatabaseException(PrepareDatabaseException):
-    pass
-
-
-def prepare_database(db_conn, database_engine):
-    """Prepares a database for usage. Will either create all necessary tables
-    or upgrade from an older schema version.
-    """
-    try:
-        cur = db_conn.cursor()
-        version_info = _get_or_create_schema_state(cur, database_engine)
-
-        if version_info:
-            user_version, delta_files, upgraded = version_info
-            _upgrade_existing_database(
-                cur, user_version, delta_files, upgraded, database_engine
-            )
-        else:
-            _setup_new_database(cur, database_engine)
-
-        # cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,))
-
-        cur.close()
-        db_conn.commit()
-    except:
-        db_conn.rollback()
-        raise
-
-
-def _setup_new_database(cur, database_engine):
-    """Sets up the database by finding a base set of "full schemas" and then
-    applying any necessary deltas.
-
-    The "full_schemas" directory has subdirectories named after versions. This
-    function searches for the highest version less than or equal to
-    `SCHEMA_VERSION` and executes all .sql files in that directory.
-
-    The function will then apply all deltas for all versions after the base
-    version.
-
-    Example directory structure:
-
-        schema/
-            delta/
-                ...
-            full_schemas/
-                3/
-                    test.sql
-                    ...
-                11/
-                    foo.sql
-                    bar.sql
-                ...
-
-    In the example foo.sql and bar.sql would be run, and then any delta files
-    for versions strictly greater than 11.
-    """
-    current_dir = os.path.join(dir_path, "schema", "full_schemas")
-    directory_entries = os.listdir(current_dir)
-
-    valid_dirs = []
-    pattern = re.compile(r"^\d+(\.sql)?$")
-    for filename in directory_entries:
-        match = pattern.match(filename)
-        abs_path = os.path.join(current_dir, filename)
-        if match and os.path.isdir(abs_path):
-            ver = int(match.group(0))
-            if ver <= SCHEMA_VERSION:
-                valid_dirs.append((ver, abs_path))
-        else:
-            logger.warn("Unexpected entry in 'full_schemas': %s", filename)
-
-    if not valid_dirs:
-        raise PrepareDatabaseException(
-            "Could not find a suitable base set of full schemas"
-        )
-
-    max_current_ver, sql_dir = max(valid_dirs, key=lambda x: x[0])
-
-    logger.debug("Initialising schema v%d", max_current_ver)
-
-    directory_entries = os.listdir(sql_dir)
-
-    for filename in fnmatch.filter(directory_entries, "*.sql"):
-        sql_loc = os.path.join(sql_dir, filename)
-        logger.debug("Applying schema %s", sql_loc)
-        executescript(cur, sql_loc)
-
-    cur.execute(
-        database_engine.convert_param_style(
-            "INSERT INTO schema_version (version, upgraded)"
-            " VALUES (?,?)"
-        ),
-        (max_current_ver, False,)
-    )
-
-    _upgrade_existing_database(
-        cur,
-        current_version=max_current_ver,
-        applied_delta_files=[],
-        upgraded=False,
-        database_engine=database_engine,
-    )
-
-
-def _upgrade_existing_database(cur, current_version, applied_delta_files,
-                               upgraded, database_engine):
-    """Upgrades an existing database.
-
-    Delta files can either be SQL stored in *.sql files, or python modules
-    in *.py.
-
-    There can be multiple delta files per version. Synapse will keep track of
-    which delta files have been applied, and will apply any that haven't been
-    even if there has been no version bump. This is useful for development
-    where orthogonal schema changes may happen on separate branches.
-
-    Different delta files for the same version *must* be orthogonal and give
-    the same result when applied in any order. No guarantees are made on the
-    order of execution of these scripts.
-
-    This is a no-op of current_version == SCHEMA_VERSION.
-
-    Example directory structure:
-
-        schema/
-            delta/
-                11/
-                    foo.sql
-                    ...
-                12/
-                    foo.sql
-                    bar.py
-                ...
-            full_schemas/
-                ...
-
-    In the example, if current_version is 11, then foo.sql will be run if and
-    only if `upgraded` is True. Then `foo.sql` and `bar.py` would be run in
-    some arbitrary order.
-
-    Args:
-        cur (Cursor)
-        current_version (int): The current version of the schema.
-        applied_delta_files (list): A list of deltas that have already been
-            applied.
-        upgraded (bool): Whether the current version was generated by having
-            applied deltas or from full schema file. If `True` the function
-            will never apply delta files for the given `current_version`, since
-            the current_version wasn't generated by applying those delta files.
-    """
-
-    if current_version > SCHEMA_VERSION:
-        raise ValueError(
-            "Cannot use this database as it is too " +
-            "new for the server to understand"
-        )
-
-    start_ver = current_version
-    if not upgraded:
-        start_ver += 1
-
-    logger.debug("applied_delta_files: %s", applied_delta_files)
-
-    for v in range(start_ver, SCHEMA_VERSION + 1):
-        logger.debug("Upgrading schema to v%d", v)
-
-        delta_dir = os.path.join(dir_path, "schema", "delta", str(v))
-
-        try:
-            directory_entries = os.listdir(delta_dir)
-        except OSError:
-            logger.exception("Could not open delta dir for version %d", v)
-            raise UpgradeDatabaseException(
-                "Could not open delta dir for version %d" % (v,)
-            )
-
-        directory_entries.sort()
-        for file_name in directory_entries:
-            relative_path = os.path.join(str(v), file_name)
-            logger.debug("Found file: %s", relative_path)
-            if relative_path in applied_delta_files:
-                continue
-
-            absolute_path = os.path.join(
-                dir_path, "schema", "delta", relative_path,
-            )
-            root_name, ext = os.path.splitext(file_name)
-            if ext == ".py":
-                # This is a python upgrade module. We need to import into some
-                # package and then execute its `run_upgrade` function.
-                module_name = "synapse.storage.v%d_%s" % (
-                    v, root_name
-                )
-                with open(absolute_path) as python_file:
-                    module = imp.load_source(
-                        module_name, absolute_path, python_file
-                    )
-                logger.debug("Running script %s", relative_path)
-                module.run_upgrade(cur, database_engine)
-            elif ext == ".pyc":
-                # Sometimes .pyc files turn up anyway even though we've
-                # disabled their generation; e.g. from distribution package
-                # installers. Silently skip it
-                pass
-            elif ext == ".sql":
-                # A plain old .sql file, just read and execute it
-                logger.debug("Applying schema %s", relative_path)
-                executescript(cur, absolute_path)
-            else:
-                # Not a valid delta file.
-                logger.warn(
-                    "Found directory entry that did not end in .py or"
-                    " .sql: %s",
-                    relative_path,
-                )
-                continue
-
-            # Mark as done.
-            cur.execute(
-                database_engine.convert_param_style(
-                    "INSERT INTO applied_schema_deltas (version, file)"
-                    " VALUES (?,?)",
-                ),
-                (v, relative_path)
-            )
-
-            cur.execute("DELETE FROM schema_version")
-            cur.execute(
-                database_engine.convert_param_style(
-                    "INSERT INTO schema_version (version, upgraded)"
-                    " VALUES (?,?)",
-                ),
-                (v, True)
-            )
-
-
-def get_statements(f):
-    statement_buffer = ""
-    in_comment = False  # If we're in a /* ... */ style comment
-
-    for line in f:
-        line = line.strip()
-
-        if in_comment:
-            # Check if this line contains an end to the comment
-            comments = line.split("*/", 1)
-            if len(comments) == 1:
-                continue
-            line = comments[1]
-            in_comment = False
-
-        # Remove inline block comments
-        line = re.sub(r"/\*.*\*/", " ", line)
-
-        # Does this line start a comment?
-        comments = line.split("/*", 1)
-        if len(comments) > 1:
-            line = comments[0]
-            in_comment = True
-
-        # Deal with line comments
-        line = line.split("--", 1)[0]
-        line = line.split("//", 1)[0]
-
-        # Find *all* semicolons. We need to treat first and last entry
-        # specially.
-        statements = line.split(";")
-
-        # We must prepend statement_buffer to the first statement
-        first_statement = "%s %s" % (
-            statement_buffer.strip(),
-            statements[0].strip()
-        )
-        statements[0] = first_statement
-
-        # Every entry, except the last, is a full statement
-        for statement in statements[:-1]:
-            yield statement.strip()
-
-        # The last entry did *not* end in a semicolon, so we store it for the
-        # next semicolon we find
-        statement_buffer = statements[-1].strip()
-
-
-def executescript(txn, schema_path):
-    with open(schema_path, 'r') as f:
-        for statement in get_statements(f):
-            txn.execute(statement)
-
-
-def _get_or_create_schema_state(txn, database_engine):
-    # Bluntly try creating the schema_version tables.
-    schema_path = os.path.join(
-        dir_path, "schema", "schema_version.sql",
-    )
-    executescript(txn, schema_path)
-
-    txn.execute("SELECT version, upgraded FROM schema_version")
-    row = txn.fetchone()
-    current_version = int(row[0]) if row else None
-    upgraded = bool(row[1]) if row else None
-
-    if current_version:
-        txn.execute(
-            database_engine.convert_param_style(
-                "SELECT file FROM applied_schema_deltas WHERE version >= ?"
-            ),
-            (current_version,)
-        )
-        applied_deltas = [d for d, in txn.fetchall()]
-        return current_version, applied_deltas, upgraded
-
-    return None
-
-
-def prepare_sqlite3_database(db_conn):
-    """This function should be called before `prepare_database` on sqlite3
-    databases.
-
-    Since we changed the way we store the current schema version and handle
-    updates to schemas, we need a way to upgrade from the old method to the
-    new. This only affects sqlite databases since they were the only ones
-    supported at the time.
-    """
-    with db_conn:
-        schema_path = os.path.join(
-            dir_path, "schema", "schema_version.sql",
-        )
-        create_schema = read_schema(schema_path)
-        db_conn.executescript(create_schema)
-
-        c = db_conn.execute("SELECT * FROM schema_version")
-        rows = c.fetchall()
-        c.close()
-
-        if not rows:
-            c = db_conn.execute("PRAGMA user_version")
-            row = c.fetchone()
-            c.close()
-
-            if row and row[0]:
-                db_conn.execute(
-                    "REPLACE INTO schema_version (version, upgraded)"
-                    " VALUES (?,?)",
-                    (row[0], False)
-                )
-
-
 def are_all_users_on_domain(txn, database_engine, domain):
     sql = database_engine.convert_param_style(
         "SELECT COUNT(*) FROM users WHERE name NOT LIKE ?"
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index d976e17786..218e708054 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -25,8 +25,6 @@ from util.id_generators import IdGenerator, StreamIdGenerator
 
 from twisted.internet import defer
 
-from collections import namedtuple
-
 import sys
 import time
 import threading
@@ -181,6 +179,7 @@ class SQLBaseStore(object):
         self._transaction_id_gen = IdGenerator("sent_transactions", "id", self)
         self._state_groups_id_gen = IdGenerator("state_groups", "id", self)
         self._access_tokens_id_gen = IdGenerator("access_tokens", "id", self)
+        self._refresh_tokens_id_gen = IdGenerator("refresh_tokens", "id", self)
         self._pushers_id_gen = IdGenerator("pushers", "id", self)
         self._push_rule_id_gen = IdGenerator("push_rules", "id", self)
         self._push_rules_enable_id_gen = IdGenerator("push_rules_enable", "id", self)
@@ -375,9 +374,6 @@ class SQLBaseStore(object):
 
         return self.runInteraction(desc, interaction)
 
-    def _execute_and_decode(self, desc, query, *args):
-        return self._execute(desc, self.cursor_to_dict, query, *args)
-
     # "Simple" SQL API methods that operate on a single table with no JOINs,
     # no complex WHERE clauses, just a dict of values for columns.
 
@@ -523,7 +519,7 @@ class SQLBaseStore(object):
                                   allow_none=False,
                                   desc="_simple_select_one_onecol"):
         """Executes a SELECT query on the named table, which is expected to
-        return a single row, returning a single column from it."
+        return a single row, returning a single column from it.
 
         Args:
             table : string giving the table name
@@ -690,37 +686,6 @@ class SQLBaseStore(object):
 
         return dict(zip(retcols, row))
 
-    def _simple_selectupdate_one(self, table, keyvalues, updatevalues=None,
-                                 retcols=None, allow_none=False,
-                                 desc="_simple_selectupdate_one"):
-        """ Combined SELECT then UPDATE."""
-        def func(txn):
-            ret = None
-            if retcols:
-                ret = self._simple_select_one_txn(
-                    txn,
-                    table=table,
-                    keyvalues=keyvalues,
-                    retcols=retcols,
-                    allow_none=allow_none,
-                )
-
-            if updatevalues:
-                self._simple_update_one_txn(
-                    txn,
-                    table=table,
-                    keyvalues=keyvalues,
-                    updatevalues=updatevalues,
-                )
-
-                # if txn.rowcount == 0:
-                #     raise StoreError(404, "No row found")
-                if txn.rowcount > 1:
-                    raise StoreError(500, "More than one row matched")
-
-            return ret
-        return self.runInteraction(desc, func)
-
     def _simple_delete_one(self, table, keyvalues, desc="_simple_delete_one"):
         """Executes a DELETE query on the named table, expecting to delete a
         single row.
@@ -742,16 +707,6 @@ class SQLBaseStore(object):
                 raise StoreError(500, "more than one row matched")
         return self.runInteraction(desc, func)
 
-    def _simple_delete(self, table, keyvalues, desc="_simple_delete"):
-        """Executes a DELETE query on the named table.
-
-        Args:
-            table : string giving the table name
-            keyvalues : dict of column names and values to select the row with
-        """
-
-        return self.runInteraction(desc, self._simple_delete_txn)
-
     def _simple_delete_txn(self, txn, table, keyvalues):
         sql = "DELETE FROM %s WHERE %s" % (
             table,
@@ -760,24 +715,6 @@ class SQLBaseStore(object):
 
         return txn.execute(sql, keyvalues.values())
 
-    def _simple_max_id(self, table):
-        """Executes a SELECT query on the named table, expecting to return the
-        max value for the column "id".
-
-        Args:
-            table : string giving the table name
-        """
-        sql = "SELECT MAX(id) AS id FROM %s" % table
-
-        def func(txn):
-            txn.execute(sql)
-            max_id = self.cursor_to_dict(txn)[0]["id"]
-            if max_id is None:
-                return 0
-            return max_id
-
-        return self.runInteraction("_simple_max_id", func)
-
     def get_next_stream_id(self):
         with self._next_stream_id_lock:
             i = self._next_stream_id
@@ -790,129 +727,3 @@ class _RollbackButIsFineException(Exception):
     something went wrong.
     """
     pass
-
-
-class Table(object):
-    """ A base class used to store information about a particular table.
-    """
-
-    table_name = None
-    """ str: The name of the table """
-
-    fields = None
-    """ list: The field names """
-
-    EntryType = None
-    """ Type: A tuple type used to decode the results """
-
-    _select_where_clause = "SELECT %s FROM %s WHERE %s"
-    _select_clause = "SELECT %s FROM %s"
-    _insert_clause = "REPLACE INTO %s (%s) VALUES (%s)"
-
-    @classmethod
-    def select_statement(cls, where_clause=None):
-        """
-        Args:
-            where_clause (str): The WHERE clause to use.
-
-        Returns:
-            str: An SQL statement to select rows from the table with the given
-            WHERE clause.
-        """
-        if where_clause:
-            return cls._select_where_clause % (
-                ", ".join(cls.fields),
-                cls.table_name,
-                where_clause
-            )
-        else:
-            return cls._select_clause % (
-                ", ".join(cls.fields),
-                cls.table_name,
-            )
-
-    @classmethod
-    def insert_statement(cls):
-        return cls._insert_clause % (
-            cls.table_name,
-            ", ".join(cls.fields),
-            ", ".join(["?"] * len(cls.fields)),
-        )
-
-    @classmethod
-    def decode_single_result(cls, results):
-        """ Given an iterable of tuples, return a single instance of
-            `EntryType` or None if the iterable is empty
-        Args:
-            results (list): The results list to convert to `EntryType`
-        Returns:
-            EntryType: An instance of `EntryType`
-        """
-        results = list(results)
-        if results:
-            return cls.EntryType(*results[0])
-        else:
-            return None
-
-    @classmethod
-    def decode_results(cls, results):
-        """ Given an iterable of tuples, return a list of `EntryType`
-        Args:
-            results (list): The results list to convert to `EntryType`
-
-        Returns:
-            list: A list of `EntryType`
-        """
-        return [cls.EntryType(*row) for row in results]
-
-    @classmethod
-    def get_fields_string(cls, prefix=None):
-        if prefix:
-            to_join = ("%s.%s" % (prefix, f) for f in cls.fields)
-        else:
-            to_join = cls.fields
-
-        return ", ".join(to_join)
-
-
-class JoinHelper(object):
-    """ Used to help do joins on tables by looking at the tables' fields and
-    creating a list of unique fields to use with SELECTs and a namedtuple
-    to dump the results into.
-
-    Attributes:
-        tables (list): List of `Table` classes
-        EntryType (type)
-    """
-
-    def __init__(self, *tables):
-        self.tables = tables
-
-        res = []
-        for table in self.tables:
-            res += [f for f in table.fields if f not in res]
-
-        self.EntryType = namedtuple("JoinHelperEntry", res)
-
-    def get_fields(self, **prefixes):
-        """Get a string representing a list of fields for use in SELECT
-        statements with the given prefixes applied to each.
-
-        For example::
-
-            JoinHelper(PdusTable, StateTable).get_fields(
-                PdusTable="pdus",
-                StateTable="state"
-            )
-        """
-        res = []
-        for field in self.EntryType._fields:
-            for table in self.tables:
-                if field in table.fields:
-                    res.append("%s.%s" % (prefixes[table.__name__], field))
-                    break
-
-        return ", ".join(res)
-
-    def decode_results(self, rows):
-        return [self.EntryType(*row) for row in rows]
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
new file mode 100644
index 0000000000..45fccc2e5e
--- /dev/null
+++ b/synapse/storage/background_updates.py
@@ -0,0 +1,256 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014, 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import SQLBaseStore
+
+from twisted.internet import defer
+
+import ujson as json
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class BackgroundUpdatePerformance(object):
+    """Tracks the how long a background update is taking to update its items"""
+
+    def __init__(self, name):
+        self.name = name
+        self.total_item_count = 0
+        self.total_duration_ms = 0
+        self.avg_item_count = 0
+        self.avg_duration_ms = 0
+
+    def update(self, item_count, duration_ms):
+        """Update the stats after doing an update"""
+        self.total_item_count += item_count
+        self.total_duration_ms += duration_ms
+
+        # Exponential moving averages for the number of items updated and
+        # the duration.
+        self.avg_item_count += 0.1 * (item_count - self.avg_item_count)
+        self.avg_duration_ms += 0.1 * (duration_ms - self.avg_duration_ms)
+
+    def average_items_per_ms(self):
+        """An estimate of how long it takes to do a single update.
+        Returns:
+            A duration in ms as a float
+        """
+        if self.total_item_count == 0:
+            return None
+        else:
+            # Use the exponential moving average so that we can adapt to
+            # changes in how long the update process takes.
+            return float(self.avg_item_count) / float(self.avg_duration_ms)
+
+    def total_items_per_ms(self):
+        """An estimate of how long it takes to do a single update.
+        Returns:
+            A duration in ms as a float
+        """
+        if self.total_item_count == 0:
+            return None
+        else:
+            return float(self.total_item_count) / float(self.total_duration_ms)
+
+
+class BackgroundUpdateStore(SQLBaseStore):
+    """ Background updates are updates to the database that run in the
+    background. Each update processes a batch of data at once. We attempt to
+    limit the impact of each update by monitoring how long each batch takes to
+    process and autotuning the batch size.
+    """
+
+    MINIMUM_BACKGROUND_BATCH_SIZE = 100
+    DEFAULT_BACKGROUND_BATCH_SIZE = 100
+    BACKGROUND_UPDATE_INTERVAL_MS = 1000
+    BACKGROUND_UPDATE_DURATION_MS = 100
+
+    def __init__(self, hs):
+        super(BackgroundUpdateStore, self).__init__(hs)
+        self._background_update_performance = {}
+        self._background_update_queue = []
+        self._background_update_handlers = {}
+        self._background_update_timer = None
+
+    @defer.inlineCallbacks
+    def start_doing_background_updates(self):
+        while True:
+            if self._background_update_timer is not None:
+                return
+
+            sleep = defer.Deferred()
+            self._background_update_timer = self._clock.call_later(
+                self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback, None
+            )
+            try:
+                yield sleep
+            finally:
+                self._background_update_timer = None
+
+            try:
+                result = yield self.do_background_update(
+                    self.BACKGROUND_UPDATE_DURATION_MS
+                )
+            except:
+                logger.exception("Error doing update")
+
+            if result is None:
+                logger.info(
+                    "No more background updates to do."
+                    " Unscheduling background update task."
+                )
+                return
+
+    @defer.inlineCallbacks
+    def do_background_update(self, desired_duration_ms):
+        """Does some amount of work on a background update
+        Args:
+            desired_duration_ms(float): How long we want to spend
+                updating.
+        Returns:
+            A deferred that completes once some amount of work is done.
+            The deferred will have a value of None if there is currently
+            no more work to do.
+        """
+        if not self._background_update_queue:
+            updates = yield self._simple_select_list(
+                "background_updates",
+                keyvalues=None,
+                retcols=("update_name",),
+            )
+            for update in updates:
+                self._background_update_queue.append(update['update_name'])
+
+        if not self._background_update_queue:
+            defer.returnValue(None)
+
+        update_name = self._background_update_queue.pop(0)
+        self._background_update_queue.append(update_name)
+
+        update_handler = self._background_update_handlers[update_name]
+
+        performance = self._background_update_performance.get(update_name)
+
+        if performance is None:
+            performance = BackgroundUpdatePerformance(update_name)
+            self._background_update_performance[update_name] = performance
+
+        items_per_ms = performance.average_items_per_ms()
+
+        if items_per_ms is not None:
+            batch_size = int(desired_duration_ms * items_per_ms)
+            # Clamp the batch size so that we always make progress
+            batch_size = max(batch_size, self.MINIMUM_BACKGROUND_BATCH_SIZE)
+        else:
+            batch_size = self.DEFAULT_BACKGROUND_BATCH_SIZE
+
+        progress_json = yield self._simple_select_one_onecol(
+            "background_updates",
+            keyvalues={"update_name": update_name},
+            retcol="progress_json"
+        )
+
+        progress = json.loads(progress_json)
+
+        time_start = self._clock.time_msec()
+        items_updated = yield update_handler(progress, batch_size)
+        time_stop = self._clock.time_msec()
+
+        duration_ms = time_stop - time_start
+
+        logger.info(
+            "Updating %r. Updated %r items in %rms."
+            " (total_rate=%r/ms, current_rate=%r/ms, total_updated=%r)",
+            update_name, items_updated, duration_ms,
+            performance.total_items_per_ms(),
+            performance.average_items_per_ms(),
+            performance.total_item_count,
+        )
+
+        performance.update(items_updated, duration_ms)
+
+        defer.returnValue(len(self._background_update_performance))
+
+    def register_background_update_handler(self, update_name, update_handler):
+        """Register a handler for doing a background update.
+
+        The handler should take two arguments:
+
+        * A dict of the current progress
+        * An integer count of the number of items to update in this batch.
+
+        The handler should return a deferred integer count of items updated.
+        The hander is responsible for updating the progress of the update.
+
+        Args:
+            update_name(str): The name of the update that this code handles.
+            update_handler(function): The function that does the update.
+        """
+        self._background_update_handlers[update_name] = update_handler
+
+    def start_background_update(self, update_name, progress):
+        """Starts a background update running.
+
+        Args:
+            update_name: The update to set running.
+            progress: The initial state of the progress of the update.
+
+        Returns:
+            A deferred that completes once the task has been added to the
+            queue.
+        """
+        # Clear the background update queue so that we will pick up the new
+        # task on the next iteration of do_background_update.
+        self._background_update_queue = []
+        progress_json = json.dumps(progress)
+
+        return self._simple_insert(
+            "background_updates",
+            {"update_name": update_name, "progress_json": progress_json}
+        )
+
+    def _end_background_update(self, update_name):
+        """Removes a completed background update task from the queue.
+
+        Args:
+            update_name(str): The name of the completed task to remove
+        Returns:
+            A deferred that completes once the task is removed.
+        """
+        self._background_update_queue = [
+            name for name in self._background_update_queue if name != update_name
+        ]
+        return self._simple_delete_one(
+            "background_updates", keyvalues={"update_name": update_name}
+        )
+
+    def _background_update_progress_txn(self, txn, update_name, progress):
+        """Update the progress of a background update
+
+        Args:
+            txn(cursor): The transaction.
+            update_name(str): The name of the background update task
+            progress(dict): The progress of the update.
+        """
+
+        progress_json = json.dumps(progress)
+
+        self._simple_update_one_txn(
+            txn,
+            "background_updates",
+            keyvalues={"update_name": update_name},
+            updatevalues={"progress_json": progress_json},
+        )
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 4a855ffd56..98d66e0a86 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.storage import prepare_database
+from synapse.storage.prepare_database import prepare_database
 
 from ._base import IncorrectDatabaseSetup
 
diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py
index d18e2808d1..a5a54ec011 100644
--- a/synapse/storage/engines/sqlite3.py
+++ b/synapse/storage/engines/sqlite3.py
@@ -13,7 +13,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.storage import prepare_database, prepare_sqlite3_database
+from synapse.storage.prepare_database import (
+    prepare_database, prepare_sqlite3_database
+)
+
+import struct
 
 
 class Sqlite3Engine(object):
@@ -30,6 +34,7 @@ class Sqlite3Engine(object):
 
     def on_new_connection(self, db_conn):
         self.prepare_database(db_conn)
+        db_conn.create_function("rank", 1, _rank)
 
     def prepare_database(self, db_conn):
         prepare_sqlite3_database(db_conn)
@@ -43,3 +48,27 @@ class Sqlite3Engine(object):
 
     def lock_table(self, txn, table):
         return
+
+
+# Following functions taken from: https://github.com/coleifer/peewee
+
+def _parse_match_info(buf):
+    bufsize = len(buf)
+    return [struct.unpack('@I', buf[i:i+4])[0] for i in range(0, bufsize, 4)]
+
+
+def _rank(raw_match_info):
+    """Handle match_info called w/default args 'pcx' - based on the example rank
+    function http://sqlite.org/fts3.html#appendix_a
+    """
+    match_info = _parse_match_info(raw_match_info)
+    score = 0.0
+    p, c = match_info[:2]
+    for phrase_num in range(p):
+        phrase_info_idx = 2 + (phrase_num * c * 3)
+        for col_num in range(c):
+            col_idx = phrase_info_idx + (col_num * 3)
+            x1, x2 = match_info[col_idx:col_idx + 2]
+            if x1 > 0:
+                score += float(x1) / x2
+    return score
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 7cb314dee8..6d4421dd8f 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -17,7 +17,7 @@ from twisted.internet import defer
 
 from ._base import SQLBaseStore
 from synapse.util.caches.descriptors import cached
-from syutil.base64util import encode_base64
+from unpaddedbase64 import encode_base64
 
 import logging
 from Queue import PriorityQueue, Empty
@@ -154,98 +154,6 @@ class EventFederationStore(SQLBaseStore):
 
         return results
 
-    def _get_latest_state_in_room(self, txn, room_id, type, state_key):
-        event_ids = self._simple_select_onecol_txn(
-            txn,
-            table="state_forward_extremities",
-            keyvalues={
-                "room_id": room_id,
-                "type": type,
-                "state_key": state_key,
-            },
-            retcol="event_id",
-        )
-
-        results = []
-        for event_id in event_ids:
-            hashes = self._get_event_reference_hashes_txn(txn, event_id)
-            prev_hashes = {
-                k: encode_base64(v) for k, v in hashes.items()
-                if k == "sha256"
-            }
-            results.append((event_id, prev_hashes))
-
-        return results
-
-    def _get_prev_events(self, txn, event_id):
-        results = self._get_prev_events_and_state(
-            txn,
-            event_id,
-            is_state=0,
-        )
-
-        return [(e_id, h, ) for e_id, h, _ in results]
-
-    def _get_prev_state(self, txn, event_id):
-        results = self._get_prev_events_and_state(
-            txn,
-            event_id,
-            is_state=True,
-        )
-
-        return [(e_id, h, ) for e_id, h, _ in results]
-
-    def _get_prev_events_and_state(self, txn, event_id, is_state=None):
-        keyvalues = {
-            "event_id": event_id,
-        }
-
-        if is_state is not None:
-            keyvalues["is_state"] = bool(is_state)
-
-        res = self._simple_select_list_txn(
-            txn,
-            table="event_edges",
-            keyvalues=keyvalues,
-            retcols=["prev_event_id", "is_state"],
-        )
-
-        hashes = self._get_prev_event_hashes_txn(txn, event_id)
-
-        results = []
-        for d in res:
-            edge_hash = self._get_event_reference_hashes_txn(txn, d["prev_event_id"])
-            edge_hash.update(hashes.get(d["prev_event_id"], {}))
-            prev_hashes = {
-                k: encode_base64(v)
-                for k, v in edge_hash.items()
-                if k == "sha256"
-            }
-            results.append((d["prev_event_id"], prev_hashes, d["is_state"]))
-
-        return results
-
-    def _get_auth_events(self, txn, event_id):
-        auth_ids = self._simple_select_onecol_txn(
-            txn,
-            table="event_auth",
-            keyvalues={
-                "event_id": event_id,
-            },
-            retcol="auth_id",
-        )
-
-        results = []
-        for auth_id in auth_ids:
-            hashes = self._get_event_reference_hashes_txn(txn, auth_id)
-            prev_hashes = {
-                k: encode_base64(v) for k, v in hashes.items()
-                if k == "sha256"
-            }
-            results.append((auth_id, prev_hashes))
-
-        return results
-
     def get_min_depth(self, room_id):
         """ For hte given room, get the minimum depth we have seen for it.
         """
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 8774b3b388..5d35ca90b9 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -12,7 +12,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 from _base import SQLBaseStore, _RollbackButIsFineException
 
 from twisted.internet import defer, reactor
@@ -24,15 +23,23 @@ from synapse.util.logcontext import preserve_context_over_deferred
 from synapse.util.logutils import log_function
 from synapse.api.constants import EventTypes
 
-from syutil.jsonutil import encode_json
+from canonicaljson import encode_canonical_json
 from contextlib import contextmanager
 
 import logging
+import math
 import ujson as json
 
 logger = logging.getLogger(__name__)
 
 
+def encode_json(json_object):
+    if USE_FROZEN_DICTS:
+        # ujson doesn't like frozen_dicts
+        return encode_canonical_json(json_object)
+    else:
+        return json.dumps(json_object, ensure_ascii=False)
+
 # These values are used in the `enqueus_event` and `_do_fetch` methods to
 # control how we batch/bulk fetch events from the database.
 # The values are plucked out of thing air to make initial sync run faster
@@ -253,8 +260,7 @@ class EventsStore(SQLBaseStore):
                 )
 
                 metadata_json = encode_json(
-                    event.internal_metadata.get_dict(),
-                    using_frozen_dicts=USE_FROZEN_DICTS
+                    event.internal_metadata.get_dict()
                 ).decode("UTF-8")
 
                 sql = (
@@ -301,8 +307,14 @@ class EventsStore(SQLBaseStore):
                 self._store_room_name_txn(txn, event)
             elif event.type == EventTypes.Topic:
                 self._store_room_topic_txn(txn, event)
+            elif event.type == EventTypes.Message:
+                self._store_room_message_txn(txn, event)
             elif event.type == EventTypes.Redaction:
                 self._store_redaction(txn, event)
+            elif event.type == EventTypes.RoomHistoryVisibility:
+                self._store_history_visibility_txn(txn, event)
+            elif event.type == EventTypes.GuestAccess:
+                self._store_guest_access_txn(txn, event)
 
         self._store_room_members_txn(
             txn,
@@ -331,12 +343,9 @@ class EventsStore(SQLBaseStore):
                     "event_id": event.event_id,
                     "room_id": event.room_id,
                     "internal_metadata": encode_json(
-                        event.internal_metadata.get_dict(),
-                        using_frozen_dicts=USE_FROZEN_DICTS
-                    ).decode("UTF-8"),
-                    "json": encode_json(
-                        event_dict(event), using_frozen_dicts=USE_FROZEN_DICTS
+                        event.internal_metadata.get_dict()
                     ).decode("UTF-8"),
+                    "json": encode_json(event_dict(event)).decode("UTF-8"),
                 }
                 for event, _ in events_and_contexts
             ],
@@ -355,9 +364,7 @@ class EventsStore(SQLBaseStore):
                     "type": event.type,
                     "processed": True,
                     "outlier": event.internal_metadata.is_outlier(),
-                    "content": encode_json(
-                        event.content, using_frozen_dicts=USE_FROZEN_DICTS
-                    ).decode("UTF-8"),
+                    "content": encode_json(event.content).decode("UTF-8"),
                 }
                 for event, _ in events_and_contexts
             ],
@@ -824,7 +831,8 @@ class EventsStore(SQLBaseStore):
                 allow_none=True,
             )
             if prev:
-                ev.unsigned["prev_content"] = prev.get_dict()["content"]
+                ev.unsigned["prev_content"] = prev.content
+                ev.unsigned["prev_sender"] = prev.sender
 
         self._get_event_cache.prefill(
             (ev.event_id, check_redacted, get_prev_content), ev
@@ -881,7 +889,8 @@ class EventsStore(SQLBaseStore):
                 get_prev_content=False,
             )
             if prev:
-                ev.unsigned["prev_content"] = prev.get_dict()["content"]
+                ev.unsigned["prev_content"] = prev.content
+                ev.unsigned["prev_sender"] = prev.sender
 
         self._get_event_cache.prefill(
             (ev.event_id, check_redacted, get_prev_content), ev
@@ -889,18 +898,69 @@ class EventsStore(SQLBaseStore):
 
         return ev
 
-    def _parse_events(self, rows):
-        return self.runInteraction(
-            "_parse_events", self._parse_events_txn, rows
-        )
-
     def _parse_events_txn(self, txn, rows):
         event_ids = [r["event_id"] for r in rows]
 
         return self._get_events_txn(txn, event_ids)
 
-    def _has_been_redacted_txn(self, txn, event):
-        sql = "SELECT event_id FROM redactions WHERE redacts = ?"
-        txn.execute(sql, (event.event_id,))
-        result = txn.fetchone()
-        return result[0] if result else None
+    @defer.inlineCallbacks
+    def count_daily_messages(self):
+        """
+        Returns an estimate of the number of messages sent in the last day.
+
+        If it has been significantly less or more than one day since the last
+        call to this function, it will return None.
+        """
+        def _count_messages(txn):
+            now = self.hs.get_clock().time()
+
+            txn.execute(
+                "SELECT reported_stream_token, reported_time FROM stats_reporting"
+            )
+            last_reported = self.cursor_to_dict(txn)
+
+            txn.execute(
+                "SELECT stream_ordering"
+                " FROM events"
+                " ORDER BY stream_ordering DESC"
+                " LIMIT 1"
+            )
+            now_reporting = self.cursor_to_dict(txn)
+            if not now_reporting:
+                return None
+            now_reporting = now_reporting[0]["stream_ordering"]
+
+            txn.execute("DELETE FROM stats_reporting")
+            txn.execute(
+                "INSERT INTO stats_reporting"
+                " (reported_stream_token, reported_time)"
+                " VALUES (?, ?)",
+                (now_reporting, now,)
+            )
+
+            if not last_reported:
+                return None
+
+            # Close enough to correct for our purposes.
+            yesterday = (now - 24 * 60 * 60)
+            if math.fabs(yesterday - last_reported[0]["reported_time"]) > 60 * 60:
+                return None
+
+            txn.execute(
+                "SELECT COUNT(*) as messages"
+                " FROM events NATURAL JOIN event_json"
+                " WHERE json like '%m.room.message%'"
+                " AND stream_ordering > ?"
+                " AND stream_ordering <= ?",
+                (
+                    last_reported[0]["reported_stream_token"],
+                    now_reporting,
+                )
+            )
+            rows = self.cursor_to_dict(txn)
+            if not rows:
+                return None
+            return rows[0]["messages"]
+
+        ret = yield self.runInteraction("count_messages", _count_messages)
+        defer.returnValue(ret)
diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py
index 8800116570..fcd43c7fdd 100644
--- a/synapse/storage/filtering.py
+++ b/synapse/storage/filtering.py
@@ -34,10 +34,10 @@ class FilteringStore(SQLBaseStore):
             desc="get_user_filter",
         )
 
-        defer.returnValue(json.loads(def_json))
+        defer.returnValue(json.loads(str(def_json).decode("utf-8")))
 
     def add_user_filter(self, user_localpart, user_filter):
-        def_json = json.dumps(user_filter)
+        def_json = json.dumps(user_filter).encode("utf-8")
 
         # Need an atomic transaction to SELECT the maximal ID so far then
         # INSERT a new one
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index ffd6daa880..344cacdc75 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -19,7 +19,7 @@ from synapse.util.caches.descriptors import cachedInlineCallbacks
 from twisted.internet import defer
 
 import OpenSSL
-from syutil.crypto.signing_key import decode_verify_key_bytes
+from signedjson.key import decode_verify_key_bytes
 import hashlib
 
 
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
new file mode 100644
index 0000000000..1a74d6e360
--- /dev/null
+++ b/synapse/storage/prepare_database.py
@@ -0,0 +1,395 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014, 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import fnmatch
+import imp
+import logging
+import os
+import re
+
+
+logger = logging.getLogger(__name__)
+
+
+# Remember to update this number every time a change is made to database
+# schema files, so the users will be informed on server restarts.
+SCHEMA_VERSION = 25
+
+dir_path = os.path.abspath(os.path.dirname(__file__))
+
+
+def read_schema(path):
+    """ Read the named database schema.
+
+    Args:
+        path: Path of the database schema.
+    Returns:
+        A string containing the database schema.
+    """
+    with open(path) as schema_file:
+        return schema_file.read()
+
+
+class PrepareDatabaseException(Exception):
+    pass
+
+
+class UpgradeDatabaseException(PrepareDatabaseException):
+    pass
+
+
+def prepare_database(db_conn, database_engine):
+    """Prepares a database for usage. Will either create all necessary tables
+    or upgrade from an older schema version.
+    """
+    try:
+        cur = db_conn.cursor()
+        version_info = _get_or_create_schema_state(cur, database_engine)
+
+        if version_info:
+            user_version, delta_files, upgraded = version_info
+            _upgrade_existing_database(
+                cur, user_version, delta_files, upgraded, database_engine
+            )
+        else:
+            _setup_new_database(cur, database_engine)
+
+        # cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,))
+
+        cur.close()
+        db_conn.commit()
+    except:
+        db_conn.rollback()
+        raise
+
+
+def _setup_new_database(cur, database_engine):
+    """Sets up the database by finding a base set of "full schemas" and then
+    applying any necessary deltas.
+
+    The "full_schemas" directory has subdirectories named after versions. This
+    function searches for the highest version less than or equal to
+    `SCHEMA_VERSION` and executes all .sql files in that directory.
+
+    The function will then apply all deltas for all versions after the base
+    version.
+
+    Example directory structure:
+
+        schema/
+            delta/
+                ...
+            full_schemas/
+                3/
+                    test.sql
+                    ...
+                11/
+                    foo.sql
+                    bar.sql
+                ...
+
+    In the example foo.sql and bar.sql would be run, and then any delta files
+    for versions strictly greater than 11.
+    """
+    current_dir = os.path.join(dir_path, "schema", "full_schemas")
+    directory_entries = os.listdir(current_dir)
+
+    valid_dirs = []
+    pattern = re.compile(r"^\d+(\.sql)?$")
+    for filename in directory_entries:
+        match = pattern.match(filename)
+        abs_path = os.path.join(current_dir, filename)
+        if match and os.path.isdir(abs_path):
+            ver = int(match.group(0))
+            if ver <= SCHEMA_VERSION:
+                valid_dirs.append((ver, abs_path))
+        else:
+            logger.warn("Unexpected entry in 'full_schemas': %s", filename)
+
+    if not valid_dirs:
+        raise PrepareDatabaseException(
+            "Could not find a suitable base set of full schemas"
+        )
+
+    max_current_ver, sql_dir = max(valid_dirs, key=lambda x: x[0])
+
+    logger.debug("Initialising schema v%d", max_current_ver)
+
+    directory_entries = os.listdir(sql_dir)
+
+    for filename in fnmatch.filter(directory_entries, "*.sql"):
+        sql_loc = os.path.join(sql_dir, filename)
+        logger.debug("Applying schema %s", sql_loc)
+        executescript(cur, sql_loc)
+
+    cur.execute(
+        database_engine.convert_param_style(
+            "INSERT INTO schema_version (version, upgraded)"
+            " VALUES (?,?)"
+        ),
+        (max_current_ver, False,)
+    )
+
+    _upgrade_existing_database(
+        cur,
+        current_version=max_current_ver,
+        applied_delta_files=[],
+        upgraded=False,
+        database_engine=database_engine,
+    )
+
+
+def _upgrade_existing_database(cur, current_version, applied_delta_files,
+                               upgraded, database_engine):
+    """Upgrades an existing database.
+
+    Delta files can either be SQL stored in *.sql files, or python modules
+    in *.py.
+
+    There can be multiple delta files per version. Synapse will keep track of
+    which delta files have been applied, and will apply any that haven't been
+    even if there has been no version bump. This is useful for development
+    where orthogonal schema changes may happen on separate branches.
+
+    Different delta files for the same version *must* be orthogonal and give
+    the same result when applied in any order. No guarantees are made on the
+    order of execution of these scripts.
+
+    This is a no-op of current_version == SCHEMA_VERSION.
+
+    Example directory structure:
+
+        schema/
+            delta/
+                11/
+                    foo.sql
+                    ...
+                12/
+                    foo.sql
+                    bar.py
+                ...
+            full_schemas/
+                ...
+
+    In the example, if current_version is 11, then foo.sql will be run if and
+    only if `upgraded` is True. Then `foo.sql` and `bar.py` would be run in
+    some arbitrary order.
+
+    Args:
+        cur (Cursor)
+        current_version (int): The current version of the schema.
+        applied_delta_files (list): A list of deltas that have already been
+            applied.
+        upgraded (bool): Whether the current version was generated by having
+            applied deltas or from full schema file. If `True` the function
+            will never apply delta files for the given `current_version`, since
+            the current_version wasn't generated by applying those delta files.
+    """
+
+    if current_version > SCHEMA_VERSION:
+        raise ValueError(
+            "Cannot use this database as it is too " +
+            "new for the server to understand"
+        )
+
+    start_ver = current_version
+    if not upgraded:
+        start_ver += 1
+
+    logger.debug("applied_delta_files: %s", applied_delta_files)
+
+    for v in range(start_ver, SCHEMA_VERSION + 1):
+        logger.debug("Upgrading schema to v%d", v)
+
+        delta_dir = os.path.join(dir_path, "schema", "delta", str(v))
+
+        try:
+            directory_entries = os.listdir(delta_dir)
+        except OSError:
+            logger.exception("Could not open delta dir for version %d", v)
+            raise UpgradeDatabaseException(
+                "Could not open delta dir for version %d" % (v,)
+            )
+
+        directory_entries.sort()
+        for file_name in directory_entries:
+            relative_path = os.path.join(str(v), file_name)
+            logger.debug("Found file: %s", relative_path)
+            if relative_path in applied_delta_files:
+                continue
+
+            absolute_path = os.path.join(
+                dir_path, "schema", "delta", relative_path,
+            )
+            root_name, ext = os.path.splitext(file_name)
+            if ext == ".py":
+                # This is a python upgrade module. We need to import into some
+                # package and then execute its `run_upgrade` function.
+                module_name = "synapse.storage.v%d_%s" % (
+                    v, root_name
+                )
+                with open(absolute_path) as python_file:
+                    module = imp.load_source(
+                        module_name, absolute_path, python_file
+                    )
+                logger.debug("Running script %s", relative_path)
+                module.run_upgrade(cur, database_engine)
+            elif ext == ".pyc":
+                # Sometimes .pyc files turn up anyway even though we've
+                # disabled their generation; e.g. from distribution package
+                # installers. Silently skip it
+                pass
+            elif ext == ".sql":
+                # A plain old .sql file, just read and execute it
+                logger.debug("Applying schema %s", relative_path)
+                executescript(cur, absolute_path)
+            else:
+                # Not a valid delta file.
+                logger.warn(
+                    "Found directory entry that did not end in .py or"
+                    " .sql: %s",
+                    relative_path,
+                )
+                continue
+
+            # Mark as done.
+            cur.execute(
+                database_engine.convert_param_style(
+                    "INSERT INTO applied_schema_deltas (version, file)"
+                    " VALUES (?,?)",
+                ),
+                (v, relative_path)
+            )
+
+            cur.execute("DELETE FROM schema_version")
+            cur.execute(
+                database_engine.convert_param_style(
+                    "INSERT INTO schema_version (version, upgraded)"
+                    " VALUES (?,?)",
+                ),
+                (v, True)
+            )
+
+
+def get_statements(f):
+    statement_buffer = ""
+    in_comment = False  # If we're in a /* ... */ style comment
+
+    for line in f:
+        line = line.strip()
+
+        if in_comment:
+            # Check if this line contains an end to the comment
+            comments = line.split("*/", 1)
+            if len(comments) == 1:
+                continue
+            line = comments[1]
+            in_comment = False
+
+        # Remove inline block comments
+        line = re.sub(r"/\*.*\*/", " ", line)
+
+        # Does this line start a comment?
+        comments = line.split("/*", 1)
+        if len(comments) > 1:
+            line = comments[0]
+            in_comment = True
+
+        # Deal with line comments
+        line = line.split("--", 1)[0]
+        line = line.split("//", 1)[0]
+
+        # Find *all* semicolons. We need to treat first and last entry
+        # specially.
+        statements = line.split(";")
+
+        # We must prepend statement_buffer to the first statement
+        first_statement = "%s %s" % (
+            statement_buffer.strip(),
+            statements[0].strip()
+        )
+        statements[0] = first_statement
+
+        # Every entry, except the last, is a full statement
+        for statement in statements[:-1]:
+            yield statement.strip()
+
+        # The last entry did *not* end in a semicolon, so we store it for the
+        # next semicolon we find
+        statement_buffer = statements[-1].strip()
+
+
+def executescript(txn, schema_path):
+    with open(schema_path, 'r') as f:
+        for statement in get_statements(f):
+            txn.execute(statement)
+
+
+def _get_or_create_schema_state(txn, database_engine):
+    # Bluntly try creating the schema_version tables.
+    schema_path = os.path.join(
+        dir_path, "schema", "schema_version.sql",
+    )
+    executescript(txn, schema_path)
+
+    txn.execute("SELECT version, upgraded FROM schema_version")
+    row = txn.fetchone()
+    current_version = int(row[0]) if row else None
+    upgraded = bool(row[1]) if row else None
+
+    if current_version:
+        txn.execute(
+            database_engine.convert_param_style(
+                "SELECT file FROM applied_schema_deltas WHERE version >= ?"
+            ),
+            (current_version,)
+        )
+        applied_deltas = [d for d, in txn.fetchall()]
+        return current_version, applied_deltas, upgraded
+
+    return None
+
+
+def prepare_sqlite3_database(db_conn):
+    """This function should be called before `prepare_database` on sqlite3
+    databases.
+
+    Since we changed the way we store the current schema version and handle
+    updates to schemas, we need a way to upgrade from the old method to the
+    new. This only affects sqlite databases since they were the only ones
+    supported at the time.
+    """
+    with db_conn:
+        schema_path = os.path.join(
+            dir_path, "schema", "schema_version.sql",
+        )
+        create_schema = read_schema(schema_path)
+        db_conn.executescript(create_schema)
+
+        c = db_conn.execute("SELECT * FROM schema_version")
+        rows = c.fetchall()
+        c.close()
+
+        if not rows:
+            c = db_conn.execute("PRAGMA user_version")
+            row = c.fetchone()
+            c.close()
+
+            if row and row[0]:
+                db_conn.execute(
+                    "REPLACE INTO schema_version (version, upgraded)"
+                    " VALUES (?,?)",
+                    (row[0], False)
+                )
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 08ea62681b..345c4e1104 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -13,12 +13,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import SQLBaseStore, Table
+from ._base import SQLBaseStore
 from twisted.internet import defer
 
 from synapse.api.errors import StoreError
 
-from syutil.jsonutil import encode_canonical_json
+from canonicaljson import encode_canonical_json
 
 import logging
 import simplejson as json
@@ -149,5 +149,5 @@ class PusherStore(SQLBaseStore):
         )
 
 
-class PushersTable(Table):
+class PushersTable(object):
     table_name = "pushers"
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 586628579d..2e5eddd259 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -51,6 +51,28 @@ class RegistrationStore(SQLBaseStore):
         )
 
     @defer.inlineCallbacks
+    def add_refresh_token_to_user(self, user_id, token):
+        """Adds a refresh token for the given user.
+
+        Args:
+            user_id (str): The user ID.
+            token (str): The new refresh token to add.
+        Raises:
+            StoreError if there was a problem adding this.
+        """
+        next_id = yield self._refresh_tokens_id_gen.get_next()
+
+        yield self._simple_insert(
+            "refresh_tokens",
+            {
+                "id": next_id,
+                "user_id": user_id,
+                "token": token
+            },
+            desc="add_refresh_token_to_user",
+        )
+
+    @defer.inlineCallbacks
     def register(self, user_id, token, password_hash):
         """Attempts to register an account.
 
@@ -80,13 +102,14 @@ class RegistrationStore(SQLBaseStore):
                 400, "User ID already taken.", errcode=Codes.USER_IN_USE
             )
 
-        # it's possible for this to get a conflict, but only for a single user
-        # since tokens are namespaced based on their user ID
-        txn.execute(
-            "INSERT INTO access_tokens(id, user_id, token)"
-            " VALUES (?,?,?)",
-            (next_id, user_id, token,)
-        )
+        if token:
+            # it's possible for this to get a conflict, but only for a single user
+            # since tokens are namespaced based on their user ID
+            txn.execute(
+                "INSERT INTO access_tokens(id, user_id, token)"
+                " VALUES (?,?,?)",
+                (next_id, user_id, token,)
+            )
 
     def get_user_by_id(self, user_id):
         return self._simple_select_one(
@@ -146,26 +169,65 @@ class RegistrationStore(SQLBaseStore):
             user_id
         )
         for r in rows:
-            self.get_user_by_token.invalidate((r,))
+            self.get_user_by_access_token.invalidate((r,))
 
     @cached()
-    def get_user_by_token(self, token):
+    def get_user_by_access_token(self, token):
         """Get a user from the given access token.
 
         Args:
             token (str): The access token of a user.
         Returns:
-            dict: Including the name (user_id), device_id and whether they are
-                an admin.
+            dict: Including the name (user_id) and the ID of their access token.
         Raises:
             StoreError if no user was found.
         """
         return self.runInteraction(
-            "get_user_by_token",
+            "get_user_by_access_token",
             self._query_for_auth,
             token
         )
 
+    def exchange_refresh_token(self, refresh_token, token_generator):
+        """Exchange a refresh token for a new access token and refresh token.
+
+        Doing so invalidates the old refresh token - refresh tokens are single
+        use.
+
+        Args:
+            token (str): The refresh token of a user.
+            token_generator (fn: str -> str): Function which, when given a
+                user ID, returns a unique refresh token for that user. This
+                function must never return the same value twice.
+        Returns:
+            tuple of (user_id, refresh_token)
+        Raises:
+            StoreError if no user was found with that refresh token.
+        """
+        return self.runInteraction(
+            "exchange_refresh_token",
+            self._exchange_refresh_token,
+            refresh_token,
+            token_generator
+        )
+
+    def _exchange_refresh_token(self, txn, old_token, token_generator):
+        sql = "SELECT user_id FROM refresh_tokens WHERE token = ?"
+        txn.execute(sql, (old_token,))
+        rows = self.cursor_to_dict(txn)
+        if not rows:
+            raise StoreError(403, "Did not recognize refresh token")
+        user_id = rows[0]["user_id"]
+
+        # TODO(danielwh): Maybe perform a validation on the macaroon that
+        # macaroon.user_id == user_id.
+
+        new_token = token_generator(user_id)
+        sql = "UPDATE refresh_tokens SET token = ? WHERE token = ?"
+        txn.execute(sql, (new_token, old_token,))
+
+        return user_id, new_token
+
     @defer.inlineCallbacks
     def is_server_admin(self, user):
         res = yield self._simple_select_one_onecol(
@@ -180,8 +242,7 @@ class RegistrationStore(SQLBaseStore):
 
     def _query_for_auth(self, txn, token):
         sql = (
-            "SELECT users.name, users.admin,"
-            " access_tokens.device_id, access_tokens.id as token_id"
+            "SELECT users.name, access_tokens.id as token_id"
             " FROM users"
             " INNER JOIN access_tokens on users.name = access_tokens.user_id"
             " WHERE token = ?"
@@ -229,3 +290,16 @@ class RegistrationStore(SQLBaseStore):
         if ret:
             defer.returnValue(ret['user_id'])
         defer.returnValue(None)
+
+    @defer.inlineCallbacks
+    def count_all_users(self):
+        """Counts all users registered on the homeserver."""
+        def _count_users(txn):
+            txn.execute("SELECT COUNT(*) AS users FROM users")
+            rows = self.cursor_to_dict(txn)
+            if rows:
+                return rows[0]["users"]
+            return 0
+
+        ret = yield self.runInteraction("count_users", _count_users)
+        defer.returnValue(ret)
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 5e07b7e0e5..4f08df478c 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -19,6 +19,7 @@ from synapse.api.errors import StoreError
 
 from ._base import SQLBaseStore
 from synapse.util.caches.descriptors import cachedInlineCallbacks
+from .engines import PostgresEngine, Sqlite3Engine
 
 import collections
 import logging
@@ -98,34 +99,39 @@ class RoomStore(SQLBaseStore):
         """
 
         def f(txn):
-            topic_subquery = (
-                "SELECT topics.event_id as event_id, "
-                "topics.room_id as room_id, topic "
-                "FROM topics "
-                "INNER JOIN current_state_events as c "
-                "ON c.event_id = topics.event_id "
-            )
-
-            name_subquery = (
-                "SELECT room_names.event_id as event_id, "
-                "room_names.room_id as room_id, name "
-                "FROM room_names "
-                "INNER JOIN current_state_events as c "
-                "ON c.event_id = room_names.event_id "
-            )
+            def subquery(table_name, column_name=None):
+                column_name = column_name or table_name
+                return (
+                    "SELECT %(table_name)s.event_id as event_id, "
+                    "%(table_name)s.room_id as room_id, %(column_name)s "
+                    "FROM %(table_name)s "
+                    "INNER JOIN current_state_events as c "
+                    "ON c.event_id = %(table_name)s.event_id " % {
+                        "column_name": column_name,
+                        "table_name": table_name,
+                    }
+                )
 
-            # We use non printing ascii character US (\x1F) as a separator
             sql = (
-                "SELECT r.room_id, max(n.name), max(t.topic)"
+                "SELECT"
+                "    r.room_id,"
+                "    max(n.name),"
+                "    max(t.topic),"
+                "    max(v.history_visibility),"
+                "    max(g.guest_access)"
                 " FROM rooms AS r"
                 " LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id"
                 " LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id"
+                " LEFT JOIN (%(history_visibility)s) AS v ON v.room_id = r.room_id"
+                " LEFT JOIN (%(guest_access)s) AS g ON g.room_id = r.room_id"
                 " WHERE r.is_public = ?"
-                " GROUP BY r.room_id"
-            ) % {
-                "topic": topic_subquery,
-                "name": name_subquery,
-            }
+                " GROUP BY r.room_id" % {
+                    "topic": subquery("topics", "topic"),
+                    "name": subquery("room_names", "name"),
+                    "history_visibility": subquery("history_visibility"),
+                    "guest_access": subquery("guest_access"),
+                }
+            )
 
             txn.execute(sql, (is_public,))
 
@@ -155,10 +161,12 @@ class RoomStore(SQLBaseStore):
                 "room_id": r[0],
                 "name": r[1],
                 "topic": r[2],
-                "aliases": r[3],
+                "world_readable": r[3] == "world_readable",
+                "guest_can_join": r[4] == "can_join",
+                "aliases": r[5],
             }
             for r in rows
-            if r[3]  # We only return rooms that have at least one alias.
+            if r[5]  # We only return rooms that have at least one alias.
         ]
 
         defer.returnValue(ret)
@@ -175,6 +183,10 @@ class RoomStore(SQLBaseStore):
                 },
             )
 
+            self._store_event_search_txn(
+                txn, event, "content.topic", event.content["topic"]
+            )
+
     def _store_room_name_txn(self, txn, event):
         if hasattr(event, "content") and "name" in event.content:
             self._simple_insert_txn(
@@ -187,6 +199,52 @@ class RoomStore(SQLBaseStore):
                 }
             )
 
+            self._store_event_search_txn(
+                txn, event, "content.name", event.content["name"]
+            )
+
+    def _store_room_message_txn(self, txn, event):
+        if hasattr(event, "content") and "body" in event.content:
+            self._store_event_search_txn(
+                txn, event, "content.body", event.content["body"]
+            )
+
+    def _store_history_visibility_txn(self, txn, event):
+        self._store_content_index_txn(txn, event, "history_visibility")
+
+    def _store_guest_access_txn(self, txn, event):
+        self._store_content_index_txn(txn, event, "guest_access")
+
+    def _store_content_index_txn(self, txn, event, key):
+        if hasattr(event, "content") and key in event.content:
+            sql = (
+                "INSERT INTO %(key)s"
+                " (event_id, room_id, %(key)s)"
+                " VALUES (?, ?, ?)" % {"key": key}
+            )
+            txn.execute(sql, (
+                event.event_id,
+                event.room_id,
+                event.content[key]
+            ))
+
+    def _store_event_search_txn(self, txn, event, key, value):
+        if isinstance(self.database_engine, PostgresEngine):
+            sql = (
+                "INSERT INTO event_search (event_id, room_id, key, vector)"
+                " VALUES (?,?,?,to_tsvector('english', ?))"
+            )
+        elif isinstance(self.database_engine, Sqlite3Engine):
+            sql = (
+                "INSERT INTO event_search (event_id, room_id, key, value)"
+                " VALUES (?,?,?,?)"
+            )
+        else:
+            # This should be unreachable.
+            raise Exception("Unrecognized database engine")
+
+        txn.execute(sql, (event.event_id, event.room_id, key, value,))
+
     @cachedInlineCallbacks()
     def get_room_name_and_aliases(self, room_id):
         def f(txn):
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 8eee2dfbcc..ae1ad56d9a 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -30,7 +30,7 @@ logger = logging.getLogger(__name__)
 
 RoomsForUser = namedtuple(
     "RoomsForUser",
-    ("room_id", "sender", "membership")
+    ("room_id", "sender", "membership", "event_id", "stream_ordering")
 )
 
 
@@ -110,6 +110,33 @@ class RoomMemberStore(SQLBaseStore):
             membership=membership,
         ).addCallback(self._get_events)
 
+    def get_invites_for_user(self, user_id):
+        """ Get all the invite events for a user
+        Args:
+            user_id (str): The user ID.
+        Returns:
+            A deferred list of event objects.
+        """
+
+        return self.get_rooms_for_user_where_membership_is(
+            user_id, [Membership.INVITE]
+        ).addCallback(lambda invites: self._get_events([
+            invites.event_id for invite in invites
+        ]))
+
+    def get_leave_and_ban_events_for_user(self, user_id):
+        """ Get all the leave events for a user
+        Args:
+            user_id (str): The user ID.
+        Returns:
+            A deferred list of event objects.
+        """
+        return self.get_rooms_for_user_where_membership_is(
+            user_id, (Membership.LEAVE, Membership.BAN)
+        ).addCallback(lambda leaves: self._get_events([
+            leave.event_id for leave in leaves
+        ]))
+
     def get_rooms_for_user_where_membership_is(self, user_id, membership_list):
         """ Get all the rooms for this user where the membership for this user
         matches one in the membership list.
@@ -141,11 +168,13 @@ class RoomMemberStore(SQLBaseStore):
         args.extend(membership_list)
 
         sql = (
-            "SELECT m.room_id, m.sender, m.membership"
-            " FROM room_memberships as m"
-            " INNER JOIN current_state_events as c"
-            " ON m.event_id = c.event_id "
-            " AND m.room_id = c.room_id "
+            "SELECT m.room_id, m.sender, m.membership, m.event_id, e.stream_ordering"
+            " FROM current_state_events as c"
+            " INNER JOIN room_memberships as m"
+            " ON m.event_id = c.event_id"
+            " INNER JOIN events as e"
+            " ON e.event_id = c.event_id"
+            " AND m.room_id = c.room_id"
             " AND m.user_id = c.state_key"
             " WHERE %s"
         ) % (where_clause,)
@@ -176,12 +205,6 @@ class RoomMemberStore(SQLBaseStore):
 
         return joined_domains
 
-    def _get_members_query(self, where_clause, where_values):
-        return self.runInteraction(
-            "get_members_query", self._get_members_events_txn,
-            where_clause, where_values
-        ).addCallbacks(self._get_events)
-
     def _get_members_events_txn(self, txn, room_id, membership=None, user_id=None):
         rows = self._get_members_rows_txn(
             txn,
diff --git a/synapse/storage/schema/delta/23/drop_state_index.sql b/synapse/storage/schema/delta/23/drop_state_index.sql
new file mode 100644
index 0000000000..07d0ea5cb2
--- /dev/null
+++ b/synapse/storage/schema/delta/23/drop_state_index.sql
@@ -0,0 +1,16 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+DROP INDEX IF EXISTS state_groups_state_tuple;
diff --git a/synapse/storage/schema/delta/23/refresh_tokens.sql b/synapse/storage/schema/delta/23/refresh_tokens.sql
new file mode 100644
index 0000000000..437b1ac1be
--- /dev/null
+++ b/synapse/storage/schema/delta/23/refresh_tokens.sql
@@ -0,0 +1,21 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS refresh_tokens(
+    id INTEGER PRIMARY KEY,
+    token TEXT NOT NULL,
+    user_id TEXT NOT NULL,
+    UNIQUE (token)
+);
diff --git a/synapse/storage/schema/delta/24/stats_reporting.sql b/synapse/storage/schema/delta/24/stats_reporting.sql
new file mode 100644
index 0000000000..e9165d2917
--- /dev/null
+++ b/synapse/storage/schema/delta/24/stats_reporting.sql
@@ -0,0 +1,22 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Should only ever contain one row
+CREATE TABLE IF NOT EXISTS stats_reporting(
+  -- The stream ordering token which was most recently reported as stats
+  reported_stream_token INTEGER,
+  -- The time (seconds since epoch) stats were most recently reported
+  reported_time BIGINT
+);
diff --git a/synapse/storage/schema/delta/25/00background_updates.sql b/synapse/storage/schema/delta/25/00background_updates.sql
new file mode 100644
index 0000000000..41a9b59b1b
--- /dev/null
+++ b/synapse/storage/schema/delta/25/00background_updates.sql
@@ -0,0 +1,21 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+CREATE TABLE IF NOT EXISTS background_updates(
+    update_name TEXT NOT NULL, -- The name of the background update.
+    progress_json TEXT NOT NULL, -- The current progress of the update as JSON.
+    CONSTRAINT background_updates_uniqueness UNIQUE (update_name)
+);
diff --git a/synapse/storage/schema/delta/25/fts.py b/synapse/storage/schema/delta/25/fts.py
new file mode 100644
index 0000000000..5239d69073
--- /dev/null
+++ b/synapse/storage/schema/delta/25/fts.py
@@ -0,0 +1,78 @@
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from synapse.storage.prepare_database import get_statements
+from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+
+import ujson
+
+logger = logging.getLogger(__name__)
+
+
+POSTGRES_TABLE = """
+CREATE TABLE IF NOT EXISTS event_search (
+    event_id TEXT,
+    room_id TEXT,
+    sender TEXT,
+    key TEXT,
+    vector tsvector
+);
+
+CREATE INDEX event_search_fts_idx ON event_search USING gin(vector);
+CREATE INDEX event_search_ev_idx ON event_search(event_id);
+CREATE INDEX event_search_ev_ridx ON event_search(room_id);
+"""
+
+
+SQLITE_TABLE = (
+    "CREATE VIRTUAL TABLE IF NOT EXISTS event_search"
+    " USING fts4 ( event_id, room_id, sender, key, value )"
+)
+
+
+def run_upgrade(cur, database_engine, *args, **kwargs):
+    if isinstance(database_engine, PostgresEngine):
+        for statement in get_statements(POSTGRES_TABLE.splitlines()):
+            cur.execute(statement)
+    elif isinstance(database_engine, Sqlite3Engine):
+        cur.execute(SQLITE_TABLE)
+    else:
+        raise Exception("Unrecognized database engine")
+
+    cur.execute("SELECT MIN(stream_ordering) FROM events")
+    rows = cur.fetchall()
+    min_stream_id = rows[0][0]
+
+    cur.execute("SELECT MAX(stream_ordering) FROM events")
+    rows = cur.fetchall()
+    max_stream_id = rows[0][0]
+
+    if min_stream_id is not None and max_stream_id is not None:
+        progress = {
+            "target_min_stream_id_inclusive": min_stream_id,
+            "max_stream_id_exclusive": max_stream_id + 1,
+            "rows_inserted": 0,
+        }
+        progress_json = ujson.dumps(progress)
+
+        sql = (
+            "INSERT into background_updates (update_name, progress_json)"
+            " VALUES (?, ?)"
+        )
+
+        sql = database_engine.convert_param_style(sql)
+
+        cur.execute(sql, ("event_search", progress_json))
diff --git a/synapse/storage/schema/delta/25/guest_access.sql b/synapse/storage/schema/delta/25/guest_access.sql
new file mode 100644
index 0000000000..bdb90e7118
--- /dev/null
+++ b/synapse/storage/schema/delta/25/guest_access.sql
@@ -0,0 +1,25 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * This is a manual index of guest_access content of state events,
+ * so that we can join on them in SELECT statements.
+ */
+CREATE TABLE IF NOT EXISTS guest_access(
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    guest_access TEXT NOT NULL,
+    UNIQUE (event_id)
+);
diff --git a/synapse/storage/schema/delta/25/history_visibility.sql b/synapse/storage/schema/delta/25/history_visibility.sql
new file mode 100644
index 0000000000..532cb05151
--- /dev/null
+++ b/synapse/storage/schema/delta/25/history_visibility.sql
@@ -0,0 +1,25 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * This is a manual index of history_visibility content of state events,
+ * so that we can join on them in SELECT statements.
+ */
+CREATE TABLE IF NOT EXISTS history_visibility(
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    history_visibility TEXT NOT NULL,
+    UNIQUE (event_id)
+);
diff --git a/synapse/storage/schema/delta/25/tags.sql b/synapse/storage/schema/delta/25/tags.sql
new file mode 100644
index 0000000000..527424c998
--- /dev/null
+++ b/synapse/storage/schema/delta/25/tags.sql
@@ -0,0 +1,38 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+CREATE TABLE IF NOT EXISTS room_tags(
+    user_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    tag     TEXT NOT NULL,  -- The name of the tag.
+    content TEXT NOT NULL,  -- The JSON content of the tag.
+    CONSTRAINT room_tag_uniqueness UNIQUE (user_id, room_id, tag)
+);
+
+CREATE TABLE IF NOT EXISTS room_tags_revisions (
+    user_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    stream_id BIGINT NOT NULL, -- The current version of the room tags.
+    CONSTRAINT room_tag_revisions_uniqueness UNIQUE (user_id, room_id)
+);
+
+CREATE TABLE IF NOT EXISTS private_user_data_max_stream_id(
+    Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE,  -- Makes sure this table only has one row.
+    stream_id  BIGINT NOT NULL,
+    CHECK (Lock='X')
+);
+
+INSERT INTO private_user_data_max_stream_id (stream_id) VALUES (0);
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
new file mode 100644
index 0000000000..380270b009
--- /dev/null
+++ b/synapse/storage/search.py
@@ -0,0 +1,307 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from .background_updates import BackgroundUpdateStore
+from synapse.api.errors import SynapseError
+from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+
+class SearchStore(BackgroundUpdateStore):
+
+    EVENT_SEARCH_UPDATE_NAME = "event_search"
+
+    def __init__(self, hs):
+        super(SearchStore, self).__init__(hs)
+        self.register_background_update_handler(
+            self.EVENT_SEARCH_UPDATE_NAME, self._background_reindex_search
+        )
+
+    @defer.inlineCallbacks
+    def _background_reindex_search(self, progress, batch_size):
+        target_min_stream_id = progress["target_min_stream_id_inclusive"]
+        max_stream_id = progress["max_stream_id_exclusive"]
+        rows_inserted = progress.get("rows_inserted", 0)
+
+        INSERT_CLUMP_SIZE = 1000
+        TYPES = ["m.room.name", "m.room.message", "m.room.topic"]
+
+        def reindex_search_txn(txn):
+            sql = (
+                "SELECT stream_ordering, event_id FROM events"
+                " WHERE ? <= stream_ordering AND stream_ordering < ?"
+                " AND (%s)"
+                " ORDER BY stream_ordering DESC"
+                " LIMIT ?"
+            ) % (" OR ".join("type = '%s'" % (t,) for t in TYPES),)
+
+            txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
+
+            rows = txn.fetchall()
+            if not rows:
+                return 0
+
+            min_stream_id = rows[-1][0]
+            event_ids = [row[1] for row in rows]
+
+            events = self._get_events_txn(txn, event_ids)
+
+            event_search_rows = []
+            for event in events:
+                try:
+                    event_id = event.event_id
+                    room_id = event.room_id
+                    content = event.content
+                    if event.type == "m.room.message":
+                        key = "content.body"
+                        value = content["body"]
+                    elif event.type == "m.room.topic":
+                        key = "content.topic"
+                        value = content["topic"]
+                    elif event.type == "m.room.name":
+                        key = "content.name"
+                        value = content["name"]
+                except (KeyError, AttributeError):
+                    # If the event is missing a necessary field then
+                    # skip over it.
+                    continue
+
+                event_search_rows.append((event_id, room_id, key, value))
+
+            if isinstance(self.database_engine, PostgresEngine):
+                sql = (
+                    "INSERT INTO event_search (event_id, room_id, key, vector)"
+                    " VALUES (?,?,?,to_tsvector('english', ?))"
+                )
+            elif isinstance(self.database_engine, Sqlite3Engine):
+                sql = (
+                    "INSERT INTO event_search (event_id, room_id, key, value)"
+                    " VALUES (?,?,?,?)"
+                )
+            else:
+                # This should be unreachable.
+                raise Exception("Unrecognized database engine")
+
+            for index in range(0, len(event_search_rows), INSERT_CLUMP_SIZE):
+                clump = event_search_rows[index:index + INSERT_CLUMP_SIZE]
+                txn.executemany(sql, clump)
+
+            progress = {
+                "target_min_stream_id_inclusive": target_min_stream_id,
+                "max_stream_id_exclusive": min_stream_id,
+                "rows_inserted": rows_inserted + len(event_search_rows)
+            }
+
+            self._background_update_progress_txn(
+                txn, self.EVENT_SEARCH_UPDATE_NAME, progress
+            )
+
+            return len(event_search_rows)
+
+        result = yield self.runInteraction(
+            self.EVENT_SEARCH_UPDATE_NAME, reindex_search_txn
+        )
+
+        if not result:
+            yield self._end_background_update(self.EVENT_SEARCH_UPDATE_NAME)
+
+        defer.returnValue(result)
+
+    @defer.inlineCallbacks
+    def search_msgs(self, room_ids, search_term, keys):
+        """Performs a full text search over events with given keys.
+
+        Args:
+            room_ids (list): List of room ids to search in
+            search_term (str): Search term to search for
+            keys (list): List of keys to search in, currently supports
+                "content.body", "content.name", "content.topic"
+
+        Returns:
+            list of dicts
+        """
+        clauses = []
+        args = []
+
+        # Make sure we don't explode because the person is in too many rooms.
+        # We filter the results below regardless.
+        if len(room_ids) < 500:
+            clauses.append(
+                "room_id IN (%s)" % (",".join(["?"] * len(room_ids)),)
+            )
+            args.extend(room_ids)
+
+        local_clauses = []
+        for key in keys:
+            local_clauses.append("key = ?")
+            args.append(key)
+
+        clauses.append(
+            "(%s)" % (" OR ".join(local_clauses),)
+        )
+
+        if isinstance(self.database_engine, PostgresEngine):
+            sql = (
+                "SELECT ts_rank_cd(vector, query) AS rank, room_id, event_id"
+                " FROM plainto_tsquery('english', ?) as query, event_search"
+                " WHERE vector @@ query"
+            )
+        elif isinstance(self.database_engine, Sqlite3Engine):
+            sql = (
+                "SELECT rank(matchinfo(event_search)) as rank, room_id, event_id"
+                " FROM event_search"
+                " WHERE value MATCH ?"
+            )
+        else:
+            # This should be unreachable.
+            raise Exception("Unrecognized database engine")
+
+        for clause in clauses:
+            sql += " AND " + clause
+
+        # We add an arbitrary limit here to ensure we don't try to pull the
+        # entire table from the database.
+        sql += " ORDER BY rank DESC LIMIT 500"
+
+        results = yield self._execute(
+            "search_msgs", self.cursor_to_dict, sql, *([search_term] + args)
+        )
+
+        results = filter(lambda row: row["room_id"] in room_ids, results)
+
+        events = yield self._get_events([r["event_id"] for r in results])
+
+        event_map = {
+            ev.event_id: ev
+            for ev in events
+        }
+
+        defer.returnValue([
+            {
+                "event": event_map[r["event_id"]],
+                "rank": r["rank"],
+            }
+            for r in results
+            if r["event_id"] in event_map
+        ])
+
+    @defer.inlineCallbacks
+    def search_room(self, room_id, search_term, keys, limit, pagination_token=None):
+        """Performs a full text search over events with given keys.
+
+        Args:
+            room_id (str): The room_id to search in
+            search_term (str): Search term to search for
+            keys (list): List of keys to search in, currently supports
+                "content.body", "content.name", "content.topic"
+            pagination_token (str): A pagination token previously returned
+
+        Returns:
+            list of dicts
+        """
+        clauses = []
+        args = [search_term, room_id]
+
+        local_clauses = []
+        for key in keys:
+            local_clauses.append("key = ?")
+            args.append(key)
+
+        clauses.append(
+            "(%s)" % (" OR ".join(local_clauses),)
+        )
+
+        if pagination_token:
+            try:
+                topo, stream = pagination_token.split(",")
+                topo = int(topo)
+                stream = int(stream)
+            except:
+                raise SynapseError(400, "Invalid pagination token")
+
+            clauses.append(
+                "(topological_ordering < ?"
+                " OR (topological_ordering = ? AND stream_ordering < ?))"
+            )
+            args.extend([topo, topo, stream])
+
+        if isinstance(self.database_engine, PostgresEngine):
+            sql = (
+                "SELECT ts_rank_cd(vector, query) as rank,"
+                " topological_ordering, stream_ordering, room_id, event_id"
+                " FROM plainto_tsquery('english', ?) as query, event_search"
+                " NATURAL JOIN events"
+                " WHERE vector @@ query AND room_id = ?"
+            )
+        elif isinstance(self.database_engine, Sqlite3Engine):
+            # We use CROSS JOIN here to ensure we use the right indexes.
+            # https://sqlite.org/optoverview.html#crossjoin
+            #
+            # We want to use the full text search index on event_search to
+            # extract all possible matches first, then lookup those matches
+            # in the events table to get the topological ordering. We need
+            # to use the indexes in this order because sqlite refuses to
+            # MATCH unless it uses the full text search index
+            sql = (
+                "SELECT rank(matchinfo) as rank, room_id, event_id,"
+                " topological_ordering, stream_ordering"
+                " FROM (SELECT key, event_id, matchinfo(event_search) as matchinfo"
+                " FROM event_search"
+                " WHERE value MATCH ?"
+                " )"
+                " CROSS JOIN events USING (event_id)"
+                " WHERE room_id = ?"
+            )
+        else:
+            # This should be unreachable.
+            raise Exception("Unrecognized database engine")
+
+        for clause in clauses:
+            sql += " AND " + clause
+
+        # We add an arbitrary limit here to ensure we don't try to pull the
+        # entire table from the database.
+        sql += " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?"
+
+        args.append(limit)
+
+        results = yield self._execute(
+            "search_rooms", self.cursor_to_dict, sql, *args
+        )
+
+        events = yield self._get_events([r["event_id"] for r in results])
+
+        event_map = {
+            ev.event_id: ev
+            for ev in events
+        }
+
+        defer.returnValue([
+            {
+                "event": event_map[r["event_id"]],
+                "rank": r["rank"],
+                "pagination_token": "%s,%s" % (
+                    r["topological_ordering"], r["stream_ordering"]
+                ),
+            }
+            for r in results
+            if r["event_id"] in event_map
+        ])
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index 4f15e534b4..b070be504d 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -17,48 +17,13 @@ from twisted.internet import defer
 
 from _base import SQLBaseStore
 
-from syutil.base64util import encode_base64
+from unpaddedbase64 import encode_base64
 from synapse.crypto.event_signing import compute_event_reference_hash
 
 
 class SignatureStore(SQLBaseStore):
     """Persistence for event signatures and hashes"""
 
-    def _get_event_content_hashes_txn(self, txn, event_id):
-        """Get all the hashes for a given Event.
-        Args:
-            txn (cursor):
-            event_id (str): Id for the Event.
-        Returns:
-            A dict of algorithm -> hash.
-        """
-        query = (
-            "SELECT algorithm, hash"
-            " FROM event_content_hashes"
-            " WHERE event_id = ?"
-        )
-        txn.execute(query, (event_id, ))
-        return dict(txn.fetchall())
-
-    def _store_event_content_hash_txn(self, txn, event_id, algorithm,
-                                      hash_bytes):
-        """Store a hash for a Event
-        Args:
-            txn (cursor):
-            event_id (str): Id for the Event.
-            algorithm (str): Hashing algorithm.
-            hash_bytes (bytes): Hash function output bytes.
-        """
-        self._simple_insert_txn(
-            txn,
-            "event_content_hashes",
-            {
-                "event_id": event_id,
-                "algorithm": algorithm,
-                "hash": buffer(hash_bytes),
-            },
-        )
-
     def get_event_reference_hashes(self, event_ids):
         def f(txn):
             return [
@@ -123,80 +88,3 @@ class SignatureStore(SQLBaseStore):
             table="event_reference_hashes",
             values=vals,
         )
-
-    def _get_event_signatures_txn(self, txn, event_id):
-        """Get all the signatures for a given PDU.
-        Args:
-            txn (cursor):
-            event_id (str): Id for the Event.
-        Returns:
-            A dict of sig name -> dict(key_id -> signature_bytes)
-        """
-        query = (
-            "SELECT signature_name, key_id, signature"
-            " FROM event_signatures"
-            " WHERE event_id = ? "
-        )
-        txn.execute(query, (event_id, ))
-        rows = txn.fetchall()
-
-        res = {}
-
-        for name, key, sig in rows:
-            res.setdefault(name, {})[key] = sig
-
-        return res
-
-    def _store_event_signature_txn(self, txn, event_id, signature_name, key_id,
-                                   signature_bytes):
-        """Store a signature from the origin server for a PDU.
-        Args:
-            txn (cursor):
-            event_id (str): Id for the Event.
-            origin (str): origin of the Event.
-            key_id (str): Id for the signing key.
-            signature (bytes): The signature.
-        """
-        self._simple_insert_txn(
-            txn,
-            "event_signatures",
-            {
-                "event_id": event_id,
-                "signature_name": signature_name,
-                "key_id": key_id,
-                "signature": buffer(signature_bytes),
-            },
-        )
-
-    def _get_prev_event_hashes_txn(self, txn, event_id):
-        """Get all the hashes for previous PDUs of a PDU
-        Args:
-            txn (cursor):
-            event_id (str): Id for the Event.
-        Returns:
-            dict of (pdu_id, origin) -> dict of algorithm -> hash_bytes.
-        """
-        query = (
-            "SELECT prev_event_id, algorithm, hash"
-            " FROM event_edge_hashes"
-            " WHERE event_id = ?"
-        )
-        txn.execute(query, (event_id, ))
-        results = {}
-        for prev_event_id, algorithm, hash_bytes in txn.fetchall():
-            hashes = results.setdefault(prev_event_id, {})
-            hashes[algorithm] = hash_bytes
-        return results
-
-    def _store_prev_event_hash_txn(self, txn, event_id, prev_event_id,
-                                   algorithm, hash_bytes):
-        self._simple_insert_txn(
-            txn,
-            "event_edge_hashes",
-            {
-                "event_id": event_id,
-                "prev_event_id": prev_event_id,
-                "algorithm": algorithm,
-                "hash": buffer(hash_bytes),
-            },
-        )
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 9630efcfcc..80e9b63f50 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -20,8 +20,6 @@ from synapse.util.caches.descriptors import (
 
 from twisted.internet import defer
 
-from synapse.util.stringutils import random_string
-
 import logging
 
 logger = logging.getLogger(__name__)
@@ -56,7 +54,7 @@ class StateStore(SQLBaseStore):
             defer.returnValue({})
 
         event_to_groups = yield self._get_state_group_for_events(
-            room_id, event_ids,
+            event_ids,
         )
 
         groups = set(event_to_groups.values())
@@ -210,13 +208,12 @@ class StateStore(SQLBaseStore):
         )
 
     @defer.inlineCallbacks
-    def get_state_for_events(self, room_id, event_ids, types):
+    def get_state_for_events(self, event_ids, types):
         """Given a list of event_ids and type tuples, return a list of state
         dicts for each event. The state dicts will only have the type/state_keys
         that are in the `types` list.
 
         Args:
-            room_id (str)
             event_ids (list)
             types (list): List of (type, state_key) tuples which are used to
                 filter the state fetched. `state_key` may be None, which matches
@@ -227,7 +224,7 @@ class StateStore(SQLBaseStore):
             The dicts are mappings from (type, state_key) -> state_events
         """
         event_to_groups = yield self._get_state_group_for_events(
-            room_id, event_ids,
+            event_ids,
         )
 
         groups = set(event_to_groups.values())
@@ -240,6 +237,20 @@ class StateStore(SQLBaseStore):
 
         defer.returnValue({event: event_to_state[event] for event in event_ids})
 
+    @defer.inlineCallbacks
+    def get_state_for_event(self, event_id, types=None):
+        """
+        Get the state dict corresponding to a particular event
+
+        :param str event_id: event whose state should be returned
+        :param list[(str, str)]|None types: List of (type, state_key) tuples
+            which are used to filter the state fetched. May be None, which
+            matches any key
+        :return: a deferred dict from (type, state_key) -> state_event
+        """
+        state_map = yield self.get_state_for_events([event_id], types)
+        defer.returnValue(state_map[event_id])
+
     @cached(num_args=2, lru=True, max_entries=10000)
     def _get_state_group_for_event(self, room_id, event_id):
         return self._simple_select_one_onecol(
@@ -253,8 +264,8 @@ class StateStore(SQLBaseStore):
         )
 
     @cachedList(cache=_get_state_group_for_event.cache, list_name="event_ids",
-                num_args=2)
-    def _get_state_group_for_events(self, room_id, event_ids):
+                num_args=1)
+    def _get_state_group_for_events(self, event_ids):
         """Returns mapping event_id -> state_group
         """
         def f(txn):
@@ -428,7 +439,3 @@ class StateStore(SQLBaseStore):
             }
 
         defer.returnValue(results)
-
-
-def _make_group_id(clock):
-    return str(int(clock.time_msec())) + random_string(5)
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index d7fe423f5a..be8ba76aae 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -23,7 +23,7 @@ paginate bacwards.
 
 This is implemented by keeping two ordering columns: stream_ordering and
 topological_ordering. Stream ordering is basically insertion/received order
-(except for events from backfill requests). The topolgical_ordering is a
+(except for events from backfill requests). The topological_ordering is a
 weak ordering of events based on the pdu graph.
 
 This means that we have to have two different types of tokens, depending on
@@ -158,16 +158,40 @@ class StreamStore(SQLBaseStore):
         defer.returnValue(results)
 
     @log_function
-    def get_room_events_stream(self, user_id, from_key, to_key, room_id,
-                               limit=0, with_feedback=False):
-        # TODO (erikj): Handle compressed feedback
-
-        current_room_membership_sql = (
-            "SELECT m.room_id FROM room_memberships as m "
-            " INNER JOIN current_state_events as c"
-            " ON m.event_id = c.event_id AND c.state_key = m.user_id"
-            " WHERE m.user_id = ? AND m.membership = 'join'"
-        )
+    def get_room_events_stream(
+        self,
+        user_id,
+        from_key,
+        to_key,
+        limit=0,
+        is_guest=False,
+        room_ids=None
+    ):
+        room_ids = room_ids or []
+        room_ids = [r for r in room_ids]
+        if is_guest:
+            current_room_membership_sql = (
+                "SELECT c.room_id FROM history_visibility AS h"
+                " INNER JOIN current_state_events AS c"
+                " ON h.event_id = c.event_id"
+                " WHERE c.room_id IN (%s) AND h.history_visibility = 'world_readable'" % (
+                    ",".join(map(lambda _: "?", room_ids))
+                )
+            )
+            current_room_membership_args = room_ids
+        else:
+            current_room_membership_sql = (
+                "SELECT m.room_id FROM room_memberships as m "
+                " INNER JOIN current_state_events as c"
+                " ON m.event_id = c.event_id AND c.state_key = m.user_id"
+                " WHERE m.user_id = ? AND m.membership = 'join'"
+            )
+            current_room_membership_args = [user_id]
+            if room_ids:
+                current_room_membership_sql += " AND m.room_id in (%s)" % (
+                    ",".join(map(lambda _: "?", room_ids))
+                )
+                current_room_membership_args = [user_id] + room_ids
 
         # We also want to get any membership events about that user, e.g.
         # invites or leave notifications.
@@ -176,6 +200,7 @@ class StreamStore(SQLBaseStore):
             "INNER JOIN current_state_events as c ON m.event_id = c.event_id "
             "WHERE m.user_id = ? "
         )
+        membership_args = [user_id]
 
         if limit:
             limit = max(limit, MAX_STREAM_SIZE)
@@ -202,7 +227,9 @@ class StreamStore(SQLBaseStore):
         }
 
         def f(txn):
-            txn.execute(sql, (False, user_id, user_id, from_id.stream, to_id.stream,))
+            args = ([False] + current_room_membership_args + membership_args +
+                    [from_id.stream, to_id.stream])
+            txn.execute(sql, args)
 
             rows = self.cursor_to_dict(txn)
 
@@ -227,10 +254,7 @@ class StreamStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def paginate_room_events(self, room_id, from_key, to_key=None,
-                             direction='b', limit=-1,
-                             with_feedback=False):
-        # TODO (erikj): Handle compressed feedback
-
+                             direction='b', limit=-1):
         # Tokens really represent positions between elements, but we use
         # the convention of pointing to the event before the gap. Hence
         # we have a bit of asymmetry when it comes to equalities.
@@ -302,7 +326,6 @@ class StreamStore(SQLBaseStore):
 
     @cachedInlineCallbacks(num_args=4)
     def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None):
-        # TODO (erikj): Handle compressed feedback
 
         end_token = RoomStreamToken.parse_stream_token(end_token)
 
@@ -379,6 +402,38 @@ class StreamStore(SQLBaseStore):
             )
             defer.returnValue("t%d-%d" % (topo, token))
 
+    def get_stream_token_for_event(self, event_id):
+        """The stream token for an event
+        Args:
+            event_id(str): The id of the event to look up a stream token for.
+        Raises:
+            StoreError if the event wasn't in the database.
+        Returns:
+            A deferred "s%d" stream token.
+        """
+        return self._simple_select_one_onecol(
+            table="events",
+            keyvalues={"event_id": event_id},
+            retcol="stream_ordering",
+        ).addCallback(lambda row: "s%d" % (row,))
+
+    def get_topological_token_for_event(self, event_id):
+        """The stream token for an event
+        Args:
+            event_id(str): The id of the event to look up a stream token for.
+        Raises:
+            StoreError if the event wasn't in the database.
+        Returns:
+            A deferred "t%d-%d" topological token.
+        """
+        return self._simple_select_one(
+            table="events",
+            keyvalues={"event_id": event_id},
+            retcols=("stream_ordering", "topological_ordering"),
+        ).addCallback(lambda row: "t%d-%d" % (
+            row["topological_ordering"], row["stream_ordering"],)
+        )
+
     def _get_max_topological_txn(self, txn):
         txn.execute(
             "SELECT MAX(topological_ordering) FROM events"
@@ -410,3 +465,138 @@ class StreamStore(SQLBaseStore):
             internal = event.internal_metadata
             internal.before = str(RoomStreamToken(topo, stream - 1))
             internal.after = str(RoomStreamToken(topo, stream))
+
+    @defer.inlineCallbacks
+    def get_events_around(self, room_id, event_id, before_limit, after_limit):
+        """Retrieve events and pagination tokens around a given event in a
+        room.
+
+        Args:
+            room_id (str)
+            event_id (str)
+            before_limit (int)
+            after_limit (int)
+
+        Returns:
+            dict
+        """
+
+        results = yield self.runInteraction(
+            "get_events_around", self._get_events_around_txn,
+            room_id, event_id, before_limit, after_limit
+        )
+
+        events_before = yield self._get_events(
+            [e for e in results["before"]["event_ids"]],
+            get_prev_content=True
+        )
+
+        events_after = yield self._get_events(
+            [e for e in results["after"]["event_ids"]],
+            get_prev_content=True
+        )
+
+        defer.returnValue({
+            "events_before": events_before,
+            "events_after": events_after,
+            "start": results["before"]["token"],
+            "end": results["after"]["token"],
+        })
+
+    def _get_events_around_txn(self, txn, room_id, event_id, before_limit, after_limit):
+        """Retrieves event_ids and pagination tokens around a given event in a
+        room.
+
+        Args:
+            room_id (str)
+            event_id (str)
+            before_limit (int)
+            after_limit (int)
+
+        Returns:
+            dict
+        """
+
+        results = self._simple_select_one_txn(
+            txn,
+            "events",
+            keyvalues={
+                "event_id": event_id,
+                "room_id": room_id,
+            },
+            retcols=["stream_ordering", "topological_ordering"],
+        )
+
+        stream_ordering = results["stream_ordering"]
+        topological_ordering = results["topological_ordering"]
+
+        query_before = (
+            "SELECT topological_ordering, stream_ordering, event_id FROM events"
+            " WHERE room_id = ? AND (topological_ordering < ?"
+            " OR (topological_ordering = ? AND stream_ordering < ?))"
+            " ORDER BY topological_ordering DESC, stream_ordering DESC"
+            " LIMIT ?"
+        )
+
+        query_after = (
+            "SELECT topological_ordering, stream_ordering, event_id FROM events"
+            " WHERE room_id = ? AND (topological_ordering > ?"
+            " OR (topological_ordering = ? AND stream_ordering > ?))"
+            " ORDER BY topological_ordering ASC, stream_ordering ASC"
+            " LIMIT ?"
+        )
+
+        txn.execute(
+            query_before,
+            (
+                room_id, topological_ordering, topological_ordering,
+                stream_ordering, before_limit,
+            )
+        )
+
+        rows = self.cursor_to_dict(txn)
+        events_before = [r["event_id"] for r in rows]
+
+        if rows:
+            start_token = str(RoomStreamToken(
+                rows[0]["topological_ordering"],
+                rows[0]["stream_ordering"] - 1,
+            ))
+        else:
+            start_token = str(RoomStreamToken(
+                topological_ordering,
+                stream_ordering - 1,
+            ))
+
+        txn.execute(
+            query_after,
+            (
+                room_id, topological_ordering, topological_ordering,
+                stream_ordering, after_limit,
+            )
+        )
+
+        rows = self.cursor_to_dict(txn)
+        events_after = [r["event_id"] for r in rows]
+
+        if rows:
+            end_token = str(RoomStreamToken(
+                rows[-1]["topological_ordering"],
+                rows[-1]["stream_ordering"],
+            ))
+        else:
+            end_token = str(RoomStreamToken(
+                topological_ordering,
+                stream_ordering,
+            ))
+
+        return {
+            "before": {
+                "event_ids": events_before,
+                "token": start_token,
+            },
+            "after": {
+                "event_ids": events_after,
+                "token": end_token,
+            },
+        }
diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py
new file mode 100644
index 0000000000..bf695b7800
--- /dev/null
+++ b/synapse/storage/tags.py
@@ -0,0 +1,216 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014, 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import SQLBaseStore
+from synapse.util.caches.descriptors import cached
+from twisted.internet import defer
+from .util.id_generators import StreamIdGenerator
+
+import ujson as json
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class TagsStore(SQLBaseStore):
+    def __init__(self, hs):
+        super(TagsStore, self).__init__(hs)
+
+        self._private_user_data_id_gen = StreamIdGenerator(
+            "private_user_data_max_stream_id", "stream_id"
+        )
+
+    def get_max_private_user_data_stream_id(self):
+        """Get the current max stream id for the private user data stream
+
+        Returns:
+            A deferred int.
+        """
+        return self._private_user_data_id_gen.get_max_token(self)
+
+    @cached()
+    def get_tags_for_user(self, user_id):
+        """Get all the tags for a user.
+
+
+        Args:
+            user_id(str): The user to get the tags for.
+        Returns:
+            A deferred dict mapping from room_id strings to lists of tag
+            strings.
+        """
+
+        deferred = self._simple_select_list(
+            "room_tags", {"user_id": user_id}, ["room_id", "tag", "content"]
+        )
+
+        @deferred.addCallback
+        def tags_by_room(rows):
+            tags_by_room = {}
+            for row in rows:
+                room_tags = tags_by_room.setdefault(row["room_id"], {})
+                room_tags[row["tag"]] = json.loads(row["content"])
+            return tags_by_room
+
+        return deferred
+
+    @defer.inlineCallbacks
+    def get_updated_tags(self, user_id, stream_id):
+        """Get all the tags for the rooms where the tags have changed since the
+        given version
+
+        Args:
+            user_id(str): The user to get the tags for.
+            stream_id(int): The earliest update to get for the user.
+        Returns:
+            A deferred dict mapping from room_id strings to lists of tag
+            strings for all the rooms that changed since the stream_id token.
+        """
+        def get_updated_tags_txn(txn):
+            sql = (
+                "SELECT room_id from room_tags_revisions"
+                " WHERE user_id = ? AND stream_id > ?"
+            )
+            txn.execute(sql, (user_id, stream_id))
+            room_ids = [row[0] for row in txn.fetchall()]
+            return room_ids
+
+        room_ids = yield self.runInteraction(
+            "get_updated_tags", get_updated_tags_txn
+        )
+
+        results = {}
+        if room_ids:
+            tags_by_room = yield self.get_tags_for_user(user_id)
+            for room_id in room_ids:
+                results[room_id] = tags_by_room.get(room_id, {})
+
+        defer.returnValue(results)
+
+    def get_tags_for_room(self, user_id, room_id):
+        """Get all the tags for the given room
+        Args:
+            user_id(str): The user to get tags for
+            room_id(str): The room to get tags for
+        Returns:
+            A deferred list of string tags.
+        """
+        return self._simple_select_list(
+            table="room_tags",
+            keyvalues={"user_id": user_id, "room_id": room_id},
+            retcols=("tag", "content"),
+            desc="get_tags_for_room",
+        ).addCallback(lambda rows: {
+            row["tag"]: json.loads(row["content"]) for row in rows
+        })
+
+    @defer.inlineCallbacks
+    def add_tag_to_room(self, user_id, room_id, tag, content):
+        """Add a tag to a room for a user.
+        Args:
+            user_id(str): The user to add a tag for.
+            room_id(str): The room to add a tag for.
+            tag(str): The tag name to add.
+            content(dict): A json object to associate with the tag.
+        Returns:
+            A deferred that completes once the tag has been added.
+        """
+        content_json = json.dumps(content)
+
+        def add_tag_txn(txn, next_id):
+            self._simple_upsert_txn(
+                txn,
+                table="room_tags",
+                keyvalues={
+                    "user_id": user_id,
+                    "room_id": room_id,
+                    "tag": tag,
+                },
+                values={
+                    "content": content_json,
+                }
+            )
+            self._update_revision_txn(txn, user_id, room_id, next_id)
+
+        with (yield self._private_user_data_id_gen.get_next(self)) as next_id:
+            yield self.runInteraction("add_tag", add_tag_txn, next_id)
+
+        self.get_tags_for_user.invalidate((user_id,))
+
+        result = yield self._private_user_data_id_gen.get_max_token(self)
+        defer.returnValue(result)
+
+    @defer.inlineCallbacks
+    def remove_tag_from_room(self, user_id, room_id, tag):
+        """Remove a tag from a room for a user.
+        Returns:
+            A deferred that completes once the tag has been removed
+        """
+        def remove_tag_txn(txn, next_id):
+            sql = (
+                "DELETE FROM room_tags "
+                " WHERE user_id = ? AND room_id = ? AND tag = ?"
+            )
+            txn.execute(sql, (user_id, room_id, tag))
+            self._update_revision_txn(txn, user_id, room_id, next_id)
+
+        with (yield self._private_user_data_id_gen.get_next(self)) as next_id:
+            yield self.runInteraction("remove_tag", remove_tag_txn, next_id)
+
+        self.get_tags_for_user.invalidate((user_id,))
+
+        result = yield self._private_user_data_id_gen.get_max_token(self)
+        defer.returnValue(result)
+
+    def _update_revision_txn(self, txn, user_id, room_id, next_id):
+        """Update the latest revision of the tags for the given user and room.
+
+        Args:
+            txn: The database cursor
+            user_id(str): The ID of the user.
+            room_id(str): The ID of the room.
+            next_id(int): The the revision to advance to.
+        """
+
+        update_max_id_sql = (
+            "UPDATE private_user_data_max_stream_id"
+            " SET stream_id = ?"
+            " WHERE stream_id < ?"
+        )
+        txn.execute(update_max_id_sql, (next_id, next_id))
+
+        update_sql = (
+            "UPDATE room_tags_revisions"
+            " SET stream_id = ?"
+            " WHERE user_id = ?"
+            " AND room_id = ?"
+        )
+        txn.execute(update_sql, (next_id, user_id, room_id))
+
+        if txn.rowcount == 0:
+            insert_sql = (
+                "INSERT INTO room_tags_revisions (user_id, room_id, stream_id)"
+                " VALUES (?, ?, ?)"
+            )
+            try:
+                txn.execute(insert_sql, (user_id, room_id, next_id))
+            except self.database_engine.module.IntegrityError:
+                # Ignore insertion errors. It doesn't matter if the row wasn't
+                # inserted because if two updates happend concurrently the one
+                # with the higher stream_id will not be reported to a client
+                # unless the previous update has completed. It doesn't matter
+                # which stream_id ends up in the table, as long as it is higher
+                # than the id that the client has.
+                pass
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index c8c7e6591a..ad099775eb 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -18,7 +18,7 @@ from synapse.util.caches.descriptors import cached
 
 from collections import namedtuple
 
-from syutil.jsonutil import encode_canonical_json
+from canonicaljson import encode_canonical_json
 import logging
 
 logger = logging.getLogger(__name__)
@@ -59,7 +59,7 @@ class TransactionStore(SQLBaseStore):
             allow_none=True,
         )
 
-        if result and result.response_code:
+        if result and result["response_code"]:
             return result["response_code"], result["response_json"]
         else:
             return None
@@ -253,16 +253,6 @@ class TransactionStore(SQLBaseStore):
             retry_interval (int) - how long until next retry in ms
         """
 
-        # As this is the new value, we might as well prefill the cache
-        self.get_destination_retry_timings.prefill(
-            destination,
-            {
-                "destination": destination,
-                "retry_last_ts": retry_last_ts,
-                "retry_interval": retry_interval
-            },
-        )
-
         # XXX: we could chose to not bother persisting this if our cache thinks
         # this is a NOOP
         return self.runInteraction(
@@ -275,31 +265,25 @@ class TransactionStore(SQLBaseStore):
 
     def _set_destination_retry_timings(self, txn, destination,
                                        retry_last_ts, retry_interval):
-        query = (
-            "UPDATE destinations"
-            " SET retry_last_ts = ?, retry_interval = ?"
-            " WHERE destination = ?"
-        )
+        txn.call_after(self.get_destination_retry_timings.invalidate, (destination,))
 
-        txn.execute(
-            query,
-            (
-                retry_last_ts, retry_interval, destination,
-            )
+        self._simple_upsert_txn(
+            txn,
+            "destinations",
+            keyvalues={
+                "destination": destination,
+            },
+            values={
+                "retry_last_ts": retry_last_ts,
+                "retry_interval": retry_interval,
+            },
+            insertion_values={
+                "destination": destination,
+                "retry_last_ts": retry_last_ts,
+                "retry_interval": retry_interval,
+            }
         )
 
-        if txn.rowcount == 0:
-            # destination wasn't already in table. Insert it.
-            self._simple_insert_txn(
-                txn,
-                table="destinations",
-                values={
-                    "destination": destination,
-                    "retry_last_ts": retry_last_ts,
-                    "retry_interval": retry_interval,
-                }
-            )
-
     def get_destinations_needing_retry(self):
         """Get all destinations which are due a retry for sending a transaction.