summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/api/auth.py54
-rw-r--r--synapse/api/constants.py3
-rw-r--r--synapse/api/errors.py2
-rwxr-xr-xsynapse/app/homeserver.py51
-rw-r--r--synapse/config/_base.py7
-rw-r--r--synapse/config/captcha.py7
-rw-r--r--synapse/config/database.py30
-rw-r--r--synapse/config/email.py42
-rw-r--r--synapse/config/homeserver.py3
-rw-r--r--synapse/config/logger.py1
-rw-r--r--synapse/handlers/__init__.py4
-rw-r--r--synapse/handlers/_base.py15
-rw-r--r--synapse/handlers/auth.py277
-rw-r--r--synapse/handlers/federation.py2
-rw-r--r--synapse/handlers/identity.py88
-rw-r--r--synapse/handlers/login.py63
-rw-r--r--synapse/handlers/message.py28
-rw-r--r--synapse/handlers/presence.py14
-rw-r--r--synapse/handlers/register.py118
-rw-r--r--synapse/handlers/room.py22
-rw-r--r--synapse/http/client.py2
-rw-r--r--synapse/http/server.py22
-rw-r--r--synapse/notifier.py2
-rw-r--r--synapse/push/__init__.py10
-rw-r--r--synapse/push/pusherpool.py84
-rw-r--r--synapse/rest/client/v1/base.py2
-rw-r--r--synapse/rest/client/v1/pusher.py16
-rw-r--r--synapse/rest/client/v2_alpha/__init__.py8
-rw-r--r--synapse/rest/client/v2_alpha/_base.py22
-rw-r--r--synapse/rest/client/v2_alpha/account.py159
-rw-r--r--synapse/rest/client/v2_alpha/auth.py190
-rw-r--r--synapse/rest/client/v2_alpha/register.py183
-rw-r--r--synapse/server.py10
-rw-r--r--synapse/storage/__init__.py146
-rw-r--r--synapse/storage/_base.py164
-rw-r--r--synapse/storage/appservice.py4
-rw-r--r--synapse/storage/directory.py8
-rw-r--r--synapse/storage/engines/__init__.py37
-rw-r--r--synapse/storage/engines/postgres.py47
-rw-r--r--synapse/storage/engines/sqlite3.py43
-rw-r--r--synapse/storage/event_federation.py35
-rw-r--r--synapse/storage/events.py160
-rw-r--r--synapse/storage/keys.py16
-rw-r--r--synapse/storage/presence.py1
-rw-r--r--synapse/storage/push_rule.py4
-rw-r--r--synapse/storage/pusher.py117
-rw-r--r--synapse/storage/registration.py119
-rw-r--r--synapse/storage/room.py39
-rw-r--r--synapse/storage/roommember.py148
-rw-r--r--synapse/storage/schema/delta/12/v12.sql34
-rw-r--r--synapse/storage/schema/delta/13/v13.sql7
-rw-r--r--synapse/storage/schema/delta/15/appservice_txns.sql11
-rw-r--r--synapse/storage/schema/delta/15/presence_indices.sql2
-rw-r--r--synapse/storage/schema/delta/15/v15.sql25
-rw-r--r--synapse/storage/schema/delta/16/events_order_index.sql4
-rw-r--r--synapse/storage/schema/delta/16/remote_media_cache_index.sql2
-rw-r--r--synapse/storage/schema/delta/16/remove_duplicates.sql9
-rw-r--r--synapse/storage/schema/delta/16/room_alias_index.sql3
-rw-r--r--synapse/storage/schema/delta/16/unique_constraints.sql80
-rw-r--r--synapse/storage/schema/delta/16/users.sql56
-rw-r--r--synapse/storage/schema/full_schemas/11/event_edges.sql42
-rw-r--r--synapse/storage/schema/full_schemas/11/event_signatures.sql34
-rw-r--r--synapse/storage/schema/full_schemas/11/im.sql62
-rw-r--r--synapse/storage/schema/full_schemas/11/keys.sql12
-rw-r--r--synapse/storage/schema/full_schemas/11/media_repository.sql21
-rw-r--r--synapse/storage/schema/full_schemas/11/presence.sql19
-rw-r--r--synapse/storage/schema/full_schemas/11/profiles.sql5
-rw-r--r--synapse/storage/schema/full_schemas/11/redactions.sql6
-rw-r--r--synapse/storage/schema/full_schemas/11/room_aliases.sql3
-rw-r--r--synapse/storage/schema/full_schemas/11/state.sql17
-rw-r--r--synapse/storage/schema/full_schemas/11/transactions.sql33
-rw-r--r--synapse/storage/schema/full_schemas/11/users.sql20
-rw-r--r--synapse/storage/schema/full_schemas/16/application_services.sql48
-rw-r--r--synapse/storage/schema/full_schemas/16/event_edges.sql89
-rw-r--r--synapse/storage/schema/full_schemas/16/event_signatures.sql55
-rw-r--r--synapse/storage/schema/full_schemas/16/im.sql128
-rw-r--r--synapse/storage/schema/full_schemas/16/keys.sql31
-rw-r--r--synapse/storage/schema/full_schemas/16/media_repository.sql68
-rw-r--r--synapse/storage/schema/full_schemas/16/presence.sql40
-rw-r--r--synapse/storage/schema/full_schemas/16/profiles.sql20
-rw-r--r--synapse/storage/schema/full_schemas/16/push.sql73
-rw-r--r--synapse/storage/schema/full_schemas/16/redactions.sql22
-rw-r--r--synapse/storage/schema/full_schemas/16/room_aliases.sql29
-rw-r--r--synapse/storage/schema/full_schemas/16/state.sql40
-rw-r--r--synapse/storage/schema/full_schemas/16/transactions.sql63
-rw-r--r--synapse/storage/schema/full_schemas/16/users.sql42
-rw-r--r--synapse/storage/schema/schema_version.sql9
-rw-r--r--synapse/storage/signatures.py6
-rw-r--r--synapse/storage/state.py13
-rw-r--r--synapse/storage/stream.py51
-rw-r--r--synapse/storage/transactions.py213
-rw-r--r--synapse/storage/util/__init__.py14
-rw-r--r--synapse/storage/util/id_generators.py131
-rw-r--r--synapse/util/lrucache.py20
-rw-r--r--synapse/util/retryutils.py2
95 files changed, 3332 insertions, 1011 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index a21120b313..77322a5c10 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -18,7 +18,7 @@
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership, JoinRules
-from synapse.api.errors import AuthError, StoreError, Codes, SynapseError
+from synapse.api.errors import AuthError, Codes, SynapseError
 from synapse.util.logutils import log_function
 from synapse.util.async import run_on_reactor
 from synapse.types import UserID, ClientInfo
@@ -40,6 +40,7 @@ class Auth(object):
         self.hs = hs
         self.store = hs.get_datastore()
         self.state = hs.get_state_handler()
+        self.TOKEN_NOT_FOUND_HTTP_STATUS = 401
 
     def check(self, event, auth_events):
         """ Checks if this event is correctly authed.
@@ -222,6 +223,13 @@ class Auth(object):
             elif target_in_room:  # the target is already in the room.
                 raise AuthError(403, "%s is already in the room." %
                                      target_user_id)
+            else:
+                invite_level = self._get_named_level(auth_events, "invite", 0)
+
+                if user_level < invite_level:
+                    raise AuthError(
+                        403, "You cannot invite user %s." % target_user_id
+                    )
         elif Membership.JOIN == membership:
             # Joins are valid iff caller == target and they were:
             # invited: They are accepting the invitation
@@ -362,7 +370,10 @@ class Auth(object):
 
             defer.returnValue((user, ClientInfo(device_id, token_id)))
         except KeyError:
-            raise AuthError(403, "Missing access token.")
+            raise AuthError(
+                self.TOKEN_NOT_FOUND_HTTP_STATUS, "Missing access token.",
+                errcode=Codes.MISSING_TOKEN
+            )
 
     @defer.inlineCallbacks
     def get_user_by_token(self, token):
@@ -376,21 +387,20 @@ class Auth(object):
         Raises:
             AuthError if no user by that token exists or the token is invalid.
         """
-        try:
-            ret = yield self.store.get_user_by_token(token)
-            if not ret:
-                raise StoreError(400, "Unknown token")
-            user_info = {
-                "admin": bool(ret.get("admin", False)),
-                "device_id": ret.get("device_id"),
-                "user": UserID.from_string(ret.get("name")),
-                "token_id": ret.get("token_id", None),
-            }
+        ret = yield self.store.get_user_by_token(token)
+        if not ret:
+            raise AuthError(
+                self.TOKEN_NOT_FOUND_HTTP_STATUS, "Unrecognised access token.",
+                errcode=Codes.UNKNOWN_TOKEN
+            )
+        user_info = {
+            "admin": bool(ret.get("admin", False)),
+            "device_id": ret.get("device_id"),
+            "user": UserID.from_string(ret.get("name")),
+            "token_id": ret.get("token_id", None),
+        }
 
-            defer.returnValue(user_info)
-        except StoreError:
-            raise AuthError(403, "Unrecognised access token.",
-                            errcode=Codes.UNKNOWN_TOKEN)
+        defer.returnValue(user_info)
 
     @defer.inlineCallbacks
     def get_appservice_by_req(self, request):
@@ -398,11 +408,16 @@ class Auth(object):
             token = request.args["access_token"][0]
             service = yield self.store.get_app_service_by_token(token)
             if not service:
-                raise AuthError(403, "Unrecognised access token.",
-                                errcode=Codes.UNKNOWN_TOKEN)
+                raise AuthError(
+                    self.TOKEN_NOT_FOUND_HTTP_STATUS,
+                    "Unrecognised access token.",
+                    errcode=Codes.UNKNOWN_TOKEN
+                )
             defer.returnValue(service)
         except KeyError:
-            raise AuthError(403, "Missing access token.")
+            raise AuthError(
+                self.TOKEN_NOT_FOUND_HTTP_STATUS, "Missing access token."
+            )
 
     def is_server_admin(self, user):
         return self.store.is_server_admin(user)
@@ -561,6 +576,7 @@ class Auth(object):
             ("ban", []),
             ("redact", []),
             ("kick", []),
+            ("invite", []),
         ]
 
         old_list = current_state.content.get("users")
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index b16bf4247d..d8a18ee87b 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -59,6 +59,9 @@ class LoginType(object):
     EMAIL_URL = u"m.login.email.url"
     EMAIL_IDENTITY = u"m.login.email.identity"
     RECAPTCHA = u"m.login.recaptcha"
+    DUMMY = u"m.login.dummy"
+
+    # Only for C/S API v1
     APPLICATION_SERVICE = u"m.login.application_service"
     SHARED_SECRET = u"org.matrix.login.shared_secret"
 
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index 72d2bd5b4c..0b3320e62c 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -31,6 +31,7 @@ class Codes(object):
     BAD_PAGINATION = "M_BAD_PAGINATION"
     UNKNOWN = "M_UNKNOWN"
     NOT_FOUND = "M_NOT_FOUND"
+    MISSING_TOKEN = "M_MISSING_TOKEN"
     UNKNOWN_TOKEN = "M_UNKNOWN_TOKEN"
     LIMIT_EXCEEDED = "M_LIMIT_EXCEEDED"
     CAPTCHA_NEEDED = "M_CAPTCHA_NEEDED"
@@ -38,6 +39,7 @@ class Codes(object):
     MISSING_PARAM = "M_MISSING_PARAM"
     TOO_LARGE = "M_TOO_LARGE"
     EXCLUSIVE = "M_EXCLUSIVE"
+    THREEPID_AUTH_FAILED = "M_THREEPID_AUTH_FAILED"
 
 
 class CodeMessageException(RuntimeError):
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index e681941612..bedf797bd2 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -17,8 +17,9 @@
 import sys
 sys.dont_write_bytecode = True
 
+from synapse.storage.engines import create_engine
 from synapse.storage import (
-    prepare_database, prepare_sqlite3_database, UpgradeDatabaseException,
+    are_all_users_on_domain, UpgradeDatabaseException,
 )
 
 from synapse.server import HomeServer
@@ -59,9 +60,9 @@ import os
 import re
 import resource
 import subprocess
-import sqlite3
 
-logger = logging.getLogger(__name__)
+
+logger = logging.getLogger("synapse.app.homeserver")
 
 
 class SynapseHomeServer(HomeServer):
@@ -108,13 +109,11 @@ class SynapseHomeServer(HomeServer):
             return None
 
     def build_db_pool(self):
+        name = self.db_config["name"]
+
         return adbapi.ConnectionPool(
-            "sqlite3", self.get_db_name(),
-            check_same_thread=False,
-            cp_min=1,
-            cp_max=1,
-            cp_openfun=prepare_database,  # Prepare the database for each conn
-                                          # so that :memory: sqlite works
+            name,
+            **self.db_config.get("args", {})
         )
 
     def create_resource_tree(self, redirect_root_to_web_client):
@@ -247,6 +246,21 @@ class SynapseHomeServer(HomeServer):
             )
             logger.info("Metrics now running on 127.0.0.1 port %d", config.metrics_port)
 
+    def run_startup_checks(self, db_conn, database_engine):
+        all_users_native = are_all_users_on_domain(
+            db_conn.cursor(), database_engine, self.hostname
+        )
+        if not all_users_native:
+            sys.stderr.write(
+                "\n"
+                "******************************************************\n"
+                "Found users in database not native to %s!\n"
+                "You cannot changed a synapse server_name after it's been configured\n"
+                "******************************************************\n"
+                "\n" % (self.hostname,)
+            )
+            sys.exit(1)
+
 
 def get_version_string():
     try:
@@ -358,15 +372,20 @@ def setup(config_options):
 
     tls_context_factory = context_factory.ServerContextFactory(config)
 
+    database_engine = create_engine(config.database_config["name"])
+    config.database_config["args"]["cp_openfun"] = database_engine.on_new_connection
+
     hs = SynapseHomeServer(
         config.server_name,
         domain_with_port=domain_with_port,
         upload_dir=os.path.abspath("uploads"),
         db_name=config.database_path,
+        db_config=config.database_config,
         tls_context_factory=tls_context_factory,
         config=config,
         content_addr=config.content_addr,
         version_string=version_string,
+        database_engine=database_engine,
     )
 
     hs.create_resource_tree(
@@ -378,9 +397,17 @@ def setup(config_options):
     logger.info("Preparing database: %s...", db_name)
 
     try:
-        with sqlite3.connect(db_name) as db_conn:
-            prepare_sqlite3_database(db_conn)
-            prepare_database(db_conn)
+        db_conn = database_engine.module.connect(
+            **{
+                k: v for k, v in config.database_config.get("args", {}).items()
+                if not k.startswith("cp_")
+            }
+        )
+
+        database_engine.prepare_database(db_conn)
+        hs.run_startup_checks(db_conn, database_engine)
+
+        db_conn.commit()
     except UpgradeDatabaseException:
         sys.stderr.write(
             "\nFailed to upgrade database.\n"
diff --git a/synapse/config/_base.py b/synapse/config/_base.py
index 6fd086a471..b59f4e45e2 100644
--- a/synapse/config/_base.py
+++ b/synapse/config/_base.py
@@ -158,9 +158,10 @@ class Config(object):
                         and value is not None):
                     config[key] = value
             with open(config_args.config_path, "w") as config_file:
-                # TODO(paul) it would be lovely if we wrote out vim- and emacs-
-                #   style mode markers into the file, to hint to people that
-                #   this is a YAML file.
+                # TODO(mark/paul) We might want to output emacs-style mode
+                # markers as well as vim-style mode markers into the file,
+                # to further hint to people this is a YAML file.
+                config_file.write("# vim:ft=yaml\n")
                 yaml.dump(config, config_file, default_flow_style=False)
             print (
                 "A config file has been generated in %s for server name"
diff --git a/synapse/config/captcha.py b/synapse/config/captcha.py
index 7e21c7414d..07fbfadc0f 100644
--- a/synapse/config/captcha.py
+++ b/synapse/config/captcha.py
@@ -20,6 +20,7 @@ class CaptchaConfig(Config):
     def __init__(self, args):
         super(CaptchaConfig, self).__init__(args)
         self.recaptcha_private_key = args.recaptcha_private_key
+        self.recaptcha_public_key = args.recaptcha_public_key
         self.enable_registration_captcha = args.enable_registration_captcha
         self.captcha_ip_origin_is_x_forwarded = (
             args.captcha_ip_origin_is_x_forwarded
@@ -31,8 +32,12 @@ class CaptchaConfig(Config):
         super(CaptchaConfig, cls).add_arguments(parser)
         group = parser.add_argument_group("recaptcha")
         group.add_argument(
+            "--recaptcha-public-key", type=str, default="YOUR_PUBLIC_KEY",
+            help="This Home Server's ReCAPTCHA public key."
+        )
+        group.add_argument(
             "--recaptcha-private-key", type=str, default="YOUR_PRIVATE_KEY",
-            help="The matching private key for the web client's public key."
+            help="This Home Server's ReCAPTCHA private key."
         )
         group.add_argument(
             "--enable-registration-captcha", type=bool, default=False,
diff --git a/synapse/config/database.py b/synapse/config/database.py
index 87efe54645..190d119df4 100644
--- a/synapse/config/database.py
+++ b/synapse/config/database.py
@@ -15,6 +15,7 @@
 
 from ._base import Config
 import os
+import yaml
 
 
 class DatabaseConfig(Config):
@@ -26,18 +27,45 @@ class DatabaseConfig(Config):
             self.database_path = self.abspath(args.database_path)
         self.event_cache_size = self.parse_size(args.event_cache_size)
 
+        if args.database_config:
+            with open(args.database_config) as f:
+                self.database_config = yaml.safe_load(f)
+        else:
+            self.database_config = {
+                "name": "sqlite3",
+                "args": {
+                    "database": self.database_path,
+                },
+            }
+
+        name = self.database_config.get("name", None)
+        if name == "psycopg2":
+            pass
+        elif name == "sqlite3":
+            self.database_config.setdefault("args", {}).update({
+                "cp_min": 1,
+                "cp_max": 1,
+                "check_same_thread": False,
+            })
+        else:
+            raise RuntimeError("Unsupported database type '%s'" % (name,))
+
     @classmethod
     def add_arguments(cls, parser):
         super(DatabaseConfig, cls).add_arguments(parser)
         db_group = parser.add_argument_group("database")
         db_group.add_argument(
             "-d", "--database-path", default="homeserver.db",
-            help="The database name."
+            metavar="SQLITE_DATABASE_PATH", help="The database name."
         )
         db_group.add_argument(
             "--event-cache-size", default="100K",
             help="Number of events to cache in memory."
         )
+        db_group.add_argument(
+            "--database-config", default=None,
+            help="Location of the database configuration file."
+        )
 
     @classmethod
     def generate_config(cls, args, config_dir_path):
diff --git a/synapse/config/email.py b/synapse/config/email.py
deleted file mode 100644
index f0854f8c37..0000000000
--- a/synapse/config/email.py
+++ /dev/null
@@ -1,42 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2014, 2015 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ._base import Config
-
-
-class EmailConfig(Config):
-
-    def __init__(self, args):
-        super(EmailConfig, self).__init__(args)
-        self.email_from_address = args.email_from_address
-        self.email_smtp_server = args.email_smtp_server
-
-    @classmethod
-    def add_arguments(cls, parser):
-        super(EmailConfig, cls).add_arguments(parser)
-        email_group = parser.add_argument_group("email")
-        email_group.add_argument(
-            "--email-from-address",
-            default="FROM@EXAMPLE.COM",
-            help="The address to send emails from (e.g. for password resets)."
-        )
-        email_group.add_argument(
-            "--email-smtp-server",
-            default="",
-            help=(
-                "The SMTP server to send emails from (e.g. for password"
-                " resets)."
-            )
-        )
diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py
index 967a0f45d6..1c8ff38465 100644
--- a/synapse/config/homeserver.py
+++ b/synapse/config/homeserver.py
@@ -20,7 +20,6 @@ from .database import DatabaseConfig
 from .ratelimiting import RatelimitConfig
 from .repository import ContentRepositoryConfig
 from .captcha import CaptchaConfig
-from .email import EmailConfig
 from .voip import VoipConfig
 from .registration import RegistrationConfig
 from .metrics import MetricsConfig
@@ -30,7 +29,7 @@ from .key import KeyConfig
 
 class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig,
                        RatelimitConfig, ContentRepositoryConfig, CaptchaConfig,
-                       EmailConfig, VoipConfig, RegistrationConfig,
+                       VoipConfig, RegistrationConfig,
                        MetricsConfig, AppServiceConfig, KeyConfig,):
     pass
 
diff --git a/synapse/config/logger.py b/synapse/config/logger.py
index 63c8e36930..247b324816 100644
--- a/synapse/config/logger.py
+++ b/synapse/config/logger.py
@@ -78,7 +78,6 @@ class LoggingConfig(Config):
             handler.addFilter(LoggingContextFilter(request=""))
 
             logger.addHandler(handler)
-            logger.info("Test")
         else:
             with open(self.log_config, 'r') as f:
                 logging.config.dictConfig(yaml.load(f))
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index 0c51d615ec..685792dbdc 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -30,6 +30,8 @@ from .typing import TypingNotificationHandler
 from .admin import AdminHandler
 from .appservice import ApplicationServicesHandler
 from .sync import SyncHandler
+from .auth import AuthHandler
+from .identity import IdentityHandler
 
 
 class Handlers(object):
@@ -64,3 +66,5 @@ class Handlers(object):
             )
         )
         self.sync_handler = SyncHandler(hs)
+        self.auth_handler = AuthHandler(hs)
+        self.identity_handler = IdentityHandler(hs)
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 48816a242d..dffb033fbd 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -16,7 +16,6 @@
 from twisted.internet import defer
 
 from synapse.api.errors import LimitExceededError, SynapseError
-from synapse.util.async import run_on_reactor
 from synapse.crypto.event_signing import add_hashes_and_signatures
 from synapse.api.constants import Membership, EventTypes
 from synapse.types import UserID
@@ -58,8 +57,6 @@ class BaseHandler(object):
 
     @defer.inlineCallbacks
     def _create_new_client_event(self, builder):
-        yield run_on_reactor()
-
         latest_ret = yield self.store.get_latest_events_in_room(
             builder.room_id,
         )
@@ -101,8 +98,6 @@ class BaseHandler(object):
     @defer.inlineCallbacks
     def handle_new_client_event(self, event, context, extra_destinations=[],
                                 extra_users=[], suppress_auth=False):
-        yield run_on_reactor()
-
         # We now need to go and hit out to wherever we need to hit out to.
 
         if not suppress_auth:
@@ -143,7 +138,9 @@ class BaseHandler(object):
                 )
 
         # Don't block waiting on waking up all the listeners.
-        d = self.notifier.on_new_room_event(event, extra_users=extra_users)
+        notify_d = self.notifier.on_new_room_event(
+            event, extra_users=extra_users
+        )
 
         def log_failure(f):
             logger.warn(
@@ -151,8 +148,10 @@ class BaseHandler(object):
                 event.event_id, f.value
             )
 
-        d.addErrback(log_failure)
+        notify_d.addErrback(log_failure)
 
-        yield federation_handler.handle_new_event(
+        fed_d = federation_handler.handle_new_event(
             event, destinations=destinations,
         )
+
+        fed_d.addErrback(log_failure)
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
new file mode 100644
index 0000000000..2e8009d3c3
--- /dev/null
+++ b/synapse/handlers/auth.py
@@ -0,0 +1,277 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014, 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from ._base import BaseHandler
+from synapse.api.constants import LoginType
+from synapse.types import UserID
+from synapse.api.errors import LoginError, Codes
+from synapse.http.client import SimpleHttpClient
+from synapse.util.async import run_on_reactor
+
+from twisted.web.client import PartialDownloadError
+
+import logging
+import bcrypt
+import simplejson
+
+import synapse.util.stringutils as stringutils
+
+
+logger = logging.getLogger(__name__)
+
+
+class AuthHandler(BaseHandler):
+
+    def __init__(self, hs):
+        super(AuthHandler, self).__init__(hs)
+        self.checkers = {
+            LoginType.PASSWORD: self._check_password_auth,
+            LoginType.RECAPTCHA: self._check_recaptcha,
+            LoginType.EMAIL_IDENTITY: self._check_email_identity,
+            LoginType.DUMMY: self._check_dummy_auth,
+        }
+        self.sessions = {}
+
+    @defer.inlineCallbacks
+    def check_auth(self, flows, clientdict, clientip=None):
+        """
+        Takes a dictionary sent by the client in the login / registration
+        protocol and handles the login flow.
+
+        Args:
+            flows: list of list of stages
+            authdict: The dictionary from the client root level, not the
+                      'auth' key: this method prompts for auth if none is sent.
+        Returns:
+            A tuple of authed, dict, dict where authed is true if the client
+            has successfully completed an auth flow. If it is true, the first
+            dict contains the authenticated credentials of each stage.
+
+            If authed is false, the first dictionary is the server response to
+            the login request and should be passed back to the client.
+
+            In either case, the second dict contains the parameters for this
+            request (which may have been given only in a previous call).
+        """
+
+        authdict = None
+        sid = None
+        if clientdict and 'auth' in clientdict:
+            authdict = clientdict['auth']
+            del clientdict['auth']
+            if 'session' in authdict:
+                sid = authdict['session']
+        sess = self._get_session_info(sid)
+
+        if len(clientdict) > 0:
+            # This was designed to allow the client to omit the parameters
+            # and just supply the session in subsequent calls so it split
+            # auth between devices by just sharing the session, (eg. so you
+            # could continue registration from your phone having clicked the
+            # email auth link on there). It's probably too open to abuse
+            # because it lets unauthenticated clients store arbitrary objects
+            # on a home server.
+            # sess['clientdict'] = clientdict
+            # self._save_session(sess)
+            pass
+        elif 'clientdict' in sess:
+            clientdict = sess['clientdict']
+
+        if not authdict:
+            defer.returnValue(
+                (False, self._auth_dict_for_flows(flows, sess), clientdict)
+            )
+
+        if 'creds' not in sess:
+            sess['creds'] = {}
+        creds = sess['creds']
+
+        # check auth type currently being presented
+        if 'type' in authdict:
+            if authdict['type'] not in self.checkers:
+                raise LoginError(400, "", Codes.UNRECOGNIZED)
+            result = yield self.checkers[authdict['type']](authdict, clientip)
+            if result:
+                creds[authdict['type']] = result
+                self._save_session(sess)
+
+        for f in flows:
+            if len(set(f) - set(creds.keys())) == 0:
+                logger.info("Auth completed with creds: %r", creds)
+                self._remove_session(sess)
+                defer.returnValue((True, creds, clientdict))
+
+        ret = self._auth_dict_for_flows(flows, sess)
+        ret['completed'] = creds.keys()
+        defer.returnValue((False, ret, clientdict))
+
+    @defer.inlineCallbacks
+    def add_oob_auth(self, stagetype, authdict, clientip):
+        """
+        Adds the result of out-of-band authentication into an existing auth
+        session. Currently used for adding the result of fallback auth.
+        """
+        if stagetype not in self.checkers:
+            raise LoginError(400, "", Codes.MISSING_PARAM)
+        if 'session' not in authdict:
+            raise LoginError(400, "", Codes.MISSING_PARAM)
+
+        sess = self._get_session_info(
+            authdict['session']
+        )
+        if 'creds' not in sess:
+            sess['creds'] = {}
+        creds = sess['creds']
+
+        result = yield self.checkers[stagetype](authdict, clientip)
+        if result:
+            creds[stagetype] = result
+            self._save_session(sess)
+            defer.returnValue(True)
+        defer.returnValue(False)
+
+    @defer.inlineCallbacks
+    def _check_password_auth(self, authdict, _):
+        if "user" not in authdict or "password" not in authdict:
+            raise LoginError(400, "", Codes.MISSING_PARAM)
+
+        user = authdict["user"]
+        password = authdict["password"]
+        if not user.startswith('@'):
+            user = UserID.create(user, self.hs.hostname).to_string()
+
+        user_info = yield self.store.get_user_by_id(user_id=user)
+        if not user_info:
+            logger.warn("Attempted to login as %s but they do not exist", user)
+            raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
+
+        stored_hash = user_info[0]["password_hash"]
+        if bcrypt.checkpw(password, stored_hash):
+            defer.returnValue(user)
+        else:
+            logger.warn("Failed password login for user %s", user)
+            raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
+
+    @defer.inlineCallbacks
+    def _check_recaptcha(self, authdict, clientip):
+        try:
+            user_response = authdict["response"]
+        except KeyError:
+            # Client tried to provide captcha but didn't give the parameter:
+            # bad request.
+            raise LoginError(
+                400, "Captcha response is required",
+                errcode=Codes.CAPTCHA_NEEDED
+            )
+
+        logger.info(
+            "Submitting recaptcha response %s with remoteip %s",
+            user_response, clientip
+        )
+
+        # TODO: get this from the homeserver rather than creating a new one for
+        # each request
+        try:
+            client = SimpleHttpClient(self.hs)
+            data = yield client.post_urlencoded_get_json(
+                "https://www.google.com/recaptcha/api/siteverify",
+                args={
+                    'secret': self.hs.config.recaptcha_private_key,
+                    'response': user_response,
+                    'remoteip': clientip,
+                }
+            )
+        except PartialDownloadError as pde:
+            # Twisted is silly
+            data = pde.response
+        resp_body = simplejson.loads(data)
+        if 'success' in resp_body and resp_body['success']:
+            defer.returnValue(True)
+        raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
+
+    @defer.inlineCallbacks
+    def _check_email_identity(self, authdict, _):
+        yield run_on_reactor()
+
+        if 'threepid_creds' not in authdict:
+            raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM)
+
+        threepid_creds = authdict['threepid_creds']
+        identity_handler = self.hs.get_handlers().identity_handler
+
+        logger.info("Getting validated threepid. threepidcreds: %r" % (threepid_creds,))
+        threepid = yield identity_handler.threepid_from_creds(threepid_creds)
+
+        if not threepid:
+            raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
+
+        threepid['threepid_creds'] = authdict['threepid_creds']
+
+        defer.returnValue(threepid)
+
+    @defer.inlineCallbacks
+    def _check_dummy_auth(self, authdict, _):
+        yield run_on_reactor()
+        defer.returnValue(True)
+
+    def _get_params_recaptcha(self):
+        return {"public_key": self.hs.config.recaptcha_public_key}
+
+    def _auth_dict_for_flows(self, flows, session):
+        public_flows = []
+        for f in flows:
+            public_flows.append(f)
+
+        get_params = {
+            LoginType.RECAPTCHA: self._get_params_recaptcha,
+        }
+
+        params = {}
+
+        for f in public_flows:
+            for stage in f:
+                if stage in get_params and stage not in params:
+                    params[stage] = get_params[stage]()
+
+        return {
+            "session": session['id'],
+            "flows": [{"stages": f} for f in public_flows],
+            "params": params
+        }
+
+    def _get_session_info(self, session_id):
+        if session_id not in self.sessions:
+            session_id = None
+
+        if not session_id:
+            # create a new session
+            while session_id is None or session_id in self.sessions:
+                session_id = stringutils.random_string(24)
+            self.sessions[session_id] = {
+                "id": session_id,
+            }
+
+        return self.sessions[session_id]
+
+    def _save_session(self, session):
+        # TODO: Persistent storage
+        logger.debug("Saving session %s", session)
+        self.sessions[session["id"]] = session
+
+    def _remove_session(self, session):
+        logger.debug("Removing session %s", session)
+        del self.sessions[session["id"]]
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 8aceac28cf..98148c13d7 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -179,7 +179,7 @@ class FederationHandler(BaseHandler):
         # it's probably a good idea to mark it as not in retry-state
         # for sending (although this is a bit of a leap)
         retry_timings = yield self.store.get_destination_retry_timings(origin)
-        if (retry_timings and retry_timings.retry_last_ts):
+        if retry_timings and retry_timings["retry_last_ts"]:
             self.store.set_destination_retry_timings(origin, 0, 0)
 
         room = yield self.store.get_room(event.room_id)
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
new file mode 100644
index 0000000000..ad8246b58c
--- /dev/null
+++ b/synapse/handlers/identity.py
@@ -0,0 +1,88 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Utilities for interacting with Identity Servers"""
+from twisted.internet import defer
+
+from synapse.api.errors import (
+    CodeMessageException
+)
+from ._base import BaseHandler
+from synapse.http.client import SimpleHttpClient
+from synapse.util.async import run_on_reactor
+
+import json
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class IdentityHandler(BaseHandler):
+
+    def __init__(self, hs):
+        super(IdentityHandler, self).__init__(hs)
+
+    @defer.inlineCallbacks
+    def threepid_from_creds(self, creds):
+        yield run_on_reactor()
+
+        # TODO: get this from the homeserver rather than creating a new one for
+        # each request
+        http_client = SimpleHttpClient(self.hs)
+        # XXX: make this configurable!
+        # trustedIdServers = ['matrix.org', 'localhost:8090']
+        trustedIdServers = ['matrix.org']
+        if not creds['id_server'] in trustedIdServers:
+            logger.warn('%s is not a trusted ID server: rejecting 3pid ' +
+                        'credentials', creds['id_server'])
+            defer.returnValue(None)
+
+        data = {}
+        try:
+            data = yield http_client.get_json(
+                "https://%s%s" % (
+                    creds['id_server'],
+                    "/_matrix/identity/api/v1/3pid/getValidated3pid"
+                ),
+                {'sid': creds['sid'], 'client_secret': creds['client_secret']}
+            )
+        except CodeMessageException as e:
+            data = json.loads(e.msg)
+
+        if 'medium' in data:
+            defer.returnValue(data)
+        defer.returnValue(None)
+
+    @defer.inlineCallbacks
+    def bind_threepid(self, creds, mxid):
+        yield run_on_reactor()
+        logger.debug("binding threepid %r to %s", creds, mxid)
+        http_client = SimpleHttpClient(self.hs)
+        data = None
+        try:
+            data = yield http_client.post_urlencoded_get_json(
+                "https://%s%s" % (
+                    creds['id_server'], "/_matrix/identity/api/v1/3pid/bind"
+                ),
+                {
+                    'sid': creds['sid'],
+                    'client_secret': creds['client_secret'],
+                    'mxid': mxid,
+                }
+            )
+            logger.debug("bound threepid %r to %s", creds, mxid)
+        except CodeMessageException as e:
+            data = json.loads(e.msg)
+        defer.returnValue(data)
diff --git a/synapse/handlers/login.py b/synapse/handlers/login.py
index 7447800460..91d87d503d 100644
--- a/synapse/handlers/login.py
+++ b/synapse/handlers/login.py
@@ -16,13 +16,9 @@
 from twisted.internet import defer
 
 from ._base import BaseHandler
-from synapse.api.errors import LoginError, Codes, CodeMessageException
-from synapse.http.client import SimpleHttpClient
-from synapse.util.emailutils import EmailException
-import synapse.util.emailutils as emailutils
+from synapse.api.errors import LoginError, Codes
 
 import bcrypt
-import json
 import logging
 
 logger = logging.getLogger(__name__)
@@ -57,7 +53,7 @@ class LoginHandler(BaseHandler):
             logger.warn("Attempted to login as %s but they do not exist", user)
             raise LoginError(403, "", errcode=Codes.FORBIDDEN)
 
-        stored_hash = user_info[0]["password_hash"]
+        stored_hash = user_info["password_hash"]
         if bcrypt.checkpw(password, stored_hash):
             # generate an access token and store it.
             token = self.reg_handler._generate_token(user)
@@ -69,48 +65,19 @@ class LoginHandler(BaseHandler):
             raise LoginError(403, "", errcode=Codes.FORBIDDEN)
 
     @defer.inlineCallbacks
-    def reset_password(self, user_id, email):
-        is_valid = yield self._check_valid_association(user_id, email)
-        logger.info("reset_password user=%s email=%s valid=%s", user_id, email,
-                    is_valid)
-        if is_valid:
-            try:
-                # send an email out
-                emailutils.send_email(
-                    smtp_server=self.hs.config.email_smtp_server,
-                    from_addr=self.hs.config.email_from_address,
-                    to_addr=email,
-                    subject="Password Reset",
-                    body="TODO."
-                )
-            except EmailException as e:
-                logger.exception(e)
+    def set_password(self, user_id, newpassword, token_id=None):
+        password_hash = bcrypt.hashpw(newpassword, bcrypt.gensalt())
 
-    @defer.inlineCallbacks
-    def _check_valid_association(self, user_id, email):
-        identity = yield self._query_email(email)
-        if identity and "mxid" in identity:
-            if identity["mxid"] == user_id:
-                defer.returnValue(True)
-                return
-        defer.returnValue(False)
+        yield self.store.user_set_password_hash(user_id, password_hash)
+        yield self.store.user_delete_access_tokens_apart_from(user_id, token_id)
+        yield self.hs.get_pusherpool().remove_pushers_by_user_access_token(
+            user_id, token_id
+        )
+        yield self.store.flush_user(user_id)
 
     @defer.inlineCallbacks
-    def _query_email(self, email):
-        http_client = SimpleHttpClient(self.hs)
-        try:
-            data = yield http_client.get_json(
-                # TODO FIXME This should be configurable.
-                # XXX: ID servers need to use HTTPS
-                "http://%s%s" % (
-                    "matrix.org:8090", "/_matrix/identity/api/v1/lookup"
-                ),
-                {
-                    'medium': 'email',
-                    'address': email
-                }
-            )
-            defer.returnValue(data)
-        except CodeMessageException as e:
-            data = json.loads(e.msg)
-            defer.returnValue(data)
+    def add_threepid(self, user_id, medium, address, validated_at):
+        yield self.store.user_add_threepid(
+            user_id, medium, address, validated_at,
+            self.hs.get_clock().time_msec()
+        )
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 7b9685be7f..9667bb8674 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -274,7 +274,8 @@ class MessageHandler(BaseHandler):
         if limit is None:
             limit = 10
 
-        for event in room_list:
+        @defer.inlineCallbacks
+        def handle_room(event):
             d = {
                 "room_id": event.room_id,
                 "membership": event.membership,
@@ -290,12 +291,19 @@ class MessageHandler(BaseHandler):
             rooms_ret.append(d)
 
             if event.membership != Membership.JOIN:
-                continue
+                return
             try:
-                messages, token = yield self.store.get_recent_events_for_room(
-                    event.room_id,
-                    limit=limit,
-                    end_token=now_token.room_key,
+                (messages, token), current_state = yield defer.gatherResults(
+                    [
+                        self.store.get_recent_events_for_room(
+                            event.room_id,
+                            limit=limit,
+                            end_token=now_token.room_key,
+                        ),
+                        self.state_handler.get_current_state(
+                            event.room_id
+                        ),
+                    ]
                 )
 
                 start_token = now_token.copy_and_replace("room_key", token[0])
@@ -311,9 +319,6 @@ class MessageHandler(BaseHandler):
                     "end": end_token.to_string(),
                 }
 
-                current_state = yield self.state_handler.get_current_state(
-                    event.room_id
-                )
                 d["state"] = [
                     serialize_event(c, time_now, as_client_event)
                     for c in current_state.values()
@@ -321,6 +326,11 @@ class MessageHandler(BaseHandler):
             except:
                 logger.exception("Failed to get snapshot")
 
+        yield defer.gatherResults(
+            [handle_room(e) for e in room_list],
+            consumeErrors=True
+        )
+
         ret = {
             "rooms": rooms_ret,
             "presence": presence,
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 571eacd343..42cd528908 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -858,22 +858,24 @@ class PresenceEventSource(object):
 
         presence = self.hs.get_handlers().presence_handler
         cachemap = presence._user_cachemap
+
+        max_serial = presence._user_cachemap_latest_serial
+
         clock = self.clock
-        latest_serial = None
+        latest_serial = 0
 
         updates = []
         # TODO(paul): use a DeferredList ? How to limit concurrency.
         for observed_user in cachemap.keys():
             cached = cachemap[observed_user]
 
-            if cached.serial <= from_key:
+            if cached.serial <= from_key or cached.serial > max_serial:
                 continue
 
             if not (yield self.is_visible(observer_user, observed_user)):
                 continue
 
-            if latest_serial is None or cached.serial > latest_serial:
-                latest_serial = cached.serial
+            latest_serial = max(cached.serial, latest_serial)
             updates.append(cached.make_event(user=observed_user, clock=clock))
 
         # TODO(paul): limit
@@ -882,6 +884,10 @@ class PresenceEventSource(object):
             if serial < from_key:
                 break
 
+            if serial > max_serial:
+                continue
+
+            latest_serial = max(latest_serial, serial)
             for u in user_ids:
                 updates.append({
                     "type": "m.presence",
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index c25e321099..7b68585a17 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -18,18 +18,15 @@ from twisted.internet import defer
 
 from synapse.types import UserID
 from synapse.api.errors import (
-    AuthError, Codes, SynapseError, RegistrationError, InvalidCaptchaError,
-    CodeMessageException
+    AuthError, Codes, SynapseError, RegistrationError, InvalidCaptchaError
 )
 from ._base import BaseHandler
 import synapse.util.stringutils as stringutils
 from synapse.util.async import run_on_reactor
-from synapse.http.client import SimpleHttpClient
 from synapse.http.client import CaptchaServerHttpClient
 
 import base64
 import bcrypt
-import json
 import logging
 import urllib
 
@@ -45,6 +42,30 @@ class RegistrationHandler(BaseHandler):
         self.distributor.declare("registered_user")
 
     @defer.inlineCallbacks
+    def check_username(self, localpart):
+        yield run_on_reactor()
+
+        if urllib.quote(localpart) != localpart:
+            raise SynapseError(
+                400,
+                "User ID must only contain characters which do not"
+                " require URL encoding."
+            )
+
+        user = UserID(localpart, self.hs.hostname)
+        user_id = user.to_string()
+
+        yield self.check_user_id_is_valid(user_id)
+
+        u = yield self.store.get_user_by_id(user_id)
+        if u:
+            raise SynapseError(
+                400,
+                "User ID already taken.",
+                errcode=Codes.USER_IN_USE,
+            )
+
+    @defer.inlineCallbacks
     def register(self, localpart=None, password=None):
         """Registers a new client on the server.
 
@@ -64,18 +85,11 @@ class RegistrationHandler(BaseHandler):
             password_hash = bcrypt.hashpw(password, bcrypt.gensalt())
 
         if localpart:
-            if localpart and urllib.quote(localpart) != localpart:
-                raise SynapseError(
-                    400,
-                    "User ID must only contain characters which do not"
-                    " require URL encoding."
-                )
+            yield self.check_username(localpart)
 
             user = UserID(localpart, self.hs.hostname)
             user_id = user.to_string()
 
-            yield self.check_user_id_is_valid(user_id)
-
             token = self._generate_token(user_id)
             yield self.store.register(
                 user_id=user_id,
@@ -157,7 +171,11 @@ class RegistrationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def check_recaptcha(self, ip, private_key, challenge, response):
-        """Checks a recaptcha is correct."""
+        """
+        Checks a recaptcha is correct.
+
+        Used only by c/s api v1
+        """
 
         captcha_response = yield self._validate_captcha(
             ip,
@@ -176,13 +194,18 @@ class RegistrationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def register_email(self, threepidCreds):
-        """Registers emails with an identity server."""
+        """
+        Registers emails with an identity server.
+
+        Used only by c/s api v1
+        """
 
         for c in threepidCreds:
             logger.info("validating theeepidcred sid %s on id server %s",
                         c['sid'], c['idServer'])
             try:
-                threepid = yield self._threepid_from_creds(c)
+                identity_handler = self.hs.get_handlers().identity_handler
+                threepid = yield identity_handler.threepid_from_creds(c)
             except:
                 logger.exception("Couldn't validate 3pid")
                 raise RegistrationError(400, "Couldn't validate 3pid")
@@ -194,12 +217,16 @@ class RegistrationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def bind_emails(self, user_id, threepidCreds):
-        """Links emails with a user ID and informs an identity server."""
+        """Links emails with a user ID and informs an identity server.
+
+        Used only by c/s api v1
+        """
 
         # Now we have a matrix ID, bind it to the threepids we were given
         for c in threepidCreds:
+            identity_handler = self.hs.get_handlers().identity_handler
             # XXX: This should be a deferred list, shouldn't it?
-            yield self._bind_threepid(c, user_id)
+            yield identity_handler.bind_threepid(c, user_id)
 
     @defer.inlineCallbacks
     def check_user_id_is_valid(self, user_id):
@@ -227,61 +254,11 @@ class RegistrationHandler(BaseHandler):
         return "-" + stringutils.random_string(18)
 
     @defer.inlineCallbacks
-    def _threepid_from_creds(self, creds):
-        # TODO: get this from the homeserver rather than creating a new one for
-        # each request
-        http_client = SimpleHttpClient(self.hs)
-        # XXX: make this configurable!
-        trustedIdServers = ['matrix.org:8090', 'matrix.org']
-        if not creds['idServer'] in trustedIdServers:
-            logger.warn('%s is not a trusted ID server: rejecting 3pid ' +
-                        'credentials', creds['idServer'])
-            defer.returnValue(None)
-
-        data = {}
-        try:
-            data = yield http_client.get_json(
-                # XXX: This should be HTTPS
-                "http://%s%s" % (
-                    creds['idServer'],
-                    "/_matrix/identity/api/v1/3pid/getValidated3pid"
-                ),
-                {'sid': creds['sid'], 'clientSecret': creds['clientSecret']}
-            )
-        except CodeMessageException as e:
-            data = json.loads(e.msg)
-
-        if 'medium' in data:
-            defer.returnValue(data)
-        defer.returnValue(None)
-
-    @defer.inlineCallbacks
-    def _bind_threepid(self, creds, mxid):
-        yield
-        logger.debug("binding threepid")
-        http_client = SimpleHttpClient(self.hs)
-        data = None
-        try:
-            data = yield http_client.post_urlencoded_get_json(
-                # XXX: Change when ID servers are all HTTPS
-                "http://%s%s" % (
-                    creds['idServer'], "/_matrix/identity/api/v1/3pid/bind"
-                ),
-                {
-                    'sid': creds['sid'],
-                    'clientSecret': creds['clientSecret'],
-                    'mxid': mxid,
-                }
-            )
-            logger.debug("bound threepid")
-        except CodeMessageException as e:
-            data = json.loads(e.msg)
-        defer.returnValue(data)
-
-    @defer.inlineCallbacks
     def _validate_captcha(self, ip_addr, private_key, challenge, response):
         """Validates the captcha provided.
 
+        Used only by c/s api v1
+
         Returns:
             dict: Containing 'valid'(bool) and 'error_url'(str) if invalid.
 
@@ -299,6 +276,9 @@ class RegistrationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def _submit_captcha(self, ip_addr, private_key, challenge, response):
+        """
+        Used only by c/s api v1
+        """
         # TODO: get this from the homeserver rather than creating a new one for
         # each request
         client = CaptchaServerHttpClient(self.hs)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index f9fc4a9c98..47456a28e9 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -213,7 +213,8 @@ class RoomCreationHandler(BaseHandler):
                 "state_default": 50,
                 "ban": 50,
                 "kick": 50,
-                "redact": 50
+                "redact": 50,
+                "invite": 0,
             },
         )
 
@@ -311,25 +312,6 @@ class RoomMemberHandler(BaseHandler):
         defer.returnValue(chunk_data)
 
     @defer.inlineCallbacks
-    def get_room_member(self, room_id, member_user_id, auth_user_id):
-        """Retrieve a room member from a room.
-
-        Args:
-            room_id : The room the member is in.
-            member_user_id : The member's user ID
-            auth_user_id : The user ID of the user making this request.
-        Returns:
-            The room member, or None if this member does not exist.
-        Raises:
-            SynapseError if something goes wrong.
-        """
-        yield self.auth.check_joined_room(room_id, auth_user_id)
-
-        member = yield self.store.get_room_member(user_id=member_user_id,
-                                                  room_id=room_id)
-        defer.returnValue(member)
-
-    @defer.inlineCallbacks
     def change_membership(self, event, context, do_auth=True):
         """ Change the membership status of a user in a room.
 
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 2ae1c4d3a4..e8a5dedab4 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -200,6 +200,8 @@ class CaptchaServerHttpClient(SimpleHttpClient):
     """
     Separate HTTP client for talking to google's captcha servers
     Only slightly special because accepts partial download responses
+
+    used only by c/s api v1
     """
 
     @defer.inlineCallbacks
diff --git a/synapse/http/server.py b/synapse/http/server.py
index b3706889ab..05636e683b 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -131,10 +131,10 @@ class HttpServer(object):
     """
 
     def register_path(self, method, path_pattern, callback):
-        """ Register a callback that get's fired if we receive a http request
+        """ Register a callback that gets fired if we receive a http request
         with the given method for a path that matches the given regex.
 
-        If the regex contains groups these get's passed to the calback via
+        If the regex contains groups these gets passed to the calback via
         an unpacked tuple.
 
         Args:
@@ -153,6 +153,13 @@ class JsonResource(HttpServer, resource.Resource):
     Resources.
 
     Register callbacks via register_path()
+
+    Callbacks can return a tuple of status code and a dict in which case the
+    the dict will automatically be sent to the client as a JSON object.
+
+    The JsonResource is primarily intended for returning JSON, but callbacks
+    may send something other than JSON, they may do so by using the methods
+    on the request object and instead returning None.
     """
 
     isLeaf = True
@@ -185,9 +192,8 @@ class JsonResource(HttpServer, resource.Resource):
             interface=self.hs.config.bind_host
         )
 
-    # Gets called by twisted
     def render(self, request):
-        """ This get's called by twisted every time someone sends us a request.
+        """ This gets called by twisted every time someone sends us a request.
         """
         self._async_render(request)
         return server.NOT_DONE_YET
@@ -195,7 +201,7 @@ class JsonResource(HttpServer, resource.Resource):
     @request_handler
     @defer.inlineCallbacks
     def _async_render(self, request):
-        """ This get's called by twisted every time someone sends us a request.
+        """ This gets called from render() every time someone sends us a request.
             This checks if anyone has registered a callback for that method and
             path.
         """
@@ -227,9 +233,11 @@ class JsonResource(HttpServer, resource.Resource):
                 urllib.unquote(u).decode("UTF-8") for u in m.groups()
             ]
 
-            code, response = yield callback(request, *args)
+            callback_return = yield callback(request, *args)
+            if callback_return is not None:
+                code, response = callback_return
+                self._send_response(request, code, response)
 
-            self._send_response(request, code, response)
             response_timer.inc_by(
                 self.clock.time_msec() - start, request.method, servlet_classname
             )
diff --git a/synapse/notifier.py b/synapse/notifier.py
index d750a6fcf7..ea854482b5 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -98,7 +98,7 @@ class _NotificationListener(object):
             try:
                 notifier.clock.cancel_call_later(self.timer)
             except:
-                logger.exception("Failed to cancel notifier timer")
+                logger.warn("Failed to cancel notifier timer")
 
 
 class Notifier(object):
diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index 0727f772a5..5575c847f9 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -253,7 +253,8 @@ class Pusher(object):
                 self.user_name, config, timeout=0)
             self.last_token = chunk['end']
             self.store.update_pusher_last_token(
-                self.app_id, self.pushkey, self.last_token)
+                self.app_id, self.pushkey, self.user_name, self.last_token
+            )
             logger.info("Pusher %s for user %s starting from token %s",
                         self.pushkey, self.user_name, self.last_token)
 
@@ -314,7 +315,7 @@ class Pusher(object):
                                 pk
                             )
                             yield self.hs.get_pusherpool().remove_pusher(
-                                self.app_id, pk
+                                self.app_id, pk, self.user_name
                             )
 
             if not self.alive:
@@ -326,6 +327,7 @@ class Pusher(object):
                 self.store.update_pusher_last_token_and_success(
                     self.app_id,
                     self.pushkey,
+                    self.user_name,
                     self.last_token,
                     self.clock.time_msec()
                 )
@@ -334,6 +336,7 @@ class Pusher(object):
                     self.store.update_pusher_failing_since(
                         self.app_id,
                         self.pushkey,
+                        self.user_name,
                         self.failing_since)
             else:
                 if not self.failing_since:
@@ -341,6 +344,7 @@ class Pusher(object):
                     self.store.update_pusher_failing_since(
                         self.app_id,
                         self.pushkey,
+                        self.user_name,
                         self.failing_since
                     )
 
@@ -358,6 +362,7 @@ class Pusher(object):
                     self.store.update_pusher_last_token(
                         self.app_id,
                         self.pushkey,
+                        self.user_name,
                         self.last_token
                     )
 
@@ -365,6 +370,7 @@ class Pusher(object):
                     self.store.update_pusher_failing_since(
                         self.app_id,
                         self.pushkey,
+                        self.user_name,
                         self.failing_since
                     )
                 else:
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 90babd7224..58fe6ad747 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -19,10 +19,7 @@ from twisted.internet import defer
 from httppusher import HttpPusher
 from synapse.push import PusherConfigException
 
-from syutil.jsonutil import encode_canonical_json
-
 import logging
-import simplejson as json
 
 logger = logging.getLogger(__name__)
 
@@ -52,12 +49,10 @@ class PusherPool:
     @defer.inlineCallbacks
     def start(self):
         pushers = yield self.store.get_all_pushers()
-        for p in pushers:
-            p['data'] = json.loads(p['data'])
         self._start_pushers(pushers)
 
     @defer.inlineCallbacks
-    def add_pusher(self, user_name, profile_tag, kind, app_id,
+    def add_pusher(self, user_name, access_token, profile_tag, kind, app_id,
                    app_display_name, device_display_name, pushkey, lang, data):
         # we try to create the pusher just to validate the config: it
         # will then get pulled out of the database,
@@ -71,7 +66,7 @@ class PusherPool:
             "app_display_name": app_display_name,
             "device_display_name": device_display_name,
             "pushkey": pushkey,
-            "pushkey_ts": self.hs.get_clock().time_msec(),
+            "ts": self.hs.get_clock().time_msec(),
             "lang": lang,
             "data": data,
             "last_token": None,
@@ -79,17 +74,50 @@ class PusherPool:
             "failing_since": None
         })
         yield self._add_pusher_to_store(
-            user_name, profile_tag, kind, app_id,
+            user_name, access_token, profile_tag, kind, app_id,
             app_display_name, device_display_name,
             pushkey, lang, data
         )
 
     @defer.inlineCallbacks
-    def _add_pusher_to_store(self, user_name, profile_tag, kind, app_id,
-                             app_display_name, device_display_name,
+    def remove_pushers_by_app_id_and_pushkey_not_user(self, app_id, pushkey,
+                                                      not_user_id):
+        to_remove = yield self.store.get_pushers_by_app_id_and_pushkey(
+            app_id, pushkey
+        )
+        for p in to_remove:
+            if p['user_name'] != not_user_id:
+                logger.info(
+                    "Removing pusher for app id %s, pushkey %s, user %s",
+                    app_id, pushkey, p['user_name']
+                )
+                self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
+
+    @defer.inlineCallbacks
+    def remove_pushers_by_user_access_token(self, user_id, not_access_token_id):
+        all = yield self.store.get_all_pushers()
+        logger.info(
+            "Removing all pushers for user %s except access token %s",
+            user_id, not_access_token_id
+        )
+        for p in all:
+            if (
+                p['user_name'] == user_id and
+                p['access_token'] != not_access_token_id
+            ):
+                logger.info(
+                    "Removing pusher for app id %s, pushkey %s, user %s",
+                    p['app_id'], p['pushkey'], p['user_name']
+                )
+                self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
+
+    @defer.inlineCallbacks
+    def _add_pusher_to_store(self, user_name, access_token, profile_tag, kind,
+                             app_id, app_display_name, device_display_name,
                              pushkey, lang, data):
         yield self.store.add_pusher(
             user_name=user_name,
+            access_token=access_token,
             profile_tag=profile_tag,
             kind=kind,
             app_id=app_id,
@@ -98,9 +126,9 @@ class PusherPool:
             pushkey=pushkey,
             pushkey_ts=self.hs.get_clock().time_msec(),
             lang=lang,
-            data=encode_canonical_json(data).decode("UTF-8"),
+            data=data,
         )
-        self._refresh_pusher((app_id, pushkey))
+        self._refresh_pusher(app_id, pushkey, user_name)
 
     def _create_pusher(self, pusherdict):
         if pusherdict['kind'] == 'http':
@@ -112,7 +140,7 @@ class PusherPool:
                 app_display_name=pusherdict['app_display_name'],
                 device_display_name=pusherdict['device_display_name'],
                 pushkey=pusherdict['pushkey'],
-                pushkey_ts=pusherdict['pushkey_ts'],
+                pushkey_ts=pusherdict['ts'],
                 data=pusherdict['data'],
                 last_token=pusherdict['last_token'],
                 last_success=pusherdict['last_success'],
@@ -125,30 +153,42 @@ class PusherPool:
             )
 
     @defer.inlineCallbacks
-    def _refresh_pusher(self, app_id_pushkey):
-        p = yield self.store.get_pushers_by_app_id_and_pushkey(
-            app_id_pushkey
+    def _refresh_pusher(self, app_id, pushkey, user_name):
+        resultlist = yield self.store.get_pushers_by_app_id_and_pushkey(
+            app_id, pushkey
         )
-        p['data'] = json.loads(p['data'])
 
-        self._start_pushers([p])
+        p = None
+        for r in resultlist:
+            if r['user_name'] == user_name:
+                p = r
+
+        if p:
+
+            self._start_pushers([p])
 
     def _start_pushers(self, pushers):
         logger.info("Starting %d pushers", len(pushers))
         for pusherdict in pushers:
             p = self._create_pusher(pusherdict)
             if p:
-                fullid = "%s:%s" % (pusherdict['app_id'], pusherdict['pushkey'])
+                fullid = "%s:%s:%s" % (
+                    pusherdict['app_id'],
+                    pusherdict['pushkey'],
+                    pusherdict['user_name']
+                )
                 if fullid in self.pushers:
                     self.pushers[fullid].stop()
                 self.pushers[fullid] = p
                 p.start()
 
     @defer.inlineCallbacks
-    def remove_pusher(self, app_id, pushkey):
-        fullid = "%s:%s" % (app_id, pushkey)
+    def remove_pusher(self, app_id, pushkey, user_name):
+        fullid = "%s:%s:%s" % (app_id, pushkey, user_name)
         if fullid in self.pushers:
             logger.info("Stopping pusher %s", fullid)
             self.pushers[fullid].stop()
             del self.pushers[fullid]
-        yield self.store.delete_pusher_by_app_id_pushkey(app_id, pushkey)
+        yield self.store.delete_pusher_by_app_id_pushkey_user_name(
+            app_id, pushkey, user_name
+        )
diff --git a/synapse/rest/client/v1/base.py b/synapse/rest/client/v1/base.py
index 72332bdb10..504a5e432f 100644
--- a/synapse/rest/client/v1/base.py
+++ b/synapse/rest/client/v1/base.py
@@ -48,5 +48,5 @@ class ClientV1RestServlet(RestServlet):
         self.hs = hs
         self.handlers = hs.get_handlers()
         self.builder_factory = hs.get_event_builder_factory()
-        self.auth = hs.get_auth()
+        self.auth = hs.get_v1auth()
         self.txns = HttpTransactionStore()
diff --git a/synapse/rest/client/v1/pusher.py b/synapse/rest/client/v1/pusher.py
index 6045e86f34..c83287c028 100644
--- a/synapse/rest/client/v1/pusher.py
+++ b/synapse/rest/client/v1/pusher.py
@@ -27,7 +27,7 @@ class PusherRestServlet(ClientV1RestServlet):
 
     @defer.inlineCallbacks
     def on_POST(self, request):
-        user, _ = yield self.auth.get_user_by_req(request)
+        user, client = yield self.auth.get_user_by_req(request)
 
         content = _parse_json(request)
 
@@ -37,7 +37,7 @@ class PusherRestServlet(ClientV1RestServlet):
                 and 'kind' in content and
                 content['kind'] is None):
             yield pusher_pool.remove_pusher(
-                content['app_id'], content['pushkey']
+                content['app_id'], content['pushkey'], user_name=user.to_string()
             )
             defer.returnValue((200, {}))
 
@@ -51,9 +51,21 @@ class PusherRestServlet(ClientV1RestServlet):
             raise SynapseError(400, "Missing parameters: "+','.join(missing),
                                errcode=Codes.MISSING_PARAM)
 
+        append = False
+        if 'append' in content:
+            append = content['append']
+
+        if not append:
+            yield pusher_pool.remove_pushers_by_app_id_and_pushkey_not_user(
+                app_id=content['app_id'],
+                pushkey=content['pushkey'],
+                not_user_id=user.to_string()
+            )
+
         try:
             yield pusher_pool.add_pusher(
                 user_name=user.to_string(),
+                access_token=client.token_id,
                 profile_tag=content['profile_tag'],
                 kind=content['kind'],
                 app_id=content['app_id'],
diff --git a/synapse/rest/client/v2_alpha/__init__.py b/synapse/rest/client/v2_alpha/__init__.py
index bca65f2a6a..28d95b2729 100644
--- a/synapse/rest/client/v2_alpha/__init__.py
+++ b/synapse/rest/client/v2_alpha/__init__.py
@@ -15,7 +15,10 @@
 
 from . import (
     sync,
-    filter
+    filter,
+    account,
+    register,
+    auth
 )
 
 from synapse.http.server import JsonResource
@@ -32,3 +35,6 @@ class ClientV2AlphaRestResource(JsonResource):
     def register_servlets(client_resource, hs):
         sync.register_servlets(hs, client_resource)
         filter.register_servlets(hs, client_resource)
+        account.register_servlets(hs, client_resource)
+        register.register_servlets(hs, client_resource)
+        auth.register_servlets(hs, client_resource)
diff --git a/synapse/rest/client/v2_alpha/_base.py b/synapse/rest/client/v2_alpha/_base.py
index 22dc5cb862..4540e8dcf7 100644
--- a/synapse/rest/client/v2_alpha/_base.py
+++ b/synapse/rest/client/v2_alpha/_base.py
@@ -17,9 +17,11 @@
 """
 
 from synapse.api.urls import CLIENT_V2_ALPHA_PREFIX
+from synapse.api.errors import SynapseError
 import re
 
 import logging
+import simplejson
 
 
 logger = logging.getLogger(__name__)
@@ -36,3 +38,23 @@ def client_v2_pattern(path_regex):
         SRE_Pattern
     """
     return re.compile("^" + CLIENT_V2_ALPHA_PREFIX + path_regex)
+
+
+def parse_request_allow_empty(request):
+    content = request.content.read()
+    if content is None or content == '':
+        return None
+    try:
+        return simplejson.loads(content)
+    except simplejson.JSONDecodeError:
+        raise SynapseError(400, "Content not JSON.")
+
+
+def parse_json_dict_from_request(request):
+    try:
+        content = simplejson.loads(request.content.read())
+        if type(content) != dict:
+            raise SynapseError(400, "Content must be a JSON object.")
+        return content
+    except simplejson.JSONDecodeError:
+        raise SynapseError(400, "Content not JSON.")
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
new file mode 100644
index 0000000000..3e522ad39b
--- /dev/null
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -0,0 +1,159 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from synapse.api.constants import LoginType
+from synapse.api.errors import LoginError, SynapseError, Codes
+from synapse.http.servlet import RestServlet
+from synapse.util.async import run_on_reactor
+
+from ._base import client_v2_pattern, parse_json_dict_from_request
+
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+
+class PasswordRestServlet(RestServlet):
+    PATTERN = client_v2_pattern("/account/password")
+
+    def __init__(self, hs):
+        super(PasswordRestServlet, self).__init__()
+        self.hs = hs
+        self.auth = hs.get_auth()
+        self.auth_handler = hs.get_handlers().auth_handler
+        self.login_handler = hs.get_handlers().login_handler
+
+    @defer.inlineCallbacks
+    def on_POST(self, request):
+        yield run_on_reactor()
+
+        body = parse_json_dict_from_request(request)
+
+        authed, result, params = yield self.auth_handler.check_auth([
+            [LoginType.PASSWORD],
+            [LoginType.EMAIL_IDENTITY]
+        ], body)
+
+        if not authed:
+            defer.returnValue((401, result))
+
+        user_id = None
+
+        if LoginType.PASSWORD in result:
+            # if using password, they should also be logged in
+            auth_user, client = yield self.auth.get_user_by_req(request)
+            if auth_user.to_string() != result[LoginType.PASSWORD]:
+                raise LoginError(400, "", Codes.UNKNOWN)
+            user_id = auth_user.to_string()
+        elif LoginType.EMAIL_IDENTITY in result:
+            threepid = result[LoginType.EMAIL_IDENTITY]
+            if 'medium' not in threepid or 'address' not in threepid:
+                raise SynapseError(500, "Malformed threepid")
+            # if using email, we must know about the email they're authing with!
+            threepid_user = yield self.hs.get_datastore().get_user_by_threepid(
+                threepid['medium'], threepid['address']
+            )
+            if not threepid_user:
+                raise SynapseError(404, "Email address not found", Codes.NOT_FOUND)
+            user_id = threepid_user
+        else:
+            logger.error("Auth succeeded but no known type!", result.keys())
+            raise SynapseError(500, "", Codes.UNKNOWN)
+
+        if 'new_password' not in params:
+            raise SynapseError(400, "", Codes.MISSING_PARAM)
+        new_password = params['new_password']
+
+        yield self.login_handler.set_password(
+            user_id, new_password, None
+        )
+
+        defer.returnValue((200, {}))
+
+    def on_OPTIONS(self, _):
+        return 200, {}
+
+
+class ThreepidRestServlet(RestServlet):
+    PATTERN = client_v2_pattern("/account/3pid")
+
+    def __init__(self, hs):
+        super(ThreepidRestServlet, self).__init__()
+        self.hs = hs
+        self.login_handler = hs.get_handlers().login_handler
+        self.identity_handler = hs.get_handlers().identity_handler
+        self.auth = hs.get_auth()
+
+    @defer.inlineCallbacks
+    def on_GET(self, request):
+        yield run_on_reactor()
+
+        auth_user, _ = yield self.auth.get_user_by_req(request)
+
+        threepids = yield self.hs.get_datastore().user_get_threepids(
+            auth_user.to_string()
+        )
+
+        defer.returnValue((200, {'threepids': threepids}))
+
+    @defer.inlineCallbacks
+    def on_POST(self, request):
+        yield run_on_reactor()
+
+        body = parse_json_dict_from_request(request)
+
+        if 'threePidCreds' not in body:
+            raise SynapseError(400, "Missing param", Codes.MISSING_PARAM)
+        threePidCreds = body['threePidCreds']
+
+        auth_user, client = yield self.auth.get_user_by_req(request)
+
+        threepid = yield self.identity_handler.threepid_from_creds(threePidCreds)
+
+        if not threepid:
+            raise SynapseError(
+                400, "Failed to auth 3pid", Codes.THREEPID_AUTH_FAILED
+            )
+
+        for reqd in ['medium', 'address', 'validated_at']:
+            if reqd not in threepid:
+                logger.warn("Couldn't add 3pid: invalid response from ID sevrer")
+                raise SynapseError(500, "Invalid response from ID Server")
+
+        yield self.login_handler.add_threepid(
+            auth_user.to_string(),
+            threepid['medium'],
+            threepid['address'],
+            threepid['validated_at'],
+        )
+
+        if 'bind' in body and body['bind']:
+            logger.debug(
+                "Binding emails %s to %s",
+                threepid, auth_user.to_string()
+            )
+            yield self.identity_handler.bind_threepid(
+                threePidCreds, auth_user.to_string()
+            )
+
+        defer.returnValue((200, {}))
+
+
+def register_servlets(hs, http_server):
+    PasswordRestServlet(hs).register(http_server)
+    ThreepidRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/v2_alpha/auth.py b/synapse/rest/client/v2_alpha/auth.py
new file mode 100644
index 0000000000..4c726f05f5
--- /dev/null
+++ b/synapse/rest/client/v2_alpha/auth.py
@@ -0,0 +1,190 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from synapse.api.constants import LoginType
+from synapse.api.errors import SynapseError
+from synapse.api.urls import CLIENT_V2_ALPHA_PREFIX
+from synapse.http.servlet import RestServlet
+
+from ._base import client_v2_pattern
+
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+RECAPTCHA_TEMPLATE = """
+<html>
+<head>
+<title>Authentication</title>
+<meta name='viewport' content='width=device-width, initial-scale=1,
+    user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'>
+<script src="https://www.google.com/recaptcha/api.js"
+    async defer></script>
+<script src="//code.jquery.com/jquery-1.11.2.min.js"></script>
+<link rel="stylesheet" href="/_matrix/static/client/register/style.css">
+<script>
+function captchaDone() {
+    $('#registrationForm').submit();
+}
+</script>
+</head>
+<body>
+<form id="registrationForm" method="post" action="%(myurl)s">
+    <div>
+        <p>
+        Hello! We need to prevent computer programs and other automated
+        things from creating accounts on this server.
+        </p>
+        <p>
+        Please verify that you're not a robot.
+        </p>
+        <input type="hidden" name="session" value="%(session)s" />
+        <div class="g-recaptcha"
+            data-sitekey="%(sitekey)s"
+            data-callback="captchaDone">
+        </div>
+        <noscript>
+        <input type="submit" value="All Done" />
+        </noscript>
+        </div>
+    </div>
+</form>
+</body>
+</html>
+"""
+
+SUCCESS_TEMPLATE = """
+<html>
+<head>
+<title>Success!</title>
+<meta name='viewport' content='width=device-width, initial-scale=1,
+    user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'>
+<link rel="stylesheet" href="/_matrix/static/client/register/style.css">
+<script>
+if (window.onAuthDone != undefined) {
+    window.onAuthDone();
+}
+</script>
+</head>
+<body>
+    <div>
+        <p>Thank you</p>
+        <p>You may now close this window and return to the application</p>
+    </div>
+</body>
+</html>
+"""
+
+
+class AuthRestServlet(RestServlet):
+    """
+    Handles Client / Server API authentication in any situations where it
+    cannot be handled in the normal flow (with requests to the same endpoint).
+    Current use is for web fallback auth.
+    """
+    PATTERN = client_v2_pattern("/auth/(?P<stagetype>[\w\.]*)/fallback/web")
+
+    def __init__(self, hs):
+        super(AuthRestServlet, self).__init__()
+        self.hs = hs
+        self.auth = hs.get_auth()
+        self.auth_handler = hs.get_handlers().auth_handler
+        self.registration_handler = hs.get_handlers().registration_handler
+
+    @defer.inlineCallbacks
+    def on_GET(self, request, stagetype):
+        yield
+        if stagetype == LoginType.RECAPTCHA:
+            if ('session' not in request.args or
+                    len(request.args['session']) == 0):
+                raise SynapseError(400, "No session supplied")
+
+            session = request.args["session"][0]
+
+            html = RECAPTCHA_TEMPLATE % {
+                'session': session,
+                'myurl': "%s/auth/%s/fallback/web" % (
+                    CLIENT_V2_ALPHA_PREFIX, LoginType.RECAPTCHA
+                ),
+                'sitekey': self.hs.config.recaptcha_public_key,
+            }
+            html_bytes = html.encode("utf8")
+            request.setResponseCode(200)
+            request.setHeader(b"Content-Type", b"text/html; charset=utf-8")
+            request.setHeader(b"Server", self.hs.version_string)
+            request.setHeader(b"Content-Length", b"%d" % (len(html_bytes),))
+
+            request.write(html_bytes)
+            request.finish()
+            defer.returnValue(None)
+        else:
+            raise SynapseError(404, "Unknown auth stage type")
+
+    @defer.inlineCallbacks
+    def on_POST(self, request, stagetype):
+        yield
+        if stagetype == "m.login.recaptcha":
+            if ('g-recaptcha-response' not in request.args or
+                    len(request.args['g-recaptcha-response'])) == 0:
+                raise SynapseError(400, "No captcha response supplied")
+            if ('session' not in request.args or
+                    len(request.args['session'])) == 0:
+                raise SynapseError(400, "No session supplied")
+
+            session = request.args['session'][0]
+
+            authdict = {
+                'response': request.args['g-recaptcha-response'][0],
+                'session': session,
+            }
+
+            success = yield self.auth_handler.add_oob_auth(
+                LoginType.RECAPTCHA,
+                authdict,
+                self.hs.get_ip_from_request(request)
+            )
+
+            if success:
+                html = SUCCESS_TEMPLATE
+            else:
+                html = RECAPTCHA_TEMPLATE % {
+                    'session': session,
+                    'myurl': "%s/auth/%s/fallback/web" % (
+                        CLIENT_V2_ALPHA_PREFIX, LoginType.RECAPTCHA
+                    ),
+                    'sitekey': self.hs.config.recaptcha_public_key,
+                }
+            html_bytes = html.encode("utf8")
+            request.setResponseCode(200)
+            request.setHeader(b"Content-Type", b"text/html; charset=utf-8")
+            request.setHeader(b"Server", self.hs.version_string)
+            request.setHeader(b"Content-Length", b"%d" % (len(html_bytes),))
+
+            request.write(html_bytes)
+            request.finish()
+
+            defer.returnValue(None)
+        else:
+            raise SynapseError(404, "Unknown auth stage type")
+
+    def on_OPTIONS(self, _):
+        return 200, {}
+
+
+def register_servlets(hs, http_server):
+    AuthRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
new file mode 100644
index 0000000000..3640fb4a29
--- /dev/null
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -0,0 +1,183 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from synapse.api.constants import LoginType
+from synapse.api.errors import SynapseError, Codes
+from synapse.http.servlet import RestServlet
+
+from ._base import client_v2_pattern, parse_request_allow_empty
+
+import logging
+import hmac
+from hashlib import sha1
+from synapse.util.async import run_on_reactor
+
+
+# We ought to be using hmac.compare_digest() but on older pythons it doesn't
+# exist. It's a _really minor_ security flaw to use plain string comparison
+# because the timing attack is so obscured by all the other code here it's
+# unlikely to make much difference
+if hasattr(hmac, "compare_digest"):
+    compare_digest = hmac.compare_digest
+else:
+    compare_digest = lambda a, b: a == b
+
+
+logger = logging.getLogger(__name__)
+
+
+class RegisterRestServlet(RestServlet):
+    PATTERN = client_v2_pattern("/register")
+
+    def __init__(self, hs):
+        super(RegisterRestServlet, self).__init__()
+        self.hs = hs
+        self.auth = hs.get_auth()
+        self.auth_handler = hs.get_handlers().auth_handler
+        self.registration_handler = hs.get_handlers().registration_handler
+        self.identity_handler = hs.get_handlers().identity_handler
+        self.login_handler = hs.get_handlers().login_handler
+
+    @defer.inlineCallbacks
+    def on_POST(self, request):
+        yield run_on_reactor()
+
+        body = parse_request_allow_empty(request)
+        if 'password' not in body:
+            raise SynapseError(400, "", Codes.MISSING_PARAM)
+
+        if 'username' in body:
+            desired_username = body['username']
+            yield self.registration_handler.check_username(desired_username)
+
+        is_using_shared_secret = False
+        is_application_server = False
+
+        service = None
+        if 'access_token' in request.args:
+            service = yield self.auth.get_appservice_by_req(request)
+
+        if self.hs.config.enable_registration_captcha:
+            flows = [
+                [LoginType.RECAPTCHA],
+                [LoginType.EMAIL_IDENTITY, LoginType.RECAPTCHA]
+            ]
+        else:
+            flows = [
+                [LoginType.DUMMY],
+                [LoginType.EMAIL_IDENTITY]
+            ]
+
+        if service:
+            is_application_server = True
+        elif 'mac' in body:
+            # Check registration-specific shared secret auth
+            if 'username' not in body:
+                raise SynapseError(400, "", Codes.MISSING_PARAM)
+            self._check_shared_secret_auth(
+                body['username'], body['mac']
+            )
+            is_using_shared_secret = True
+        else:
+            authed, result, params = yield self.auth_handler.check_auth(
+                flows, body, self.hs.get_ip_from_request(request)
+            )
+
+            if not authed:
+                defer.returnValue((401, result))
+
+        can_register = (
+            not self.hs.config.disable_registration
+            or is_application_server
+            or is_using_shared_secret
+        )
+        if not can_register:
+            raise SynapseError(403, "Registration has been disabled")
+
+        if 'password' not in params:
+            raise SynapseError(400, "", Codes.MISSING_PARAM)
+        desired_username = params['username'] if 'username' in params else None
+        new_password = params['password']
+
+        (user_id, token) = yield self.registration_handler.register(
+            localpart=desired_username,
+            password=new_password
+        )
+
+        if LoginType.EMAIL_IDENTITY in result:
+            threepid = result[LoginType.EMAIL_IDENTITY]
+
+            for reqd in ['medium', 'address', 'validated_at']:
+                if reqd not in threepid:
+                    logger.info("Can't add incomplete 3pid")
+                else:
+                    yield self.login_handler.add_threepid(
+                        user_id,
+                        threepid['medium'],
+                        threepid['address'],
+                        threepid['validated_at'],
+                    )
+
+            if 'bind_email' in params and params['bind_email']:
+                logger.info("bind_email specified: binding")
+
+                emailThreepid = result[LoginType.EMAIL_IDENTITY]
+                threepid_creds = emailThreepid['threepid_creds']
+                logger.debug("Binding emails %s to %s" % (
+                    emailThreepid, user_id
+                ))
+                yield self.identity_handler.bind_threepid(threepid_creds, user_id)
+            else:
+                logger.info("bind_email not specified: not binding email")
+
+        result = {
+            "user_id": user_id,
+            "access_token": token,
+            "home_server": self.hs.hostname,
+        }
+
+        defer.returnValue((200, result))
+
+    def on_OPTIONS(self, _):
+        return 200, {}
+
+    def _check_shared_secret_auth(self, username, mac):
+        if not self.hs.config.registration_shared_secret:
+            raise SynapseError(400, "Shared secret registration is not enabled")
+
+        user = username.encode("utf-8")
+
+        # str() because otherwise hmac complains that 'unicode' does not
+        # have the buffer interface
+        got_mac = str(mac)
+
+        want_mac = hmac.new(
+            key=self.hs.config.registration_shared_secret,
+            msg=user,
+            digestmod=sha1,
+        ).hexdigest()
+
+        if compare_digest(want_mac, got_mac):
+            return True
+        else:
+            raise SynapseError(
+                403, "HMAC incorrect",
+            )
+
+
+def register_servlets(hs, http_server):
+    RegisterRestServlet(hs).register(http_server)
diff --git a/synapse/server.py b/synapse/server.py
index a602b425e3..d61a228c36 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -65,6 +65,7 @@ class BaseHomeServer(object):
         'replication_layer',
         'datastore',
         'handlers',
+        'v1auth',
         'auth',
         'rest_servlet_factory',
         'state_handler',
@@ -182,6 +183,15 @@ class HomeServer(BaseHomeServer):
     def build_auth(self):
         return Auth(self)
 
+    def build_v1auth(self):
+        orf = Auth(self)
+        # Matrix spec makes no reference to what HTTP status code is returned,
+        # but the V1 API uses 403 where it means 401, and the webclient
+        # relies on this behaviour, so V1 gets its own copy of the auth
+        # with backwards compat behaviour.
+        orf.TOKEN_NOT_FOUND_HTTP_STATUS = 403
+        return orf
+
     def build_state_handler(self):
         return StateHandler(self)
 
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index f4dec70393..61215bbc7b 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -51,7 +51,7 @@ logger = logging.getLogger(__name__)
 
 # Remember to update this number every time a change is made to database
 # schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 15
+SCHEMA_VERSION = 16
 
 dir_path = os.path.abspath(os.path.dirname(__file__))
 
@@ -104,14 +104,16 @@ class DataStore(RoomMemberStore, RoomStore,
 
         self.client_ip_last_seen.prefill(*key + (now,))
 
-        yield self._simple_insert(
+        yield self._simple_upsert(
             "user_ips",
-            {
-                "user": user.to_string(),
+            keyvalues={
+                "user_id": user.to_string(),
                 "access_token": access_token,
-                "device_id": device_id,
                 "ip": ip,
                 "user_agent": user_agent,
+            },
+            values={
+                "device_id": device_id,
                 "last_seen": now,
             },
             desc="insert_client_ip",
@@ -120,7 +122,7 @@ class DataStore(RoomMemberStore, RoomStore,
     def get_user_ip_and_agents(self, user):
         return self._simple_select_list(
             table="user_ips",
-            keyvalues={"user": user.to_string()},
+            keyvalues={"user_id": user.to_string()},
             retcols=[
                 "device_id", "access_token", "ip", "user_agent", "last_seen"
             ],
@@ -148,21 +150,23 @@ class UpgradeDatabaseException(PrepareDatabaseException):
     pass
 
 
-def prepare_database(db_conn):
+def prepare_database(db_conn, database_engine):
     """Prepares a database for usage. Will either create all necessary tables
     or upgrade from an older schema version.
     """
     try:
         cur = db_conn.cursor()
-        version_info = _get_or_create_schema_state(cur)
+        version_info = _get_or_create_schema_state(cur, database_engine)
 
         if version_info:
             user_version, delta_files, upgraded = version_info
-            _upgrade_existing_database(cur, user_version, delta_files, upgraded)
+            _upgrade_existing_database(
+                cur, user_version, delta_files, upgraded, database_engine
+            )
         else:
-            _setup_new_database(cur)
+            _setup_new_database(cur, database_engine)
 
-        cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,))
+        # cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,))
 
         cur.close()
         db_conn.commit()
@@ -171,7 +175,7 @@ def prepare_database(db_conn):
         raise
 
 
-def _setup_new_database(cur):
+def _setup_new_database(cur, database_engine):
     """Sets up the database by finding a base set of "full schemas" and then
     applying any necessary deltas.
 
@@ -225,31 +229,30 @@ def _setup_new_database(cur):
 
     directory_entries = os.listdir(sql_dir)
 
-    sql_script = "BEGIN TRANSACTION;\n"
     for filename in fnmatch.filter(directory_entries, "*.sql"):
         sql_loc = os.path.join(sql_dir, filename)
         logger.debug("Applying schema %s", sql_loc)
-        sql_script += read_schema(sql_loc)
-        sql_script += "\n"
-    sql_script += "COMMIT TRANSACTION;"
-    cur.executescript(sql_script)
+        executescript(cur, sql_loc)
 
     cur.execute(
-        "INSERT OR REPLACE INTO schema_version (version, upgraded)"
-        " VALUES (?,?)",
-        (max_current_ver, False)
+        database_engine.convert_param_style(
+            "INSERT INTO schema_version (version, upgraded)"
+            " VALUES (?,?)"
+        ),
+        (max_current_ver, False,)
     )
 
     _upgrade_existing_database(
         cur,
         current_version=max_current_ver,
         applied_delta_files=[],
-        upgraded=False
+        upgraded=False,
+        database_engine=database_engine,
     )
 
 
 def _upgrade_existing_database(cur, current_version, applied_delta_files,
-                               upgraded):
+                               upgraded, database_engine):
     """Upgrades an existing database.
 
     Delta files can either be SQL stored in *.sql files, or python modules
@@ -305,6 +308,8 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
     if not upgraded:
         start_ver += 1
 
+    logger.debug("applied_delta_files: %s", applied_delta_files)
+
     for v in range(start_ver, SCHEMA_VERSION + 1):
         logger.debug("Upgrading schema to v%d", v)
 
@@ -321,6 +326,7 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
         directory_entries.sort()
         for file_name in directory_entries:
             relative_path = os.path.join(str(v), file_name)
+            logger.debug("Found file: %s", relative_path)
             if relative_path in applied_delta_files:
                 continue
 
@@ -342,9 +348,8 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
                 module.run_upgrade(cur)
             elif ext == ".sql":
                 # A plain old .sql file, just read and execute it
-                delta_schema = read_schema(absolute_path)
                 logger.debug("Applying schema %s", relative_path)
-                cur.executescript(delta_schema)
+                executescript(cur, absolute_path)
             else:
                 # Not a valid delta file.
                 logger.warn(
@@ -356,24 +361,82 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
 
             # Mark as done.
             cur.execute(
-                "INSERT INTO applied_schema_deltas (version, file)"
-                " VALUES (?,?)",
+                database_engine.convert_param_style(
+                    "INSERT INTO applied_schema_deltas (version, file)"
+                    " VALUES (?,?)",
+                ),
                 (v, relative_path)
             )
 
             cur.execute(
-                "INSERT OR REPLACE INTO schema_version (version, upgraded)"
-                " VALUES (?,?)",
+                database_engine.convert_param_style(
+                    "REPLACE INTO schema_version (version, upgraded)"
+                    " VALUES (?,?)",
+                ),
                 (v, True)
             )
 
 
-def _get_or_create_schema_state(txn):
+def get_statements(f):
+    statement_buffer = ""
+    in_comment = False  # If we're in a /* ... */ style comment
+
+    for line in f:
+        line = line.strip()
+
+        if in_comment:
+            # Check if this line contains an end to the comment
+            comments = line.split("*/", 1)
+            if len(comments) == 1:
+                continue
+            line = comments[1]
+            in_comment = False
+
+        # Remove inline block comments
+        line = re.sub(r"/\*.*\*/", " ", line)
+
+        # Does this line start a comment?
+        comments = line.split("/*", 1)
+        if len(comments) > 1:
+            line = comments[0]
+            in_comment = True
+
+        # Deal with line comments
+        line = line.split("--", 1)[0]
+        line = line.split("//", 1)[0]
+
+        # Find *all* semicolons. We need to treat first and last entry
+        # specially.
+        statements = line.split(";")
+
+        # We must prepend statement_buffer to the first statement
+        first_statement = "%s %s" % (
+            statement_buffer.strip(),
+            statements[0].strip()
+        )
+        statements[0] = first_statement
+
+        # Every entry, except the last, is a full statement
+        for statement in statements[:-1]:
+            yield statement.strip()
+
+        # The last entry did *not* end in a semicolon, so we store it for the
+        # next semicolon we find
+        statement_buffer = statements[-1].strip()
+
+
+def executescript(txn, schema_path):
+    with open(schema_path, 'r') as f:
+        for statement in get_statements(f):
+            txn.execute(statement)
+
+
+def _get_or_create_schema_state(txn, database_engine):
+    # Bluntly try creating the schema_version tables.
     schema_path = os.path.join(
         dir_path, "schema", "schema_version.sql",
     )
-    create_schema = read_schema(schema_path)
-    txn.executescript(create_schema)
+    executescript(txn, schema_path)
 
     txn.execute("SELECT version, upgraded FROM schema_version")
     row = txn.fetchone()
@@ -382,10 +445,13 @@ def _get_or_create_schema_state(txn):
 
     if current_version:
         txn.execute(
-            "SELECT file FROM applied_schema_deltas WHERE version >= ?",
+            database_engine.convert_param_style(
+                "SELECT file FROM applied_schema_deltas WHERE version >= ?"
+            ),
             (current_version,)
         )
-        return current_version, txn.fetchall(), upgraded
+        applied_deltas = [d for d, in txn.fetchall()]
+        return current_version, applied_deltas, upgraded
 
     return None
 
@@ -417,7 +483,19 @@ def prepare_sqlite3_database(db_conn):
 
             if row and row[0]:
                 db_conn.execute(
-                    "INSERT OR REPLACE INTO schema_version (version, upgraded)"
+                    "REPLACE INTO schema_version (version, upgraded)"
                     " VALUES (?,?)",
                     (row[0], False)
                 )
+
+
+def are_all_users_on_domain(txn, database_engine, domain):
+    sql = database_engine.convert_param_style(
+        "SELECT COUNT(*) FROM users WHERE name NOT LIKE ?"
+    )
+    pat = "%:" + domain
+    txn.execute(sql, (pat,))
+    num_not_matching = txn.fetchall()[0][0]
+    if num_not_matching == 0:
+        return True
+    return False
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index e3e67d8e0d..6017c2a6e8 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -22,6 +22,8 @@ from synapse.util.logcontext import PreserveLoggingContext, LoggingContext
 from synapse.util.lrucache import LruCache
 import synapse.metrics
 
+from util.id_generators import IdGenerator, StreamIdGenerator
+
 from twisted.internet import defer
 
 from collections import namedtuple, OrderedDict
@@ -145,11 +147,12 @@ class LoggingTransaction(object):
     """An object that almost-transparently proxies for the 'txn' object
     passed to the constructor. Adds logging and metrics to the .execute()
     method."""
-    __slots__ = ["txn", "name"]
+    __slots__ = ["txn", "name", "database_engine"]
 
-    def __init__(self, txn, name):
+    def __init__(self, txn, name, database_engine):
         object.__setattr__(self, "txn", txn)
         object.__setattr__(self, "name", name)
+        object.__setattr__(self, "database_engine", database_engine)
 
     def __getattr__(self, name):
         return getattr(self.txn, name)
@@ -161,26 +164,32 @@ class LoggingTransaction(object):
         # TODO(paul): Maybe use 'info' and 'debug' for values?
         sql_logger.debug("[SQL] {%s} %s", self.name, sql)
 
-        try:
-            if args and args[0]:
-                values = args[0]
+        sql = self.database_engine.convert_param_style(sql)
+
+        if args and args[0]:
+            args = list(args)
+            args[0] = [
+                self.database_engine.encode_parameter(a) for a in args[0]
+            ]
+            try:
                 sql_logger.debug(
-                    "[SQL values] {%s} " + ", ".join(("<%r>",) * len(values)),
+                    "[SQL values] {%s} " + ", ".join(("<%r>",) * len(args[0])),
                     self.name,
-                    *values
+                    *args[0]
                 )
-        except:
-            # Don't let logging failures stop SQL from working
-            pass
+            except:
+                # Don't let logging failures stop SQL from working
+                pass
 
         start = time.time() * 1000
+
         try:
             return self.txn.execute(
                 sql, *args, **kwargs
             )
-        except:
-                logger.exception("[SQL FAIL] {%s}", self.name)
-                raise
+        except Exception as e:
+            logger.debug("[SQL FAIL] {%s} %s", self.name, e)
+            raise
         finally:
             msecs = (time.time() * 1000) - start
             sql_logger.debug("[SQL time] {%s} %f", self.name, msecs)
@@ -245,6 +254,14 @@ class SQLBaseStore(object):
         self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True,
                                       max_entries=hs.config.event_cache_size)
 
+        self.database_engine = hs.database_engine
+
+        self._stream_id_gen = StreamIdGenerator()
+        self._transaction_id_gen = IdGenerator("sent_transactions", "id", self)
+        self._state_groups_id_gen = IdGenerator("state_groups", "id", self)
+        self._access_tokens_id_gen = IdGenerator("access_tokens", "id", self)
+        self._pushers_id_gen = IdGenerator("pushers", "id", self)
+
     def start_profiling(self):
         self._previous_loop_ts = self._clock.time_msec()
 
@@ -281,8 +298,11 @@ class SQLBaseStore(object):
 
         start_time = time.time() * 1000
 
-        def inner_func(txn, *args, **kwargs):
+        def inner_func(conn, *args, **kwargs):
             with LoggingContext("runInteraction") as context:
+                if self.database_engine.is_connection_closed(conn):
+                    conn.reconnect()
+
                 current_context.copy_to(context)
                 start = time.time() * 1000
                 txn_id = self._TXN_ID
@@ -296,9 +316,48 @@ class SQLBaseStore(object):
                 sql_scheduling_timer.inc_by(time.time() * 1000 - start_time)
                 transaction_logger.debug("[TXN START] {%s}", name)
                 try:
-                    return func(LoggingTransaction(txn, name), *args, **kwargs)
-                except:
-                    logger.exception("[TXN FAIL] {%s}", name)
+                    i = 0
+                    N = 5
+                    while True:
+                        try:
+                            txn = conn.cursor()
+                            return func(
+                                LoggingTransaction(txn, name, self.database_engine),
+                                *args, **kwargs
+                            )
+                        except self.database_engine.module.OperationalError as e:
+                            # This can happen if the database disappears mid
+                            # transaction.
+                            logger.warn(
+                                "[TXN OPERROR] {%s} %s %d/%d",
+                                name, e, i, N
+                            )
+                            if i < N:
+                                i += 1
+                                try:
+                                    conn.rollback()
+                                except self.database_engine.module.Error as e1:
+                                    logger.warn(
+                                        "[TXN EROLL] {%s} %s",
+                                        name, e1,
+                                    )
+                                continue
+                        except self.database_engine.module.DatabaseError as e:
+                            if self.database_engine.is_deadlock(e):
+                                logger.warn("[TXN DEADLOCK] {%s} %d/%d", name, i, N)
+                                if i < N:
+                                    i += 1
+                                    try:
+                                        conn.rollback()
+                                    except self.database_engine.module.Error as e1:
+                                        logger.warn(
+                                            "[TXN EROLL] {%s} %s",
+                                            name, e1,
+                                        )
+                                    continue
+                            raise
+                except Exception as e:
+                    logger.debug("[TXN FAIL] {%s} %s", name, e)
                     raise
                 finally:
                     end = time.time() * 1000
@@ -311,7 +370,7 @@ class SQLBaseStore(object):
                     sql_txn_timer.inc_by(duration, desc)
 
         with PreserveLoggingContext():
-            result = yield self._db_pool.runInteraction(
+            result = yield self._db_pool.runWithConnection(
                 inner_func, *args, **kwargs
             )
         defer.returnValue(result)
@@ -342,11 +401,11 @@ class SQLBaseStore(object):
             The result of decoder(results)
         """
         def interaction(txn):
-            cursor = txn.execute(query, args)
+            txn.execute(query, args)
             if decoder:
-                return decoder(cursor)
+                return decoder(txn)
             else:
-                return cursor.fetchall()
+                return txn.fetchall()
 
         return self.runInteraction(desc, interaction)
 
@@ -356,27 +415,29 @@ class SQLBaseStore(object):
     # "Simple" SQL API methods that operate on a single table with no JOINs,
     # no complex WHERE clauses, just a dict of values for columns.
 
-    def _simple_insert(self, table, values, or_replace=False, or_ignore=False,
+    @defer.inlineCallbacks
+    def _simple_insert(self, table, values, or_ignore=False,
                        desc="_simple_insert"):
         """Executes an INSERT query on the named table.
 
         Args:
             table : string giving the table name
             values : dict of new column names and values for them
-            or_replace : bool; if True performs an INSERT OR REPLACE
         """
-        return self.runInteraction(
-            desc,
-            self._simple_insert_txn, table, values, or_replace=or_replace,
-            or_ignore=or_ignore,
-        )
+        try:
+            yield self.runInteraction(
+                desc,
+                self._simple_insert_txn, table, values,
+            )
+        except self.database_engine.module.IntegrityError:
+            # We have to do or_ignore flag at this layer, since we can't reuse
+            # a cursor after we receive an error from the db.
+            if not or_ignore:
+                raise
 
     @log_function
-    def _simple_insert_txn(self, txn, table, values, or_replace=False,
-                           or_ignore=False):
-        sql = "%s INTO %s (%s) VALUES(%s)" % (
-            ("INSERT OR REPLACE" if or_replace else
-             "INSERT OR IGNORE" if or_ignore else "INSERT"),
+    def _simple_insert_txn(self, txn, table, values):
+        sql = "INSERT INTO %s (%s) VALUES(%s)" % (
             table,
             ", ".join(k for k in values),
             ", ".join("?" for k in values)
@@ -388,22 +449,26 @@ class SQLBaseStore(object):
         )
 
         txn.execute(sql, values.values())
-        return txn.lastrowid
 
-    def _simple_upsert(self, table, keyvalues, values, desc="_simple_upsert"):
+    def _simple_upsert(self, table, keyvalues, values,
+                       insertion_values={}, desc="_simple_upsert"):
         """
         Args:
             table (str): The table to upsert into
             keyvalues (dict): The unique key tables and their new values
             values (dict): The nonunique columns and their new values
+            insertion_values (dict): key/values to use when inserting
         Returns: A deferred
         """
         return self.runInteraction(
             desc,
-            self._simple_upsert_txn, table, keyvalues, values
+            self._simple_upsert_txn, table, keyvalues, values, insertion_values,
         )
 
-    def _simple_upsert_txn(self, txn, table, keyvalues, values):
+    def _simple_upsert_txn(self, txn, table, keyvalues, values, insertion_values={}):
+        # We need to lock the table :(
+        self.database_engine.lock_table(txn, table)
+
         # Try to update
         sql = "UPDATE %s SET %s WHERE %s" % (
             table,
@@ -422,6 +487,7 @@ class SQLBaseStore(object):
             allvalues = {}
             allvalues.update(keyvalues)
             allvalues.update(values)
+            allvalues.update(insertion_values)
 
             sql = "INSERT INTO %s (%s) VALUES (%s)" % (
                 table,
@@ -489,8 +555,7 @@ class SQLBaseStore(object):
 
     def _simple_select_onecol_txn(self, txn, table, keyvalues, retcol):
         sql = (
-            "SELECT %(retcol)s FROM %(table)s WHERE %(where)s "
-            "ORDER BY rowid asc"
+            "SELECT %(retcol)s FROM %(table)s WHERE %(where)s"
         ) % {
             "retcol": retcol,
             "table": table,
@@ -548,14 +613,14 @@ class SQLBaseStore(object):
             retcols : list of strings giving the names of the columns to return
         """
         if keyvalues:
-            sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % (
+            sql = "SELECT %s FROM %s WHERE %s" % (
                 ", ".join(retcols),
                 table,
                 " AND ".join("%s = ?" % (k, ) for k in keyvalues)
             )
             txn.execute(sql, keyvalues.values())
         else:
-            sql = "SELECT %s FROM %s ORDER BY rowid asc" % (
+            sql = "SELECT %s FROM %s" % (
                 ", ".join(retcols),
                 table
             )
@@ -607,10 +672,10 @@ class SQLBaseStore(object):
 
     def _simple_select_one_txn(self, txn, table, keyvalues, retcols,
                                allow_none=False):
-        select_sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % (
+        select_sql = "SELECT %s FROM %s WHERE %s" % (
             ", ".join(retcols),
             table,
-            " AND ".join("%s = ?" % (k) for k in keyvalues)
+            " AND ".join("%s = ?" % (k,) for k in keyvalues)
         )
 
         txn.execute(select_sql, keyvalues.values())
@@ -648,6 +713,11 @@ class SQLBaseStore(object):
                     updatevalues=updatevalues,
                 )
 
+                # if txn.rowcount == 0:
+                #     raise StoreError(404, "No row found")
+                if txn.rowcount > 1:
+                    raise StoreError(500, "More than one row matched")
+
             return ret
         return self.runInteraction(desc, func)
 
@@ -860,6 +930,12 @@ class SQLBaseStore(object):
         result = txn.fetchone()
         return result[0] if result else None
 
+    def get_next_stream_id(self):
+        with self._next_stream_id_lock:
+            i = self._next_stream_id
+            self._next_stream_id += 1
+            return i
+
 
 class _RollbackButIsFineException(Exception):
     """ This exception is used to rollback a transaction without implying
@@ -883,7 +959,7 @@ class Table(object):
 
     _select_where_clause = "SELECT %s FROM %s WHERE %s"
     _select_clause = "SELECT %s FROM %s"
-    _insert_clause = "INSERT OR REPLACE INTO %s (%s) VALUES (%s)"
+    _insert_clause = "REPLACE INTO %s (%s) VALUES (%s)"
 
     @classmethod
     def select_statement(cls, where_clause=None):
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index f8cbb3f323..63d1af4e86 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -366,11 +366,11 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
         new_txn_id = max(highest_txn_id, last_txn_id) + 1
 
         # Insert new txn into txn table
-        event_ids = [e.event_id for e in events]
+        event_ids = json.dumps([e.event_id for e in events])
         txn.execute(
             "INSERT INTO application_services_txns(as_id, txn_id, event_ids) "
             "VALUES(?,?,?)",
-            (service.id, new_txn_id, json.dumps(event_ids))
+            (service.id, new_txn_id, event_ids)
         )
         return AppServiceTransaction(
             service=service, id=new_txn_id, events=events
diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py
index 0199539fea..2b2bdf8615 100644
--- a/synapse/storage/directory.py
+++ b/synapse/storage/directory.py
@@ -21,8 +21,6 @@ from twisted.internet import defer
 
 from collections import namedtuple
 
-import sqlite3
-
 
 RoomAliasMapping = namedtuple(
     "RoomAliasMapping",
@@ -91,7 +89,7 @@ class DirectoryStore(SQLBaseStore):
                 },
                 desc="create_room_alias_association",
             )
-        except sqlite3.IntegrityError:
+        except self.database_engine.module.IntegrityError:
             raise SynapseError(
                 409, "Room alias %s already exists" % room_alias.to_string()
             )
@@ -120,12 +118,12 @@ class DirectoryStore(SQLBaseStore):
         defer.returnValue(room_id)
 
     def _delete_room_alias_txn(self, txn, room_alias):
-        cursor = txn.execute(
+        txn.execute(
             "SELECT room_id FROM room_aliases WHERE room_alias = ?",
             (room_alias.to_string(),)
         )
 
-        res = cursor.fetchone()
+        res = txn.fetchone()
         if res:
             room_id = res[0]
         else:
diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py
new file mode 100644
index 0000000000..eb76df7f01
--- /dev/null
+++ b/synapse/storage/engines/__init__.py
@@ -0,0 +1,37 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from .postgres import PostgresEngine
+from .sqlite3 import Sqlite3Engine
+
+import importlib
+
+
+SUPPORTED_MODULE = {
+    "sqlite3": Sqlite3Engine,
+    "psycopg2": PostgresEngine,
+}
+
+
+def create_engine(name):
+    engine_class = SUPPORTED_MODULE.get(name, None)
+
+    if engine_class:
+        module = importlib.import_module(name)
+        return engine_class(module)
+
+    raise RuntimeError(
+        "Unsupported database engine '%s'" % (name,)
+    )
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
new file mode 100644
index 0000000000..b8cca9b187
--- /dev/null
+++ b/synapse/storage/engines/postgres.py
@@ -0,0 +1,47 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.storage import prepare_database
+
+
+class PostgresEngine(object):
+    def __init__(self, database_module):
+        self.module = database_module
+        self.module.extensions.register_type(self.module.extensions.UNICODE)
+
+    def convert_param_style(self, sql):
+        return sql.replace("?", "%s")
+
+    def encode_parameter(self, param):
+        return param
+
+    def on_new_connection(self, db_conn):
+        db_conn.set_isolation_level(
+            self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ
+        )
+
+    def prepare_database(self, db_conn):
+        prepare_database(db_conn, self)
+
+    def is_deadlock(self, error):
+        if isinstance(error, self.module.DatabaseError):
+            return error.pgcode in ["40001", "40P01"]
+        return False
+
+    def is_connection_closed(self, conn):
+        return bool(conn)
+
+    def lock_table(self, txn, table):
+        txn.execute("LOCK TABLE %s in EXCLUSIVE MODE" % (table,))
diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py
new file mode 100644
index 0000000000..f62d5d1205
--- /dev/null
+++ b/synapse/storage/engines/sqlite3.py
@@ -0,0 +1,43 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.storage import prepare_database, prepare_sqlite3_database
+
+
+class Sqlite3Engine(object):
+    def __init__(self, database_module):
+        self.module = database_module
+
+    def convert_param_style(self, sql):
+        return sql
+
+    def encode_parameter(self, param):
+        return param
+
+    def on_new_connection(self, db_conn):
+        self.prepare_database(db_conn)
+
+    def prepare_database(self, db_conn):
+        prepare_sqlite3_database(db_conn)
+        prepare_database(db_conn, self)
+
+    def is_deadlock(self, error):
+        return False
+
+    def is_connection_closed(self, conn):
+        return False
+
+    def lock_table(self, txn, table):
+        return
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 032334bfd6..54a3c9d805 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -153,7 +153,7 @@ class EventFederationStore(SQLBaseStore):
         results = self._get_prev_events_and_state(
             txn,
             event_id,
-            is_state=1,
+            is_state=True,
         )
 
         return [(e_id, h, ) for e_id, h, _ in results]
@@ -164,7 +164,7 @@ class EventFederationStore(SQLBaseStore):
         }
 
         if is_state is not None:
-            keyvalues["is_state"] = is_state
+            keyvalues["is_state"] = bool(is_state)
 
         res = self._simple_select_list_txn(
             txn,
@@ -242,7 +242,6 @@ class EventFederationStore(SQLBaseStore):
                     "room_id": room_id,
                     "min_depth": depth,
                 },
-                or_replace=True,
             )
 
     def _handle_prev_events(self, txn, outlier, event_id, prev_events,
@@ -260,9 +259,8 @@ class EventFederationStore(SQLBaseStore):
                     "event_id": event_id,
                     "prev_event_id": e_id,
                     "room_id": room_id,
-                    "is_state": 0,
+                    "is_state": False,
                 },
-                or_ignore=True,
             )
 
         # Update the extremities table if this is not an outlier.
@@ -281,19 +279,19 @@ class EventFederationStore(SQLBaseStore):
             # We only insert as a forward extremity the new event if there are
             # no other events that reference it as a prev event
             query = (
-                "INSERT OR IGNORE INTO %(table)s (event_id, room_id) "
-                "SELECT ?, ? WHERE NOT EXISTS ("
-                "SELECT 1 FROM %(event_edges)s WHERE "
-                "prev_event_id = ? "
-                ")"
-            ) % {
-                "table": "event_forward_extremities",
-                "event_edges": "event_edges",
-            }
+                "SELECT 1 FROM event_edges WHERE prev_event_id = ?"
+            )
 
-            logger.debug("query: %s", query)
+            txn.execute(query, (event_id,))
 
-            txn.execute(query, (event_id, room_id, event_id))
+            if not txn.fetchone():
+                query = (
+                    "INSERT INTO event_forward_extremities"
+                    " (event_id, room_id)"
+                    " VALUES (?, ?)"
+                )
+
+                txn.execute(query, (event_id, room_id))
 
             # Insert all the prev_events as a backwards thing, they'll get
             # deleted in a second if they're incorrect anyway.
@@ -306,7 +304,6 @@ class EventFederationStore(SQLBaseStore):
                         "event_id": e_id,
                         "room_id": room_id,
                     },
-                    or_ignore=True,
                 )
 
             # Also delete from the backwards extremities table all ones that
@@ -400,7 +397,7 @@ class EventFederationStore(SQLBaseStore):
 
         query = (
             "SELECT prev_event_id FROM event_edges "
-            "WHERE room_id = ? AND event_id = ? AND is_state = 0 "
+            "WHERE room_id = ? AND event_id = ? AND is_state = ? "
             "LIMIT ?"
         )
 
@@ -409,7 +406,7 @@ class EventFederationStore(SQLBaseStore):
             for event_id in front:
                 txn.execute(
                     query,
-                    (room_id, event_id, limit - len(event_results))
+                    (room_id, event_id, False, limit - len(event_results))
                 )
 
                 for e_id, in txn.fetchall():
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 2425f57f5f..a3c260ddc4 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -52,7 +52,6 @@ class EventsStore(SQLBaseStore):
                 is_new_state=is_new_state,
                 current_state=current_state,
             )
-            self.get_room_events_max_id.invalidate()
         except _RollbackButIsFineException:
             pass
 
@@ -96,12 +95,22 @@ class EventsStore(SQLBaseStore):
         # Remove the any existing cache entries for the event_id
         self._invalidate_get_event_cache(event.event_id)
 
+        if stream_ordering is None:
+            with self._stream_id_gen.get_next_txn(txn) as stream_ordering:
+                return self._persist_event_txn(
+                    txn, event, context, backfilled,
+                    stream_ordering=stream_ordering,
+                    is_new_state=is_new_state,
+                    current_state=current_state,
+                )
+
         # We purposefully do this first since if we include a `current_state`
         # key, we *want* to update the `current_state_events` table
         if current_state:
-            txn.execute(
-                "DELETE FROM current_state_events WHERE room_id = ?",
-                (event.room_id,)
+            self._simple_delete_txn(
+                txn,
+                table="current_state_events",
+                keyvalues={"room_id": event.room_id},
             )
 
             for s in current_state:
@@ -114,7 +123,6 @@ class EventsStore(SQLBaseStore):
                         "type": s.type,
                         "state_key": s.state_key,
                     },
-                    or_replace=True,
                 )
 
         if event.is_state() and is_new_state:
@@ -128,7 +136,6 @@ class EventsStore(SQLBaseStore):
                         "type": event.type,
                         "state_key": event.state_key,
                     },
-                    or_replace=True,
                 )
 
                 for prev_state_id, _ in event.prev_state:
@@ -151,14 +158,6 @@ class EventsStore(SQLBaseStore):
                 event.depth
             )
 
-        self._handle_prev_events(
-            txn,
-            outlier=outlier,
-            event_id=event.event_id,
-            prev_events=event.prev_events,
-            room_id=event.room_id,
-        )
-
         have_persisted = self._simple_select_one_onecol_txn(
             txn,
             table="event_json",
@@ -169,7 +168,7 @@ class EventsStore(SQLBaseStore):
 
         metadata_json = encode_canonical_json(
             event.internal_metadata.get_dict()
-        )
+        ).decode("UTF-8")
 
         # If we have already persisted this event, we don't need to do any
         # more processing.
@@ -185,23 +184,29 @@ class EventsStore(SQLBaseStore):
                 )
                 txn.execute(
                     sql,
-                    (metadata_json.decode("UTF-8"), event.event_id,)
+                    (metadata_json, event.event_id,)
                 )
 
                 sql = (
-                    "UPDATE events SET outlier = 0"
+                    "UPDATE events SET outlier = ?"
                     " WHERE event_id = ?"
                 )
                 txn.execute(
                     sql,
-                    (event.event_id,)
+                    (False, event.event_id,)
                 )
             return
 
+        self._handle_prev_events(
+            txn,
+            outlier=outlier,
+            event_id=event.event_id,
+            prev_events=event.prev_events,
+            room_id=event.room_id,
+        )
+
         if event.type == EventTypes.Member:
             self._store_room_member_txn(txn, event)
-        elif event.type == EventTypes.Feedback:
-            self._store_feedback_txn(txn, event)
         elif event.type == EventTypes.Name:
             self._store_room_name_txn(txn, event)
         elif event.type == EventTypes.Topic:
@@ -224,10 +229,9 @@ class EventsStore(SQLBaseStore):
             values={
                 "event_id": event.event_id,
                 "room_id": event.room_id,
-                "internal_metadata": metadata_json.decode("UTF-8"),
+                "internal_metadata": metadata_json,
                 "json": encode_canonical_json(event_dict).decode("UTF-8"),
             },
-            or_replace=True,
         )
 
         content = encode_canonical_json(
@@ -245,9 +249,6 @@ class EventsStore(SQLBaseStore):
             "depth": event.depth,
         }
 
-        if stream_ordering is not None:
-            vals["stream_ordering"] = stream_ordering
-
         unrec = {
             k: v
             for k, v in event.get_dict().items()
@@ -264,25 +265,53 @@ class EventsStore(SQLBaseStore):
             unrec
         ).decode("UTF-8")
 
-        try:
-            self._simple_insert_txn(
-                txn,
-                "events",
-                vals,
-                or_replace=(not outlier),
-                or_ignore=bool(outlier),
-            )
-        except:
-            logger.warn(
-                "Failed to persist, probably duplicate: %s",
-                event.event_id,
-                exc_info=True,
+        sql = (
+            "INSERT INTO events"
+            " (stream_ordering, topological_ordering, event_id, type,"
+            " room_id, content, processed, outlier, depth)"
+            " VALUES (?,?,?,?,?,?,?,?,?)"
+        )
+
+        txn.execute(
+            sql,
+            (
+                stream_ordering, event.depth, event.event_id, event.type,
+                event.room_id, content, True, outlier, event.depth
             )
-            raise _RollbackButIsFineException("_persist_event")
+        )
 
         if context.rejected:
             self._store_rejections_txn(txn, event.event_id, context.rejected)
 
+        for hash_alg, hash_base64 in event.hashes.items():
+            hash_bytes = decode_base64(hash_base64)
+            self._store_event_content_hash_txn(
+                txn, event.event_id, hash_alg, hash_bytes,
+            )
+
+        for prev_event_id, prev_hashes in event.prev_events:
+            for alg, hash_base64 in prev_hashes.items():
+                hash_bytes = decode_base64(hash_base64)
+                self._store_prev_event_hash_txn(
+                    txn, event.event_id, prev_event_id, alg, hash_bytes
+                )
+
+        for auth_id, _ in event.auth_events:
+            self._simple_insert_txn(
+                txn,
+                table="event_auth",
+                values={
+                    "event_id": event.event_id,
+                    "room_id": event.room_id,
+                    "auth_id": auth_id,
+                },
+            )
+
+        (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event)
+        self._store_event_reference_hash_txn(
+            txn, event.event_id, ref_alg, ref_hash_bytes
+        )
+
         if event.is_state():
             vals = {
                 "event_id": event.event_id,
@@ -301,18 +330,6 @@ class EventsStore(SQLBaseStore):
                 vals,
             )
 
-            if is_new_state and not context.rejected:
-                self._simple_insert_txn(
-                    txn,
-                    "current_state_events",
-                    {
-                        "event_id": event.event_id,
-                        "room_id": event.room_id,
-                        "type": event.type,
-                        "state_key": event.state_key,
-                    },
-                )
-
             for e_id, h in event.prev_state:
                 self._simple_insert_txn(
                     txn,
@@ -321,39 +338,24 @@ class EventsStore(SQLBaseStore):
                         "event_id": event.event_id,
                         "prev_event_id": e_id,
                         "room_id": event.room_id,
-                        "is_state": 1,
+                        "is_state": True,
                     },
                 )
 
-        for hash_alg, hash_base64 in event.hashes.items():
-            hash_bytes = decode_base64(hash_base64)
-            self._store_event_content_hash_txn(
-                txn, event.event_id, hash_alg, hash_bytes,
-            )
-
-        for prev_event_id, prev_hashes in event.prev_events:
-            for alg, hash_base64 in prev_hashes.items():
-                hash_bytes = decode_base64(hash_base64)
-                self._store_prev_event_hash_txn(
-                    txn, event.event_id, prev_event_id, alg, hash_bytes
+            if is_new_state and not context.rejected:
+                self._simple_upsert_txn(
+                    txn,
+                    "current_state_events",
+                    keyvalues={
+                        "room_id": event.room_id,
+                        "type": event.type,
+                        "state_key": event.state_key,
+                    },
+                    values={
+                        "event_id": event.event_id,
+                    }
                 )
 
-        for auth_id, _ in event.auth_events:
-            self._simple_insert_txn(
-                txn,
-                table="event_auth",
-                values={
-                    "event_id": event.event_id,
-                    "room_id": event.room_id,
-                    "auth_id": auth_id,
-                },
-            )
-
-        (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event)
-        self._store_event_reference_hash_txn(
-            txn, event.event_id, ref_alg, ref_hash_bytes
-        )
-
     def _store_redaction(self, txn, event):
         # invalidate the cache for the redacted event
         self._invalidate_get_event_cache(event.redacts)
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index 22b158d71e..cbe9339ccf 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -57,16 +57,18 @@ class KeyStore(SQLBaseStore):
             OpenSSL.crypto.FILETYPE_ASN1, tls_certificate
         )
         fingerprint = hashlib.sha256(tls_certificate_bytes).hexdigest()
-        return self._simple_insert(
+        return self._simple_upsert(
             table="server_tls_certificates",
-            values={
+            keyvalues={
                 "server_name": server_name,
                 "fingerprint": fingerprint,
+            },
+            values={
                 "from_server": from_server,
                 "ts_added_ms": time_now_ms,
                 "tls_certificate": buffer(tls_certificate_bytes),
             },
-            or_ignore=True,
+            desc="store_server_certificate",
         )
 
     @defer.inlineCallbacks
@@ -107,16 +109,18 @@ class KeyStore(SQLBaseStore):
             ts_now_ms (int): The time now in milliseconds
             verification_key (VerifyKey): The NACL verify key.
         """
-        return self._simple_insert(
+        return self._simple_upsert(
             table="server_signature_keys",
-            values={
+            keyvalues={
                 "server_name": server_name,
                 "key_id": "%s:%s" % (verify_key.alg, verify_key.version),
+            },
+            values={
                 "from_server": from_server,
                 "ts_added_ms": time_now_ms,
                 "verify_key": buffer(verify_key.encode()),
             },
-            or_ignore=True,
+            desc="store_server_verify_key",
         )
 
     def store_server_keys_json(self, server_name, key_id, from_server,
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 87fba55439..22ec94bc16 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -57,6 +57,7 @@ class PresenceStore(SQLBaseStore):
             values={"observed_user_id": observed_localpart,
                     "observer_user_id": observer_userid},
             desc="allow_presence_visible",
+            or_ignore=True,
         )
 
     def disallow_presence_visible(self, observed_localpart, observer_userid):
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index c47bdc2861..ee7718d5ed 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -154,7 +154,7 @@ class PushRuleStore(SQLBaseStore):
             txn.execute(sql, (user_name, priority_class, new_rule_priority))
 
         # now insert the new rule
-        sql = "INSERT OR REPLACE INTO "+PushRuleTable.table_name+" ("
+        sql = "INSERT INTO "+PushRuleTable.table_name+" ("
         sql += ",".join(new_rule.keys())+") VALUES ("
         sql += ", ".join(["?" for _ in new_rule.keys()])+")"
 
@@ -183,7 +183,7 @@ class PushRuleStore(SQLBaseStore):
         new_rule['priority_class'] = priority_class
         new_rule['priority'] = new_prio
 
-        sql = "INSERT OR REPLACE INTO "+PushRuleTable.table_name+" ("
+        sql = "INSERT INTO "+PushRuleTable.table_name+" ("
         sql += ",".join(new_rule.keys())+") VALUES ("
         sql += ", ".join(["?" for _ in new_rule.keys()])+")"
 
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 000502b4ff..2582a1da66 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -13,13 +13,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import collections
-
 from ._base import SQLBaseStore, Table
 from twisted.internet import defer
 
 from synapse.api.errors import StoreError
 
+from syutil.jsonutil import encode_canonical_json
+
 import logging
 
 logger = logging.getLogger(__name__)
@@ -27,93 +27,55 @@ logger = logging.getLogger(__name__)
 
 class PusherStore(SQLBaseStore):
     @defer.inlineCallbacks
-    def get_pushers_by_app_id_and_pushkey(self, app_id_and_pushkey):
+    def get_pushers_by_app_id_and_pushkey(self, app_id, pushkey):
         sql = (
-            "SELECT id, user_name, kind, profile_tag, app_id,"
-            "app_display_name, device_display_name, pushkey, ts, data, "
-            "last_token, last_success, failing_since "
-            "FROM pushers "
+            "SELECT * FROM pushers "
             "WHERE app_id = ? AND pushkey = ?"
         )
 
-        rows = yield self._execute(
-            "get_pushers_by_app_id_and_pushkey", None, sql,
-            app_id_and_pushkey[0], app_id_and_pushkey[1]
+        rows = yield self._execute_and_decode(
+            "get_pushers_by_app_id_and_pushkey",
+            sql,
+            app_id, pushkey
         )
 
-        ret = [
-            {
-                "id": r[0],
-                "user_name": r[1],
-                "kind": r[2],
-                "profile_tag": r[3],
-                "app_id": r[4],
-                "app_display_name": r[5],
-                "device_display_name": r[6],
-                "pushkey": r[7],
-                "pushkey_ts": r[8],
-                "data": r[9],
-                "last_token": r[10],
-                "last_success": r[11],
-                "failing_since": r[12]
-            }
-            for r in rows
-        ]
-
-        defer.returnValue(ret[0])
+        defer.returnValue(rows)
 
     @defer.inlineCallbacks
     def get_all_pushers(self):
         sql = (
-            "SELECT id, user_name, kind, profile_tag, app_id,"
-            "app_display_name, device_display_name, pushkey, ts, data, "
-            "last_token, last_success, failing_since "
-            "FROM pushers"
+            "SELECT * FROM pushers"
         )
 
-        rows = yield self._execute("get_all_pushers", None, sql)
-
-        ret = [
-            {
-                "id": r[0],
-                "user_name": r[1],
-                "kind": r[2],
-                "profile_tag": r[3],
-                "app_id": r[4],
-                "app_display_name": r[5],
-                "device_display_name": r[6],
-                "pushkey": r[7],
-                "pushkey_ts": r[8],
-                "data": r[9],
-                "last_token": r[10],
-                "last_success": r[11],
-                "failing_since": r[12]
-            }
-            for r in rows
-        ]
-
-        defer.returnValue(ret)
+        rows = yield self._execute_and_decode("get_all_pushers", sql)
+
+        defer.returnValue(rows)
 
     @defer.inlineCallbacks
-    def add_pusher(self, user_name, profile_tag, kind, app_id,
+    def add_pusher(self, user_name, access_token, profile_tag, kind, app_id,
                    app_display_name, device_display_name,
                    pushkey, pushkey_ts, lang, data):
         try:
+            next_id = yield self._pushers_id_gen.get_next()
             yield self._simple_upsert(
                 PushersTable.table_name,
                 dict(
                     app_id=app_id,
                     pushkey=pushkey,
+                    user_name=user_name,
                 ),
                 dict(
-                    user_name=user_name,
+                    access_token=access_token,
                     kind=kind,
                     profile_tag=profile_tag,
                     app_display_name=app_display_name,
                     device_display_name=device_display_name,
                     ts=pushkey_ts,
                     lang=lang,
-                    data=data
+                    data=encode_canonical_json(data).decode("UTF-8"),
+                ),
+                insertion_values=dict(
+                    id=next_id,
                 ),
                 desc="add_pusher",
             )
@@ -122,37 +84,38 @@ class PusherStore(SQLBaseStore):
             raise StoreError(500, "Problem creating pusher.")
 
     @defer.inlineCallbacks
-    def delete_pusher_by_app_id_pushkey(self, app_id, pushkey):
+    def delete_pusher_by_app_id_pushkey_user_name(self, app_id, pushkey, user_name):
         yield self._simple_delete_one(
             PushersTable.table_name,
-            {"app_id": app_id, "pushkey": pushkey},
-            desc="delete_pusher_by_app_id_pushkey",
+            {"app_id": app_id, "pushkey": pushkey, 'user_name': user_name},
+            desc="delete_pusher_by_app_id_pushkey_user_name",
         )
 
     @defer.inlineCallbacks
-    def update_pusher_last_token(self, app_id, pushkey, last_token):
+    def update_pusher_last_token(self, app_id, pushkey, user_name, last_token):
         yield self._simple_update_one(
             PushersTable.table_name,
-            {'app_id': app_id, 'pushkey': pushkey},
+            {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_name},
             {'last_token': last_token},
             desc="update_pusher_last_token",
         )
 
     @defer.inlineCallbacks
-    def update_pusher_last_token_and_success(self, app_id, pushkey,
+    def update_pusher_last_token_and_success(self, app_id, pushkey, user_name,
                                              last_token, last_success):
         yield self._simple_update_one(
             PushersTable.table_name,
-            {'app_id': app_id, 'pushkey': pushkey},
+            {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_name},
             {'last_token': last_token, 'last_success': last_success},
             desc="update_pusher_last_token_and_success",
         )
 
     @defer.inlineCallbacks
-    def update_pusher_failing_since(self, app_id, pushkey, failing_since):
+    def update_pusher_failing_since(self, app_id, pushkey, user_name,
+                                    failing_since):
         yield self._simple_update_one(
             PushersTable.table_name,
-            {'app_id': app_id, 'pushkey': pushkey},
+            {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_name},
             {'failing_since': failing_since},
             desc="update_pusher_failing_since",
         )
@@ -160,21 +123,3 @@ class PusherStore(SQLBaseStore):
 
 class PushersTable(Table):
     table_name = "pushers"
-
-    fields = [
-        "id",
-        "user_name",
-        "kind",
-        "profile_tag",
-        "app_id",
-        "app_display_name",
-        "device_display_name",
-        "pushkey",
-        "pushkey_ts",
-        "data",
-        "last_token",
-        "last_success",
-        "failing_since"
-    ]
-
-    EntryType = collections.namedtuple("PusherEntry", fields)
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index f24154f146..a986c4816e 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -15,8 +15,6 @@
 
 from twisted.internet import defer
 
-from sqlite3 import IntegrityError
-
 from synapse.api.errors import StoreError, Codes
 
 from ._base import SQLBaseStore, cached
@@ -39,17 +37,13 @@ class RegistrationStore(SQLBaseStore):
         Raises:
             StoreError if there was a problem adding this.
         """
-        row = yield self._simple_select_one(
-            "users", {"name": user_id}, ["id"],
-            desc="add_access_token_to_user",
-        )
-        if not row:
-            raise StoreError(400, "Bad user ID supplied.")
-        row_id = row["id"]
+        next_id = yield self._access_tokens_id_gen.get_next()
+
         yield self._simple_insert(
             "access_tokens",
             {
-                "user_id": row_id,
+                "id": next_id,
+                "user_id": user_id,
                 "token": token
             },
             desc="add_access_token_to_user",
@@ -74,32 +68,71 @@ class RegistrationStore(SQLBaseStore):
     def _register(self, txn, user_id, token, password_hash):
         now = int(self.clock.time())
 
+        next_id = self._access_tokens_id_gen.get_next_txn(txn)
+
         try:
             txn.execute("INSERT INTO users(name, password_hash, creation_ts) "
                         "VALUES (?,?,?)",
                         [user_id, password_hash, now])
-        except IntegrityError:
+        except self.database_engine.module.IntegrityError:
             raise StoreError(
                 400, "User ID already taken.", errcode=Codes.USER_IN_USE
             )
 
         # it's possible for this to get a conflict, but only for a single user
         # since tokens are namespaced based on their user ID
-        txn.execute("INSERT INTO access_tokens(user_id, token) " +
-                    "VALUES (?,?)", [txn.lastrowid, token])
+        txn.execute(
+            "INSERT INTO access_tokens(id, user_id, token)"
+            " VALUES (?,?,?)",
+            (next_id, user_id, token,)
+        )
 
     def get_user_by_id(self, user_id):
-        query = ("SELECT users.name, users.password_hash FROM users"
-                 " WHERE users.name = ?")
-        return self._execute(
-            "get_user_by_id", self.cursor_to_dict, query, user_id
+        return self._simple_select_one(
+            table="users",
+            keyvalues={
+                "name": user_id,
+            },
+            retcols=["name", "password_hash"],
+            allow_none=True,
+        )
+
+    @defer.inlineCallbacks
+    def user_set_password_hash(self, user_id, password_hash):
+        """
+        NB. This does *not* evict any cache because the one use for this
+            removes most of the entries subsequently anyway so it would be
+            pointless. Use flush_user separately.
+        """
+        yield self._simple_update_one('users', {
+            'name': user_id
+        }, {
+            'password_hash': password_hash
+        })
+
+    @defer.inlineCallbacks
+    def user_delete_access_tokens_apart_from(self, user_id, token_id):
+        rows = yield self.get_user_by_id(user_id)
+        if len(rows) == 0:
+            raise Exception("No such user!")
+
+        yield self._execute(
+            "delete_access_tokens_apart_from", None,
+            "DELETE FROM access_tokens WHERE user_id = ? AND id != ?",
+            rows[0]['id'], token_id
+        )
+
+    @defer.inlineCallbacks
+    def flush_user(self, user_id):
+        rows = yield self._execute(
+            'flush_user', None,
+            "SELECT token FROM access_tokens WHERE user_id = ?",
+            user_id
         )
+        for r in rows:
+            self.get_user_by_token.invalidate(r)
 
     @cached()
-    # TODO(paul): Currently there's no code to invalidate this cache. That
-    #   means if/when we ever add internal ways to invalidate access tokens or
-    #   change whether a user is a server admin, those will need to invoke
-    #      store.get_user_by_token.invalidate(token)
     def get_user_by_token(self, token):
         """Get a user from the given access token.
 
@@ -134,13 +167,49 @@ class RegistrationStore(SQLBaseStore):
             "SELECT users.name, users.admin,"
             " access_tokens.device_id, access_tokens.id as token_id"
             " FROM users"
-            " INNER JOIN access_tokens on users.id = access_tokens.user_id"
+            " INNER JOIN access_tokens on users.name = access_tokens.user_id"
             " WHERE token = ?"
         )
 
-        cursor = txn.execute(sql, (token,))
-        rows = self.cursor_to_dict(cursor)
+        txn.execute(sql, (token,))
+        rows = self.cursor_to_dict(txn)
         if rows:
             return rows[0]
 
-        raise StoreError(404, "Token not found.")
+        return None
+
+    @defer.inlineCallbacks
+    def user_add_threepid(self, user_id, medium, address, validated_at, added_at):
+        yield self._simple_upsert("user_threepids", {
+            "user": user_id,
+            "medium": medium,
+            "address": address,
+        }, {
+            "validated_at": validated_at,
+            "added_at": added_at,
+        })
+
+    @defer.inlineCallbacks
+    def user_get_threepids(self, user_id):
+        ret = yield self._simple_select_list(
+            "user_threepids", {
+                "user": user_id
+            },
+            ['medium', 'address', 'validated_at', 'added_at'],
+            'user_get_threepids'
+        )
+        defer.returnValue(ret)
+
+    @defer.inlineCallbacks
+    def get_user_by_threepid(self, medium, address):
+        ret = yield self._simple_select_one(
+            "user_threepids",
+            {
+                "medium": medium,
+                "address": address
+            },
+            ['user'], True, 'get_user_by_threepid'
+        )
+        if ret:
+            defer.returnValue(ret['user'])
+        defer.returnValue(None)
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index be3e28c2ea..48ebb33057 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -72,6 +72,7 @@ class RoomStore(SQLBaseStore):
             keyvalues={"room_id": room_id},
             retcols=RoomsTable.fields,
             desc="get_room",
+            allow_none=True,
         )
 
     @defer.inlineCallbacks
@@ -102,24 +103,37 @@ class RoomStore(SQLBaseStore):
                 "ON c.event_id = room_names.event_id "
             )
 
-            # We use non printing ascii character US () as a seperator
+            # We use non printing ascii character US (\x1F) as a separator
             sql = (
-                "SELECT r.room_id, n.name, t.topic, "
-                "group_concat(a.room_alias, '') "
-                "FROM rooms AS r "
-                "LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id "
-                "LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id "
-                "INNER JOIN room_aliases AS a ON a.room_id = r.room_id "
-                "WHERE r.is_public = ? "
-                "GROUP BY r.room_id "
+                "SELECT r.room_id, max(n.name), max(t.topic)"
+                " FROM rooms AS r"
+                " LEFT JOIN (%(topic)s) AS t ON t.room_id = r.room_id"
+                " LEFT JOIN (%(name)s) AS n ON n.room_id = r.room_id"
+                " WHERE r.is_public = ?"
+                " GROUP BY r.room_id"
             ) % {
                 "topic": topic_subquery,
                 "name": name_subquery,
             }
 
-            c = txn.execute(sql, (is_public,))
+            txn.execute(sql, (is_public,))
 
-            return c.fetchall()
+            rows = txn.fetchall()
+
+            for i, row in enumerate(rows):
+                room_id = row[0]
+                aliases = self._simple_select_onecol_txn(
+                    txn,
+                    table="room_aliases",
+                    keyvalues={
+                        "room_id": room_id
+                    },
+                    retcol="room_alias",
+                )
+
+                rows[i] = list(row) + [aliases]
+
+            return rows
 
         rows = yield self.runInteraction(
             "get_rooms", f
@@ -130,9 +144,10 @@ class RoomStore(SQLBaseStore):
                 "room_id": r[0],
                 "name": r[1],
                 "topic": r[2],
-                "aliases": r[3].split(""),
+                "aliases": r[3],
             }
             for r in rows
+            if r[3]  # We only return rooms that have at least one alias.
         ]
 
         defer.returnValue(ret)
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 52c37c76f5..8ea5756d61 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -40,7 +40,6 @@ class RoomMemberStore(SQLBaseStore):
         """
         try:
             target_user_id = event.state_key
-            domain = UserID.from_string(target_user_id).domain
         except:
             logger.exception(
                 "Failed to parse target_user_id=%s", target_user_id
@@ -65,42 +64,8 @@ class RoomMemberStore(SQLBaseStore):
             }
         )
 
-        # Update room hosts table
-        if event.membership == Membership.JOIN:
-            sql = (
-                "INSERT OR IGNORE INTO room_hosts (room_id, host) "
-                "VALUES (?, ?)"
-            )
-            txn.execute(sql, (event.room_id, domain))
-        elif event.membership != Membership.INVITE:
-            # Check if this was the last person to have left.
-            member_events = self._get_members_query_txn(
-                txn,
-                where_clause=("c.room_id = ? AND m.membership = ?"
-                              " AND m.user_id != ?"),
-                where_values=(event.room_id, Membership.JOIN, target_user_id,)
-            )
-
-            joined_domains = set()
-            for e in member_events:
-                try:
-                    joined_domains.add(
-                        UserID.from_string(e.state_key).domain
-                    )
-                except:
-                    # FIXME: How do we deal with invalid user ids in the db?
-                    logger.exception("Invalid user_id: %s", event.state_key)
-
-            if domain not in joined_domains:
-                sql = (
-                    "DELETE FROM room_hosts WHERE room_id = ? AND host = ?"
-                )
-
-                txn.execute(sql, (event.room_id, domain))
-
         self.get_rooms_for_user.invalidate(target_user_id)
 
-    @defer.inlineCallbacks
     def get_room_member(self, user_id, room_id):
         """Retrieve the current state of a room member.
 
@@ -110,41 +75,27 @@ class RoomMemberStore(SQLBaseStore):
         Returns:
             Deferred: Results in a MembershipEvent or None.
         """
-        rows = yield self._get_members_by_dict({
-            "e.room_id": room_id,
-            "m.user_id": user_id,
-        })
+        def f(txn):
+            events = self._get_members_events_txn(
+                txn,
+                room_id,
+                user_id=user_id,
+            )
 
-        defer.returnValue(rows[0] if rows else None)
+            return events[0] if events else None
 
-    def _get_room_member(self, txn, user_id, room_id):
-        sql = (
-            "SELECT e.* FROM events as e"
-            " INNER JOIN room_memberships as m"
-            " ON e.event_id = m.event_id"
-            " INNER JOIN current_state_events as c"
-            " ON m.event_id = c.event_id"
-            " WHERE m.user_id = ? and e.room_id = ?"
-            " LIMIT 1"
-        )
-        txn.execute(sql, (user_id, room_id))
-        rows = self.cursor_to_dict(txn)
-        if rows:
-            return self._parse_events_txn(txn, rows)[0]
-        else:
-            return None
+        return self.runInteraction("get_room_member", f)
 
     def get_users_in_room(self, room_id):
         def f(txn):
-            sql = (
-                "SELECT m.user_id FROM room_memberships as m"
-                " INNER JOIN current_state_events as c"
-                " ON m.event_id = c.event_id"
-                " WHERE m.membership = ? AND m.room_id = ?"
+
+            rows = self._get_members_rows_txn(
+                txn,
+                room_id=room_id,
+                membership=Membership.JOIN,
             )
 
-            txn.execute(sql, (Membership.JOIN, room_id))
-            return [r[0] for r in txn.fetchall()]
+            return [r["user_id"] for r in rows]
         return self.runInteraction("get_users_in_room", f)
 
     def get_room_members(self, room_id, membership=None):
@@ -159,11 +110,14 @@ class RoomMemberStore(SQLBaseStore):
             list of namedtuples representing the members in this room.
         """
 
-        where = {"m.room_id": room_id}
-        if membership:
-            where["m.membership"] = membership
+        def f(txn):
+            return self._get_members_events_txn(
+                txn,
+                room_id,
+                membership=membership,
+            )
 
-        return self._get_members_by_dict(where)
+        return self.runInteraction("get_room_members", f)
 
     def get_rooms_for_user_where_membership_is(self, user_id, membership_list):
         """ Get all the rooms for this user where the membership for this user
@@ -209,32 +163,55 @@ class RoomMemberStore(SQLBaseStore):
         ]
 
     def get_joined_hosts_for_room(self, room_id):
-        return self._simple_select_onecol(
-            "room_hosts",
-            {"room_id": room_id},
-            "host",
-            desc="get_joined_hosts_for_room",
+        return self.runInteraction(
+            "get_joined_hosts_for_room",
+            self._get_joined_hosts_for_room_txn,
+            room_id,
+        )
+
+    def _get_joined_hosts_for_room_txn(self, txn, room_id):
+        rows = self._get_members_rows_txn(
+            txn,
+            room_id, membership=Membership.JOIN
+        )
+
+        joined_domains = set(
+            UserID.from_string(r["user_id"]).domain
+            for r in rows
         )
 
-    def _get_members_by_dict(self, where_dict):
-        clause = " AND ".join("%s = ?" % k for k in where_dict.keys())
-        vals = where_dict.values()
-        return self._get_members_query(clause, vals)
+        return joined_domains
 
     def _get_members_query(self, where_clause, where_values):
         return self.runInteraction(
-            "get_members_query", self._get_members_query_txn,
+            "get_members_query", self._get_members_events_txn,
             where_clause, where_values
         )
 
-    def _get_members_query_txn(self, txn, where_clause, where_values):
+    def _get_members_events_txn(self, txn, room_id, membership=None, user_id=None):
+        rows = self._get_members_rows_txn(
+            txn,
+            room_id, membership, user_id,
+        )
+        return self._get_events_txn(txn, [r["event_id"] for r in rows])
+
+    def _get_members_rows_txn(self, txn, room_id, membership=None, user_id=None):
+        where_clause = "c.room_id = ?"
+        where_values = [room_id]
+
+        if membership:
+            where_clause += " AND m.membership = ?"
+            where_values.append(membership)
+
+        if user_id:
+            where_clause += " AND m.user_id = ?"
+            where_values.append(user_id)
+
         sql = (
-            "SELECT e.* FROM events as e "
-            "INNER JOIN room_memberships as m "
-            "ON e.event_id = m.event_id "
-            "INNER JOIN current_state_events as c "
-            "ON m.event_id = c.event_id "
-            "WHERE %(where)s "
+            "SELECT m.* FROM room_memberships as m"
+            " INNER JOIN current_state_events as c"
+            " ON m.event_id = c.event_id"
+            " WHERE %(where)s"
         ) % {
             "where": where_clause,
         }
@@ -242,8 +219,7 @@ class RoomMemberStore(SQLBaseStore):
         txn.execute(sql, where_values)
         rows = self.cursor_to_dict(txn)
 
-        results = self._parse_events_txn(txn, rows)
-        return results
+        return rows
 
     @cached()
     def get_rooms_for_user(self, user_id):
diff --git a/synapse/storage/schema/delta/12/v12.sql b/synapse/storage/schema/delta/12/v12.sql
index b87ef1fe79..878c36260a 100644
--- a/synapse/storage/schema/delta/12/v12.sql
+++ b/synapse/storage/schema/delta/12/v12.sql
@@ -17,26 +17,25 @@ CREATE TABLE IF NOT EXISTS rejections(
     event_id TEXT NOT NULL,
     reason TEXT NOT NULL,
     last_check TEXT NOT NULL,
-    CONSTRAINT ev_id UNIQUE (event_id) ON CONFLICT REPLACE
+    UNIQUE (event_id)
 );
 
 -- Push notification endpoints that users have configured
 CREATE TABLE IF NOT EXISTS pushers (
   id INTEGER PRIMARY KEY AUTOINCREMENT,
   user_name TEXT NOT NULL,
-  profile_tag varchar(32) NOT NULL,
-  kind varchar(8) NOT NULL,
-  app_id varchar(64) NOT NULL,
-  app_display_name varchar(64) NOT NULL,
-  device_display_name varchar(128) NOT NULL,
-  pushkey blob NOT NULL,
-  ts BIGINT NOT NULL,
-  lang varchar(8),
-  data blob,
+  profile_tag VARCHAR(32) NOT NULL,
+  kind VARCHAR(8) NOT NULL,
+  app_id VARCHAR(64) NOT NULL,
+  app_display_name VARCHAR(64) NOT NULL,
+  device_display_name VARCHAR(128) NOT NULL,
+  pushkey VARBINARY(512) NOT NULL,
+  ts BIGINT UNSIGNED NOT NULL,
+  lang VARCHAR(8),
+  data LONGBLOB,
   last_token TEXT,
-  last_success BIGINT,
-  failing_since BIGINT,
-  FOREIGN KEY(user_name) REFERENCES users(name),
+  last_success BIGINT UNSIGNED,
+  failing_since BIGINT UNSIGNED,
   UNIQUE (app_id, pushkey)
 );
 
@@ -55,13 +54,10 @@ CREATE INDEX IF NOT EXISTS push_rules_user_name on push_rules (user_name);
 
 CREATE TABLE IF NOT EXISTS user_filters(
   user_id TEXT,
-  filter_id INTEGER,
-  filter_json TEXT,
-  FOREIGN KEY(user_id) REFERENCES users(id)
+  filter_id BIGINT UNSIGNED,
+  filter_json LONGBLOB
 );
 
 CREATE INDEX IF NOT EXISTS user_filters_by_user_id_filter_id ON user_filters(
-  user_id, filter_id
+    user_id, filter_id
 );
-
-PRAGMA user_version = 12;
diff --git a/synapse/storage/schema/delta/13/v13.sql b/synapse/storage/schema/delta/13/v13.sql
index e491ad5aec..3265924013 100644
--- a/synapse/storage/schema/delta/13/v13.sql
+++ b/synapse/storage/schema/delta/13/v13.sql
@@ -19,16 +19,13 @@ CREATE TABLE IF NOT EXISTS application_services(
     token TEXT,
     hs_token TEXT,
     sender TEXT,
-    UNIQUE(token) ON CONFLICT ROLLBACK
+    UNIQUE(token)
 );
 
 CREATE TABLE IF NOT EXISTS application_services_regex(
     id INTEGER PRIMARY KEY AUTOINCREMENT,
-    as_id INTEGER NOT NULL,
+    as_id BIGINT UNSIGNED NOT NULL,
     namespace INTEGER,  /* enum[room_id|room_alias|user_id] */
     regex TEXT,
     FOREIGN KEY(as_id) REFERENCES application_services(id)
 );
-
-
-
diff --git a/synapse/storage/schema/delta/15/appservice_txns.sql b/synapse/storage/schema/delta/15/appservice_txns.sql
index 2b27e2a429..db2e720393 100644
--- a/synapse/storage/schema/delta/15/appservice_txns.sql
+++ b/synapse/storage/schema/delta/15/appservice_txns.sql
@@ -15,16 +15,17 @@
 
 CREATE TABLE IF NOT EXISTS application_services_state(
     as_id TEXT PRIMARY KEY,
-    state TEXT,
-    last_txn TEXT
+    state VARCHAR(5),
+    last_txn INTEGER
 );
 
 CREATE TABLE IF NOT EXISTS application_services_txns(
     as_id TEXT NOT NULL,
     txn_id INTEGER NOT NULL,
     event_ids TEXT NOT NULL,
-    UNIQUE(as_id, txn_id) ON CONFLICT ROLLBACK
+    UNIQUE(as_id, txn_id)
 );
 
-
-
+CREATE INDEX IF NOT EXISTS application_services_txns_id ON application_services_txns (
+    as_id
+);
diff --git a/synapse/storage/schema/delta/15/presence_indices.sql b/synapse/storage/schema/delta/15/presence_indices.sql
new file mode 100644
index 0000000000..6b8d0f1ca7
--- /dev/null
+++ b/synapse/storage/schema/delta/15/presence_indices.sql
@@ -0,0 +1,2 @@
+
+CREATE INDEX IF NOT EXISTS presence_list_user_id ON presence_list (user_id);
diff --git a/synapse/storage/schema/delta/15/v15.sql b/synapse/storage/schema/delta/15/v15.sql
new file mode 100644
index 0000000000..f5b2a08ca4
--- /dev/null
+++ b/synapse/storage/schema/delta/15/v15.sql
@@ -0,0 +1,25 @@
+-- Drop, copy & recreate pushers table to change unique key
+-- Also add access_token column at the same time
+CREATE TABLE IF NOT EXISTS pushers2 (
+  id INTEGER PRIMARY KEY AUTOINCREMENT,
+  user_name TEXT NOT NULL,
+  access_token INTEGER DEFAULT NULL,
+  profile_tag varchar(32) NOT NULL,
+  kind varchar(8) NOT NULL,
+  app_id varchar(64) NOT NULL,
+  app_display_name varchar(64) NOT NULL,
+  device_display_name varchar(128) NOT NULL,
+  pushkey blob NOT NULL,
+  ts BIGINT NOT NULL,
+  lang varchar(8),
+  data blob,
+  last_token TEXT,
+  last_success BIGINT,
+  failing_since BIGINT,
+  FOREIGN KEY(user_name) REFERENCES users(name),
+  UNIQUE (app_id, pushkey, user_name)
+);
+INSERT INTO pushers2 (id, user_name, profile_tag, kind, app_id, app_display_name, device_display_name, pushkey, ts, lang, data, last_token, last_success, failing_since)
+  SELECT id, user_name, profile_tag, kind, app_id, app_display_name, device_display_name, pushkey, ts, lang, data, last_token, last_success, failing_since FROM pushers;
+DROP TABLE pushers;
+ALTER TABLE pushers2 RENAME TO pushers;
diff --git a/synapse/storage/schema/delta/16/events_order_index.sql b/synapse/storage/schema/delta/16/events_order_index.sql
new file mode 100644
index 0000000000..a48f215170
--- /dev/null
+++ b/synapse/storage/schema/delta/16/events_order_index.sql
@@ -0,0 +1,4 @@
+CREATE INDEX events_order ON events (topological_ordering, stream_ordering);
+CREATE INDEX events_order_room ON events (
+    room_id, topological_ordering, stream_ordering
+);
diff --git a/synapse/storage/schema/delta/16/remote_media_cache_index.sql b/synapse/storage/schema/delta/16/remote_media_cache_index.sql
new file mode 100644
index 0000000000..7a15265cb1
--- /dev/null
+++ b/synapse/storage/schema/delta/16/remote_media_cache_index.sql
@@ -0,0 +1,2 @@
+CREATE INDEX IF NOT EXISTS remote_media_cache_thumbnails_media_id
+    ON remote_media_cache_thumbnails (media_id);
\ No newline at end of file
diff --git a/synapse/storage/schema/delta/16/remove_duplicates.sql b/synapse/storage/schema/delta/16/remove_duplicates.sql
new file mode 100644
index 0000000000..65c97b5e2f
--- /dev/null
+++ b/synapse/storage/schema/delta/16/remove_duplicates.sql
@@ -0,0 +1,9 @@
+
+
+DELETE FROM event_to_state_groups WHERE state_group not in (
+    SELECT MAX(state_group) FROM event_to_state_groups GROUP BY event_id
+);
+
+DELETE FROM event_to_state_groups WHERE rowid not in (
+    SELECT MIN(rowid) FROM event_to_state_groups GROUP BY event_id
+);
diff --git a/synapse/storage/schema/delta/16/room_alias_index.sql b/synapse/storage/schema/delta/16/room_alias_index.sql
new file mode 100644
index 0000000000..f82486132b
--- /dev/null
+++ b/synapse/storage/schema/delta/16/room_alias_index.sql
@@ -0,0 +1,3 @@
+
+CREATE INDEX IF NOT EXISTS room_aliases_id ON room_aliases(room_id);
+CREATE INDEX IF NOT EXISTS room_alias_servers_alias ON room_alias_servers(room_alias);
diff --git a/synapse/storage/schema/delta/16/unique_constraints.sql b/synapse/storage/schema/delta/16/unique_constraints.sql
new file mode 100644
index 0000000000..fecf11118c
--- /dev/null
+++ b/synapse/storage/schema/delta/16/unique_constraints.sql
@@ -0,0 +1,80 @@
+
+-- We can use SQLite features here, since other db support was only added in v16
+
+--
+DELETE FROM current_state_events WHERE rowid not in (
+    SELECT MIN(rowid) FROM current_state_events GROUP BY event_id
+);
+
+DROP INDEX IF EXISTS current_state_events_event_id;
+CREATE UNIQUE INDEX current_state_events_event_id ON current_state_events(event_id);
+
+--
+DELETE FROM room_memberships WHERE rowid not in (
+    SELECT MIN(rowid) FROM room_memberships GROUP BY event_id
+);
+
+DROP INDEX IF EXISTS room_memberships_event_id;
+CREATE UNIQUE INDEX room_memberships_event_id ON room_memberships(event_id);
+
+--
+DELETE FROM feedback WHERE rowid not in (
+    SELECT MIN(rowid) FROM feedback GROUP BY event_id
+);
+
+DROP INDEX IF EXISTS feedback_event_id;
+CREATE UNIQUE INDEX feedback_event_id ON feedback(event_id);
+
+--
+DELETE FROM topics WHERE rowid not in (
+    SELECT MIN(rowid) FROM topics GROUP BY event_id
+);
+
+DROP INDEX IF EXISTS topics_event_id;
+CREATE UNIQUE INDEX topics_event_id ON topics(event_id);
+
+--
+DELETE FROM room_names WHERE rowid not in (
+    SELECT MIN(rowid) FROM room_names GROUP BY event_id
+);
+
+DROP INDEX IF EXISTS room_names_id;
+CREATE UNIQUE INDEX room_names_id ON room_names(event_id);
+
+--
+DELETE FROM presence WHERE rowid not in (
+    SELECT MIN(rowid) FROM presence GROUP BY user_id
+);
+
+DROP INDEX IF EXISTS presence_id;
+CREATE UNIQUE INDEX presence_id ON presence(user_id);
+
+--
+DELETE FROM presence_allow_inbound WHERE rowid not in (
+    SELECT MIN(rowid) FROM presence_allow_inbound
+    GROUP BY observed_user_id, observer_user_id
+);
+
+DROP INDEX IF EXISTS presence_allow_inbound_observers;
+CREATE UNIQUE INDEX presence_allow_inbound_observers ON presence_allow_inbound(
+    observed_user_id, observer_user_id
+);
+
+--
+DELETE FROM presence_list WHERE rowid not in (
+    SELECT MIN(rowid) FROM presence_list
+    GROUP BY user_id, observed_user_id
+);
+
+DROP INDEX IF EXISTS presence_list_observers;
+CREATE UNIQUE INDEX presence_list_observers ON presence_list(
+    user_id, observed_user_id
+);
+
+--
+DELETE FROM room_aliases WHERE rowid not in (
+    SELECT MIN(rowid) FROM room_aliases GROUP BY room_alias
+);
+
+DROP INDEX IF EXISTS room_aliases_id;
+CREATE INDEX room_aliases_id ON room_aliases(room_id);
diff --git a/synapse/storage/schema/delta/16/users.sql b/synapse/storage/schema/delta/16/users.sql
new file mode 100644
index 0000000000..cd0709250d
--- /dev/null
+++ b/synapse/storage/schema/delta/16/users.sql
@@ -0,0 +1,56 @@
+-- Convert `access_tokens`.user from rowids to user strings.
+-- MUST BE DONE BEFORE REMOVING ID COLUMN FROM USERS TABLE BELOW
+CREATE TABLE IF NOT EXISTS new_access_tokens(
+    id BIGINT UNSIGNED PRIMARY KEY,
+    user_id TEXT NOT NULL,
+    device_id TEXT,
+    token TEXT NOT NULL,
+    last_used BIGINT UNSIGNED,
+    UNIQUE(token)
+);
+
+INSERT INTO new_access_tokens
+    SELECT a.id, u.name, a.device_id, a.token, a.last_used
+    FROM access_tokens as a
+    INNER JOIN users as u ON u.id = a.user_id;
+
+DROP TABLE access_tokens;
+
+ALTER TABLE new_access_tokens RENAME TO access_tokens;
+
+-- Remove ID column from `users` table
+CREATE TABLE IF NOT EXISTS new_users(
+    name TEXT,
+    password_hash TEXT,
+    creation_ts BIGINT UNSIGNED,
+    admin BOOL DEFAULT 0 NOT NULL,
+    UNIQUE(name)
+);
+
+INSERT INTO new_users SELECT name, password_hash, creation_ts, admin FROM users;
+
+DROP TABLE users;
+
+ALTER TABLE new_users RENAME TO users;
+
+
+-- Remove UNIQUE constraint from `user_ips` table
+CREATE TABLE IF NOT EXISTS new_user_ips (
+    user_id TEXT NOT NULL,
+    access_token TEXT NOT NULL,
+    device_id TEXT,
+    ip TEXT NOT NULL,
+    user_agent TEXT NOT NULL,
+    last_seen BIGINT UNSIGNED NOT NULL
+);
+
+INSERT INTO new_user_ips
+    SELECT user, access_token, device_id, ip, user_agent, last_seen FROM user_ips;
+
+DROP TABLE user_ips;
+
+ALTER TABLE new_user_ips RENAME TO user_ips;
+
+CREATE INDEX IF NOT EXISTS user_ips_user ON user_ips(user_id);
+CREATE INDEX IF NOT EXISTS user_ips_user_ip ON user_ips(user_id, access_token, ip);
+
diff --git a/synapse/storage/schema/full_schemas/11/event_edges.sql b/synapse/storage/schema/full_schemas/11/event_edges.sql
index 1e766d6db2..f7020f7793 100644
--- a/synapse/storage/schema/full_schemas/11/event_edges.sql
+++ b/synapse/storage/schema/full_schemas/11/event_edges.sql
@@ -16,52 +16,52 @@
 CREATE TABLE IF NOT EXISTS event_forward_extremities(
     event_id TEXT NOT NULL,
     room_id TEXT NOT NULL,
-    CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE
+    UNIQUE (event_id, room_id)
 );
 
-CREATE INDEX IF NOT EXISTS ev_extrem_room ON event_forward_extremities(room_id);
-CREATE INDEX IF NOT EXISTS ev_extrem_id ON event_forward_extremities(event_id);
+CREATE INDEX ev_extrem_room ON event_forward_extremities(room_id);
+CREATE INDEX ev_extrem_id ON event_forward_extremities(event_id);
 
 
 CREATE TABLE IF NOT EXISTS event_backward_extremities(
     event_id TEXT NOT NULL,
     room_id TEXT NOT NULL,
-    CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE
+    UNIQUE (event_id, room_id)
 );
 
-CREATE INDEX IF NOT EXISTS ev_b_extrem_room ON event_backward_extremities(room_id);
-CREATE INDEX IF NOT EXISTS ev_b_extrem_id ON event_backward_extremities(event_id);
+CREATE INDEX ev_b_extrem_room ON event_backward_extremities(room_id);
+CREATE INDEX ev_b_extrem_id ON event_backward_extremities(event_id);
 
 
 CREATE TABLE IF NOT EXISTS event_edges(
     event_id TEXT NOT NULL,
     prev_event_id TEXT NOT NULL,
     room_id TEXT NOT NULL,
-    is_state INTEGER NOT NULL,
-    CONSTRAINT uniqueness UNIQUE (event_id, prev_event_id, room_id, is_state)
+    is_state BOOL NOT NULL,
+    UNIQUE (event_id, prev_event_id, room_id, is_state)
 );
 
-CREATE INDEX IF NOT EXISTS ev_edges_id ON event_edges(event_id);
-CREATE INDEX IF NOT EXISTS ev_edges_prev_id ON event_edges(prev_event_id);
+CREATE INDEX ev_edges_id ON event_edges(event_id);
+CREATE INDEX ev_edges_prev_id ON event_edges(prev_event_id);
 
 
 CREATE TABLE IF NOT EXISTS room_depth(
     room_id TEXT NOT NULL,
     min_depth INTEGER NOT NULL,
-    CONSTRAINT uniqueness UNIQUE (room_id)
+    UNIQUE (room_id)
 );
 
-CREATE INDEX IF NOT EXISTS room_depth_room ON room_depth(room_id);
+CREATE INDEX room_depth_room ON room_depth(room_id);
 
 
 create TABLE IF NOT EXISTS event_destinations(
     event_id TEXT NOT NULL,
     destination TEXT NOT NULL,
-    delivered_ts INTEGER DEFAULT 0, -- or 0 if not delivered
-    CONSTRAINT uniqueness UNIQUE (event_id, destination) ON CONFLICT REPLACE
+    delivered_ts BIGINT DEFAULT 0, -- or 0 if not delivered
+    UNIQUE (event_id, destination)
 );
 
-CREATE INDEX IF NOT EXISTS event_destinations_id ON event_destinations(event_id);
+CREATE INDEX event_destinations_id ON event_destinations(event_id);
 
 
 CREATE TABLE IF NOT EXISTS state_forward_extremities(
@@ -69,21 +69,21 @@ CREATE TABLE IF NOT EXISTS state_forward_extremities(
     room_id TEXT NOT NULL,
     type TEXT NOT NULL,
     state_key TEXT NOT NULL,
-    CONSTRAINT uniqueness UNIQUE (event_id, room_id) ON CONFLICT REPLACE
+    UNIQUE (event_id, room_id)
 );
 
-CREATE INDEX IF NOT EXISTS st_extrem_keys ON state_forward_extremities(
+CREATE INDEX st_extrem_keys ON state_forward_extremities(
     room_id, type, state_key
 );
-CREATE INDEX IF NOT EXISTS st_extrem_id ON state_forward_extremities(event_id);
+CREATE INDEX st_extrem_id ON state_forward_extremities(event_id);
 
 
 CREATE TABLE IF NOT EXISTS event_auth(
     event_id TEXT NOT NULL,
     auth_id TEXT NOT NULL,
     room_id TEXT NOT NULL,
-    CONSTRAINT uniqueness UNIQUE (event_id, auth_id, room_id)
+    UNIQUE (event_id, auth_id, room_id)
 );
 
-CREATE INDEX IF NOT EXISTS evauth_edges_id ON event_auth(event_id);
-CREATE INDEX IF NOT EXISTS evauth_edges_auth_id ON event_auth(auth_id);
\ No newline at end of file
+CREATE INDEX evauth_edges_id ON event_auth(event_id);
+CREATE INDEX evauth_edges_auth_id ON event_auth(auth_id);
diff --git a/synapse/storage/schema/full_schemas/11/event_signatures.sql b/synapse/storage/schema/full_schemas/11/event_signatures.sql
index c28c39c48a..636b2d3353 100644
--- a/synapse/storage/schema/full_schemas/11/event_signatures.sql
+++ b/synapse/storage/schema/full_schemas/11/event_signatures.sql
@@ -16,50 +16,40 @@
 CREATE TABLE IF NOT EXISTS event_content_hashes (
     event_id TEXT,
     algorithm TEXT,
-    hash BLOB,
-    CONSTRAINT uniqueness UNIQUE (event_id, algorithm)
+    hash bytea,
+    UNIQUE (event_id, algorithm)
 );
 
-CREATE INDEX IF NOT EXISTS event_content_hashes_id ON event_content_hashes(
-    event_id
-);
+CREATE INDEX event_content_hashes_id ON event_content_hashes(event_id);
 
 
 CREATE TABLE IF NOT EXISTS event_reference_hashes (
     event_id TEXT,
     algorithm TEXT,
-    hash BLOB,
-    CONSTRAINT uniqueness UNIQUE (event_id, algorithm)
+    hash bytea,
+    UNIQUE (event_id, algorithm)
 );
 
-CREATE INDEX IF NOT EXISTS event_reference_hashes_id ON event_reference_hashes (
-    event_id
-);
+CREATE INDEX event_reference_hashes_id ON event_reference_hashes(event_id);
 
 
 CREATE TABLE IF NOT EXISTS event_signatures (
     event_id TEXT,
     signature_name TEXT,
     key_id TEXT,
-    signature BLOB,
-    CONSTRAINT uniqueness UNIQUE (event_id, signature_name, key_id)
+    signature bytea,
+    UNIQUE (event_id, signature_name, key_id)
 );
 
-CREATE INDEX IF NOT EXISTS event_signatures_id ON event_signatures (
-    event_id
-);
+CREATE INDEX event_signatures_id ON event_signatures(event_id);
 
 
 CREATE TABLE IF NOT EXISTS event_edge_hashes(
     event_id TEXT,
     prev_event_id TEXT,
     algorithm TEXT,
-    hash BLOB,
-    CONSTRAINT uniqueness UNIQUE (
-        event_id, prev_event_id, algorithm
-    )
+    hash bytea,
+    UNIQUE (event_id, prev_event_id, algorithm)
 );
 
-CREATE INDEX IF NOT EXISTS event_edge_hashes_id ON event_edge_hashes(
-    event_id
-);
+CREATE INDEX event_edge_hashes_id ON event_edge_hashes(event_id);
diff --git a/synapse/storage/schema/full_schemas/11/im.sql b/synapse/storage/schema/full_schemas/11/im.sql
index dd00c1cd2f..1901654ac2 100644
--- a/synapse/storage/schema/full_schemas/11/im.sql
+++ b/synapse/storage/schema/full_schemas/11/im.sql
@@ -15,7 +15,7 @@
 
 CREATE TABLE IF NOT EXISTS events(
     stream_ordering INTEGER PRIMARY KEY AUTOINCREMENT,
-    topological_ordering INTEGER NOT NULL,
+    topological_ordering BIGINT NOT NULL,
     event_id TEXT NOT NULL,
     type TEXT NOT NULL,
     room_id TEXT NOT NULL,
@@ -23,26 +23,24 @@ CREATE TABLE IF NOT EXISTS events(
     unrecognized_keys TEXT,
     processed BOOL NOT NULL,
     outlier BOOL NOT NULL,
-    depth INTEGER DEFAULT 0 NOT NULL,
-    CONSTRAINT ev_uniq UNIQUE (event_id)
+    depth BIGINT DEFAULT 0 NOT NULL,
+    UNIQUE (event_id)
 );
 
-CREATE INDEX IF NOT EXISTS events_event_id ON events (event_id);
-CREATE INDEX IF NOT EXISTS events_stream_ordering ON events (stream_ordering);
-CREATE INDEX IF NOT EXISTS events_topological_ordering ON events (topological_ordering);
-CREATE INDEX IF NOT EXISTS events_room_id ON events (room_id);
+CREATE INDEX events_stream_ordering ON events (stream_ordering);
+CREATE INDEX events_topological_ordering ON events (topological_ordering);
+CREATE INDEX events_room_id ON events (room_id);
 
 
 CREATE TABLE IF NOT EXISTS event_json(
     event_id TEXT NOT NULL,
     room_id TEXT NOT NULL,
-    internal_metadata NOT NULL,
-    json BLOB NOT NULL,
-    CONSTRAINT ev_j_uniq UNIQUE (event_id)
+    internal_metadata TEXT NOT NULL,
+    json TEXT NOT NULL,
+    UNIQUE (event_id)
 );
 
-CREATE INDEX IF NOT EXISTS event_json_id ON event_json(event_id);
-CREATE INDEX IF NOT EXISTS event_json_room_id ON event_json(room_id);
+CREATE INDEX event_json_room_id ON event_json(room_id);
 
 
 CREATE TABLE IF NOT EXISTS state_events(
@@ -50,13 +48,13 @@ CREATE TABLE IF NOT EXISTS state_events(
     room_id TEXT NOT NULL,
     type TEXT NOT NULL,
     state_key TEXT NOT NULL,
-    prev_state TEXT
+    prev_state TEXT,
+    UNIQUE (event_id)
 );
 
-CREATE UNIQUE INDEX IF NOT EXISTS state_events_event_id ON state_events (event_id);
-CREATE INDEX IF NOT EXISTS state_events_room_id ON state_events (room_id);
-CREATE INDEX IF NOT EXISTS state_events_type ON state_events (type);
-CREATE INDEX IF NOT EXISTS state_events_state_key ON state_events (state_key);
+CREATE INDEX state_events_room_id ON state_events (room_id);
+CREATE INDEX state_events_type ON state_events (type);
+CREATE INDEX state_events_state_key ON state_events (state_key);
 
 
 CREATE TABLE IF NOT EXISTS current_state_events(
@@ -64,13 +62,13 @@ CREATE TABLE IF NOT EXISTS current_state_events(
     room_id TEXT NOT NULL,
     type TEXT NOT NULL,
     state_key TEXT NOT NULL,
-    CONSTRAINT curr_uniq UNIQUE (room_id, type, state_key) ON CONFLICT REPLACE
+    UNIQUE (room_id, type, state_key)
 );
 
-CREATE INDEX IF NOT EXISTS curr_events_event_id ON current_state_events (event_id);
-CREATE INDEX IF NOT EXISTS current_state_events_room_id ON current_state_events (room_id);
-CREATE INDEX IF NOT EXISTS current_state_events_type ON current_state_events (type);
-CREATE INDEX IF NOT EXISTS current_state_events_state_key ON current_state_events (state_key);
+CREATE INDEX curr_events_event_id ON current_state_events (event_id);
+CREATE INDEX current_state_events_room_id ON current_state_events (room_id);
+CREATE INDEX current_state_events_type ON current_state_events (type);
+CREATE INDEX current_state_events_state_key ON current_state_events (state_key);
 
 CREATE TABLE IF NOT EXISTS room_memberships(
     event_id TEXT NOT NULL,
@@ -80,9 +78,9 @@ CREATE TABLE IF NOT EXISTS room_memberships(
     membership TEXT NOT NULL
 );
 
-CREATE INDEX IF NOT EXISTS room_memberships_event_id ON room_memberships (event_id);
-CREATE INDEX IF NOT EXISTS room_memberships_room_id ON room_memberships (room_id);
-CREATE INDEX IF NOT EXISTS room_memberships_user_id ON room_memberships (user_id);
+CREATE INDEX room_memberships_event_id ON room_memberships (event_id);
+CREATE INDEX room_memberships_room_id ON room_memberships (room_id);
+CREATE INDEX room_memberships_user_id ON room_memberships (user_id);
 
 CREATE TABLE IF NOT EXISTS feedback(
     event_id TEXT NOT NULL,
@@ -98,8 +96,8 @@ CREATE TABLE IF NOT EXISTS topics(
     topic TEXT NOT NULL
 );
 
-CREATE INDEX IF NOT EXISTS topics_event_id ON topics(event_id);
-CREATE INDEX IF NOT EXISTS topics_room_id ON topics(room_id);
+CREATE INDEX topics_event_id ON topics(event_id);
+CREATE INDEX topics_room_id ON topics(room_id);
 
 CREATE TABLE IF NOT EXISTS room_names(
     event_id TEXT NOT NULL,
@@ -107,19 +105,19 @@ CREATE TABLE IF NOT EXISTS room_names(
     name TEXT NOT NULL
 );
 
-CREATE INDEX IF NOT EXISTS room_names_event_id ON room_names(event_id);
-CREATE INDEX IF NOT EXISTS room_names_room_id ON room_names(room_id);
+CREATE INDEX room_names_event_id ON room_names(event_id);
+CREATE INDEX room_names_room_id ON room_names(room_id);
 
 CREATE TABLE IF NOT EXISTS rooms(
     room_id TEXT PRIMARY KEY NOT NULL,
-    is_public INTEGER,
+    is_public BOOL,
     creator TEXT
 );
 
 CREATE TABLE IF NOT EXISTS room_hosts(
     room_id TEXT NOT NULL,
     host TEXT NOT NULL,
-    CONSTRAINT room_hosts_uniq UNIQUE (room_id, host) ON CONFLICT IGNORE
+    UNIQUE (room_id, host)
 );
 
-CREATE INDEX IF NOT EXISTS room_hosts_room_id ON room_hosts (room_id);
+CREATE INDEX room_hosts_room_id ON room_hosts (room_id);
diff --git a/synapse/storage/schema/full_schemas/11/keys.sql b/synapse/storage/schema/full_schemas/11/keys.sql
index a9e0a4fe0d..afc142045e 100644
--- a/synapse/storage/schema/full_schemas/11/keys.sql
+++ b/synapse/storage/schema/full_schemas/11/keys.sql
@@ -16,16 +16,16 @@ CREATE TABLE IF NOT EXISTS server_tls_certificates(
   server_name TEXT, -- Server name.
   fingerprint TEXT, -- Certificate fingerprint.
   from_server TEXT, -- Which key server the certificate was fetched from.
-  ts_added_ms INTEGER, -- When the certifcate was added.
-  tls_certificate BLOB, -- DER encoded x509 certificate.
-  CONSTRAINT uniqueness UNIQUE (server_name, fingerprint)
+  ts_added_ms BIGINT, -- When the certifcate was added.
+  tls_certificate bytea, -- DER encoded x509 certificate.
+  UNIQUE (server_name, fingerprint)
 );
 
 CREATE TABLE IF NOT EXISTS server_signature_keys(
   server_name TEXT, -- Server name.
   key_id TEXT, -- Key version.
   from_server TEXT, -- Which key server the key was fetched form.
-  ts_added_ms INTEGER, -- When the key was added.
-  verify_key BLOB, -- NACL verification key.
-  CONSTRAINT uniqueness UNIQUE (server_name, key_id)
+  ts_added_ms BIGINT, -- When the key was added.
+  verify_key bytea, -- NACL verification key.
+  UNIQUE (server_name, key_id)
 );
diff --git a/synapse/storage/schema/full_schemas/11/media_repository.sql b/synapse/storage/schema/full_schemas/11/media_repository.sql
index afdf48cbfb..e927e581d1 100644
--- a/synapse/storage/schema/full_schemas/11/media_repository.sql
+++ b/synapse/storage/schema/full_schemas/11/media_repository.sql
@@ -17,10 +17,10 @@ CREATE TABLE IF NOT EXISTS local_media_repository (
     media_id TEXT, -- The id used to refer to the media.
     media_type TEXT, -- The MIME-type of the media.
     media_length INTEGER, -- Length of the media in bytes.
-    created_ts INTEGER, -- When the content was uploaded in ms.
+    created_ts BIGINT, -- When the content was uploaded in ms.
     upload_name TEXT, -- The name the media was uploaded with.
     user_id TEXT, -- The user who uploaded the file.
-    CONSTRAINT uniqueness UNIQUE (media_id)
+    UNIQUE (media_id)
 );
 
 CREATE TABLE IF NOT EXISTS local_media_repository_thumbnails (
@@ -30,23 +30,23 @@ CREATE TABLE IF NOT EXISTS local_media_repository_thumbnails (
     thumbnail_type TEXT, -- The MIME-type of the thumbnail.
     thumbnail_method TEXT, -- The method used to make the thumbnail.
     thumbnail_length INTEGER, -- The length of the thumbnail in bytes.
-    CONSTRAINT uniqueness UNIQUE (
+    UNIQUE (
         media_id, thumbnail_width, thumbnail_height, thumbnail_type
     )
 );
 
-CREATE INDEX IF NOT EXISTS local_media_repository_thumbnails_media_id
+CREATE INDEX local_media_repository_thumbnails_media_id
     ON local_media_repository_thumbnails (media_id);
 
 CREATE TABLE IF NOT EXISTS remote_media_cache (
     media_origin TEXT, -- The remote HS the media came from.
     media_id TEXT, -- The id used to refer to the media on that server.
     media_type TEXT, -- The MIME-type of the media.
-    created_ts INTEGER, -- When the content was uploaded in ms.
+    created_ts BIGINT, -- When the content was uploaded in ms.
     upload_name TEXT, -- The name the media was uploaded with.
     media_length INTEGER, -- Length of the media in bytes.
     filesystem_id TEXT, -- The name used to store the media on disk.
-    CONSTRAINT uniqueness UNIQUE (media_origin, media_id)
+    UNIQUE (media_origin, media_id)
 );
 
 CREATE TABLE IF NOT EXISTS remote_media_cache_thumbnails (
@@ -58,11 +58,8 @@ CREATE TABLE IF NOT EXISTS remote_media_cache_thumbnails (
     thumbnail_type TEXT, -- The MIME-type of the thumbnail.
     thumbnail_length INTEGER, -- The length of the thumbnail in bytes.
     filesystem_id TEXT, -- The name used to store the media on disk.
-    CONSTRAINT uniqueness UNIQUE (
+    UNIQUE (
         media_origin, media_id, thumbnail_width, thumbnail_height,
-        thumbnail_type, thumbnail_type
-    )
+        thumbnail_type
+     )
 );
-
-CREATE INDEX IF NOT EXISTS remote_media_cache_thumbnails_media_id
-    ON local_media_repository_thumbnails (media_id);
diff --git a/synapse/storage/schema/full_schemas/11/presence.sql b/synapse/storage/schema/full_schemas/11/presence.sql
index f9f8db9697..d8d82e9fe3 100644
--- a/synapse/storage/schema/full_schemas/11/presence.sql
+++ b/synapse/storage/schema/full_schemas/11/presence.sql
@@ -13,26 +13,23 @@
  * limitations under the License.
  */
 CREATE TABLE IF NOT EXISTS presence(
-  user_id INTEGER NOT NULL,
-  state INTEGER,
+  user_id TEXT NOT NULL,
+  state VARCHAR(20),
   status_msg TEXT,
-  mtime INTEGER, -- miliseconds since last state change
-  FOREIGN KEY(user_id) REFERENCES users(id)
+  mtime BIGINT -- miliseconds since last state change
 );
 
 -- For each of /my/ users which possibly-remote users are allowed to see their
 -- presence state
 CREATE TABLE IF NOT EXISTS presence_allow_inbound(
-  observed_user_id INTEGER NOT NULL,
-  observer_user_id TEXT, -- a UserID,
-  FOREIGN KEY(observed_user_id) REFERENCES users(id)
+  observed_user_id TEXT NOT NULL,
+  observer_user_id TEXT NOT NULL -- a UserID,
 );
 
 -- For each of /my/ users (watcher), which possibly-remote users are they
 -- watching?
 CREATE TABLE IF NOT EXISTS presence_list(
-  user_id INTEGER NOT NULL,
-  observed_user_id TEXT, -- a UserID,
-  accepted BOOLEAN,
-  FOREIGN KEY(user_id) REFERENCES users(id)
+  user_id TEXT NOT NULL,
+  observed_user_id TEXT NOT NULL, -- a UserID,
+  accepted BOOLEAN NOT NULL
 );
diff --git a/synapse/storage/schema/full_schemas/11/profiles.sql b/synapse/storage/schema/full_schemas/11/profiles.sql
index f06a528b4d..26e4204437 100644
--- a/synapse/storage/schema/full_schemas/11/profiles.sql
+++ b/synapse/storage/schema/full_schemas/11/profiles.sql
@@ -13,8 +13,7 @@
  * limitations under the License.
  */
 CREATE TABLE IF NOT EXISTS profiles(
-    user_id INTEGER NOT NULL,
+    user_id TEXT NOT NULL,
     displayname TEXT,
-    avatar_url TEXT,
-    FOREIGN KEY(user_id) REFERENCES users(id)
+    avatar_url TEXT
 );
diff --git a/synapse/storage/schema/full_schemas/11/redactions.sql b/synapse/storage/schema/full_schemas/11/redactions.sql
index 5011d95db8..69621955d4 100644
--- a/synapse/storage/schema/full_schemas/11/redactions.sql
+++ b/synapse/storage/schema/full_schemas/11/redactions.sql
@@ -15,8 +15,8 @@
 CREATE TABLE IF NOT EXISTS redactions (
     event_id TEXT NOT NULL,
     redacts TEXT NOT NULL,
-    CONSTRAINT ev_uniq UNIQUE (event_id)
+    UNIQUE (event_id)
 );
 
-CREATE INDEX IF NOT EXISTS redactions_event_id ON redactions (event_id);
-CREATE INDEX IF NOT EXISTS redactions_redacts ON redactions (redacts);
+CREATE INDEX redactions_event_id ON redactions (event_id);
+CREATE INDEX redactions_redacts ON redactions (redacts);
diff --git a/synapse/storage/schema/full_schemas/11/room_aliases.sql b/synapse/storage/schema/full_schemas/11/room_aliases.sql
index 0d2df01603..5027b1e3f6 100644
--- a/synapse/storage/schema/full_schemas/11/room_aliases.sql
+++ b/synapse/storage/schema/full_schemas/11/room_aliases.sql
@@ -22,6 +22,3 @@ CREATE TABLE IF NOT EXISTS room_alias_servers(
     room_alias TEXT NOT NULL,
     server TEXT NOT NULL
 );
-
-
-
diff --git a/synapse/storage/schema/full_schemas/11/state.sql b/synapse/storage/schema/full_schemas/11/state.sql
index 1fe8f1e430..ffd164ab71 100644
--- a/synapse/storage/schema/full_schemas/11/state.sql
+++ b/synapse/storage/schema/full_schemas/11/state.sql
@@ -30,18 +30,11 @@ CREATE TABLE IF NOT EXISTS state_groups_state(
 CREATE TABLE IF NOT EXISTS event_to_state_groups(
     event_id TEXT NOT NULL,
     state_group INTEGER NOT NULL,
-    CONSTRAINT event_to_state_groups_uniq UNIQUE (event_id)
+    UNIQUE (event_id)
 );
 
-CREATE INDEX IF NOT EXISTS state_groups_id ON state_groups(id);
+CREATE INDEX state_groups_id ON state_groups(id);
 
-CREATE INDEX IF NOT EXISTS state_groups_state_id ON state_groups_state(
-    state_group
-);
-CREATE INDEX IF NOT EXISTS state_groups_state_tuple ON state_groups_state(
-    room_id, type, state_key
-);
-
-CREATE INDEX IF NOT EXISTS event_to_state_groups_id ON event_to_state_groups(
-    event_id
-);
\ No newline at end of file
+CREATE INDEX state_groups_state_id ON state_groups_state(state_group);
+CREATE INDEX state_groups_state_tuple ON state_groups_state(room_id, type, state_key);
+CREATE INDEX event_to_state_groups_id ON event_to_state_groups(event_id);
diff --git a/synapse/storage/schema/full_schemas/11/transactions.sql b/synapse/storage/schema/full_schemas/11/transactions.sql
index 2d30f99b06..cc5b54f5aa 100644
--- a/synapse/storage/schema/full_schemas/11/transactions.sql
+++ b/synapse/storage/schema/full_schemas/11/transactions.sql
@@ -14,17 +14,16 @@
  */
 -- Stores what transaction ids we have received and what our response was
 CREATE TABLE IF NOT EXISTS received_transactions(
-    transaction_id TEXT, 
-    origin TEXT, 
-    ts INTEGER,
+    transaction_id TEXT,
+    origin TEXT,
+    ts BIGINT,
     response_code INTEGER,
-    response_json TEXT,
-    has_been_referenced BOOL default 0, -- Whether thishas been referenced by a prev_tx
-    CONSTRAINT uniquesss UNIQUE (transaction_id, origin) ON CONFLICT REPLACE
+    response_json bytea,
+    has_been_referenced SMALLINT DEFAULT 0, -- Whether thishas been referenced by a prev_tx
+    UNIQUE (transaction_id, origin)
 );
 
-CREATE UNIQUE INDEX IF NOT EXISTS transactions_txid ON received_transactions(transaction_id, origin);
-CREATE INDEX IF NOT EXISTS transactions_have_ref ON received_transactions(origin, has_been_referenced);-- WHERE has_been_referenced = 0;
+CREATE INDEX transactions_have_ref ON received_transactions(origin, has_been_referenced);-- WHERE has_been_referenced = 0;
 
 
 -- Stores what transactions we've sent, what their response was (if we got one) and whether we have
@@ -35,17 +34,14 @@ CREATE TABLE IF NOT EXISTS sent_transactions(
     destination TEXT,
     response_code INTEGER DEFAULT 0,
     response_json TEXT,
-    ts INTEGER
+    ts BIGINT
 );
 
-CREATE INDEX IF NOT EXISTS sent_transaction_dest ON sent_transactions(destination);
-CREATE INDEX IF NOT EXISTS sent_transaction_dest_referenced ON sent_transactions(
-    destination
-);
-CREATE INDEX IF NOT EXISTS sent_transaction_txn_id ON sent_transactions(transaction_id);
+CREATE INDEX sent_transaction_dest ON sent_transactions(destination);
+CREATE INDEX sent_transaction_txn_id ON sent_transactions(transaction_id);
 -- So that we can do an efficient look up of all transactions that have yet to be successfully
 -- sent.
-CREATE INDEX IF NOT EXISTS sent_transaction_sent ON sent_transactions(response_code);
+CREATE INDEX sent_transaction_sent ON sent_transactions(response_code);
 
 
 -- For sent transactions only.
@@ -56,13 +52,12 @@ CREATE TABLE IF NOT EXISTS transaction_id_to_pdu(
     pdu_origin TEXT
 );
 
-CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_tx ON transaction_id_to_pdu(transaction_id, destination);
-CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination);
-CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_index ON transaction_id_to_pdu(transaction_id, destination);
+CREATE INDEX transaction_id_to_pdu_tx ON transaction_id_to_pdu(transaction_id, destination);
+CREATE INDEX transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination);
 
 -- To track destination health
 CREATE TABLE IF NOT EXISTS destinations(
     destination TEXT PRIMARY KEY,
-    retry_last_ts INTEGER,
+    retry_last_ts BIGINT,
     retry_interval INTEGER
 );
diff --git a/synapse/storage/schema/full_schemas/11/users.sql b/synapse/storage/schema/full_schemas/11/users.sql
index 08ccfdac0a..eec3da3c35 100644
--- a/synapse/storage/schema/full_schemas/11/users.sql
+++ b/synapse/storage/schema/full_schemas/11/users.sql
@@ -16,19 +16,18 @@ CREATE TABLE IF NOT EXISTS users(
     id INTEGER PRIMARY KEY AUTOINCREMENT,
     name TEXT,
     password_hash TEXT,
-    creation_ts INTEGER,
-    admin BOOL DEFAULT 0 NOT NULL,
-    UNIQUE(name) ON CONFLICT ROLLBACK
+    creation_ts BIGINT,
+    admin SMALLINT DEFAULT 0 NOT NULL,
+    UNIQUE(name)
 );
 
 CREATE TABLE IF NOT EXISTS access_tokens(
     id INTEGER PRIMARY KEY AUTOINCREMENT,
-    user_id INTEGER NOT NULL,
+    user_id TEXT NOT NULL,
     device_id TEXT,
     token TEXT NOT NULL,
-    last_used INTEGER,
-    FOREIGN KEY(user_id) REFERENCES users(id),
-    UNIQUE(token) ON CONFLICT ROLLBACK
+    last_used BIGINT,
+    UNIQUE(token)
 );
 
 CREATE TABLE IF NOT EXISTS user_ips (
@@ -37,9 +36,8 @@ CREATE TABLE IF NOT EXISTS user_ips (
     device_id TEXT,
     ip TEXT NOT NULL,
     user_agent TEXT NOT NULL,
-    last_seen INTEGER NOT NULL,
-    CONSTRAINT user_ip UNIQUE (user, access_token, ip, user_agent) ON CONFLICT REPLACE
+    last_seen BIGINT NOT NULL,
+    UNIQUE (user, access_token, ip, user_agent)
 );
 
-CREATE INDEX IF NOT EXISTS user_ips_user ON user_ips(user);
-
+CREATE INDEX user_ips_user ON user_ips(user);
diff --git a/synapse/storage/schema/full_schemas/16/application_services.sql b/synapse/storage/schema/full_schemas/16/application_services.sql
new file mode 100644
index 0000000000..d382d63fbd
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/application_services.sql
@@ -0,0 +1,48 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS application_services(
+    id BIGINT PRIMARY KEY,
+    url TEXT,
+    token TEXT,
+    hs_token TEXT,
+    sender TEXT,
+    UNIQUE(token)
+);
+
+CREATE TABLE IF NOT EXISTS application_services_regex(
+    id BIGINT PRIMARY KEY,
+    as_id BIGINT NOT NULL,
+    namespace INTEGER,  /* enum[room_id|room_alias|user_id] */
+    regex TEXT,
+    FOREIGN KEY(as_id) REFERENCES application_services(id)
+);
+
+CREATE TABLE IF NOT EXISTS application_services_state(
+    as_id TEXT PRIMARY KEY,
+    state VARCHAR(5),
+    last_txn INTEGER
+);
+
+CREATE TABLE IF NOT EXISTS application_services_txns(
+    as_id TEXT NOT NULL,
+    txn_id INTEGER NOT NULL,
+    event_ids TEXT NOT NULL,
+    UNIQUE(as_id, txn_id)
+);
+
+CREATE INDEX application_services_txns_id ON application_services_txns (
+    as_id
+);
diff --git a/synapse/storage/schema/full_schemas/16/event_edges.sql b/synapse/storage/schema/full_schemas/16/event_edges.sql
new file mode 100644
index 0000000000..f7020f7793
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/event_edges.sql
@@ -0,0 +1,89 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS event_forward_extremities(
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    UNIQUE (event_id, room_id)
+);
+
+CREATE INDEX ev_extrem_room ON event_forward_extremities(room_id);
+CREATE INDEX ev_extrem_id ON event_forward_extremities(event_id);
+
+
+CREATE TABLE IF NOT EXISTS event_backward_extremities(
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    UNIQUE (event_id, room_id)
+);
+
+CREATE INDEX ev_b_extrem_room ON event_backward_extremities(room_id);
+CREATE INDEX ev_b_extrem_id ON event_backward_extremities(event_id);
+
+
+CREATE TABLE IF NOT EXISTS event_edges(
+    event_id TEXT NOT NULL,
+    prev_event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    is_state BOOL NOT NULL,
+    UNIQUE (event_id, prev_event_id, room_id, is_state)
+);
+
+CREATE INDEX ev_edges_id ON event_edges(event_id);
+CREATE INDEX ev_edges_prev_id ON event_edges(prev_event_id);
+
+
+CREATE TABLE IF NOT EXISTS room_depth(
+    room_id TEXT NOT NULL,
+    min_depth INTEGER NOT NULL,
+    UNIQUE (room_id)
+);
+
+CREATE INDEX room_depth_room ON room_depth(room_id);
+
+
+create TABLE IF NOT EXISTS event_destinations(
+    event_id TEXT NOT NULL,
+    destination TEXT NOT NULL,
+    delivered_ts BIGINT DEFAULT 0, -- or 0 if not delivered
+    UNIQUE (event_id, destination)
+);
+
+CREATE INDEX event_destinations_id ON event_destinations(event_id);
+
+
+CREATE TABLE IF NOT EXISTS state_forward_extremities(
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    type TEXT NOT NULL,
+    state_key TEXT NOT NULL,
+    UNIQUE (event_id, room_id)
+);
+
+CREATE INDEX st_extrem_keys ON state_forward_extremities(
+    room_id, type, state_key
+);
+CREATE INDEX st_extrem_id ON state_forward_extremities(event_id);
+
+
+CREATE TABLE IF NOT EXISTS event_auth(
+    event_id TEXT NOT NULL,
+    auth_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    UNIQUE (event_id, auth_id, room_id)
+);
+
+CREATE INDEX evauth_edges_id ON event_auth(event_id);
+CREATE INDEX evauth_edges_auth_id ON event_auth(auth_id);
diff --git a/synapse/storage/schema/full_schemas/16/event_signatures.sql b/synapse/storage/schema/full_schemas/16/event_signatures.sql
new file mode 100644
index 0000000000..636b2d3353
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/event_signatures.sql
@@ -0,0 +1,55 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS event_content_hashes (
+    event_id TEXT,
+    algorithm TEXT,
+    hash bytea,
+    UNIQUE (event_id, algorithm)
+);
+
+CREATE INDEX event_content_hashes_id ON event_content_hashes(event_id);
+
+
+CREATE TABLE IF NOT EXISTS event_reference_hashes (
+    event_id TEXT,
+    algorithm TEXT,
+    hash bytea,
+    UNIQUE (event_id, algorithm)
+);
+
+CREATE INDEX event_reference_hashes_id ON event_reference_hashes(event_id);
+
+
+CREATE TABLE IF NOT EXISTS event_signatures (
+    event_id TEXT,
+    signature_name TEXT,
+    key_id TEXT,
+    signature bytea,
+    UNIQUE (event_id, signature_name, key_id)
+);
+
+CREATE INDEX event_signatures_id ON event_signatures(event_id);
+
+
+CREATE TABLE IF NOT EXISTS event_edge_hashes(
+    event_id TEXT,
+    prev_event_id TEXT,
+    algorithm TEXT,
+    hash bytea,
+    UNIQUE (event_id, prev_event_id, algorithm)
+);
+
+CREATE INDEX event_edge_hashes_id ON event_edge_hashes(event_id);
diff --git a/synapse/storage/schema/full_schemas/16/im.sql b/synapse/storage/schema/full_schemas/16/im.sql
new file mode 100644
index 0000000000..576653a3c9
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/im.sql
@@ -0,0 +1,128 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS events(
+    stream_ordering INTEGER PRIMARY KEY,
+    topological_ordering BIGINT NOT NULL,
+    event_id TEXT NOT NULL,
+    type TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    content TEXT NOT NULL,
+    unrecognized_keys TEXT,
+    processed BOOL NOT NULL,
+    outlier BOOL NOT NULL,
+    depth BIGINT DEFAULT 0 NOT NULL,
+    UNIQUE (event_id)
+);
+
+CREATE INDEX events_stream_ordering ON events (stream_ordering);
+CREATE INDEX events_topological_ordering ON events (topological_ordering);
+CREATE INDEX events_order ON events (topological_ordering, stream_ordering);
+CREATE INDEX events_room_id ON events (room_id);
+CREATE INDEX events_order_room ON events (
+    room_id, topological_ordering, stream_ordering
+);
+
+
+CREATE TABLE IF NOT EXISTS event_json(
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    internal_metadata TEXT NOT NULL,
+    json TEXT NOT NULL,
+    UNIQUE (event_id)
+);
+
+CREATE INDEX event_json_room_id ON event_json(room_id);
+
+
+CREATE TABLE IF NOT EXISTS state_events(
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    type TEXT NOT NULL,
+    state_key TEXT NOT NULL,
+    prev_state TEXT,
+    UNIQUE (event_id)
+);
+
+CREATE INDEX state_events_room_id ON state_events (room_id);
+CREATE INDEX state_events_type ON state_events (type);
+CREATE INDEX state_events_state_key ON state_events (state_key);
+
+
+CREATE TABLE IF NOT EXISTS current_state_events(
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    type TEXT NOT NULL,
+    state_key TEXT NOT NULL,
+    UNIQUE (event_id),
+    UNIQUE (room_id, type, state_key)
+);
+
+CREATE INDEX current_state_events_room_id ON current_state_events (room_id);
+CREATE INDEX current_state_events_type ON current_state_events (type);
+CREATE INDEX current_state_events_state_key ON current_state_events (state_key);
+
+CREATE TABLE IF NOT EXISTS room_memberships(
+    event_id TEXT NOT NULL,
+    user_id TEXT NOT NULL,
+    sender TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    membership TEXT NOT NULL,
+    UNIQUE (event_id)
+);
+
+CREATE INDEX room_memberships_room_id ON room_memberships (room_id);
+CREATE INDEX room_memberships_user_id ON room_memberships (user_id);
+
+CREATE TABLE IF NOT EXISTS feedback(
+    event_id TEXT NOT NULL,
+    feedback_type TEXT,
+    target_event_id TEXT,
+    sender TEXT,
+    room_id TEXT,
+    UNIQUE (event_id)
+);
+
+CREATE TABLE IF NOT EXISTS topics(
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    topic TEXT NOT NULL,
+    UNIQUE (event_id)
+);
+
+CREATE INDEX topics_room_id ON topics(room_id);
+
+CREATE TABLE IF NOT EXISTS room_names(
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    name TEXT NOT NULL,
+    UNIQUE (event_id)
+);
+
+CREATE INDEX room_names_room_id ON room_names(room_id);
+
+CREATE TABLE IF NOT EXISTS rooms(
+    room_id TEXT PRIMARY KEY NOT NULL,
+    is_public BOOL,
+    creator TEXT
+);
+
+CREATE TABLE IF NOT EXISTS room_hosts(
+    room_id TEXT NOT NULL,
+    host TEXT NOT NULL,
+    UNIQUE (room_id, host)
+);
+
+CREATE INDEX room_hosts_room_id ON room_hosts (room_id);
diff --git a/synapse/storage/schema/full_schemas/16/keys.sql b/synapse/storage/schema/full_schemas/16/keys.sql
new file mode 100644
index 0000000000..afc142045e
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/keys.sql
@@ -0,0 +1,31 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+CREATE TABLE IF NOT EXISTS server_tls_certificates(
+  server_name TEXT, -- Server name.
+  fingerprint TEXT, -- Certificate fingerprint.
+  from_server TEXT, -- Which key server the certificate was fetched from.
+  ts_added_ms BIGINT, -- When the certifcate was added.
+  tls_certificate bytea, -- DER encoded x509 certificate.
+  UNIQUE (server_name, fingerprint)
+);
+
+CREATE TABLE IF NOT EXISTS server_signature_keys(
+  server_name TEXT, -- Server name.
+  key_id TEXT, -- Key version.
+  from_server TEXT, -- Which key server the key was fetched form.
+  ts_added_ms BIGINT, -- When the key was added.
+  verify_key bytea, -- NACL verification key.
+  UNIQUE (server_name, key_id)
+);
diff --git a/synapse/storage/schema/full_schemas/16/media_repository.sql b/synapse/storage/schema/full_schemas/16/media_repository.sql
new file mode 100644
index 0000000000..dacbda40ca
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/media_repository.sql
@@ -0,0 +1,68 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS local_media_repository (
+    media_id TEXT, -- The id used to refer to the media.
+    media_type TEXT, -- The MIME-type of the media.
+    media_length INTEGER, -- Length of the media in bytes.
+    created_ts BIGINT, -- When the content was uploaded in ms.
+    upload_name TEXT, -- The name the media was uploaded with.
+    user_id TEXT, -- The user who uploaded the file.
+    UNIQUE (media_id)
+);
+
+CREATE TABLE IF NOT EXISTS local_media_repository_thumbnails (
+    media_id TEXT, -- The id used to refer to the media.
+    thumbnail_width INTEGER, -- The width of the thumbnail in pixels.
+    thumbnail_height INTEGER, -- The height of the thumbnail in pixels.
+    thumbnail_type TEXT, -- The MIME-type of the thumbnail.
+    thumbnail_method TEXT, -- The method used to make the thumbnail.
+    thumbnail_length INTEGER, -- The length of the thumbnail in bytes.
+    UNIQUE (
+        media_id, thumbnail_width, thumbnail_height, thumbnail_type
+    )
+);
+
+CREATE INDEX local_media_repository_thumbnails_media_id
+    ON local_media_repository_thumbnails (media_id);
+
+CREATE TABLE IF NOT EXISTS remote_media_cache (
+    media_origin TEXT, -- The remote HS the media came from.
+    media_id TEXT, -- The id used to refer to the media on that server.
+    media_type TEXT, -- The MIME-type of the media.
+    created_ts BIGINT, -- When the content was uploaded in ms.
+    upload_name TEXT, -- The name the media was uploaded with.
+    media_length INTEGER, -- Length of the media in bytes.
+    filesystem_id TEXT, -- The name used to store the media on disk.
+    UNIQUE (media_origin, media_id)
+);
+
+CREATE TABLE IF NOT EXISTS remote_media_cache_thumbnails (
+    media_origin TEXT, -- The remote HS the media came from.
+    media_id TEXT, -- The id used to refer to the media.
+    thumbnail_width INTEGER, -- The width of the thumbnail in pixels.
+    thumbnail_height INTEGER, -- The height of the thumbnail in pixels.
+    thumbnail_method TEXT, -- The method used to make the thumbnail
+    thumbnail_type TEXT, -- The MIME-type of the thumbnail.
+    thumbnail_length INTEGER, -- The length of the thumbnail in bytes.
+    filesystem_id TEXT, -- The name used to store the media on disk.
+    UNIQUE (
+        media_origin, media_id, thumbnail_width, thumbnail_height,
+        thumbnail_type
+     )
+);
+
+CREATE INDEX remote_media_cache_thumbnails_media_id
+    ON remote_media_cache_thumbnails (media_id);
diff --git a/synapse/storage/schema/full_schemas/16/presence.sql b/synapse/storage/schema/full_schemas/16/presence.sql
new file mode 100644
index 0000000000..80088413ba
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/presence.sql
@@ -0,0 +1,40 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+CREATE TABLE IF NOT EXISTS presence(
+  user_id TEXT NOT NULL,
+  state VARCHAR(20),
+  status_msg TEXT,
+  mtime BIGINT, -- miliseconds since last state change
+  UNIQUE (user_id)
+);
+
+-- For each of /my/ users which possibly-remote users are allowed to see their
+-- presence state
+CREATE TABLE IF NOT EXISTS presence_allow_inbound(
+  observed_user_id TEXT NOT NULL,
+  observer_user_id TEXT NOT NULL, -- a UserID,
+  UNIQUE (observed_user_id, observer_user_id)
+);
+
+-- For each of /my/ users (watcher), which possibly-remote users are they
+-- watching?
+CREATE TABLE IF NOT EXISTS presence_list(
+  user_id TEXT NOT NULL,
+  observed_user_id TEXT NOT NULL, -- a UserID,
+  accepted BOOLEAN NOT NULL,
+  UNIQUE (user_id, observed_user_id)
+);
+
+CREATE INDEX presence_list_user_id ON presence_list (user_id);
diff --git a/synapse/storage/schema/full_schemas/16/profiles.sql b/synapse/storage/schema/full_schemas/16/profiles.sql
new file mode 100644
index 0000000000..934be86520
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/profiles.sql
@@ -0,0 +1,20 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+CREATE TABLE IF NOT EXISTS profiles(
+    user_id TEXT NOT NULL,
+    displayname TEXT,
+    avatar_url TEXT,
+    UNIQUE(user_id)
+);
diff --git a/synapse/storage/schema/full_schemas/16/push.sql b/synapse/storage/schema/full_schemas/16/push.sql
new file mode 100644
index 0000000000..db6e05cbdf
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/push.sql
@@ -0,0 +1,73 @@
+/* Copyright 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS rejections(
+    event_id TEXT NOT NULL,
+    reason TEXT NOT NULL,
+    last_check TEXT NOT NULL,
+    UNIQUE (event_id)
+);
+
+-- Push notification endpoints that users have configured
+CREATE TABLE IF NOT EXISTS pushers (
+  id BIGINT PRIMARY KEY,
+  user_name TEXT NOT NULL,
+  profile_tag VARCHAR(32) NOT NULL,
+  kind VARCHAR(8) NOT NULL,
+  app_id VARCHAR(64) NOT NULL,
+  app_display_name VARCHAR(64) NOT NULL,
+  device_display_name VARCHAR(128) NOT NULL,
+  pushkey bytea NOT NULL,
+  ts BIGINT NOT NULL,
+  lang VARCHAR(8),
+  data bytea,
+  last_token TEXT,
+  last_success BIGINT,
+  failing_since BIGINT,
+  UNIQUE (app_id, pushkey)
+);
+
+CREATE TABLE IF NOT EXISTS push_rules (
+  id BIGINT PRIMARY KEY,
+  user_name TEXT NOT NULL,
+  rule_id TEXT NOT NULL,
+  priority_class SMALLINT NOT NULL,
+  priority INTEGER NOT NULL DEFAULT 0,
+  conditions TEXT NOT NULL,
+  actions TEXT NOT NULL,
+  UNIQUE(user_name, rule_id)
+);
+
+CREATE INDEX push_rules_user_name on push_rules (user_name);
+
+CREATE TABLE IF NOT EXISTS user_filters(
+  user_id TEXT,
+  filter_id BIGINT,
+  filter_json bytea
+);
+
+CREATE INDEX user_filters_by_user_id_filter_id ON user_filters(
+    user_id, filter_id
+);
+
+CREATE TABLE IF NOT EXISTS push_rules_enable (
+  id BIGINT PRIMARY KEY,
+  user_name TEXT NOT NULL,
+  rule_id TEXT NOT NULL,
+  enabled SMALLINT,
+  UNIQUE(user_name, rule_id)
+);
+
+CREATE INDEX push_rules_enable_user_name on push_rules_enable (user_name);
diff --git a/synapse/storage/schema/full_schemas/16/redactions.sql b/synapse/storage/schema/full_schemas/16/redactions.sql
new file mode 100644
index 0000000000..69621955d4
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/redactions.sql
@@ -0,0 +1,22 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+CREATE TABLE IF NOT EXISTS redactions (
+    event_id TEXT NOT NULL,
+    redacts TEXT NOT NULL,
+    UNIQUE (event_id)
+);
+
+CREATE INDEX redactions_event_id ON redactions (event_id);
+CREATE INDEX redactions_redacts ON redactions (redacts);
diff --git a/synapse/storage/schema/full_schemas/16/room_aliases.sql b/synapse/storage/schema/full_schemas/16/room_aliases.sql
new file mode 100644
index 0000000000..412bb97fad
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/room_aliases.sql
@@ -0,0 +1,29 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS room_aliases(
+    room_alias TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    UNIQUE (room_alias)
+);
+
+CREATE INDEX room_aliases_id ON room_aliases(room_id);
+
+CREATE TABLE IF NOT EXISTS room_alias_servers(
+    room_alias TEXT NOT NULL,
+    server TEXT NOT NULL
+);
+
+CREATE INDEX room_alias_servers_alias ON room_alias_servers(room_alias);
diff --git a/synapse/storage/schema/full_schemas/16/state.sql b/synapse/storage/schema/full_schemas/16/state.sql
new file mode 100644
index 0000000000..705cac6ce9
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/state.sql
@@ -0,0 +1,40 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS state_groups(
+    id BIGINT PRIMARY KEY,
+    room_id TEXT NOT NULL,
+    event_id TEXT NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS state_groups_state(
+    state_group BIGINT NOT NULL,
+    room_id TEXT NOT NULL,
+    type TEXT NOT NULL,
+    state_key TEXT NOT NULL,
+    event_id TEXT NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS event_to_state_groups(
+    event_id TEXT NOT NULL,
+    state_group BIGINT NOT NULL,
+    UNIQUE (event_id)
+);
+
+CREATE INDEX state_groups_id ON state_groups(id);
+
+CREATE INDEX state_groups_state_id ON state_groups_state(state_group);
+CREATE INDEX state_groups_state_tuple ON state_groups_state(room_id, type, state_key);
+CREATE INDEX event_to_state_groups_id ON event_to_state_groups(event_id);
diff --git a/synapse/storage/schema/full_schemas/16/transactions.sql b/synapse/storage/schema/full_schemas/16/transactions.sql
new file mode 100644
index 0000000000..1ab77cdb63
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/transactions.sql
@@ -0,0 +1,63 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-- Stores what transaction ids we have received and what our response was
+CREATE TABLE IF NOT EXISTS received_transactions(
+    transaction_id TEXT,
+    origin TEXT,
+    ts BIGINT,
+    response_code INTEGER,
+    response_json bytea,
+    has_been_referenced smallint default 0, -- Whether thishas been referenced by a prev_tx
+    UNIQUE (transaction_id, origin)
+);
+
+CREATE INDEX transactions_have_ref ON received_transactions(origin, has_been_referenced);-- WHERE has_been_referenced = 0;
+
+
+-- Stores what transactions we've sent, what their response was (if we got one) and whether we have
+-- since referenced the transaction in another outgoing transaction
+CREATE TABLE IF NOT EXISTS sent_transactions(
+    id BIGINT PRIMARY KEY, -- This is used to apply insertion ordering
+    transaction_id TEXT,
+    destination TEXT,
+    response_code INTEGER DEFAULT 0,
+    response_json TEXT,
+    ts BIGINT
+);
+
+CREATE INDEX sent_transaction_dest ON sent_transactions(destination);
+CREATE INDEX sent_transaction_txn_id ON sent_transactions(transaction_id);
+-- So that we can do an efficient look up of all transactions that have yet to be successfully
+-- sent.
+CREATE INDEX sent_transaction_sent ON sent_transactions(response_code);
+
+
+-- For sent transactions only.
+CREATE TABLE IF NOT EXISTS transaction_id_to_pdu(
+    transaction_id INTEGER,
+    destination TEXT,
+    pdu_id TEXT,
+    pdu_origin TEXT,
+    UNIQUE (transaction_id, destination)
+);
+
+CREATE INDEX transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination);
+
+-- To track destination health
+CREATE TABLE IF NOT EXISTS destinations(
+    destination TEXT PRIMARY KEY,
+    retry_last_ts BIGINT,
+    retry_interval INTEGER
+);
diff --git a/synapse/storage/schema/full_schemas/16/users.sql b/synapse/storage/schema/full_schemas/16/users.sql
new file mode 100644
index 0000000000..d2fa3122da
--- /dev/null
+++ b/synapse/storage/schema/full_schemas/16/users.sql
@@ -0,0 +1,42 @@
+/* Copyright 2014, 2015 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+CREATE TABLE IF NOT EXISTS users(
+    name TEXT,
+    password_hash TEXT,
+    creation_ts BIGINT,
+    admin SMALLINT DEFAULT 0 NOT NULL,
+    UNIQUE(name)
+);
+
+CREATE TABLE IF NOT EXISTS access_tokens(
+    id BIGINT PRIMARY KEY,
+    user_id TEXT NOT NULL,
+    device_id TEXT,
+    token TEXT NOT NULL,
+    last_used BIGINT,
+    UNIQUE(token)
+);
+
+CREATE TABLE IF NOT EXISTS user_ips (
+    user_id TEXT NOT NULL,
+    access_token TEXT NOT NULL,
+    device_id TEXT,
+    ip TEXT NOT NULL,
+    user_agent TEXT NOT NULL,
+    last_seen BIGINT NOT NULL
+);
+
+CREATE INDEX user_ips_user ON user_ips(user_id);
+CREATE INDEX user_ips_user_ip ON user_ips(user_id, access_token, ip);
diff --git a/synapse/storage/schema/schema_version.sql b/synapse/storage/schema/schema_version.sql
index 0431e2d051..d682608aa0 100644
--- a/synapse/storage/schema/schema_version.sql
+++ b/synapse/storage/schema/schema_version.sql
@@ -14,17 +14,14 @@
  */
 
 CREATE TABLE IF NOT EXISTS schema_version(
-    Lock char(1) NOT NULL DEFAULT 'X',  -- Makes sure this table only has one row.
+    Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE,  -- Makes sure this table only has one row.
     version INTEGER NOT NULL,
     upgraded BOOL NOT NULL,  -- Whether we reached this version from an upgrade or an initial schema.
-    CONSTRAINT schema_version_lock_x CHECK (Lock='X')
-    CONSTRAINT schema_version_lock_uniq UNIQUE (Lock)
+    CHECK (Lock='X')
 );
 
 CREATE TABLE IF NOT EXISTS applied_schema_deltas(
     version INTEGER NOT NULL,
     file TEXT NOT NULL,
-    CONSTRAINT schema_deltas_ver_file UNIQUE (version, file) ON CONFLICT IGNORE
+    UNIQUE(version, file)
 );
-
-CREATE INDEX IF NOT EXISTS schema_deltas_ver ON applied_schema_deltas(version);
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index d0d53770f2..f051828630 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -56,7 +56,6 @@ class SignatureStore(SQLBaseStore):
                 "algorithm": algorithm,
                 "hash": buffer(hash_bytes),
             },
-            or_ignore=True,
         )
 
     def get_event_reference_hashes(self, event_ids):
@@ -100,7 +99,7 @@ class SignatureStore(SQLBaseStore):
             " WHERE event_id = ?"
         )
         txn.execute(query, (event_id, ))
-        return dict(txn.fetchall())
+        return {k: v for k, v in txn.fetchall()}
 
     def _store_event_reference_hash_txn(self, txn, event_id, algorithm,
                                         hash_bytes):
@@ -119,7 +118,6 @@ class SignatureStore(SQLBaseStore):
                 "algorithm": algorithm,
                 "hash": buffer(hash_bytes),
             },
-            or_ignore=True,
         )
 
     def _get_event_signatures_txn(self, txn, event_id):
@@ -164,7 +162,6 @@ class SignatureStore(SQLBaseStore):
                 "key_id": key_id,
                 "signature": buffer(signature_bytes),
             },
-            or_ignore=True,
         )
 
     def _get_prev_event_hashes_txn(self, txn, event_id):
@@ -198,5 +195,4 @@ class SignatureStore(SQLBaseStore):
                 "algorithm": algorithm,
                 "hash": buffer(hash_bytes),
             },
-            or_ignore=True,
         )
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 58dbf2802b..553ba9dd1f 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -17,6 +17,8 @@ from ._base import SQLBaseStore
 
 from twisted.internet import defer
 
+from synapse.util.stringutils import random_string
+
 import logging
 
 logger = logging.getLogger(__name__)
@@ -91,14 +93,15 @@ class StateStore(SQLBaseStore):
 
         state_group = context.state_group
         if not state_group:
-            state_group = self._simple_insert_txn(
+            state_group = self._state_groups_id_gen.get_next_txn(txn)
+            self._simple_insert_txn(
                 txn,
                 table="state_groups",
                 values={
+                    "id": state_group,
                     "room_id": event.room_id,
                     "event_id": event.event_id,
                 },
-                or_ignore=True,
             )
 
             for state in state_events.values():
@@ -112,7 +115,6 @@ class StateStore(SQLBaseStore):
                         "state_key": state.state_key,
                         "event_id": state.event_id,
                     },
-                    or_ignore=True,
                 )
 
         self._simple_insert_txn(
@@ -122,7 +124,6 @@ class StateStore(SQLBaseStore):
                 "state_group": state_group,
                 "event_id": event.event_id,
             },
-            or_replace=True,
         )
 
     @defer.inlineCallbacks
@@ -154,3 +155,7 @@ class StateStore(SQLBaseStore):
 
         events = yield self._parse_events(results)
         defer.returnValue(events)
+
+
+def _make_group_id(clock):
+    return str(int(clock.time_msec())) + random_string(5)
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 66f307e640..df6de7cbcd 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -35,7 +35,7 @@ what sort order was used:
 
 from twisted.internet import defer
 
-from ._base import SQLBaseStore, cached
+from ._base import SQLBaseStore
 from synapse.api.constants import EventTypes
 from synapse.api.errors import SynapseError
 from synapse.util.logutils import log_function
@@ -110,7 +110,7 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")):
         if self.topological is None:
             return "(%d < %s)" % (self.stream, "stream_ordering")
         else:
-            return "(%d < %s OR (%d == %s AND %d < %s))" % (
+            return "(%d < %s OR (%d = %s AND %d < %s))" % (
                 self.topological, "topological_ordering",
                 self.topological, "topological_ordering",
                 self.stream, "stream_ordering",
@@ -120,7 +120,7 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")):
         if self.topological is None:
             return "(%d >= %s)" % (self.stream, "stream_ordering")
         else:
-            return "(%d > %s OR (%d == %s AND %d >= %s))" % (
+            return "(%d > %s OR (%d = %s AND %d >= %s))" % (
                 self.topological, "topological_ordering",
                 self.topological, "topological_ordering",
                 self.stream, "stream_ordering",
@@ -240,7 +240,7 @@ class StreamStore(SQLBaseStore):
 
         sql = (
             "SELECT e.event_id, e.stream_ordering FROM events AS e WHERE "
-            "(e.outlier = 0 AND (room_id IN (%(current)s)) OR "
+            "(e.outlier = ? AND (room_id IN (%(current)s)) OR "
             "(event_id IN (%(invites)s))) "
             "AND e.stream_ordering > ? AND e.stream_ordering <= ? "
             "ORDER BY stream_ordering ASC LIMIT %(limit)d "
@@ -251,7 +251,7 @@ class StreamStore(SQLBaseStore):
         }
 
         def f(txn):
-            txn.execute(sql, (user_id, user_id, from_id.stream, to_id.stream,))
+            txn.execute(sql, (False, user_id, user_id, from_id.stream, to_id.stream,))
 
             rows = self.cursor_to_dict(txn)
 
@@ -283,7 +283,7 @@ class StreamStore(SQLBaseStore):
         # Tokens really represent positions between elements, but we use
         # the convention of pointing to the event before the gap. Hence
         # we have a bit of asymmetry when it comes to equalities.
-        args = [room_id]
+        args = [False, room_id]
         if direction == 'b':
             order = "DESC"
             bounds = _StreamToken.parse(from_key).upper_bound()
@@ -307,7 +307,7 @@ class StreamStore(SQLBaseStore):
 
         sql = (
             "SELECT * FROM events"
-            " WHERE outlier = 0 AND room_id = ? AND %(bounds)s"
+            " WHERE outlier = ? AND room_id = ? AND %(bounds)s"
             " ORDER BY topological_ordering %(order)s,"
             " stream_ordering %(order)s %(limit)s"
         ) % {
@@ -358,7 +358,7 @@ class StreamStore(SQLBaseStore):
             sql = (
                 "SELECT stream_ordering, topological_ordering, event_id"
                 " FROM events"
-                " WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0"
+                " WHERE room_id = ? AND stream_ordering <= ? AND outlier = ?"
                 " ORDER BY topological_ordering DESC, stream_ordering DESC"
                 " LIMIT ?"
             )
@@ -368,17 +368,17 @@ class StreamStore(SQLBaseStore):
                 "SELECT stream_ordering, topological_ordering, event_id"
                 " FROM events"
                 " WHERE room_id = ? AND stream_ordering > ?"
-                " AND stream_ordering <= ? AND outlier = 0"
+                " AND stream_ordering <= ? AND outlier = ?"
                 " ORDER BY topological_ordering DESC, stream_ordering DESC"
                 " LIMIT ?"
             )
 
         def get_recent_events_for_room_txn(txn):
             if from_token is None:
-                txn.execute(sql, (room_id, end_token.stream, limit,))
+                txn.execute(sql, (room_id, end_token.stream, False, limit,))
             else:
                 txn.execute(sql, (
-                    room_id, from_token.stream, end_token.stream, limit
+                    room_id, from_token.stream, end_token.stream, False, limit
                 ))
 
             rows = self.cursor_to_dict(txn)
@@ -413,12 +413,10 @@ class StreamStore(SQLBaseStore):
             "get_recent_events_for_room", get_recent_events_for_room_txn
         )
 
-    @cached(num_args=0)
+    @defer.inlineCallbacks
     def get_room_events_max_id(self):
-        return self.runInteraction(
-            "get_room_events_max_id",
-            self._get_room_events_max_id_txn
-        )
+        token = yield self._stream_id_gen.get_max_token(self)
+        defer.returnValue("s%d" % (token,))
 
     @defer.inlineCallbacks
     def _get_min_token(self):
@@ -433,27 +431,6 @@ class StreamStore(SQLBaseStore):
 
         defer.returnValue(self.min_token)
 
-    def get_next_stream_id(self):
-        with self._next_stream_id_lock:
-            i = self._next_stream_id
-            self._next_stream_id += 1
-            return i
-
-    def _get_room_events_max_id_txn(self, txn):
-        txn.execute(
-            "SELECT MAX(stream_ordering) as m FROM events"
-        )
-
-        res = self.cursor_to_dict(txn)
-
-        logger.debug("get_room_events_max_id: %s", res)
-
-        if not res or not res[0] or not res[0]["m"]:
-            return "s0"
-
-        key = res[0]["m"]
-        return "s%d" % (key,)
-
     @staticmethod
     def _set_before_and_after(events, rows):
         for event, row in zip(events, rows):
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index b777395e06..89dd7d8947 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import SQLBaseStore, Table, cached
+from ._base import SQLBaseStore, cached
 
 from collections import namedtuple
 
@@ -76,22 +76,18 @@ class TransactionStore(SQLBaseStore):
             response_json (str)
         """
 
-        return self.runInteraction(
-            "set_received_txn_response",
-            self._set_received_txn_response,
-            transaction_id, origin, code, response_dict
+        return self._simple_insert(
+            table=ReceivedTransactionsTable.table_name,
+            values={
+                "transaction_id": transaction_id,
+                "origin": origin,
+                "response_code": code,
+                "response_json": response_dict,
+            },
+            or_ignore=True,
+            desc="set_received_txn_response",
         )
 
-    def _set_received_txn_response(self, txn, transaction_id, origin, code,
-                                   response_json):
-        query = (
-            "UPDATE %s "
-            "SET response_code = ?, response_json = ? "
-            "WHERE transaction_id = ? AND origin = ?"
-        ) % ReceivedTransactionsTable.table_name
-
-        txn.execute(query, (code, response_json, transaction_id, origin))
-
     def prep_send_transaction(self, transaction_id, destination,
                               origin_server_ts):
         """Persists an outgoing transaction and calculates the values for the
@@ -118,41 +114,38 @@ class TransactionStore(SQLBaseStore):
     def _prep_send_transaction(self, txn, transaction_id, destination,
                                origin_server_ts):
 
+        next_id = self._transaction_id_gen.get_next_txn(txn)
+
         # First we find out what the prev_txns should be.
         # Since we know that we are only sending one transaction at a time,
         # we can simply take the last one.
-        query = "%s ORDER BY id DESC LIMIT 1" % (
-                SentTransactions.select_statement("destination = ?"),
-            )
+        query = (
+            "SELECT * FROM sent_transactions"
+            " WHERE destination = ?"
+            " ORDER BY id DESC LIMIT 1"
+        )
 
-        results = txn.execute(query, (destination,))
-        results = SentTransactions.decode_results(results)
+        txn.execute(query, (destination,))
+        results = self.cursor_to_dict(txn)
 
-        prev_txns = [r.transaction_id for r in results]
+        prev_txns = [r["transaction_id"] for r in results]
 
         # Actually add the new transaction to the sent_transactions table.
 
-        query = SentTransactions.insert_statement()
-        txn.execute(query, SentTransactions.EntryType(
-            None,
-            transaction_id=transaction_id,
-            destination=destination,
-            ts=origin_server_ts,
-            response_code=0,
-            response_json=None
-        ))
-
-        # Update the tx id -> pdu id mapping
-
-        # values = [
-        #     (transaction_id, destination, pdu[0], pdu[1])
-        #     for pdu in pdu_list
-        # ]
-        #
-        # logger.debug("Inserting: %s", repr(values))
-        #
-        # query = TransactionsToPduTable.insert_statement()
-        # txn.executemany(query, values)
+        self._simple_insert_txn(
+            txn,
+            table=SentTransactions.table_name,
+            values={
+                "id": next_id,
+                "transaction_id": transaction_id,
+                "destination": destination,
+                "ts": origin_server_ts,
+                "response_code": 0,
+                "response_json": None,
+            }
+        )
+
+        # TODO Update the tx id -> pdu id mapping
 
         return prev_txns
 
@@ -171,15 +164,20 @@ class TransactionStore(SQLBaseStore):
             transaction_id, destination, code, response_dict
         )
 
-    def _delivered_txn(cls, txn, transaction_id, destination,
+    def _delivered_txn(self, txn, transaction_id, destination,
                        code, response_json):
-        query = (
-            "UPDATE %s "
-            "SET response_code = ?, response_json = ? "
-            "WHERE transaction_id = ? AND destination = ?"
-        ) % SentTransactions.table_name
-
-        txn.execute(query, (code, response_json, transaction_id, destination))
+        self._simple_update_one_txn(
+            txn,
+            table=SentTransactions.table_name,
+            keyvalues={
+                "transaction_id": transaction_id,
+                "destination": destination,
+            },
+            updatevalues={
+                "response_code": code,
+                "response_json": None,  # For now, don't persist response_json
+            }
+        )
 
     def get_transactions_after(self, transaction_id, destination):
         """Get all transactions after a given local transaction_id.
@@ -189,25 +187,26 @@ class TransactionStore(SQLBaseStore):
             destination (str)
 
         Returns:
-            list: A list of `ReceivedTransactionsTable.EntryType`
+            list: A list of dicts
         """
         return self.runInteraction(
             "get_transactions_after",
             self._get_transactions_after, transaction_id, destination
         )
 
-    def _get_transactions_after(cls, txn, transaction_id, destination):
-        where = (
-            "destination = ? AND id > (select id FROM %s WHERE "
-            "transaction_id = ? AND destination = ?)"
-        ) % (
-            SentTransactions.table_name
+    def _get_transactions_after(self, txn, transaction_id, destination):
+        query = (
+            "SELECT * FROM sent_transactions"
+            " WHERE destination = ? AND id >"
+            " ("
+            " SELECT id FROM sent_transactions"
+            " WHERE transaction_id = ? AND destination = ?"
+            " )"
         )
-        query = SentTransactions.select_statement(where)
 
         txn.execute(query, (destination, transaction_id, destination))
 
-        return ReceivedTransactionsTable.decode_results(txn.fetchall())
+        return self.cursor_to_dict(txn)
 
     @cached()
     def get_destination_retry_timings(self, destination):
@@ -218,22 +217,27 @@ class TransactionStore(SQLBaseStore):
 
         Returns:
             None if not retrying
-            Otherwise a DestinationsTable.EntryType for the retry scheme
+            Otherwise a dict for the retry scheme
         """
         return self.runInteraction(
             "get_destination_retry_timings",
             self._get_destination_retry_timings, destination)
 
-    def _get_destination_retry_timings(cls, txn, destination):
-        query = DestinationsTable.select_statement("destination = ?")
-        txn.execute(query, (destination,))
-        result = txn.fetchall()
-        if result:
-            result = DestinationsTable.decode_single_result(result)
-            if result.retry_last_ts > 0:
-                return result
-            else:
-                return None
+    def _get_destination_retry_timings(self, txn, destination):
+        result = self._simple_select_one_txn(
+            txn,
+            table=DestinationsTable.table_name,
+            keyvalues={
+                "destination": destination,
+            },
+            retcols=DestinationsTable.fields,
+            allow_none=True,
+        )
+
+        if result and result["retry_last_ts"] > 0:
+            return result
+        else:
+            return None
 
     def set_destination_retry_timings(self, destination,
                                       retry_last_ts, retry_interval):
@@ -249,11 +253,11 @@ class TransactionStore(SQLBaseStore):
         # As this is the new value, we might as well prefill the cache
         self.get_destination_retry_timings.prefill(
             destination,
-            DestinationsTable.EntryType(
-                destination,
-                retry_last_ts,
-                retry_interval
-            )
+            {
+                "destination": destination,
+                "retry_last_ts": retry_last_ts,
+                "retry_interval": retry_interval
+            },
         )
 
         # XXX: we could chose to not bother persisting this if our cache thinks
@@ -266,22 +270,38 @@ class TransactionStore(SQLBaseStore):
             retry_interval,
         )
 
-    def _set_destination_retry_timings(cls, txn, destination,
+    def _set_destination_retry_timings(self, txn, destination,
                                        retry_last_ts, retry_interval):
-
         query = (
-            "INSERT OR REPLACE INTO %s "
-            "(destination, retry_last_ts, retry_interval) "
-            "VALUES (?, ?, ?) "
-        ) % DestinationsTable.table_name
+            "UPDATE destinations"
+            " SET retry_last_ts = ?, retry_interval = ?"
+            " WHERE destination = ?"
+        )
+
+        txn.execute(
+            query,
+            (
+                retry_last_ts, retry_interval, destination,
+            )
+        )
 
-        txn.execute(query, (destination, retry_last_ts, retry_interval))
+        if txn.rowcount == 0:
+            # destination wasn't already in table. Insert it.
+            self._simple_insert_txn(
+                txn,
+                table="destinations",
+                values={
+                    "destination": destination,
+                    "retry_last_ts": retry_last_ts,
+                    "retry_interval": retry_interval,
+                }
+            )
 
     def get_destinations_needing_retry(self):
         """Get all destinations which are due a retry for sending a transaction.
 
         Returns:
-            list: A list of `DestinationsTable.EntryType`
+            list: A list of dicts
         """
 
         return self.runInteraction(
@@ -289,14 +309,17 @@ class TransactionStore(SQLBaseStore):
             self._get_destinations_needing_retry
         )
 
-    def _get_destinations_needing_retry(cls, txn):
-        where = "retry_last_ts > 0 and retry_next_ts < now()"
-        query = DestinationsTable.select_statement(where)
-        txn.execute(query)
-        return DestinationsTable.decode_results(txn.fetchall())
+    def _get_destinations_needing_retry(self, txn):
+        query = (
+            "SELECT * FROM destinations"
+            " WHERE retry_last_ts > 0 and retry_next_ts < ?"
+        )
+
+        txn.execute(query, (self._clock.time_msec(),))
+        return self.cursor_to_dict(txn)
 
 
-class ReceivedTransactionsTable(Table):
+class ReceivedTransactionsTable(object):
     table_name = "received_transactions"
 
     fields = [
@@ -308,10 +331,8 @@ class ReceivedTransactionsTable(Table):
         "has_been_referenced",
     ]
 
-    EntryType = namedtuple("ReceivedTransactionsEntry", fields)
 
-
-class SentTransactions(Table):
+class SentTransactions(object):
     table_name = "sent_transactions"
 
     fields = [
@@ -326,7 +347,7 @@ class SentTransactions(Table):
     EntryType = namedtuple("SentTransactionsEntry", fields)
 
 
-class TransactionsToPduTable(Table):
+class TransactionsToPduTable(object):
     table_name = "transaction_id_to_pdu"
 
     fields = [
@@ -336,10 +357,8 @@ class TransactionsToPduTable(Table):
         "pdu_origin",
     ]
 
-    EntryType = namedtuple("TransactionsToPduEntry", fields)
-
 
-class DestinationsTable(Table):
+class DestinationsTable(object):
     table_name = "destinations"
 
     fields = [
@@ -347,5 +366,3 @@ class DestinationsTable(Table):
         "retry_last_ts",
         "retry_interval",
     ]
-
-    EntryType = namedtuple("DestinationsEntry", fields)
diff --git a/synapse/storage/util/__init__.py b/synapse/storage/util/__init__.py
new file mode 100644
index 0000000000..c488b10d3c
--- /dev/null
+++ b/synapse/storage/util/__init__.py
@@ -0,0 +1,14 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014, 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
new file mode 100644
index 0000000000..9d461d5e96
--- /dev/null
+++ b/synapse/storage/util/id_generators.py
@@ -0,0 +1,131 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014, 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from collections import deque
+import contextlib
+import threading
+
+
+class IdGenerator(object):
+    def __init__(self, table, column, store):
+        self.table = table
+        self.column = column
+        self.store = store
+        self._lock = threading.Lock()
+        self._next_id = None
+
+    @defer.inlineCallbacks
+    def get_next(self):
+        with self._lock:
+            if not self._next_id:
+                res = yield self.store._execute_and_decode(
+                    "IdGenerator_%s" % (self.table,),
+                    "SELECT MAX(%s) as mx FROM %s" % (self.column, self.table,)
+                )
+
+                self._next_id = (res and res[0] and res[0]["mx"]) or 1
+
+            i = self._next_id
+            self._next_id += 1
+            defer.returnValue(i)
+
+    def get_next_txn(self, txn):
+        with self._lock:
+            if self._next_id:
+                i = self._next_id
+                self._next_id += 1
+                return i
+            else:
+                txn.execute(
+                    "SELECT MAX(%s) FROM %s" % (self.column, self.table,)
+                )
+
+                val, = txn.fetchone()
+                cur = val or 0
+                cur += 1
+                self._next_id = cur + 1
+
+                return cur
+
+
+class StreamIdGenerator(object):
+    """Used to generate new stream ids when persisting events while keeping
+    track of which transactions have been completed.
+
+    This allows us to get the "current" stream id, i.e. the stream id such that
+    all ids less than or equal to it have completed. This handles the fact that
+    persistence of events can complete out of order.
+
+    Usage:
+        with stream_id_gen.get_next_txn(txn) as stream_id:
+            # ... persist event ...
+    """
+    def __init__(self):
+        self._lock = threading.Lock()
+
+        self._current_max = None
+        self._unfinished_ids = deque()
+
+    def get_next_txn(self, txn):
+        """
+        Usage:
+            with stream_id_gen.get_next_txn(txn) as stream_id:
+                # ... persist event ...
+        """
+        with self._lock:
+            if not self._current_max:
+                self._compute_current_max(txn)
+
+            self._current_max += 1
+            next_id = self._current_max
+
+            self._unfinished_ids.append(next_id)
+
+        @contextlib.contextmanager
+        def manager():
+            try:
+                yield next_id
+            finally:
+                with self._lock:
+                    self._unfinished_ids.remove(next_id)
+
+        return manager()
+
+    @defer.inlineCallbacks
+    def get_max_token(self, store):
+        """Returns the maximum stream id such that all stream ids less than or
+        equal to it have been successfully persisted.
+        """
+        with self._lock:
+            if self._unfinished_ids:
+                defer.returnValue(self._unfinished_ids[0] - 1)
+
+            if not self._current_max:
+                yield store.runInteraction(
+                    "_compute_current_max",
+                    self._compute_current_max,
+                )
+
+            defer.returnValue(self._current_max)
+
+    def _compute_current_max(self, txn):
+        txn.execute("SELECT MAX(stream_ordering) FROM events")
+        val, = txn.fetchone()
+
+        self._current_max = int(val) if val else 1
+
+        return self._current_max
diff --git a/synapse/util/lrucache.py b/synapse/util/lrucache.py
index 2f7b615f78..96163c90f1 100644
--- a/synapse/util/lrucache.py
+++ b/synapse/util/lrucache.py
@@ -14,6 +14,10 @@
 # limitations under the License.
 
 
+from functools import wraps
+import threading
+
+
 class LruCache(object):
     """Least-recently-used cache."""
     # TODO(mjark) Add mutex for linked list for thread safety.
@@ -24,6 +28,16 @@ class LruCache(object):
 
         PREV, NEXT, KEY, VALUE = 0, 1, 2, 3
 
+        lock = threading.Lock()
+
+        def synchronized(f):
+            @wraps(f)
+            def inner(*args, **kwargs):
+                with lock:
+                    return f(*args, **kwargs)
+
+            return inner
+
         def add_node(key, value):
             prev_node = list_root
             next_node = prev_node[NEXT]
@@ -51,6 +65,7 @@ class LruCache(object):
             next_node[PREV] = prev_node
             cache.pop(node[KEY], None)
 
+        @synchronized
         def cache_get(key, default=None):
             node = cache.get(key, None)
             if node is not None:
@@ -59,6 +74,7 @@ class LruCache(object):
             else:
                 return default
 
+        @synchronized
         def cache_set(key, value):
             node = cache.get(key, None)
             if node is not None:
@@ -69,6 +85,7 @@ class LruCache(object):
                 if len(cache) > max_size:
                     delete_node(list_root[PREV])
 
+        @synchronized
         def cache_set_default(key, value):
             node = cache.get(key, None)
             if node is not None:
@@ -79,6 +96,7 @@ class LruCache(object):
                     delete_node(list_root[PREV])
                 return value
 
+        @synchronized
         def cache_pop(key, default=None):
             node = cache.get(key, None)
             if node:
@@ -87,9 +105,11 @@ class LruCache(object):
             else:
                 return default
 
+        @synchronized
         def cache_len():
             return len(cache)
 
+        @synchronized
         def cache_contains(key):
             return key in cache
 
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 4e82232796..a42138f556 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -60,7 +60,7 @@ def get_retry_limiter(destination, clock, store, **kwargs):
 
     if retry_timings:
         retry_last_ts, retry_interval = (
-            retry_timings.retry_last_ts, retry_timings.retry_interval
+            retry_timings["retry_last_ts"], retry_timings["retry_interval"]
         )
 
         now = int(clock.time_msec())