summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/__init__.py2
-rw-r--r--synapse/api/auth.py113
-rw-r--r--synapse/api/constants.py9
-rw-r--r--synapse/api/errors.py15
-rw-r--r--synapse/api/events/__init__.py21
-rw-r--r--synapse/api/events/factory.py13
-rw-r--r--synapse/api/events/room.py7
-rwxr-xr-xsynapse/app/homeserver.py92
-rw-r--r--synapse/config/captcha.py46
-rw-r--r--synapse/config/email.py39
-rw-r--r--synapse/config/homeserver.py9
-rw-r--r--synapse/federation/replication.py31
-rw-r--r--synapse/federation/transport.py17
-rw-r--r--synapse/federation/units.py1
-rw-r--r--synapse/handlers/_base.py3
-rw-r--r--synapse/handlers/directory.py38
-rw-r--r--synapse/handlers/events.py10
-rw-r--r--synapse/handlers/federation.py14
-rw-r--r--synapse/handlers/login.py43
-rw-r--r--synapse/handlers/message.py15
-rw-r--r--synapse/handlers/presence.py5
-rw-r--r--synapse/handlers/profile.py46
-rw-r--r--synapse/handlers/register.py124
-rw-r--r--synapse/handlers/room.py62
-rw-r--r--synapse/http/client.py41
-rw-r--r--synapse/rest/directory.py5
-rw-r--r--synapse/rest/events.py2
-rw-r--r--synapse/rest/login.py24
-rw-r--r--synapse/rest/profile.py4
-rw-r--r--synapse/rest/register.py215
-rw-r--r--synapse/rest/room.py10
-rw-r--r--synapse/server.py5
-rw-r--r--synapse/state.py157
-rw-r--r--synapse/storage/__init__.py122
-rw-r--r--synapse/storage/_base.py95
-rw-r--r--synapse/storage/directory.py7
-rw-r--r--synapse/storage/pdu.py114
-rw-r--r--synapse/storage/registration.py4
-rw-r--r--synapse/storage/room.py4
-rw-r--r--synapse/storage/roommember.py72
-rw-r--r--synapse/storage/schema/delta/v3.sql27
-rw-r--r--synapse/storage/stream.py21
-rw-r--r--synapse/storage/transactions.py12
-rw-r--r--synapse/util/emailutils.py71
44 files changed, 1359 insertions, 428 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py
index 440e633966..bba551b2c4 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
 """ This is a reference implementation of a synapse home server.
 """
 
-__version__ = "0.2.1"
+__version__ = "0.3.3"
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index b4eda3df01..8f32191b57 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -18,8 +18,8 @@
 from twisted.internet import defer
 
 from synapse.api.constants import Membership, JoinRules
-from synapse.api.errors import AuthError, StoreError, Codes
-from synapse.api.events.room import RoomMemberEvent
+from synapse.api.errors import AuthError, StoreError, Codes, SynapseError
+from synapse.api.events.room import RoomMemberEvent, RoomPowerLevelsEvent
 from synapse.util.logutils import log_function
 
 import logging
@@ -67,6 +67,9 @@ class Auth(object):
                 else:
                     yield self._can_send_event(event)
 
+                if event.type == RoomPowerLevelsEvent.TYPE:
+                    yield self._check_power_levels(event)
+
                 defer.returnValue(True)
             else:
                 raise AuthError(500, "Unknown event: %s" % event)
@@ -172,7 +175,7 @@ class Auth(object):
                 if kick_level:
                     kick_level = int(kick_level)
                 else:
-                    kick_level = 5
+                    kick_level = 50
 
                 if user_level < kick_level:
                     raise AuthError(
@@ -189,7 +192,7 @@ class Auth(object):
             if ban_level:
                 ban_level = int(ban_level)
             else:
-                ban_level = 5  # FIXME (erikj): What should we do here?
+                ban_level = 50  # FIXME (erikj): What should we do here?
 
             if user_level < ban_level:
                 raise AuthError(403, "You don't have permission to ban")
@@ -305,7 +308,9 @@ class Auth(object):
         else:
             user_level = 0
 
-        logger.debug("Checking power level for %s, %s", event.user_id, user_level)
+        logger.debug(
+            "Checking power level for %s, %s", event.user_id, user_level
+        )
         if current_state and hasattr(current_state, "required_power_level"):
             req = current_state.required_power_level
 
@@ -315,3 +320,101 @@ class Auth(object):
                     403,
                     "You don't have permission to change that state"
                 )
+
+    @defer.inlineCallbacks
+    def _check_power_levels(self, event):
+        for k, v in event.content.items():
+            if k == "default":
+                continue
+
+            # FIXME (erikj): We don't want hsob_Ts in content.
+            if k == "hsob_ts":
+                continue
+
+            try:
+                self.hs.parse_userid(k)
+            except:
+                raise SynapseError(400, "Not a valid user_id: %s" % (k,))
+
+            try:
+                int(v)
+            except:
+                raise SynapseError(400, "Not a valid power level: %s" % (v,))
+
+        current_state = yield self.store.get_current_state(
+            event.room_id,
+            event.type,
+            event.state_key,
+        )
+
+        if not current_state:
+            return
+        else:
+            current_state = current_state[0]
+
+        user_level = yield self.store.get_power_level(
+            event.room_id,
+            event.user_id,
+        )
+
+        if user_level:
+            user_level = int(user_level)
+        else:
+            user_level = 0
+
+        old_list = current_state.content
+
+        # FIXME (erikj)
+        old_people = {k: v for k, v in old_list.items() if k.startswith("@")}
+        new_people = {
+            k: v for k, v in event.content.items()
+            if k.startswith("@")
+        }
+
+        removed = set(old_people.keys()) - set(new_people.keys())
+        added = set(old_people.keys()) - set(new_people.keys())
+        same = set(old_people.keys()) & set(new_people.keys())
+
+        for r in removed:
+            if int(old_list.content[r]) > user_level:
+                raise AuthError(
+                    403,
+                    "You don't have permission to remove user: %s" % (r, )
+                )
+
+        for n in added:
+            if int(event.content[n]) > user_level:
+                raise AuthError(
+                    403,
+                    "You don't have permission to add ops level greater "
+                    "than your own"
+                )
+
+        for s in same:
+            if int(event.content[s]) != int(old_list[s]):
+                if int(event.content[s]) > user_level:
+                    raise AuthError(
+                        403,
+                        "You don't have permission to add ops level greater "
+                        "than your own"
+                    )
+
+        if "default" in old_list:
+            old_default = int(old_list["default"])
+
+            if old_default > user_level:
+                raise AuthError(
+                    403,
+                    "You don't have permission to add ops level greater than "
+                    "your own"
+                )
+
+            if "default" in event.content:
+                new_default = int(event.content["default"])
+
+                if new_default > user_level:
+                    raise AuthError(
+                        403,
+                        "You don't have permission to add ops level greater "
+                        "than your own"
+                    )
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index fcef062fc9..618d3d7577 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -50,3 +50,12 @@ class JoinRules(object):
     KNOCK = u"knock"
     INVITE = u"invite"
     PRIVATE = u"private"
+
+
+class LoginType(object):
+    PASSWORD = u"m.login.password"
+    OAUTH = u"m.login.oauth2"
+    EMAIL_CODE = u"m.login.email.code"
+    EMAIL_URL = u"m.login.email.url"
+    EMAIL_IDENTITY = u"m.login.email.identity"
+    RECAPTCHA = u"m.login.recaptcha"
\ No newline at end of file
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index 84afe4fa37..88175602c4 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -29,6 +29,8 @@ class Codes(object):
     NOT_FOUND = "M_NOT_FOUND"
     UNKNOWN_TOKEN = "M_UNKNOWN_TOKEN"
     LIMIT_EXCEEDED = "M_LIMIT_EXCEEDED"
+    CAPTCHA_NEEDED = "M_CAPTCHA_NEEDED"
+    CAPTCHA_INVALID = "M_CAPTCHA_INVALID"
 
 
 class CodeMessageException(Exception):
@@ -101,6 +103,19 @@ class StoreError(SynapseError):
     pass
 
 
+class InvalidCaptchaError(SynapseError):
+    def __init__(self, code=400, msg="Invalid captcha.", error_url=None,
+                 errcode=Codes.CAPTCHA_INVALID):
+        super(InvalidCaptchaError, self).__init__(code, msg, errcode)
+        self.error_url = error_url
+
+    def error_dict(self):
+        return cs_error(
+            self.msg,
+            self.errcode,
+            error_url=self.error_url,
+        )
+
 class LimitExceededError(SynapseError):
     """A client has sent too many requests and is being throttled.
     """
diff --git a/synapse/api/events/__init__.py b/synapse/api/events/__init__.py
index f95468fc65..0cee196851 100644
--- a/synapse/api/events/__init__.py
+++ b/synapse/api/events/__init__.py
@@ -17,6 +17,19 @@ from synapse.api.errors import SynapseError, Codes
 from synapse.util.jsonobject import JsonEncodedObject
 
 
+def serialize_event(hs, e):
+    # FIXME(erikj): To handle the case of presence events and the like
+    if not isinstance(e, SynapseEvent):
+        return e
+
+    d = e.get_dict()
+    if "age_ts" in d:
+        d["age"] = int(hs.get_clock().time_msec()) - d["age_ts"]
+        del d["age_ts"]
+
+    return d
+
+
 class SynapseEvent(JsonEncodedObject):
 
     """Base class for Synapse events. These are JSON objects which must abide
@@ -43,6 +56,8 @@ class SynapseEvent(JsonEncodedObject):
         "content",  # HTTP body, JSON
         "state_key",
         "required_power_level",
+        "age_ts",
+        "prev_content",
     ]
 
     internal_keys = [
@@ -141,7 +156,8 @@ class SynapseEvent(JsonEncodedObject):
                     return "Missing %s key" % key
 
                 if type(content[key]) != type(template[key]):
-                    return "Key %s is of the wrong type." % key
+                    return "Key %s is of the wrong type (got %s, want %s)" % (
+                        key, type(content[key]), type(template[key]))
 
                 if type(content[key]) == dict:
                     # we must go deeper
@@ -157,7 +173,8 @@ class SynapseEvent(JsonEncodedObject):
 
 
 class SynapseStateEvent(SynapseEvent):
-     def __init__(self, **kwargs):
+
+    def __init__(self, **kwargs):
         if "state_key" not in kwargs:
             kwargs["state_key"] = ""
         super(SynapseStateEvent, self).__init__(**kwargs)
diff --git a/synapse/api/events/factory.py b/synapse/api/events/factory.py
index a3b293e024..d3d96d73eb 100644
--- a/synapse/api/events/factory.py
+++ b/synapse/api/events/factory.py
@@ -47,15 +47,26 @@ class EventFactory(object):
             self._event_list[event_class.TYPE] = event_class
 
         self.clock = hs.get_clock()
+        self.hs = hs
 
     def create_event(self, etype=None, **kwargs):
         kwargs["type"] = etype
         if "event_id" not in kwargs:
-            kwargs["event_id"] = random_string(10)
+            kwargs["event_id"] = "%s@%s" % (
+                random_string(10), self.hs.hostname
+            )
 
         if "ts" not in kwargs:
             kwargs["ts"] = int(self.clock.time_msec())
 
+        # The "age" key is a delta timestamp that should be converted into an
+        # absolute timestamp the minute we see it.
+        if "age" in kwargs:
+            kwargs["age_ts"] = int(self.clock.time_msec()) - int(kwargs["age"])
+            del kwargs["age"]
+        elif "age_ts" not in kwargs:
+            kwargs["age_ts"] = int(self.clock.time_msec())
+
         if etype in self._event_list:
             handler = self._event_list[etype]
         else:
diff --git a/synapse/api/events/room.py b/synapse/api/events/room.py
index 33f0f0cb99..3a4dbc58ce 100644
--- a/synapse/api/events/room.py
+++ b/synapse/api/events/room.py
@@ -173,3 +173,10 @@ class RoomOpsPowerLevelsEvent(SynapseStateEvent):
 
     def get_content_template(self):
         return {}
+
+
+class RoomAliasesEvent(SynapseStateEvent):
+    TYPE = "m.room.aliases"
+
+    def get_content_template(self):
+        return {}
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 49cf928cc1..2f1b954902 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -14,7 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.storage import read_schema
+from synapse.storage import prepare_database
 
 from synapse.server import HomeServer
 
@@ -36,30 +36,14 @@ from daemonize import Daemonize
 import twisted.manhole.telnet
 
 import logging
-import sqlite3
 import os
 import re
 import sys
+import sqlite3
 
 logger = logging.getLogger(__name__)
 
 
-SCHEMAS = [
-    "transactions",
-    "pdu",
-    "users",
-    "profiles",
-    "presence",
-    "im",
-    "room_aliases",
-]
-
-
-# Remember to update this number every time an incompatible change is made to
-# database schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 2
-
-
 class SynapseHomeServer(HomeServer):
 
     def build_http_client(self):
@@ -80,52 +64,12 @@ class SynapseHomeServer(HomeServer):
         )
 
     def build_db_pool(self):
-        """ Set up all the dbs. Since all the *.sql have IF NOT EXISTS, so we
-        don't have to worry about overwriting existing content.
-        """
-        logging.info("Preparing database: %s...", self.db_name)
-
-        with sqlite3.connect(self.db_name) as db_conn:
-            c = db_conn.cursor()
-            c.execute("PRAGMA user_version")
-            row = c.fetchone()
-
-            if row and row[0]:
-                user_version = row[0]
-
-                if user_version > SCHEMA_VERSION:
-                    raise ValueError("Cannot use this database as it is too " +
-                        "new for the server to understand"
-                    )
-                elif user_version < SCHEMA_VERSION:
-                    logging.info("Upgrading database from version %d",
-                        user_version
-                    )
-
-                    # Run every version since after the current version.
-                    for v in range(user_version + 1, SCHEMA_VERSION + 1):
-                        sql_script = read_schema("delta/v%d" % (v))
-                        c.executescript(sql_script)
-
-                    db_conn.commit()
-
-            else:
-                for sql_loc in SCHEMAS:
-                    sql_script = read_schema(sql_loc)
-
-                    c.executescript(sql_script)
-                db_conn.commit()
-                c.execute("PRAGMA user_version = %d" % SCHEMA_VERSION)
-
-            c.close()
-
-        logging.info("Database prepared in %s.", self.db_name)
-
-        pool = adbapi.ConnectionPool(
-            'sqlite3', self.db_name, check_same_thread=False,
-            cp_min=1, cp_max=1)
-
-        return pool
+        return adbapi.ConnectionPool(
+            "sqlite3", self.get_db_name(),
+            check_same_thread=False,
+            cp_min=1,
+            cp_max=1
+        )
 
     def create_resource_tree(self, web_client, redirect_root_to_web_client):
         """Create the resource tree for this Home Server.
@@ -230,10 +174,6 @@ class SynapseHomeServer(HomeServer):
             logger.info("Synapse now listening on port %d", unsecure_port)
 
 
-def run():
-    reactor.run()
-
-
 def setup():
     config = HomeServerConfig.load_config(
         "Synapse Homeserver",
@@ -268,7 +208,15 @@ def setup():
         web_client=config.webclient,
         redirect_root_to_web_client=True,
     )
-    hs.start_listening(config.bind_port, config.unsecure_port)
+
+    db_name = hs.get_db_name()
+
+    logging.info("Preparing database: %s...", db_name)
+
+    with sqlite3.connect(db_name) as db_conn:
+        prepare_database(db_conn)
+
+    logging.info("Database prepared in %s.", db_name)
 
     hs.get_db_pool()
 
@@ -279,12 +227,14 @@ def setup():
         f.namespace['hs'] = hs
         reactor.listenTCP(config.manhole, f, interface='127.0.0.1')
 
+    hs.start_listening(config.bind_port, config.unsecure_port)
+
     if config.daemonize:
         print config.pid_file
         daemon = Daemonize(
             app="synapse-homeserver",
             pid=config.pid_file,
-            action=run,
+            action=reactor.run,
             auto_close_fds=False,
             verbose=True,
             logger=logger,
@@ -292,7 +242,7 @@ def setup():
 
         daemon.start()
     else:
-        run()
+        reactor.run()
 
 
 if __name__ == '__main__':
diff --git a/synapse/config/captcha.py b/synapse/config/captcha.py
new file mode 100644
index 0000000000..8ebcfc3623
--- /dev/null
+++ b/synapse/config/captcha.py
@@ -0,0 +1,46 @@
+# Copyright 2014 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import Config
+
+
+class CaptchaConfig(Config):
+
+    def __init__(self, args):
+        super(CaptchaConfig, self).__init__(args)
+        self.recaptcha_private_key = args.recaptcha_private_key
+        self.enable_registration_captcha = args.enable_registration_captcha
+        self.captcha_ip_origin_is_x_forwarded = (
+            args.captcha_ip_origin_is_x_forwarded
+        )
+
+    @classmethod
+    def add_arguments(cls, parser):
+        super(CaptchaConfig, cls).add_arguments(parser)
+        group = parser.add_argument_group("recaptcha")
+        group.add_argument(
+            "--recaptcha-private-key", type=str, default="YOUR_PRIVATE_KEY",
+            help="The matching private key for the web client's public key."
+        )
+        group.add_argument(
+            "--enable-registration-captcha", type=bool, default=False,
+            help="Enables ReCaptcha checks when registering, preventing signup"
+            + " unless a captcha is answered. Requires a valid ReCaptcha "
+            + "public/private key."
+        )
+        group.add_argument(
+            "--captcha_ip_origin_is_x_forwarded", type=bool, default=False,
+            help="When checking captchas, use the X-Forwarded-For (XFF) header"
+            + " as the client IP and not the actual client IP."
+        )
\ No newline at end of file
diff --git a/synapse/config/email.py b/synapse/config/email.py
new file mode 100644
index 0000000000..9bcc5a8fea
--- /dev/null
+++ b/synapse/config/email.py
@@ -0,0 +1,39 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import Config
+
+
+class EmailConfig(Config):
+
+    def __init__(self, args):
+        super(EmailConfig, self).__init__(args)
+        self.email_from_address = args.email_from_address
+        self.email_smtp_server = args.email_smtp_server
+
+    @classmethod
+    def add_arguments(cls, parser):
+        super(EmailConfig, cls).add_arguments(parser)
+        email_group = parser.add_argument_group("email")
+        email_group.add_argument(
+            "--email-from-address",
+            default="FROM@EXAMPLE.COM",
+            help="The address to send emails from (e.g. for password resets)."
+        )
+        email_group.add_argument(
+            "--email-smtp-server",
+            default="",
+            help="The SMTP server to send emails from (e.g. for password resets)."
+        )
\ No newline at end of file
diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py
index 76e2cdeddd..4b810a2302 100644
--- a/synapse/config/homeserver.py
+++ b/synapse/config/homeserver.py
@@ -19,11 +19,16 @@ from .logger import LoggingConfig
 from .database import DatabaseConfig
 from .ratelimiting import RatelimitConfig
 from .repository import ContentRepositoryConfig
+from .captcha import CaptchaConfig
+from .email import EmailConfig
+
 
 class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig,
-                       RatelimitConfig, ContentRepositoryConfig):
+                       RatelimitConfig, ContentRepositoryConfig, CaptchaConfig,
+                       EmailConfig):
     pass
 
-if __name__=='__main__':
+
+if __name__ == '__main__':
     import sys
     HomeServerConfig.load_config("Generate config", sys.argv[1:], "HomeServer")
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index e12510017f..96b82f00cb 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -291,6 +291,13 @@ class ReplicationLayer(object):
     def on_incoming_transaction(self, transaction_data):
         transaction = Transaction(**transaction_data)
 
+        for p in transaction.pdus:
+            if "age" in p:
+                p["age_ts"] = int(self._clock.time_msec()) - int(p["age"])
+                del p["age"]
+
+        pdu_list = [Pdu(**p) for p in transaction.pdus]
+
         logger.debug("[%s] Got transaction", transaction.transaction_id)
 
         response = yield self.transaction_actions.have_responded(transaction)
@@ -303,8 +310,6 @@ class ReplicationLayer(object):
 
         logger.debug("[%s] Transacition is new", transaction.transaction_id)
 
-        pdu_list = [Pdu(**p) for p in transaction.pdus]
-
         dl = []
         for pdu in pdu_list:
             dl.append(self._handle_new_pdu(pdu))
@@ -405,9 +410,14 @@ class ReplicationLayer(object):
         """Returns a new Transaction containing the given PDUs suitable for
         transmission.
         """
+        pdus = [p.get_dict() for p in pdu_list]
+        for p in pdus:
+            if "age_ts" in pdus:
+                p["age"] = int(self.clock.time_msec()) - p["age_ts"]
+
         return Transaction(
-            pdus=[p.get_dict() for p in pdu_list],
             origin=self.server_name,
+            pdus=pdus,
             ts=int(self._clock.time_msec()),
             destination=None,
         )
@@ -593,8 +603,21 @@ class _TransactionQueue(object):
             logger.debug("TX [%s] Sending transaction...", destination)
 
             # Actually send the transaction
+
+            # FIXME (erikj): This is a bit of a hack to make the Pdu age
+            # keys work
+            def cb(transaction):
+                now = int(self._clock.time_msec())
+                if "pdus" in transaction:
+                    for p in transaction["pdus"]:
+                        if "age_ts" in p:
+                            p["age"] = now - int(p["age_ts"])
+
+                return transaction
+
             code, response = yield self.transport_layer.send_transaction(
-                transaction
+                transaction,
+                on_send_callback=cb,
             )
 
             logger.debug("TX [%s] Sent transaction", destination)
diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py
index 6e62ae7c74..afc777ec9e 100644
--- a/synapse/federation/transport.py
+++ b/synapse/federation/transport.py
@@ -144,7 +144,7 @@ class TransportLayer(object):
 
     @defer.inlineCallbacks
     @log_function
-    def send_transaction(self, transaction):
+    def send_transaction(self, transaction, on_send_callback=None):
         """ Sends the given Transaction to it's destination
 
         Args:
@@ -165,10 +165,23 @@ class TransportLayer(object):
 
         data = transaction.get_dict()
 
+        # FIXME (erikj): This is a bit of a hack to make the Pdu age
+        # keys work
+        def cb(destination, method, path_bytes, producer):
+            if not on_send_callback:
+                return
+
+            transaction = json.loads(producer.body)
+
+            new_transaction = on_send_callback(transaction)
+
+            producer.reset(new_transaction)
+
         code, response = yield self.client.put_json(
             transaction.destination,
             path=PREFIX + "/send/%s/" % transaction.transaction_id,
-            data=data
+            data=data,
+            on_send_callback=cb,
         )
 
         logger.debug(
diff --git a/synapse/federation/units.py b/synapse/federation/units.py
index 9740431279..622fe66a8f 100644
--- a/synapse/federation/units.py
+++ b/synapse/federation/units.py
@@ -69,6 +69,7 @@ class Pdu(JsonEncodedObject):
         "prev_state_id",
         "prev_state_origin",
         "required_power_level",
+        "user_id",
     ]
 
     internal_keys = [
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 9989fe8670..de4d23bbb3 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -42,9 +42,6 @@ class BaseHandler(object):
                 retry_after_ms=int(1000*(time_allowed - time_now)),
             )
 
-
-class BaseRoomHandler(BaseHandler):
-
     @defer.inlineCallbacks
     def _on_new_room_event(self, event, snapshot, extra_destinations=[],
                            extra_users=[]):
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 1b9e831fc0..4ab00a761a 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -19,8 +19,10 @@ from ._base import BaseHandler
 
 from synapse.api.errors import SynapseError
 from synapse.http.client import HttpClient
+from synapse.api.events.room import RoomAliasesEvent
 
 import logging
+import sqlite3
 
 
 logger = logging.getLogger(__name__)
@@ -37,7 +39,8 @@ class DirectoryHandler(BaseHandler):
         )
 
     @defer.inlineCallbacks
-    def create_association(self, room_alias, room_id, servers=None):
+    def create_association(self, user_id, room_alias, room_id, servers=None):
+
         # TODO(erikj): Do auth.
 
         if not room_alias.is_mine:
@@ -54,12 +57,37 @@ class DirectoryHandler(BaseHandler):
         if not servers:
             raise SynapseError(400, "Failed to get server list")
 
-        yield self.store.create_room_alias_association(
-            room_alias,
-            room_id,
-            servers
+
+        try:
+            yield self.store.create_room_alias_association(
+                room_alias,
+                room_id,
+                servers
+            )
+        except sqlite3.IntegrityError:
+            defer.returnValue("Already exists")
+
+        # TODO: Send the room event.
+
+        aliases = yield self.store.get_aliases_for_room(room_id)
+
+        event = self.event_factory.create_event(
+            etype=RoomAliasesEvent.TYPE,
+            state_key=self.hs.hostname,
+            room_id=room_id,
+            user_id=user_id,
+            content={"aliases": aliases},
+        )
+
+        snapshot = yield self.store.snapshot_room(
+            room_id=room_id,
+            user_id=user_id,
         )
 
+        yield self.state_handler.handle_new_event(event, snapshot)
+        yield self._on_new_room_event(event, snapshot, extra_users=[user_id])
+
+
     @defer.inlineCallbacks
     def get_association(self, room_alias):
         room_id = None
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index fd24a11fb8..93dcd40324 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -15,7 +15,6 @@
 
 from twisted.internet import defer
 
-from synapse.api.events import SynapseEvent
 from synapse.util.logutils import log_function
 
 from ._base import BaseHandler
@@ -71,10 +70,7 @@ class EventStreamHandler(BaseHandler):
                 auth_user, room_ids, pagin_config, timeout
             )
 
-            chunks = [
-                e.get_dict() if isinstance(e, SynapseEvent) else e
-                for e in events
-            ]
+            chunks = [self.hs.serialize_event(e) for e in events]
 
             chunk = {
                 "chunk": chunks,
@@ -92,7 +88,9 @@ class EventStreamHandler(BaseHandler):
                 # 10 seconds of grace to allow the client to reconnect again
                 #   before we think they're gone
                 def _later():
-                    logger.debug("_later stopped_user_eventstream %s", auth_user)
+                    logger.debug(
+                        "_later stopped_user_eventstream %s", auth_user
+                    )
                     self.distributor.fire(
                         "stopped_user_eventstream", auth_user
                     )
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 59cbf71d78..001c6c110c 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -93,22 +93,18 @@ class FederationHandler(BaseHandler):
         """
         event = self.pdu_codec.event_from_pdu(pdu)
 
+        logger.debug("Got event: %s", event.event_id)
+
         with (yield self.lock_manager.lock(pdu.context)):
             if event.is_state and not backfilled:
                 is_new_state = yield self.state_handler.handle_new_state(
                     pdu
                 )
-                if not is_new_state:
-                    return
             else:
                 is_new_state = False
         # TODO: Implement something in federation that allows us to
         # respond to PDU.
 
-        if hasattr(event, "state_key") and not is_new_state:
-            logger.debug("Ignoring old state.")
-            return
-
         target_is_mine = False
         if hasattr(event, "target_host"):
             target_is_mine = event.target_host == self.hs.hostname
@@ -139,7 +135,11 @@ class FederationHandler(BaseHandler):
 
         else:
             with (yield self.room_lock.lock(event.room_id)):
-                yield self.store.persist_event(event, backfilled)
+                yield self.store.persist_event(
+                    event,
+                    backfilled,
+                    is_new_state=is_new_state
+                )
 
             room = yield self.store.get_room(event.room_id)
 
diff --git a/synapse/handlers/login.py b/synapse/handlers/login.py
index 6ee7ce5a2d..80ffdd2726 100644
--- a/synapse/handlers/login.py
+++ b/synapse/handlers/login.py
@@ -17,9 +17,13 @@ from twisted.internet import defer
 
 from ._base import BaseHandler
 from synapse.api.errors import LoginError, Codes
+from synapse.http.client import PlainHttpClient
+from synapse.util.emailutils import EmailException
+import synapse.util.emailutils as emailutils
 
 import bcrypt
 import logging
+import urllib
 
 logger = logging.getLogger(__name__)
 
@@ -62,4 +66,41 @@ class LoginHandler(BaseHandler):
             defer.returnValue(token)
         else:
             logger.warn("Failed password login for user %s", user)
-            raise LoginError(403, "", errcode=Codes.FORBIDDEN)
\ No newline at end of file
+            raise LoginError(403, "", errcode=Codes.FORBIDDEN)
+
+    @defer.inlineCallbacks
+    def reset_password(self, user_id, email):
+        is_valid = yield self._check_valid_association(user_id, email)
+        logger.info("reset_password user=%s email=%s valid=%s", user_id, email,
+                    is_valid)
+        if is_valid:
+            try:
+                # send an email out
+                emailutils.send_email(
+                    smtp_server=self.hs.config.email_smtp_server,
+                    from_addr=self.hs.config.email_from_address,
+                    to_addr=email,
+                    subject="Password Reset",
+                    body="TODO."
+                )
+            except EmailException as e:
+                logger.exception(e)
+
+    @defer.inlineCallbacks
+    def _check_valid_association(self, user_id, email):
+        identity = yield self._query_email(email)
+        if identity and "mxid" in identity:
+            if identity["mxid"] == user_id:
+                defer.returnValue(True)
+                return
+        defer.returnValue(False)
+
+    @defer.inlineCallbacks
+    def _query_email(self, email):
+        httpCli = PlainHttpClient(self.hs)
+        data = yield httpCli.get_json(
+            'matrix.org:8090',  # TODO FIXME This should be configurable.
+            "/_matrix/identity/api/v1/lookup?medium=email&address=" +
+            "%s" % urllib.quote(email)
+        )
+        defer.returnValue(data)
\ No newline at end of file
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index dad2bbd1a4..14fae689f2 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -19,7 +19,7 @@ from synapse.api.constants import Membership
 from synapse.api.events.room import RoomTopicEvent
 from synapse.api.errors import RoomError
 from synapse.streams.config import PaginationConfig
-from ._base import BaseRoomHandler
+from ._base import BaseHandler
 
 import logging
 
@@ -27,7 +27,7 @@ logger = logging.getLogger(__name__)
 
 
 
-class MessageHandler(BaseRoomHandler):
+class MessageHandler(BaseHandler):
 
     def __init__(self, hs):
         super(MessageHandler, self).__init__(hs)
@@ -124,7 +124,7 @@ class MessageHandler(BaseRoomHandler):
         )
 
         chunk = {
-            "chunk": [e.get_dict() for e in events],
+            "chunk": [self.hs.serialize_event(e) for e in events],
             "start": pagin_config.from_token.to_string(),
             "end": next_token.to_string(),
         }
@@ -268,6 +268,9 @@ class MessageHandler(BaseRoomHandler):
             user, pagination_config, None
         )
 
+        public_rooms = yield self.store.get_rooms(is_public=True)
+        public_room_ids = [r["room_id"] for r in public_rooms]
+
         limit = pagin_config.limit
         if not limit:
             limit = 10
@@ -276,6 +279,8 @@ class MessageHandler(BaseRoomHandler):
             d = {
                 "room_id": event.room_id,
                 "membership": event.membership,
+                "visibility": ("public" if event.room_id in
+                              public_room_ids else "private"),
             }
 
             if event.membership == Membership.INVITE:
@@ -296,7 +301,7 @@ class MessageHandler(BaseRoomHandler):
                 end_token = now_token.copy_and_replace("room_key", token[1])
 
                 d["messages"] = {
-                    "chunk": [m.get_dict() for m in messages],
+                    "chunk": [self.hs.serialize_event(m) for m in messages],
                     "start": start_token.to_string(),
                     "end": end_token.to_string(),
                 }
@@ -304,7 +309,7 @@ class MessageHandler(BaseRoomHandler):
                 current_state = yield self.store.get_current_state(
                     event.room_id
                 )
-                d["state"] = [c.get_dict() for c in current_state]
+                d["state"] = [self.hs.serialize_event(c) for c in current_state]
             except:
                 logger.exception("Failed to get snapshot")
 
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index c79bb6ff76..b2af09f090 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -796,11 +796,12 @@ class PresenceEventSource(object):
         updates = []
         # TODO(paul): use a DeferredList ? How to limit concurrency.
         for observed_user in cachemap.keys():
-            if not (from_key < cachemap[observed_user].serial):
+            cached = cachemap[observed_user]
+            if not (from_key < cached.serial):
                 continue
 
             if (yield self.is_visible(observer_user, observed_user)):
-                updates.append((observed_user, cachemap[observed_user]))
+                updates.append((observed_user, cached))
 
         # TODO(paul): limit
 
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 023d8c0cf2..dab9b03f04 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -15,9 +15,9 @@
 
 from twisted.internet import defer
 
-from synapse.api.errors import SynapseError, AuthError
-
-from synapse.api.errors import CodeMessageException
+from synapse.api.errors import SynapseError, AuthError, CodeMessageException
+from synapse.api.constants import Membership
+from synapse.api.events.room import RoomMemberEvent
 
 from ._base import BaseHandler
 
@@ -97,6 +97,8 @@ class ProfileHandler(BaseHandler):
             }
         )
 
+        yield self._update_join_states(target_user)
+
     @defer.inlineCallbacks
     def get_avatar_url(self, target_user):
         if target_user.is_mine:
@@ -144,6 +146,8 @@ class ProfileHandler(BaseHandler):
             }
         )
 
+        yield self._update_join_states(target_user)
+
     @defer.inlineCallbacks
     def collect_presencelike_data(self, user, state):
         if not user.is_mine:
@@ -180,3 +184,39 @@ class ProfileHandler(BaseHandler):
             )
 
         defer.returnValue(response)
+
+    @defer.inlineCallbacks
+    def _update_join_states(self, user):
+        if not user.is_mine:
+            return
+
+        joins = yield self.store.get_rooms_for_user_where_membership_is(
+            user.to_string(),
+            [Membership.JOIN],
+        )
+
+        for j in joins:
+            snapshot = yield self.store.snapshot_room(
+                j.room_id, j.state_key, RoomMemberEvent.TYPE,
+                j.state_key
+            )
+
+            content = {
+                "membership": j.content["membership"],
+                "prev": j.content["membership"],
+            }
+
+            yield self.distributor.fire(
+                "collect_presencelike_data", user, content
+            )
+
+            new_event = self.event_factory.create_event(
+                etype=j.type,
+                room_id=j.room_id,
+                state_key=j.state_key,
+                content=content,
+                user_id=j.state_key,
+            )
+
+            yield self.state_handler.handle_new_event(new_event, snapshot)
+            yield self._on_new_room_event(new_event, snapshot)
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index bee052274f..a019d770d4 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -17,7 +17,9 @@
 from twisted.internet import defer
 
 from synapse.types import UserID
-from synapse.api.errors import SynapseError, RegistrationError
+from synapse.api.errors import (
+    SynapseError, RegistrationError, InvalidCaptchaError
+)
 from ._base import BaseHandler
 import synapse.util.stringutils as stringutils
 from synapse.http.client import PlainHttpClient
@@ -38,7 +40,7 @@ class RegistrationHandler(BaseHandler):
         self.distributor.declare("registered_user")
 
     @defer.inlineCallbacks
-    def register(self, localpart=None, password=None, threepidCreds=None):
+    def register(self, localpart=None, password=None):
         """Registers a new client on the server.
 
         Args:
@@ -51,20 +53,6 @@ class RegistrationHandler(BaseHandler):
         Raises:
             RegistrationError if there was a problem registering.
         """
-
-        if threepidCreds:
-            for c in threepidCreds:
-                logger.info("validating theeepidcred sid %s on id server %s", c['sid'], c['idServer'])
-                try:
-                    threepid = yield self._threepid_from_creds(c)
-                except:
-                    logger.err()
-                    raise RegistrationError(400, "Couldn't validate 3pid")
-                    
-                if not threepid:
-                    raise RegistrationError(400, "Couldn't validate 3pid")
-                logger.info("got threepid medium %s address %s", threepid['medium'], threepid['address'])
-
         password_hash = None
         if password:
             password_hash = bcrypt.hashpw(password, bcrypt.gensalt())
@@ -106,15 +94,54 @@ class RegistrationHandler(BaseHandler):
                         raise RegistrationError(
                             500, "Cannot generate user ID.")
 
-        # Now we have a matrix ID, bind it to the threepids we were given
-        if threepidCreds:
-            for c in threepidCreds:
-                # XXX: This should be a deferred list, shouldn't it?
-                yield self._bind_threepid(c, user_id)
-                
-
         defer.returnValue((user_id, token))
 
+    @defer.inlineCallbacks
+    def check_recaptcha(self, ip, private_key, challenge, response):
+        """Checks a recaptcha is correct."""
+
+        captcha_response = yield self._validate_captcha(
+            ip,
+            private_key,
+            challenge,
+            response
+        )
+        if not captcha_response["valid"]:
+            logger.info("Invalid captcha entered from %s. Error: %s",
+                        ip, captcha_response["error_url"])
+            raise InvalidCaptchaError(
+                error_url=captcha_response["error_url"]
+            )
+        else:
+            logger.info("Valid captcha entered from %s", ip)
+
+    @defer.inlineCallbacks
+    def register_email(self, threepidCreds):
+        """Registers emails with an identity server."""
+
+        for c in threepidCreds:
+            logger.info("validating theeepidcred sid %s on id server %s",
+                        c['sid'], c['idServer'])
+            try:
+                threepid = yield self._threepid_from_creds(c)
+            except:
+                logger.err()
+                raise RegistrationError(400, "Couldn't validate 3pid")
+
+            if not threepid:
+                raise RegistrationError(400, "Couldn't validate 3pid")
+            logger.info("got threepid medium %s address %s",
+                        threepid['medium'], threepid['address'])
+
+    @defer.inlineCallbacks
+    def bind_emails(self, user_id, threepidCreds):
+        """Links emails with a user ID and informs an identity server."""
+
+        # Now we have a matrix ID, bind it to the threepids we were given
+        for c in threepidCreds:
+            # XXX: This should be a deferred list, shouldn't it?
+            yield self._bind_threepid(c, user_id)
+
     def _generate_token(self, user_id):
         # urlsafe variant uses _ and - so use . as the separator and replace
         # all =s with .s so http clients don't quote =s when it is used as
@@ -129,16 +156,17 @@ class RegistrationHandler(BaseHandler):
     def _threepid_from_creds(self, creds):
         httpCli = PlainHttpClient(self.hs)
         # XXX: make this configurable!
-        trustedIdServers = [ 'matrix.org:8090' ]
+        trustedIdServers = ['matrix.org:8090']
         if not creds['idServer'] in trustedIdServers:
-            logger.warn('%s is not a trusted ID server: rejecting 3pid credentials', creds['idServer'])
+            logger.warn('%s is not a trusted ID server: rejecting 3pid ' +
+                        'credentials', creds['idServer'])
             defer.returnValue(None)
         data = yield httpCli.get_json(
             creds['idServer'],
             "/_matrix/identity/api/v1/3pid/getValidated3pid",
-            { 'sid': creds['sid'], 'clientSecret': creds['clientSecret'] }
+            {'sid': creds['sid'], 'clientSecret': creds['clientSecret']}
         )
-        
+
         if 'medium' in data:
             defer.returnValue(data)
         defer.returnValue(None)
@@ -149,9 +177,45 @@ class RegistrationHandler(BaseHandler):
         data = yield httpCli.post_urlencoded_get_json(
             creds['idServer'],
             "/_matrix/identity/api/v1/3pid/bind",
-            { 'sid': creds['sid'], 'clientSecret': creds['clientSecret'], 'mxid':mxid }
+            {'sid': creds['sid'], 'clientSecret': creds['clientSecret'],
+            'mxid': mxid}
         )
         defer.returnValue(data)
-        
-        
+
+    @defer.inlineCallbacks
+    def _validate_captcha(self, ip_addr, private_key, challenge, response):
+        """Validates the captcha provided.
+
+        Returns:
+            dict: Containing 'valid'(bool) and 'error_url'(str) if invalid.
+
+        """
+        response = yield self._submit_captcha(ip_addr, private_key, challenge,
+                                              response)
+        # parse Google's response. Lovely format..
+        lines = response.split('\n')
+        json = {
+            "valid": lines[0] == 'true',
+            "error_url": "http://www.google.com/recaptcha/api/challenge?" +
+                         "error=%s" % lines[1]
+        }
+        defer.returnValue(json)
+
+    @defer.inlineCallbacks
+    def _submit_captcha(self, ip_addr, private_key, challenge, response):
+        client = PlainHttpClient(self.hs)
+        data = yield client.post_urlencoded_get_raw(
+            "www.google.com:80",
+            "/recaptcha/api/verify",
+            # twisted dislikes google's response, no content length.
+            accept_partial=True,
+            args={
+                'privatekey': private_key,
+                'remoteip': ip_addr,
+                'challenge': challenge,
+                'response': response
+            }
+        )
+        defer.returnValue(data)
+
 
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 8171e9eb45..5bc1280432 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -25,14 +25,14 @@ from synapse.api.events.room import (
     RoomSendEventLevelEvent, RoomOpsPowerLevelsEvent, RoomNameEvent,
 )
 from synapse.util import stringutils
-from ._base import BaseRoomHandler
+from ._base import BaseHandler
 
 import logging
 
 logger = logging.getLogger(__name__)
 
 
-class RoomCreationHandler(BaseRoomHandler):
+class RoomCreationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def create_room(self, user_id, room_id, config):
@@ -65,6 +65,13 @@ class RoomCreationHandler(BaseRoomHandler):
         else:
             room_alias = None
 
+        invite_list = config.get("invite", [])
+        for i in invite_list:
+            try:
+                self.hs.parse_userid(i)
+            except:
+                raise SynapseError(400, "Invalid user_id: %s" % (i,))
+
         is_public = config.get("visibility", None) == "public"
 
         if room_id:
@@ -105,7 +112,9 @@ class RoomCreationHandler(BaseRoomHandler):
         )
 
         if room_alias:
-            yield self.store.create_room_alias_association(
+            directory_handler = self.hs.get_handlers().directory_handler
+            yield directory_handler.create_association(
+                user_id=user_id,
                 room_id=room_id,
                 room_alias=room_alias,
                 servers=[self.hs.hostname],
@@ -132,7 +141,7 @@ class RoomCreationHandler(BaseRoomHandler):
                 etype=RoomNameEvent.TYPE,
                 room_id=room_id,
                 user_id=user_id,
-                required_power_level=5,
+                required_power_level=50,
                 content={"name": name},
             )
 
@@ -143,7 +152,7 @@ class RoomCreationHandler(BaseRoomHandler):
                 etype=RoomNameEvent.TYPE,
                 room_id=room_id,
                 user_id=user_id,
-                required_power_level=5,
+                required_power_level=50,
                 content={"name": name},
             )
 
@@ -155,7 +164,7 @@ class RoomCreationHandler(BaseRoomHandler):
                 etype=RoomTopicEvent.TYPE,
                 room_id=room_id,
                 user_id=user_id,
-                required_power_level=5,
+                required_power_level=50,
                 content={"topic": topic},
             )
 
@@ -176,6 +185,25 @@ class RoomCreationHandler(BaseRoomHandler):
             do_auth=False
         )
 
+        content = {"membership": Membership.INVITE}
+        for invitee in invite_list:
+            invite_event = self.event_factory.create_event(
+                etype=RoomMemberEvent.TYPE,
+                state_key=invitee,
+                room_id=room_id,
+                user_id=user_id,
+                content=content
+            )
+
+            yield self.hs.get_handlers().room_member_handler.change_membership(
+                invite_event,
+                do_auth=False
+            )
+
+        yield self.hs.get_handlers().room_member_handler.change_membership(
+            join_event,
+            do_auth=False
+        )
         result = {"room_id": room_id}
         if room_alias:
             result["room_alias"] = room_alias.to_string()
@@ -186,7 +214,7 @@ class RoomCreationHandler(BaseRoomHandler):
         event_keys = {
             "room_id": room_id,
             "user_id": creator.to_string(),
-            "required_power_level": 10,
+            "required_power_level": 100,
         }
 
         def create(etype, **content):
@@ -203,7 +231,7 @@ class RoomCreationHandler(BaseRoomHandler):
 
         power_levels_event = self.event_factory.create_event(
             etype=RoomPowerLevelsEvent.TYPE,
-            content={creator.to_string(): 10, "default": 0},
+            content={creator.to_string(): 100, "default": 0},
             **event_keys
         )
 
@@ -215,7 +243,7 @@ class RoomCreationHandler(BaseRoomHandler):
 
         add_state_event = create(
             etype=RoomAddStateLevelEvent.TYPE,
-            level=10,
+            level=100,
         )
 
         send_event = create(
@@ -225,8 +253,8 @@ class RoomCreationHandler(BaseRoomHandler):
 
         ops = create(
             etype=RoomOpsPowerLevelsEvent.TYPE,
-            ban_level=5,
-            kick_level=5,
+            ban_level=50,
+            kick_level=50,
         )
 
         return [
@@ -239,7 +267,7 @@ class RoomCreationHandler(BaseRoomHandler):
         ]
 
 
-class RoomMemberHandler(BaseRoomHandler):
+class RoomMemberHandler(BaseHandler):
     # TODO(paul): This handler currently contains a messy conflation of
     #   low-level API that works on UserID objects and so on, and REST-level
     #   API that takes ID strings and returns pagination chunks. These concerns
@@ -307,7 +335,7 @@ class RoomMemberHandler(BaseRoomHandler):
 
         member_list = yield self.store.get_room_members(room_id=room_id)
         event_list = [
-            entry.get_dict()
+            self.hs.serialize_event(entry)
             for entry in member_list
         ]
         chunk_data = {
@@ -560,11 +588,17 @@ class RoomMemberHandler(BaseRoomHandler):
             extra_users=[target_user]
         )
 
-class RoomListHandler(BaseRoomHandler):
+class RoomListHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def get_public_room_list(self):
         chunk = yield self.store.get_rooms(is_public=True)
+        for room in chunk:
+            joined_members = yield self.store.get_room_members(
+                room_id=room["room_id"],
+                membership=Membership.JOIN
+            )
+            room["num_joined_members"] = len(joined_members)
         # FIXME (erikj): START is no longer a valid value
         defer.returnValue({"start": "START", "end": "END", "chunk": chunk})
 
diff --git a/synapse/http/client.py b/synapse/http/client.py
index ebf1aa47c4..eb11bfd4d5 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -16,7 +16,7 @@
 
 from twisted.internet import defer, reactor
 from twisted.internet.error import DNSLookupError
-from twisted.web.client import _AgentBase, _URI, readBody, FileBodyProducer
+from twisted.web.client import _AgentBase, _URI, readBody, FileBodyProducer, PartialDownloadError
 from twisted.web.http_headers import Headers
 
 from synapse.http.endpoint import matrix_endpoint
@@ -122,7 +122,7 @@ class TwistedHttpClient(HttpClient):
         self.hs = hs
 
     @defer.inlineCallbacks
-    def put_json(self, destination, path, data):
+    def put_json(self, destination, path, data, on_send_callback=None):
         if destination in _destination_mappings:
             destination = _destination_mappings[destination]
 
@@ -131,7 +131,8 @@ class TwistedHttpClient(HttpClient):
             "PUT",
             path.encode("ascii"),
             producer=_JsonProducer(data),
-            headers_dict={"Content-Type": ["application/json"]}
+            headers_dict={"Content-Type": ["application/json"]},
+            on_send_callback=on_send_callback,
         )
 
         logger.debug("Getting resp body")
@@ -188,11 +189,37 @@ class TwistedHttpClient(HttpClient):
         body = yield readBody(response)
 
         defer.returnValue(json.loads(body))
+        
+    # XXX FIXME : I'm so sorry.
+    @defer.inlineCallbacks
+    def post_urlencoded_get_raw(self, destination, path, accept_partial=False, args={}):
+        if destination in _destination_mappings:
+            destination = _destination_mappings[destination]
+
+        query_bytes = urllib.urlencode(args, True)
+
+        response = yield self._create_request(
+            destination.encode("ascii"),
+            "POST",
+            path.encode("ascii"),
+            producer=FileBodyProducer(StringIO(urllib.urlencode(args))),
+            headers_dict={"Content-Type": ["application/x-www-form-urlencoded"]}
+        )
+
+        try:
+            body = yield readBody(response)
+            defer.returnValue(body)
+        except PartialDownloadError as e:
+            if accept_partial:
+                defer.returnValue(e.response)
+            else:
+                raise e
+        
 
     @defer.inlineCallbacks
     def _create_request(self, destination, method, path_bytes, param_bytes=b"",
                         query_bytes=b"", producer=None, headers_dict={},
-                        retry_on_dns_fail=True):
+                        retry_on_dns_fail=True, on_send_callback=None):
         """ Creates and sends a request to the given url
         """
         headers_dict[b"User-Agent"] = [b"Synapse"]
@@ -216,6 +243,9 @@ class TwistedHttpClient(HttpClient):
         endpoint = self._getEndpoint(reactor, destination);
 
         while True:
+            if on_send_callback:
+                on_send_callback(destination, method, path_bytes, producer)
+
             try:
                 response = yield self.agent.request(
                     destination,
@@ -284,6 +314,9 @@ class _JsonProducer(object):
     """ Used by the twisted http client to create the HTTP body from json
     """
     def __init__(self, jsn):
+        self.reset(jsn)
+
+    def reset(self, jsn):
         self.body = encode_canonical_json(jsn)
         self.length = len(self.body)
 
diff --git a/synapse/rest/directory.py b/synapse/rest/directory.py
index 18df7c8d8b..31849246a1 100644
--- a/synapse/rest/directory.py
+++ b/synapse/rest/directory.py
@@ -45,6 +45,8 @@ class ClientDirectoryServer(RestServlet):
 
     @defer.inlineCallbacks
     def on_PUT(self, request, room_alias):
+        user = yield self.auth.get_user_by_req(request)
+
         content = _parse_json(request)
         if not "room_id" in content:
             raise SynapseError(400, "Missing room_id key",
@@ -69,12 +71,13 @@ class ClientDirectoryServer(RestServlet):
 
         try:
             yield dir_handler.create_association(
-                room_alias, room_id, servers
+                user.to_string(), room_alias, room_id, servers
             )
         except SynapseError as e:
             raise e
         except:
             logger.exception("Failed to create association")
+            raise
 
         defer.returnValue((200, {}))
 
diff --git a/synapse/rest/events.py b/synapse/rest/events.py
index 7fde143200..097195d7cc 100644
--- a/synapse/rest/events.py
+++ b/synapse/rest/events.py
@@ -59,7 +59,7 @@ class EventRestServlet(RestServlet):
         event = yield handler.get_event(auth_user, event_id)
 
         if event:
-            defer.returnValue((200, event.get_dict()))
+            defer.returnValue((200, self.hs.serialize_event(event)))
         else:
             defer.returnValue((404, "Event not found."))
 
diff --git a/synapse/rest/login.py b/synapse/rest/login.py
index c7bf901c8e..ad71f6c61d 100644
--- a/synapse/rest/login.py
+++ b/synapse/rest/login.py
@@ -70,7 +70,28 @@ class LoginFallbackRestServlet(RestServlet):
     def on_GET(self, request):
         # TODO(kegan): This should be returning some HTML which is capable of
         # hitting LoginRestServlet
-        return (200, "")
+        return (200, {})
+
+
+class PasswordResetRestServlet(RestServlet):
+    PATTERN = client_path_pattern("/login/reset")
+
+    @defer.inlineCallbacks
+    def on_POST(self, request):
+        reset_info = _parse_json(request)
+        try:
+            email = reset_info["email"]
+            user_id = reset_info["user_id"]
+            handler = self.handlers.login_handler
+            yield handler.reset_password(user_id, email)
+            # purposefully give no feedback to avoid people hammering different
+            # combinations.
+            defer.returnValue((200, {}))
+        except KeyError:
+            raise SynapseError(
+                400,
+                "Missing keys. Requires 'email' and 'user_id'."
+            )
 
 
 def _parse_json(request):
@@ -85,3 +106,4 @@ def _parse_json(request):
 
 def register_servlets(hs, http_server):
     LoginRestServlet(hs).register(http_server)
+    # TODO PasswordResetRestServlet(hs).register(http_server)
diff --git a/synapse/rest/profile.py b/synapse/rest/profile.py
index 2e17f87fa1..dad5a208c7 100644
--- a/synapse/rest/profile.py
+++ b/synapse/rest/profile.py
@@ -51,7 +51,7 @@ class ProfileDisplaynameRestServlet(RestServlet):
         yield self.handlers.profile_handler.set_displayname(
             user, auth_user, new_name)
 
-        defer.returnValue((200, ""))
+        defer.returnValue((200, {}))
 
     def on_OPTIONS(self, request, user_id):
         return (200, {})
@@ -86,7 +86,7 @@ class ProfileAvatarURLRestServlet(RestServlet):
         yield self.handlers.profile_handler.set_avatar_url(
             user, auth_user, new_name)
 
-        defer.returnValue((200, ""))
+        defer.returnValue((200, {}))
 
     def on_OPTIONS(self, request, user_id):
         return (200, {})
diff --git a/synapse/rest/register.py b/synapse/rest/register.py
index b8de3b250d..af528a44f6 100644
--- a/synapse/rest/register.py
+++ b/synapse/rest/register.py
@@ -16,58 +16,219 @@
 """This module contains REST servlets to do with registration: /register"""
 from twisted.internet import defer
 
-from synapse.api.errors import SynapseError
+from synapse.api.errors import SynapseError, Codes
+from synapse.api.constants import LoginType
 from base import RestServlet, client_path_pattern
+import synapse.util.stringutils as stringutils
 
 import json
+import logging
 import urllib
 
+logger = logging.getLogger(__name__)
+
 
 class RegisterRestServlet(RestServlet):
+    """Handles registration with the home server.
+
+    This servlet is in control of the registration flow; the registration
+    handler doesn't have a concept of multi-stages or sessions.
+    """
+
     PATTERN = client_path_pattern("/register$")
 
+    def __init__(self, hs):
+        super(RegisterRestServlet, self).__init__(hs)
+        # sessions are stored as:
+        # self.sessions = {
+        #   "session_id" : { __session_dict__ }
+        # }
+        # TODO: persistent storage
+        self.sessions = {}
+
+    def on_GET(self, request):
+        if self.hs.config.enable_registration_captcha:
+            return (200, {
+                "flows": [
+                    {
+                        "type": LoginType.RECAPTCHA,
+                        "stages": ([LoginType.RECAPTCHA,
+                                    LoginType.EMAIL_IDENTITY,
+                                    LoginType.PASSWORD])
+                    },
+                    {
+                        "type": LoginType.RECAPTCHA,
+                        "stages": [LoginType.RECAPTCHA, LoginType.PASSWORD]
+                    }
+                ]
+            })
+        else:
+            return (200, {
+                "flows": [
+                    {
+                        "type": LoginType.EMAIL_IDENTITY,
+                        "stages": ([LoginType.EMAIL_IDENTITY,
+                                    LoginType.PASSWORD])
+                    },
+                    {
+                        "type": LoginType.PASSWORD
+                    }
+                ]
+            })
+
     @defer.inlineCallbacks
     def on_POST(self, request):
-        desired_user_id = None
-        password = None
+        register_json = _parse_json(request)
+
+        session = (register_json["session"] if "session" in register_json
+                  else None)
+        login_type = None
+        if "type" not in register_json:
+            raise SynapseError(400, "Missing 'type' key.")
+
+        try:
+            login_type = register_json["type"]
+            stages = {
+                LoginType.RECAPTCHA: self._do_recaptcha,
+                LoginType.PASSWORD: self._do_password,
+                LoginType.EMAIL_IDENTITY: self._do_email_identity
+            }
+
+            session_info = self._get_session_info(request, session)
+            logger.debug("%s : session info %s   request info %s",
+                         login_type, session_info, register_json)
+            response = yield stages[login_type](
+                request,
+                register_json,
+                session_info
+            )
+
+            if "access_token" not in response:
+                # isn't a final response
+                response["session"] = session_info["id"]
+
+            defer.returnValue((200, response))
+        except KeyError as e:
+            logger.exception(e)
+            raise SynapseError(400, "Missing JSON keys for login type %s." % login_type)
+
+    def on_OPTIONS(self, request):
+        return (200, {})
+
+    def _get_session_info(self, request, session_id):
+        if not session_id:
+            # create a new session
+            while session_id is None or session_id in self.sessions:
+                session_id = stringutils.random_string(24)
+            self.sessions[session_id] = {
+                "id": session_id,
+                LoginType.EMAIL_IDENTITY: False,
+                LoginType.RECAPTCHA: False
+            }
+
+        return self.sessions[session_id]
+
+    def _save_session(self, session):
+        # TODO: Persistent storage
+        logger.debug("Saving session %s", session)
+        self.sessions[session["id"]] = session
+
+    def _remove_session(self, session):
+        logger.debug("Removing session %s", session)
+        self.sessions.pop(session["id"])
+
+    @defer.inlineCallbacks
+    def _do_recaptcha(self, request, register_json, session):
+        if not self.hs.config.enable_registration_captcha:
+            raise SynapseError(400, "Captcha not required.")
+
+        challenge = None
+        user_response = None
         try:
-            register_json = json.loads(request.content.read())
-            if "password" in register_json:
-                password = register_json["password"].encode("utf-8")
-
-            if type(register_json["user_id"]) == unicode:
-                desired_user_id = register_json["user_id"].encode("utf-8")
-                if urllib.quote(desired_user_id) != desired_user_id:
-                    raise SynapseError(
-                        400,
-                        "User ID must only contain characters which do not " +
-                        "require URL encoding.")
-        except ValueError:
-            defer.returnValue((400, "No JSON object."))
+            challenge = register_json["challenge"]
+            user_response = register_json["response"]
         except KeyError:
-            pass  # user_id is optional
+            raise SynapseError(400, "Captcha response is required",
+                               errcode=Codes.CAPTCHA_NEEDED)
+
+        # May be an X-Forwarding-For header depending on config
+        ip_addr = request.getClientIP()
+        if self.hs.config.captcha_ip_origin_is_x_forwarded:
+            # use the header
+            if request.requestHeaders.hasHeader("X-Forwarded-For"):
+                ip_addr = request.requestHeaders.getRawHeaders(
+                    "X-Forwarded-For")[0]
+
+        handler = self.handlers.registration_handler
+        yield handler.check_recaptcha(
+            ip_addr,
+            self.hs.config.recaptcha_private_key,
+            challenge,
+            user_response
+        )
+        session[LoginType.RECAPTCHA] = True  # mark captcha as done
+        self._save_session(session)
+        defer.returnValue({
+            "next": [LoginType.PASSWORD, LoginType.EMAIL_IDENTITY]
+        })
+
+    @defer.inlineCallbacks
+    def _do_email_identity(self, request, register_json, session):
+        if (self.hs.config.enable_registration_captcha and
+                not session[LoginType.RECAPTCHA]):
+            raise SynapseError(400, "Captcha is required.")
 
-        threepidCreds = None
-        if 'threepidCreds' in register_json:
-            threepidCreds = register_json['threepidCreds']
+        threepidCreds = register_json['threepidCreds']
+        handler = self.handlers.registration_handler
+        yield handler.register_email(threepidCreds)
+        session["threepidCreds"] = threepidCreds  # store creds for next stage
+        session[LoginType.EMAIL_IDENTITY] = True  # mark email as done
+        self._save_session(session)
+        defer.returnValue({
+            "next": LoginType.PASSWORD
+        })
+
+    @defer.inlineCallbacks
+    def _do_password(self, request, register_json, session):
+        if (self.hs.config.enable_registration_captcha and
+                not session[LoginType.RECAPTCHA]):
+            # captcha should've been done by this stage!
+            raise SynapseError(400, "Captcha is required.")
 
+        password = register_json["password"].encode("utf-8")
+        desired_user_id = (register_json["user"].encode("utf-8") if "user"
+                          in register_json else None)
+        if desired_user_id and urllib.quote(desired_user_id) != desired_user_id:
+            raise SynapseError(
+                400,
+                "User ID must only contain characters which do not " +
+                "require URL encoding.")
         handler = self.handlers.registration_handler
         (user_id, token) = yield handler.register(
             localpart=desired_user_id,
-            password=password,
-            threepidCreds=threepidCreds)
+            password=password
+        )
+
+        if session[LoginType.EMAIL_IDENTITY]:
+            yield handler.bind_emails(user_id, session["threepidCreds"])
 
         result = {
             "user_id": user_id,
             "access_token": token,
             "home_server": self.hs.hostname,
         }
-        defer.returnValue(
-            (200, result)
-        )
+        self._remove_session(session)
+        defer.returnValue(result)
 
-    def on_OPTIONS(self, request):
-        return (200, {})
+
+def _parse_json(request):
+    try:
+        content = json.loads(request.content.read())
+        if type(content) != dict:
+            raise SynapseError(400, "Content must be a JSON object.")
+        return content
+    except ValueError:
+        raise SynapseError(400, "Content not JSON.")
 
 
 def register_servlets(hs, http_server):
diff --git a/synapse/rest/room.py b/synapse/rest/room.py
index 308b447090..ecb1e346d9 100644
--- a/synapse/rest/room.py
+++ b/synapse/rest/room.py
@@ -154,14 +154,14 @@ class RoomStateEventRestServlet(RestServlet):
             # membership events are special
             handler = self.handlers.room_member_handler
             yield handler.change_membership(event)
-            defer.returnValue((200, ""))
+            defer.returnValue((200, {}))
         else:
             # store random bits of state
             msg_handler = self.handlers.message_handler
             yield msg_handler.store_room_data(
                 event=event
             )
-            defer.returnValue((200, ""))
+            defer.returnValue((200, {}))
 
 
 # TODO: Needs unit testing for generic events + feedback
@@ -249,7 +249,7 @@ class JoinRoomAliasServlet(RestServlet):
             )
             handler = self.handlers.room_member_handler
             yield handler.change_membership(event)
-            defer.returnValue((200, ""))
+            defer.returnValue((200, {}))
 
     @defer.inlineCallbacks
     def on_PUT(self, request, room_identifier, txn_id):
@@ -378,7 +378,7 @@ class RoomTriggerBackfill(RestServlet):
         handler = self.handlers.federation_handler
         events = yield handler.backfill(remote_server, room_id, limit)
 
-        res = [event.get_dict() for event in events]
+        res = [self.hs.serialize_event(event) for event in events]
         defer.returnValue((200, res))
 
 
@@ -416,7 +416,7 @@ class RoomMembershipRestServlet(RestServlet):
         )
         handler = self.handlers.room_member_handler
         yield handler.change_membership(event)
-        defer.returnValue((200, ""))
+        defer.returnValue((200, {}))
 
     @defer.inlineCallbacks
     def on_PUT(self, request, room_id, membership_action, txn_id):
diff --git a/synapse/server.py b/synapse/server.py
index 83368ea5a7..cdea49e6ab 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -20,6 +20,7 @@
 
 # Imports required for the default HomeServer() implementation
 from synapse.federation import initialize_http_replication
+from synapse.api.events import serialize_event
 from synapse.api.events.factory import EventFactory
 from synapse.notifier import Notifier
 from synapse.api.auth import Auth
@@ -57,6 +58,7 @@ class BaseHomeServer(object):
     DEPENDENCIES = [
         'clock',
         'http_client',
+        'db_name',
         'db_pool',
         'persistence_service',
         'replication_layer',
@@ -138,6 +140,9 @@ class BaseHomeServer(object):
         object."""
         return RoomID.from_string(s, hs=self)
 
+    def serialize_event(self, e):
+        return serialize_event(self, e)
+
 # Build magic accessors for every dependency
 for depname in BaseHomeServer.DEPENDENCIES:
     BaseHomeServer._make_dependency_method(depname)
diff --git a/synapse/state.py b/synapse/state.py
index 36d8210eb5..9db84c9b5c 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -16,7 +16,7 @@
 
 from twisted.internet import defer
 
-from synapse.federation.pdu_codec import encode_event_id
+from synapse.federation.pdu_codec import encode_event_id, decode_event_id
 from synapse.util.logutils import log_function
 
 from collections import namedtuple
@@ -87,9 +87,11 @@ class StateHandler(object):
         # than the power level of the user
         # power_level = self._get_power_level_for_event(event)
 
+        pdu_id, origin = decode_event_id(event.event_id, self.server_name)
+
         yield self.store.update_current_state(
-            pdu_id=event.event_id,
-            origin=self.server_name,
+            pdu_id=pdu_id,
+            origin=origin,
             context=key.context,
             pdu_type=key.type,
             state_key=key.state_key
@@ -113,6 +115,8 @@ class StateHandler(object):
 
         is_new = yield self._handle_new_state(new_pdu)
 
+        logger.debug("is_new: %s %s %s", is_new, new_pdu.pdu_id, new_pdu.origin)
+
         if is_new:
             yield self.store.update_current_state(
                 pdu_id=new_pdu.pdu_id,
@@ -132,7 +136,9 @@ class StateHandler(object):
     @defer.inlineCallbacks
     @log_function
     def _handle_new_state(self, new_pdu):
-        tree = yield self.store.get_unresolved_state_tree(new_pdu)
+        tree, missing_branch = yield self.store.get_unresolved_state_tree(
+            new_pdu
+        )
         new_branch, current_branch = tree
 
         logger.debug(
@@ -140,6 +146,28 @@ class StateHandler(object):
             new_branch, current_branch
         )
 
+        if missing_branch is not None:
+            # We're missing some PDUs. Fetch them.
+            # TODO (erikj): Limit this.
+            missing_prev = tree[missing_branch][-1]
+
+            pdu_id = missing_prev.prev_state_id
+            origin = missing_prev.prev_state_origin
+
+            is_missing = yield self.store.get_pdu(pdu_id, origin) is None
+            if not is_missing:
+                raise Exception("Conflict resolution failed")
+
+            yield self._replication.get_pdu(
+                destination=missing_prev.origin,
+                pdu_origin=origin,
+                pdu_id=pdu_id,
+                outlier=True
+            )
+
+            updated_current = yield self._handle_new_state(new_pdu)
+            defer.returnValue(updated_current)
+
         if not current_branch:
             # There is no current state
             defer.returnValue(True)
@@ -148,84 +176,85 @@ class StateHandler(object):
         n = new_branch[-1]
         c = current_branch[-1]
 
-        if n.pdu_id == c.pdu_id and n.origin == c.origin:
-            # We have all the PDUs we need, so we can just do the conflict
-            # resolution.
+        common_ancestor = n.pdu_id == c.pdu_id and n.origin == c.origin
+
+        if common_ancestor:
+            # We found a common ancestor!
 
             if len(current_branch) == 1:
                 # This is a direct clobber so we can just...
                 defer.returnValue(True)
 
-            conflict_res = [
-                self._do_power_level_conflict_res,
-                self._do_chain_length_conflict_res,
-                self._do_hash_conflict_res,
-            ]
-
-            for algo in conflict_res:
-                new_res, curr_res = algo(new_branch, current_branch)
-
-                if new_res < curr_res:
-                    defer.returnValue(False)
-                elif new_res > curr_res:
-                    defer.returnValue(True)
-
-            raise Exception("Conflict resolution failed.")
-
         else:
-            # We need to ask for PDUs.
-            missing_prev = max(
-                new_branch[-1], current_branch[-1],
-                key=lambda x: x.depth
-            )
-
-            if not hasattr(missing_prev, "prev_state_id"):
-                # FIXME Hmm
-                # temporary fallback
-                for algo in conflict_res:
-                    new_res, curr_res = algo(new_branch, current_branch)
+            # We didn't find a common ancestor. This is probably fine.
+            pass
 
-                    if new_res < curr_res:
-                        defer.returnValue(False)
-                    elif new_res > curr_res:
-                        defer.returnValue(True)
-                return
+        result = yield self._do_conflict_res(
+            new_branch, current_branch, common_ancestor
+        )
+        defer.returnValue(result)
 
-            pdu_id = missing_prev.prev_state_id
-            origin = missing_prev.prev_state_origin
+    @defer.inlineCallbacks
+    def _do_conflict_res(self, new_branch, current_branch, common_ancestor):
+        conflict_res = [
+            self._do_power_level_conflict_res,
+            self._do_chain_length_conflict_res,
+            self._do_hash_conflict_res,
+        ]
 
-            is_missing = yield self.store.get_pdu(pdu_id, origin) is None
+        for algo in conflict_res:
+            new_res, curr_res = yield defer.maybeDeferred(
+                algo,
+                new_branch, current_branch, common_ancestor
+            )
 
-            if not is_missing:
-                raise Exception("Conflict resolution failed.")
+            if new_res < curr_res:
+                defer.returnValue(False)
+            elif new_res > curr_res:
+                defer.returnValue(True)
 
-            yield self._replication.get_pdu(
-                destination=missing_prev.origin,
-                pdu_origin=origin,
-                pdu_id=pdu_id,
-                outlier=True
-            )
+        raise Exception("Conflict resolution failed.")
 
-            updated_current = yield self._handle_new_state(new_pdu)
-            defer.returnValue(updated_current)
+    @defer.inlineCallbacks
+    def _do_power_level_conflict_res(self, new_branch, current_branch,
+                                     common_ancestor):
+        new_powers_deferreds = []
+        for e in new_branch[:-1] if common_ancestor else new_branch:
+            if hasattr(e, "user_id"):
+                new_powers_deferreds.append(
+                    self.store.get_power_level(e.context, e.user_id)
+                )
+
+        current_powers_deferreds = []
+        for e in current_branch[:-1] if common_ancestor else current_branch:
+            if hasattr(e, "user_id"):
+                current_powers_deferreds.append(
+                    self.store.get_power_level(e.context, e.user_id)
+                )
+
+        new_powers = yield defer.gatherResults(
+            new_powers_deferreds,
+            consumeErrors=True
+        )
 
-    def _do_power_level_conflict_res(self, new_branch, current_branch):
-        max_power_new = max(
-            new_branch[:-1],
-            key=lambda t: t.power_level
-        ).power_level
+        current_powers = yield defer.gatherResults(
+            current_powers_deferreds,
+            consumeErrors=True
+        )
 
-        max_power_current = max(
-            current_branch[:-1],
-            key=lambda t: t.power_level
-        ).power_level
+        max_power_new = max(new_powers)
+        max_power_current = max(current_powers)
 
-        return (max_power_new, max_power_current)
+        defer.returnValue(
+            (max_power_new, max_power_current)
+        )
 
-    def _do_chain_length_conflict_res(self, new_branch, current_branch):
+    def _do_chain_length_conflict_res(self, new_branch, current_branch,
+                                      common_ancestor):
         return (len(new_branch), len(current_branch))
 
-    def _do_hash_conflict_res(self, new_branch, current_branch):
+    def _do_hash_conflict_res(self, new_branch, current_branch,
+                              common_ancestor):
         new_str = "".join([p.pdu_id + p.origin for p in new_branch])
         c_str = "".join([p.pdu_id + p.origin for p in current_branch])
 
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index d97014f4da..66658f6721 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -36,7 +36,7 @@ from .registration import RegistrationStore
 from .room import RoomStore
 from .roommember import RoomMemberStore
 from .stream import StreamStore
-from .pdu import StatePduStore, PduStore
+from .pdu import StatePduStore, PduStore, PdusTable
 from .transactions import TransactionStore
 from .keys import KeyStore
 
@@ -48,6 +48,28 @@ import os
 logger = logging.getLogger(__name__)
 
 
+SCHEMAS = [
+    "transactions",
+    "pdu",
+    "users",
+    "profiles",
+    "presence",
+    "im",
+    "room_aliases",
+]
+
+
+# Remember to update this number every time an incompatible change is made to
+# database schema files, so the users will be informed on server restarts.
+SCHEMA_VERSION = 3
+
+
+class _RollbackButIsFineException(Exception):
+    """ This exception is used to rollback a transaction without implying
+    something went wrong.
+    """
+    pass
+
 class DataStore(RoomMemberStore, RoomStore,
                 RegistrationStore, StreamStore, ProfileStore, FeedbackStore,
                 PresenceStore, PduStore, StatePduStore, TransactionStore,
@@ -63,7 +85,8 @@ class DataStore(RoomMemberStore, RoomStore,
 
     @defer.inlineCallbacks
     @log_function
-    def persist_event(self, event=None, backfilled=False, pdu=None):
+    def persist_event(self, event=None, backfilled=False, pdu=None,
+                      is_new_state=True):
         stream_ordering = None
         if backfilled:
             if not self.min_token_deferred.called:
@@ -71,17 +94,20 @@ class DataStore(RoomMemberStore, RoomStore,
             self.min_token -= 1
             stream_ordering = self.min_token
 
-        latest = yield self._db_pool.runInteraction(
-            self._persist_pdu_event_txn,
-            pdu=pdu,
-            event=event,
-            backfilled=backfilled,
-            stream_ordering=stream_ordering,
-        )
-        defer.returnValue(latest)
+        try:
+            yield self.runInteraction(
+                self._persist_pdu_event_txn,
+                pdu=pdu,
+                event=event,
+                backfilled=backfilled,
+                stream_ordering=stream_ordering,
+                is_new_state=is_new_state,
+            )
+        except _RollbackButIsFineException as e:
+            pass
 
     @defer.inlineCallbacks
-    def get_event(self, event_id):
+    def get_event(self, event_id, allow_none=False):
         events_dict = yield self._simple_select_one(
             "events",
             {"event_id": event_id},
@@ -92,18 +118,24 @@ class DataStore(RoomMemberStore, RoomStore,
                 "content",
                 "unrecognized_keys"
             ],
+            allow_none=allow_none,
         )
 
+        if not events_dict:
+            defer.returnValue(None)
+
         event = self._parse_event_from_row(events_dict)
         defer.returnValue(event)
 
     def _persist_pdu_event_txn(self, txn, pdu=None, event=None,
-                               backfilled=False, stream_ordering=None):
+                               backfilled=False, stream_ordering=None,
+                               is_new_state=True):
         if pdu is not None:
             self._persist_event_pdu_txn(txn, pdu)
         if event is not None:
             return self._persist_event_txn(
-                txn, event, backfilled, stream_ordering
+                txn, event, backfilled, stream_ordering,
+                is_new_state=is_new_state,
             )
 
     def _persist_event_pdu_txn(self, txn, pdu):
@@ -112,6 +144,12 @@ class DataStore(RoomMemberStore, RoomStore,
         del cols["content"]
         del cols["prev_pdus"]
         cols["content_json"] = json.dumps(pdu.content)
+
+        unrec_keys.update({
+            k: v for k, v in cols.items()
+            if k not in PdusTable.fields
+        })
+
         cols["unrecognized_keys"] = json.dumps(unrec_keys)
 
         logger.debug("Persisting: %s", repr(cols))
@@ -124,7 +162,8 @@ class DataStore(RoomMemberStore, RoomStore,
         self._update_min_depth_for_context_txn(txn, pdu.context, pdu.depth)
 
     @log_function
-    def _persist_event_txn(self, txn, event, backfilled, stream_ordering=None):
+    def _persist_event_txn(self, txn, event, backfilled, stream_ordering=None,
+                           is_new_state=True):
         if event.type == RoomMemberEvent.TYPE:
             self._store_room_member_txn(txn, event)
         elif event.type == FeedbackEvent.TYPE:
@@ -171,13 +210,14 @@ class DataStore(RoomMemberStore, RoomStore,
         try:
             self._simple_insert_txn(txn, "events", vals)
         except:
-            logger.exception(
+            logger.warn(
                 "Failed to persist, probably duplicate: %s",
-                event.event_id
+                event.event_id,
+                exc_info=True,
             )
-            return
+            raise _RollbackButIsFineException("_persist_event")
 
-        if not backfilled and hasattr(event, "state_key"):
+        if is_new_state and hasattr(event, "state_key"):
             vals = {
                 "event_id": event.event_id,
                 "room_id": event.room_id,
@@ -201,8 +241,6 @@ class DataStore(RoomMemberStore, RoomStore,
                 }
             )
 
-        return self._get_room_events_max_id_txn(txn)
-
     @defer.inlineCallbacks
     def get_current_state(self, room_id, event_type=None, state_key=""):
         sql = (
@@ -220,7 +258,8 @@ class DataStore(RoomMemberStore, RoomStore,
 
         results = yield self._execute_and_decode(sql, *args)
 
-        defer.returnValue([self._parse_event_from_row(r) for r in results])
+        events = yield self._parse_events(results)
+        defer.returnValue(events)
 
     @defer.inlineCallbacks
     def _get_min_token(self):
@@ -269,7 +308,7 @@ class DataStore(RoomMemberStore, RoomStore,
                 prev_state_pdu=prev_state_pdu,
             )
 
-        return self._db_pool.runInteraction(_snapshot)
+        return self.runInteraction(_snapshot)
 
 
 class Snapshot(object):
@@ -339,3 +378,42 @@ def read_schema(schema):
     """
     with open(schema_path(schema)) as schema_file:
         return schema_file.read()
+
+
+def prepare_database(db_conn):
+    """ Set up all the dbs. Since all the *.sql have IF NOT EXISTS, so we
+    don't have to worry about overwriting existing content.
+    """
+    c = db_conn.cursor()
+    c.execute("PRAGMA user_version")
+    row = c.fetchone()
+
+    if row and row[0]:
+        user_version = row[0]
+
+        if user_version > SCHEMA_VERSION:
+            raise ValueError("Cannot use this database as it is too " +
+                "new for the server to understand"
+            )
+        elif user_version < SCHEMA_VERSION:
+            logging.info("Upgrading database from version %d",
+                user_version
+            )
+
+            # Run every version since after the current version.
+            for v in range(user_version + 1, SCHEMA_VERSION + 1):
+                sql_script = read_schema("delta/v%d" % (v))
+                c.executescript(sql_script)
+
+            db_conn.commit()
+
+    else:
+        for sql_loc in SCHEMAS:
+            sql_script = read_schema(sql_loc)
+
+            c.executescript(sql_script)
+        db_conn.commit()
+        c.execute("PRAGMA user_version = %d" % SCHEMA_VERSION)
+
+    c.close()
+
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index bae50e7d1f..76ed7d06fb 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -17,6 +17,7 @@ import logging
 from twisted.internet import defer
 
 from synapse.api.errors import StoreError
+from synapse.util.logutils import log_function
 
 import collections
 import copy
@@ -25,6 +26,44 @@ import json
 
 logger = logging.getLogger(__name__)
 
+sql_logger = logging.getLogger("synapse.storage.SQL")
+
+
+class LoggingTransaction(object):
+    """An object that almost-transparently proxies for the 'txn' object
+    passed to the constructor. Adds logging to the .execute() method."""
+    __slots__ = ["txn"]
+
+    def __init__(self, txn):
+        object.__setattr__(self, "txn", txn)
+
+    def __getattribute__(self, name):
+        if name == "execute":
+            return object.__getattribute__(self, "execute")
+
+        return getattr(object.__getattribute__(self, "txn"), name)
+
+    def __setattr__(self, name, value):
+        setattr(object.__getattribute__(self, "txn"), name, value)
+
+    def execute(self, sql, *args, **kwargs):
+        # TODO(paul): Maybe use 'info' and 'debug' for values?
+        sql_logger.debug("[SQL] %s", sql)
+        try:
+            if args and args[0]:
+                values = args[0]
+                sql_logger.debug("[SQL values] " +
+                    ", ".join(("<%s>",) * len(values)), *values)
+        except:
+            # Don't let logging failures stop SQL from working
+            pass
+
+        # TODO(paul): Here would be an excellent place to put some timing
+        #   measurements, and log (warning?) slow queries.
+        return object.__getattribute__(self, "txn").execute(
+            sql, *args, **kwargs
+        )
+
 
 class SQLBaseStore(object):
 
@@ -34,6 +73,13 @@ class SQLBaseStore(object):
         self.event_factory = hs.get_event_factory()
         self._clock = hs.get_clock()
 
+    def runInteraction(self, func, *args, **kwargs):
+        """Wraps the .runInteraction() method on the underlying db_pool."""
+        def inner_func(txn, *args, **kwargs):
+            return func(LoggingTransaction(txn), *args, **kwargs)
+
+        return self._db_pool.runInteraction(inner_func, *args, **kwargs)
+
     def cursor_to_dict(self, cursor):
         """Converts a SQL cursor into an list of dicts.
 
@@ -59,11 +105,6 @@ class SQLBaseStore(object):
         Returns:
             The result of decoder(results)
         """
-        logger.debug(
-            "[SQL] %s  Args=%s Func=%s",
-            query, args, decoder.__name__ if decoder else None
-        )
-
         def interaction(txn):
             cursor = txn.execute(query, args)
             if decoder:
@@ -71,7 +112,7 @@ class SQLBaseStore(object):
             else:
                 return cursor.fetchall()
 
-        return self._db_pool.runInteraction(interaction)
+        return self.runInteraction(interaction)
 
     def _execute_and_decode(self, query, *args):
         return self._execute(self.cursor_to_dict, query, *args)
@@ -87,10 +128,11 @@ class SQLBaseStore(object):
             values : dict of new column names and values for them
             or_replace : bool; if True performs an INSERT OR REPLACE
         """
-        return self._db_pool.runInteraction(
+        return self.runInteraction(
             self._simple_insert_txn, table, values, or_replace=or_replace
         )
 
+    @log_function
     def _simple_insert_txn(self, txn, table, values, or_replace=False):
         sql = "%s INTO %s (%s) VALUES(%s)" % (
             ("INSERT OR REPLACE" if or_replace else "INSERT"),
@@ -98,6 +140,12 @@ class SQLBaseStore(object):
             ", ".join(k for k in values),
             ", ".join("?" for k in values)
         )
+
+        logger.debug(
+            "[SQL] %s  Args=%s Func=%s",
+            sql, values.values(),
+        )
+
         txn.execute(sql, values.values())
         return txn.lastrowid
 
@@ -164,7 +212,7 @@ class SQLBaseStore(object):
             txn.execute(sql, keyvalues.values())
             return txn.fetchall()
 
-        res = yield self._db_pool.runInteraction(func)
+        res = yield self.runInteraction(func)
 
         defer.returnValue([r[0] for r in res])
 
@@ -187,7 +235,7 @@ class SQLBaseStore(object):
             txn.execute(sql, keyvalues.values())
             return self.cursor_to_dict(txn)
 
-        return self._db_pool.runInteraction(func)
+        return self.runInteraction(func)
 
     def _simple_update_one(self, table, keyvalues, updatevalues,
                            retcols=None):
@@ -255,7 +303,7 @@ class SQLBaseStore(object):
                     raise StoreError(500, "More than one row matched")
 
             return ret
-        return self._db_pool.runInteraction(func)
+        return self.runInteraction(func)
 
     def _simple_delete_one(self, table, keyvalues):
         """Executes a DELETE query on the named table, expecting to delete a
@@ -276,7 +324,7 @@ class SQLBaseStore(object):
                 raise StoreError(404, "No row found")
             if txn.rowcount > 1:
                 raise StoreError(500, "more than one row matched")
-        return self._db_pool.runInteraction(func)
+        return self.runInteraction(func)
 
     def _simple_max_id(self, table):
         """Executes a SELECT query on the named table, expecting to return the
@@ -294,7 +342,7 @@ class SQLBaseStore(object):
                 return 0
             return max_id
 
-        return self._db_pool.runInteraction(func)
+        return self.runInteraction(func)
 
     def _parse_event_from_row(self, row_dict):
         d = copy.deepcopy({k: v for k, v in row_dict.items() if v})
@@ -307,11 +355,34 @@ class SQLBaseStore(object):
         d["content"] = json.loads(d["content"])
         del d["unrecognized_keys"]
 
+        if "age_ts" not in d:
+            # For compatibility
+            d["age_ts"] = d["ts"] if "ts" in d else 0
+
         return self.event_factory.create_event(
             etype=d["type"],
             **d
         )
 
+    def _parse_events(self, rows):
+        return self.runInteraction(self._parse_events_txn, rows)
+
+    def _parse_events_txn(self, txn, rows):
+        events = [self._parse_event_from_row(r) for r in rows]
+
+        sql = "SELECT * FROM events WHERE event_id = ?"
+
+        for ev in events:
+           if hasattr(ev, "prev_state"):
+                # Load previous state_content. 
+                # TODO: Should we be pulling this out above?
+                cursor = txn.execute(sql, (ev.prev_state,))
+                prevs = self.cursor_to_dict(cursor)
+                if prevs:
+                    prev = self._parse_event_from_row(prevs[0])
+                    ev.prev_content = prev.content
+
+        return events
 
 class Table(object):
     """ A base class used to store information about a particular table.
diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py
index bf55449253..540eb4c2c4 100644
--- a/synapse/storage/directory.py
+++ b/synapse/storage/directory.py
@@ -92,3 +92,10 @@ class DirectoryStore(SQLBaseStore):
                     "server": server,
                 }
             )
+
+    def get_aliases_for_room(self, room_id):
+        return self._simple_select_onecol(
+            "room_aliases",
+            {"room_id": room_id},
+            "room_alias",
+        )
diff --git a/synapse/storage/pdu.py b/synapse/storage/pdu.py
index 0bf97e37ee..d70467dcd6 100644
--- a/synapse/storage/pdu.py
+++ b/synapse/storage/pdu.py
@@ -17,6 +17,7 @@ from twisted.internet import defer
 
 from ._base import SQLBaseStore, Table, JoinHelper
 
+from synapse.federation.units import Pdu
 from synapse.util.logutils import log_function
 
 from collections import namedtuple
@@ -42,7 +43,7 @@ class PduStore(SQLBaseStore):
             PduTuple: If the pdu does not exist in the database, returns None
         """
 
-        return self._db_pool.runInteraction(
+        return self.runInteraction(
             self._get_pdu_tuple, pdu_id, origin
         )
 
@@ -94,7 +95,7 @@ class PduStore(SQLBaseStore):
             list: A list of PduTuples
         """
 
-        return self._db_pool.runInteraction(
+        return self.runInteraction(
             self._get_current_state_for_context,
             context
         )
@@ -142,7 +143,7 @@ class PduStore(SQLBaseStore):
             pdu_origin (str)
         """
 
-        return self._db_pool.runInteraction(
+        return self.runInteraction(
             self._mark_as_processed, pdu_id, pdu_origin
         )
 
@@ -151,7 +152,7 @@ class PduStore(SQLBaseStore):
 
     def get_all_pdus_from_context(self, context):
         """Get a list of all PDUs for a given context."""
-        return self._db_pool.runInteraction(
+        return self.runInteraction(
             self._get_all_pdus_from_context, context,
         )
 
@@ -178,7 +179,7 @@ class PduStore(SQLBaseStore):
         Return:
             list: A list of PduTuples
         """
-        return self._db_pool.runInteraction(
+        return self.runInteraction(
             self._get_backfill, context, pdu_list, limit
         )
 
@@ -239,7 +240,7 @@ class PduStore(SQLBaseStore):
             txn
             context (str)
         """
-        return self._db_pool.runInteraction(
+        return self.runInteraction(
             self._get_min_depth_for_context, context
         )
 
@@ -308,8 +309,8 @@ class PduStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def get_oldest_pdus_in_context(self, context):
-        """Get a list of Pdus that we haven't backfilled beyond yet (and haven't    
-        seen). This list is used when we want to backfill backwards and is the 
+        """Get a list of Pdus that we haven't backfilled beyond yet (and havent
+        seen). This list is used when we want to backfill backwards and is the
         list we send to the remote server.
 
         Args:
@@ -345,7 +346,7 @@ class PduStore(SQLBaseStore):
             bool
         """
 
-        return self._db_pool.runInteraction(
+        return self.runInteraction(
             self._is_pdu_new,
             pdu_id=pdu_id,
             origin=origin,
@@ -498,7 +499,7 @@ class StatePduStore(SQLBaseStore):
         )
 
     def get_unresolved_state_tree(self, new_state_pdu):
-        return self._db_pool.runInteraction(
+        return self.runInteraction(
             self._get_unresolved_state_tree, new_state_pdu
         )
 
@@ -516,7 +517,7 @@ class StatePduStore(SQLBaseStore):
 
         if not current:
             logger.debug("get_unresolved_state_tree No current state.")
-            return return_value
+            return (return_value, None)
 
         return_value.current_branch.append(current)
 
@@ -524,17 +525,20 @@ class StatePduStore(SQLBaseStore):
             txn, new_pdu, current
         )
 
+        missing_branch = None
         for branch, prev_state, state in enum_branches:
             if state:
                 return_value[branch].append(state)
             else:
+                # We don't have prev_state :(
+                missing_branch = branch
                 break
 
-        return return_value
+        return (return_value, missing_branch)
 
     def update_current_state(self, pdu_id, origin, context, pdu_type,
                              state_key):
-        return self._db_pool.runInteraction(
+        return self.runInteraction(
             self._update_current_state,
             pdu_id, origin, context, pdu_type, state_key
         )
@@ -573,7 +577,7 @@ class StatePduStore(SQLBaseStore):
             PduEntry
         """
 
-        return self._db_pool.runInteraction(
+        return self.runInteraction(
             self._get_current_state_pdu, context, pdu_type, state_key
         )
 
@@ -622,53 +626,6 @@ class StatePduStore(SQLBaseStore):
 
         return result
 
-    def get_next_missing_pdu(self, new_pdu):
-        """When we get a new state pdu we need to check whether we need to do
-        any conflict resolution, if we do then we need to check if we need
-        to go back and request some more state pdus that we haven't seen yet.
-
-        Args:
-            txn
-            new_pdu
-
-        Returns:
-            PduIdTuple: A pdu that we are missing, or None if we have all the
-                pdus required to do the conflict resolution.
-        """
-        return self._db_pool.runInteraction(
-            self._get_next_missing_pdu, new_pdu
-        )
-
-    def _get_next_missing_pdu(self, txn, new_pdu):
-        logger.debug(
-            "get_next_missing_pdu %s %s",
-            new_pdu.pdu_id, new_pdu.origin
-        )
-
-        current = self._get_current_interaction(
-            txn,
-            new_pdu.context, new_pdu.pdu_type, new_pdu.state_key
-        )
-
-        if (not current or not current.prev_state_id
-                or not current.prev_state_origin):
-            return None
-
-        # Oh look, it's a straight clobber, so wooooo almost no-op.
-        if (new_pdu.prev_state_id == current.pdu_id
-                and new_pdu.prev_state_origin == current.origin):
-            return None
-
-        enum_branches = self._enumerate_state_branches(txn, new_pdu, current)
-        for branch, prev_state, state in enum_branches:
-            if not state:
-                return PduIdTuple(
-                    prev_state.prev_state_id,
-                    prev_state.prev_state_origin
-                )
-
-        return None
-
     def handle_new_state(self, new_pdu):
         """Actually perform conflict resolution on the new_pdu on the
         assumption we have all the pdus required to perform it.
@@ -679,7 +636,7 @@ class StatePduStore(SQLBaseStore):
         Returns:
             bool: True if the new_pdu clobbered the current state, False if not
         """
-        return self._db_pool.runInteraction(
+        return self.runInteraction(
             self._handle_new_state, new_pdu
         )
 
@@ -752,24 +709,11 @@ class StatePduStore(SQLBaseStore):
 
         return is_current
 
-    @classmethod
     @log_function
-    def _enumerate_state_branches(cls, txn, pdu_a, pdu_b):
+    def _enumerate_state_branches(self, txn, pdu_a, pdu_b):
         branch_a = pdu_a
         branch_b = pdu_b
 
-        get_query = (
-            "SELECT %(fields)s FROM %(pdus)s as p "
-            "LEFT JOIN %(state)s as s "
-            "ON p.pdu_id = s.pdu_id AND p.origin = s.origin "
-            "WHERE p.pdu_id = ? AND p.origin = ? "
-        ) % {
-            "fields": _pdu_state_joiner.get_fields(
-                PdusTable="p", StatePdusTable="s"),
-            "pdus": PdusTable.table_name,
-            "state": StatePdusTable.table_name,
-        }
-
         while True:
             if (branch_a.pdu_id == branch_b.pdu_id
                     and branch_a.origin == branch_b.origin):
@@ -801,13 +745,12 @@ class StatePduStore(SQLBaseStore):
                     branch_a.prev_state_origin
                 )
 
-                logger.debug("getting branch_a prev %s", pdu_tuple)
-                txn.execute(get_query, pdu_tuple)
-
                 prev_branch = branch_a
 
-                res = txn.fetchone()
-                branch_a = PduEntry(*res) if res else None
+                logger.debug("getting branch_a prev %s", pdu_tuple)
+                branch_a = self._get_pdu_tuple(txn, *pdu_tuple)
+                if branch_a:
+                    branch_a = Pdu.from_pdu_tuple(branch_a)
 
                 logger.debug("branch_a=%s", branch_a)
 
@@ -820,14 +763,13 @@ class StatePduStore(SQLBaseStore):
                     branch_b.prev_state_id,
                     branch_b.prev_state_origin
                 )
-                txn.execute(get_query, pdu_tuple)
-
-                logger.debug("getting branch_b prev %s", pdu_tuple)
 
                 prev_branch = branch_b
 
-                res = txn.fetchone()
-                branch_b = PduEntry(*res) if res else None
+                logger.debug("getting branch_b prev %s", pdu_tuple)
+                branch_b = self._get_pdu_tuple(txn, *pdu_tuple)
+                if branch_b:
+                    branch_b = Pdu.from_pdu_tuple(branch_b)
 
                 logger.debug("branch_b=%s", branch_b)
 
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index fd762bc643..db20b1daa0 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -62,7 +62,7 @@ class RegistrationStore(SQLBaseStore):
         Raises:
             StoreError if the user_id could not be registered.
         """
-        yield self._db_pool.runInteraction(self._register, user_id, token,
+        yield self.runInteraction(self._register, user_id, token,
                                            password_hash)
 
     def _register(self, txn, user_id, token, password_hash):
@@ -99,7 +99,7 @@ class RegistrationStore(SQLBaseStore):
         Raises:
             StoreError if no user was found.
         """
-        user_id = yield self._db_pool.runInteraction(self._query_for_auth,
+        user_id = yield self.runInteraction(self._query_for_auth,
                                                      token)
         defer.returnValue(user_id)
 
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 017169ce00..5adf8cdf1b 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -149,7 +149,7 @@ class RoomStore(SQLBaseStore):
             defer.returnValue(None)
 
     def get_power_level(self, room_id, user_id):
-        return self._db_pool.runInteraction(
+        return self.runInteraction(
             self._get_power_level,
             room_id, user_id,
         )
@@ -182,7 +182,7 @@ class RoomStore(SQLBaseStore):
             return None
 
     def get_ops_levels(self, room_id):
-        return self._db_pool.runInteraction(
+        return self.runInteraction(
             self._get_ops_levels,
             room_id,
         )
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 75c9a60101..04b4067d03 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
 from ._base import SQLBaseStore
 
 from synapse.api.constants import Membership
+from synapse.util.logutils import log_function
 
 import logging
 
@@ -29,8 +30,18 @@ class RoomMemberStore(SQLBaseStore):
     def _store_room_member_txn(self, txn, event):
         """Store a room member in the database.
         """
-        target_user_id = event.state_key
-        domain = self.hs.parse_userid(target_user_id).domain
+        try:
+            target_user_id = event.state_key
+            domain = self.hs.parse_userid(target_user_id).domain
+        except:
+            logger.exception("Failed to parse target_user_id=%s", target_user_id)
+            raise
+
+        logger.debug(
+            "_store_room_member_txn: target_user_id=%s, membership=%s",
+            target_user_id,
+            event.membership,
+        )
 
         self._simple_insert_txn(
             txn,
@@ -51,12 +62,30 @@ class RoomMemberStore(SQLBaseStore):
                 "VALUES (?, ?)"
             )
             txn.execute(sql, (event.room_id, domain))
-        else:
-            sql = (
-                "DELETE FROM room_hosts WHERE room_id = ? AND host = ?"
+        elif event.membership != Membership.INVITE:
+            # Check if this was the last person to have left.
+            member_events = self._get_members_query_txn(
+                txn,
+                where_clause="c.room_id = ? AND m.membership = ? AND m.user_id != ?",
+                where_values=(event.room_id, Membership.JOIN, target_user_id,)
             )
 
-            txn.execute(sql, (event.room_id, domain))
+            joined_domains = set()
+            for e in member_events:
+                try:
+                    joined_domains.add(
+                        self.hs.parse_userid(e.state_key).domain
+                    )
+                except:
+                    # FIXME: How do we deal with invalid user ids in the db?
+                    logger.exception("Invalid user_id: %s", event.state_key)
+
+            if domain not in joined_domains:
+                sql = (
+                    "DELETE FROM room_hosts WHERE room_id = ? AND host = ?"
+                )
+
+                txn.execute(sql, (event.room_id, domain))
 
     @defer.inlineCallbacks
     def get_room_member(self, user_id, room_id):
@@ -88,7 +117,7 @@ class RoomMemberStore(SQLBaseStore):
         txn.execute(sql, (user_id, room_id))
         rows = self.cursor_to_dict(txn)
         if rows:
-            return self._parse_event_from_row(rows[0])
+            return self._parse_events_txn(txn, rows)[0]
         else:
             return None
 
@@ -120,7 +149,7 @@ class RoomMemberStore(SQLBaseStore):
             membership_list (list): A list of synapse.api.constants.Membership
             values which the user must be in.
         Returns:
-            A list of dicts with "room_id" and "membership" keys.
+            A list of RoomMemberEvent objects
         """
         if not membership_list:
             return defer.succeed(None)
@@ -146,8 +175,13 @@ class RoomMemberStore(SQLBaseStore):
         vals = where_dict.values()
         return self._get_members_query(clause, vals)
 
-    @defer.inlineCallbacks
     def _get_members_query(self, where_clause, where_values):
+        return self._db_pool.runInteraction(
+            self._get_members_query_txn,
+            where_clause, where_values
+        )
+
+    def _get_members_query_txn(self, txn, where_clause, where_values):
         sql = (
             "SELECT e.* FROM events as e "
             "INNER JOIN room_memberships as m "
@@ -157,18 +191,18 @@ class RoomMemberStore(SQLBaseStore):
             "WHERE %s "
         ) % (where_clause,)
 
-        rows = yield self._execute_and_decode(sql, *where_values)
-
-        # logger.debug("_get_members_query Got rows %s", rows)
+        txn.execute(sql, where_values)
+        rows = self.cursor_to_dict(txn)
 
-        results = [self._parse_event_from_row(r) for r in rows]
-        defer.returnValue(results)
+        results = self._parse_events_txn(txn, rows)
+        return results
 
     @defer.inlineCallbacks
-    def user_rooms_intersect(self, user_list):
-        """ Checks whether a list of users share a room.
+    def user_rooms_intersect(self, user_id_list):
+        """ Checks whether all the users whose IDs are given in a list share a
+        room.
         """
-        user_list_clause = " OR ".join(["m.user_id = ?"] * len(user_list))
+        user_list_clause = " OR ".join(["m.user_id = ?"] * len(user_id_list))
         sql = (
             "SELECT m.room_id FROM room_memberships as m "
             "INNER JOIN current_state_events as c "
@@ -178,8 +212,8 @@ class RoomMemberStore(SQLBaseStore):
             "GROUP BY m.room_id HAVING COUNT(m.room_id) = ?"
         ) % {"clause": user_list_clause}
 
-        args = user_list
-        args.append(len(user_list))
+        args = list(user_id_list)
+        args.append(len(user_id_list))
 
         rows = yield self._execute(None, sql, *args)
 
diff --git a/synapse/storage/schema/delta/v3.sql b/synapse/storage/schema/delta/v3.sql
new file mode 100644
index 0000000000..cade295989
--- /dev/null
+++ b/synapse/storage/schema/delta/v3.sql
@@ -0,0 +1,27 @@
+/* Copyright 2014 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+CREATE INDEX IF NOT EXISTS room_aliases_alias ON room_aliases(room_alias);
+CREATE INDEX IF NOT EXISTS room_aliases_id ON room_aliases(room_id);
+
+
+CREATE INDEX IF NOT EXISTS room_alias_servers_alias ON room_alias_servers(room_alias);
+
+DELETE FROM room_aliases WHERE rowid NOT IN (SELECT max(rowid) FROM room_aliases GROUP BY room_alias, room_id);
+
+CREATE UNIQUE INDEX IF NOT EXISTS room_aliases_uniq ON room_aliases(room_alias, room_id);
+
+PRAGMA user_version = 3;
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 2cb0067a67..a76fecf24f 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -146,7 +146,7 @@ class StreamStore(SQLBaseStore):
         current_room_membership_sql = (
             "SELECT m.room_id FROM room_memberships as m "
             "INNER JOIN current_state_events as c ON m.event_id = c.event_id "
-            "WHERE m.user_id = ?"
+            "WHERE m.user_id = ? AND m.membership = 'join'"
         )
 
         # We also want to get any membership events about that user, e.g.
@@ -188,7 +188,7 @@ class StreamStore(SQLBaseStore):
             user_id, user_id, from_id, to_id
         )
 
-        ret = [self._parse_event_from_row(r) for r in rows]
+        ret = yield self._parse_events(rows)
 
         if rows:
             key = "s%d" % max([r["stream_ordering"] for r in rows])
@@ -243,9 +243,11 @@ class StreamStore(SQLBaseStore):
             # TODO (erikj): We should work out what to do here instead.
             next_token = to_key if to_key else from_key
 
+        events = yield self._parse_events(rows)
+
         defer.returnValue(
             (
-                [self._parse_event_from_row(r) for r in rows],
+                events,
                 next_token
             )
         )
@@ -277,15 +279,14 @@ class StreamStore(SQLBaseStore):
         else:
             token = (end_token, end_token)
 
-        defer.returnValue(
-            (
-                [self._parse_event_from_row(r) for r in rows],
-                token
-            )
-        )
+        events = yield self._parse_events(rows)
+
+        ret = (events, token)
+
+        defer.returnValue(ret)
 
     def get_room_events_max_id(self):
-        return self._db_pool.runInteraction(self._get_room_events_max_id_txn)
+        return self.runInteraction(self._get_room_events_max_id_txn)
 
     def _get_room_events_max_id_txn(self, txn):
         txn.execute(
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index 7467e1035b..ab4599b468 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -41,7 +41,7 @@ class TransactionStore(SQLBaseStore):
             this transaction or a 2-tuple of (int, dict)
         """
 
-        return self._db_pool.runInteraction(
+        return self.runInteraction(
             self._get_received_txn_response, transaction_id, origin
         )
 
@@ -72,7 +72,7 @@ class TransactionStore(SQLBaseStore):
             response_json (str)
         """
 
-        return self._db_pool.runInteraction(
+        return self.runInteraction(
             self._set_received_txn_response,
             transaction_id, origin, code, response_dict
         )
@@ -104,7 +104,7 @@ class TransactionStore(SQLBaseStore):
             list: A list of previous transaction ids.
         """
 
-        return self._db_pool.runInteraction(
+        return self.runInteraction(
             self._prep_send_transaction,
             transaction_id, destination, ts, pdu_list
         )
@@ -159,7 +159,7 @@ class TransactionStore(SQLBaseStore):
             code (int)
             response_json (str)
         """
-        return self._db_pool.runInteraction(
+        return self.runInteraction(
             self._delivered_txn,
             transaction_id, destination, code, response_dict
         )
@@ -184,7 +184,7 @@ class TransactionStore(SQLBaseStore):
         Returns:
             list: A list of `ReceivedTransactionsTable.EntryType`
         """
-        return self._db_pool.runInteraction(
+        return self.runInteraction(
             self._get_transactions_after, transaction_id, destination
         )
 
@@ -214,7 +214,7 @@ class TransactionStore(SQLBaseStore):
         Returns
             list: A list of PduTuple
         """
-        return self._db_pool.runInteraction(
+        return self.runInteraction(
             self._get_pdus_after_transaction,
             transaction_id, destination
         )
diff --git a/synapse/util/emailutils.py b/synapse/util/emailutils.py
new file mode 100644
index 0000000000..cdb0abd7ea
--- /dev/null
+++ b/synapse/util/emailutils.py
@@ -0,0 +1,71 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+""" This module allows you to send out emails.
+"""
+import email.utils
+import smtplib
+import twisted.python.log
+from email.mime.text import MIMEText
+from email.mime.multipart import MIMEMultipart
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class EmailException(Exception):
+    pass
+
+
+def send_email(smtp_server, from_addr, to_addr, subject, body):
+    """Sends an email.
+
+    Args:
+        smtp_server(str): The SMTP server to use.
+        from_addr(str): The address to send from.
+        to_addr(str): The address to send to.
+        subject(str): The subject of the email.
+        body(str): The plain text body of the email.
+    Raises:
+        EmailException if there was a problem sending the mail.
+    """
+    if not smtp_server or not from_addr or not to_addr:
+        raise EmailException("Need SMTP server, from and to addresses. Check " +
+                             "the config to set these.")
+
+    msg = MIMEMultipart('alternative')
+    msg['Subject'] = subject
+    msg['From'] = from_addr
+    msg['To'] = to_addr
+    plain_part = MIMEText(body)
+    msg.attach(plain_part)
+
+    raw_from = email.utils.parseaddr(from_addr)[1]
+    raw_to = email.utils.parseaddr(to_addr)[1]
+    if not raw_from or not raw_to:
+        raise EmailException("Couldn't parse from/to address.")
+
+    logger.info("Sending email to %s on server %s with subject %s",
+                to_addr, smtp_server, subject)
+
+    try:
+        smtp = smtplib.SMTP(smtp_server)
+        smtp.sendmail(raw_from, raw_to, msg.as_string())
+        smtp.quit()
+    except Exception as origException:
+        twisted.python.log.err()
+        ese = EmailException()
+        ese.cause = origException
+        raise ese
\ No newline at end of file