summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-06-20 14:18:04 +0100
committerErik Johnston <erik@matrix.org>2016-06-20 14:18:04 +0100
commitbc72d381b2d5322982204bb214453e2af56f70d5 (patch)
tree4f6932f9a32e5e25b80e8202be75d04e50a9686e /synapse
parentpoint to the CAPTCHA docs (diff)
parentBump version and changelog (diff)
downloadsynapse-bc72d381b2d5322982204bb214453e2af56f70d5.tar.xz
Merge branch 'release-v0.16.1' of github.com:matrix-org/synapse v0.16.1
Diffstat (limited to 'synapse')
-rw-r--r--synapse/__init__.py2
-rwxr-xr-xsynapse/app/homeserver.py3
-rw-r--r--synapse/app/pusher.py172
-rw-r--r--synapse/app/synchrotron.py136
-rw-r--r--synapse/config/_base.py147
-rw-r--r--synapse/config/homeserver.py4
-rw-r--r--synapse/config/logger.py102
-rw-r--r--synapse/config/server.py33
-rw-r--r--synapse/config/workers.py31
-rw-r--r--synapse/federation/federation_base.py3
-rw-r--r--synapse/federation/federation_client.py2
-rw-r--r--synapse/federation/federation_server.py112
-rw-r--r--synapse/federation/replication.py2
-rw-r--r--synapse/federation/transport/server.py9
-rw-r--r--synapse/handlers/auth.py2
-rw-r--r--synapse/handlers/federation.py45
-rw-r--r--synapse/handlers/profile.py7
-rw-r--r--synapse/handlers/register.py28
-rw-r--r--synapse/handlers/room.py32
-rw-r--r--synapse/handlers/typing.py3
-rw-r--r--synapse/http/client.py28
-rw-r--r--synapse/push/mailer.py29
-rw-r--r--synapse/rest/client/v1/events.py45
-rw-r--r--synapse/rest/client/v1/room.py9
-rw-r--r--synapse/rest/media/v1/media_repository.py14
-rw-r--r--synapse/rest/media/v1/preview_url_resource.py11
-rw-r--r--synapse/storage/account_data.py3
-rw-r--r--synapse/storage/presence.py3
-rw-r--r--synapse/storage/push_rule.py3
-rw-r--r--synapse/storage/registration.py17
-rw-r--r--synapse/storage/tags.py3
-rw-r--r--synapse/types.py5
-rw-r--r--synapse/util/distributor.py4
33 files changed, 545 insertions, 504 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py
index dc211e9637..3cd79b1247 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
 """ This is a reference implementation of a Matrix home server.
 """
 
-__version__ = "0.16.0"
+__version__ = "0.16.1"
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 22e1721fc4..40ffd9bf0d 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -266,10 +266,9 @@ def setup(config_options):
         HomeServer
     """
     try:
-        config = HomeServerConfig.load_config(
+        config = HomeServerConfig.load_or_generate_config(
             "Synapse Homeserver",
             config_options,
-            generate_section="Homeserver"
         )
     except ConfigError as e:
         sys.stderr.write("\n" + e.message + "\n")
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 4ec23d84c1..4f1d18ab5f 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -18,10 +18,8 @@ import synapse
 
 from synapse.server import HomeServer
 from synapse.config._base import ConfigError
-from synapse.config.database import DatabaseConfig
-from synapse.config.logger import LoggingConfig
-from synapse.config.emailconfig import EmailConfig
-from synapse.config.key import KeyConfig
+from synapse.config.logger import setup_logging
+from synapse.config.homeserver import HomeServerConfig
 from synapse.http.site import SynapseSite
 from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
 from synapse.storage.roommember import RoomMemberStore
@@ -43,98 +41,13 @@ from twisted.web.resource import Resource
 
 from daemonize import Daemonize
 
-import gc
 import sys
 import logging
+import gc
 
 logger = logging.getLogger("synapse.app.pusher")
 
 
-class SlaveConfig(DatabaseConfig):
-    def read_config(self, config):
-        self.replication_url = config["replication_url"]
-        self.server_name = config["server_name"]
-        self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get(
-            "use_insecure_ssl_client_just_for_testing_do_not_use", False
-        )
-        self.user_agent_suffix = None
-        self.start_pushers = True
-        self.listeners = config["listeners"]
-        self.soft_file_limit = config.get("soft_file_limit")
-        self.daemonize = config.get("daemonize")
-        self.pid_file = self.abspath(config.get("pid_file"))
-        self.public_baseurl = config["public_baseurl"]
-
-        thresholds = config.get("gc_thresholds", None)
-        if thresholds is not None:
-            try:
-                assert len(thresholds) == 3
-                self.gc_thresholds = (
-                    int(thresholds[0]), int(thresholds[1]), int(thresholds[2]),
-                )
-            except:
-                raise ConfigError(
-                    "Value of `gc_threshold` must be a list of three integers if set"
-                )
-        else:
-            self.gc_thresholds = None
-
-        # some things used by the auth handler but not actually used in the
-        # pusher codebase
-        self.bcrypt_rounds = None
-        self.ldap_enabled = None
-        self.ldap_server = None
-        self.ldap_port = None
-        self.ldap_tls = None
-        self.ldap_search_base = None
-        self.ldap_search_property = None
-        self.ldap_email_property = None
-        self.ldap_full_name_property = None
-
-        # We would otherwise try to use the registration shared secret as the
-        # macaroon shared secret if there was no macaroon_shared_secret, but
-        # that means pulling in RegistrationConfig too. We don't need to be
-        # backwards compaitible in the pusher codebase so just make people set
-        # macaroon_shared_secret. We set this to None to prevent it referencing
-        # an undefined key.
-        self.registration_shared_secret = None
-
-    def default_config(self, server_name, **kwargs):
-        pid_file = self.abspath("pusher.pid")
-        return """\
-        # Slave configuration
-
-        # The replication listener on the synapse to talk to.
-        #replication_url: https://localhost:{replication_port}/_synapse/replication
-
-        server_name: "%(server_name)s"
-
-        listeners: []
-        # Enable a ssh manhole listener on the pusher.
-        # - type: manhole
-        #   port: {manhole_port}
-        #   bind_address: 127.0.0.1
-        # Enable a metric listener on the pusher.
-        # - type: http
-        #   port: {metrics_port}
-        #   bind_address: 127.0.0.1
-        #   resources:
-        #    - names: ["metrics"]
-        #      compress: False
-
-        report_stats: False
-
-        daemonize: False
-
-        pid_file: %(pid_file)s
-
-        """ % locals()
-
-
-class PusherSlaveConfig(SlaveConfig, LoggingConfig, EmailConfig, KeyConfig):
-    pass
-
-
 class PusherSlaveStore(
     SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore,
     SlavedAccountDataStore
@@ -199,7 +112,7 @@ class PusherServer(HomeServer):
 
     def remove_pusher(self, app_id, push_key, user_id):
         http_client = self.get_simple_http_client()
-        replication_url = self.config.replication_url
+        replication_url = self.config.worker_replication_url
         url = replication_url + "/remove_pushers"
         return http_client.post_json_get_json(url, {
             "remove": [{
@@ -232,8 +145,8 @@ class PusherServer(HomeServer):
         )
         logger.info("Synapse pusher now listening on port %d", port)
 
-    def start_listening(self):
-        for listener in self.config.listeners:
+    def start_listening(self, listeners):
+        for listener in listeners:
             if listener["type"] == "http":
                 self._listen_http(listener)
             elif listener["type"] == "manhole":
@@ -253,7 +166,7 @@ class PusherServer(HomeServer):
     def replicate(self):
         http_client = self.get_simple_http_client()
         store = self.get_datastore()
-        replication_url = self.config.replication_url
+        replication_url = self.config.worker_replication_url
         pusher_pool = self.get_pusherpool()
         clock = self.get_clock()
 
@@ -329,19 +242,30 @@ class PusherServer(HomeServer):
                 yield sleep(30)
 
 
-def setup(config_options):
+def start(config_options):
     try:
-        config = PusherSlaveConfig.load_config(
+        config = HomeServerConfig.load_config(
             "Synapse pusher", config_options
         )
     except ConfigError as e:
         sys.stderr.write("\n" + e.message + "\n")
         sys.exit(1)
 
-    if not config:
-        sys.exit(0)
+    assert config.worker_app == "synapse.app.pusher"
+
+    setup_logging(config.worker_log_config, config.worker_log_file)
 
-    config.setup_logging()
+    if config.start_pushers:
+        sys.stderr.write(
+            "\nThe pushers must be disabled in the main synapse process"
+            "\nbefore they can be run in a separate worker."
+            "\nPlease add ``start_pushers: false`` to the main config"
+            "\n"
+        )
+        sys.exit(1)
+
+    # Force the pushers to start since they will be disabled in the main config
+    config.start_pushers = True
 
     database_engine = create_engine(config.database_config)
 
@@ -354,11 +278,15 @@ def setup(config_options):
     )
 
     ps.setup()
-    ps.start_listening()
-
-    change_resource_limit(ps.config.soft_file_limit)
-    if ps.config.gc_thresholds:
-        gc.set_threshold(*ps.config.gc_thresholds)
+    ps.start_listening(config.worker_listeners)
+
+    def run():
+        with LoggingContext("run"):
+            logger.info("Running")
+            change_resource_limit(config.soft_file_limit)
+            if config.gc_thresholds:
+                gc.set_threshold(*config.gc_thresholds)
+            reactor.run()
 
     def start():
         ps.replicate()
@@ -367,30 +295,20 @@ def setup(config_options):
 
     reactor.callWhenRunning(start)
 
-    return ps
+    if config.worker_daemonize:
+        daemon = Daemonize(
+            app="synapse-pusher",
+            pid=config.worker_pid_file,
+            action=run,
+            auto_close_fds=False,
+            verbose=True,
+            logger=logger,
+        )
+        daemon.start()
+    else:
+        run()
 
 
 if __name__ == '__main__':
     with LoggingContext("main"):
-        ps = setup(sys.argv[1:])
-
-        if ps.config.daemonize:
-            def run():
-                with LoggingContext("run"):
-                    change_resource_limit(ps.config.soft_file_limit)
-                    if ps.config.gc_thresholds:
-                        gc.set_threshold(*ps.config.gc_thresholds)
-                    reactor.run()
-
-            daemon = Daemonize(
-                app="synapse-pusher",
-                pid=ps.config.pid_file,
-                action=run,
-                auto_close_fds=False,
-                verbose=True,
-                logger=logger,
-            )
-
-            daemon.start()
-        else:
-            reactor.run()
+        ps = start(sys.argv[1:])
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 297e199453..8cf5bbbb6d 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -18,9 +18,8 @@ import synapse
 
 from synapse.api.constants import EventTypes, PresenceState
 from synapse.config._base import ConfigError
-from synapse.config.database import DatabaseConfig
-from synapse.config.logger import LoggingConfig
-from synapse.config.appservice import AppServiceConfig
+from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
 from synapse.events import FrozenEvent
 from synapse.handlers.presence import PresenceHandler
 from synapse.http.site import SynapseSite
@@ -63,70 +62,6 @@ import ujson as json
 logger = logging.getLogger("synapse.app.synchrotron")
 
 
-class SynchrotronConfig(DatabaseConfig, LoggingConfig, AppServiceConfig):
-    def read_config(self, config):
-        self.replication_url = config["replication_url"]
-        self.server_name = config["server_name"]
-        self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get(
-            "use_insecure_ssl_client_just_for_testing_do_not_use", False
-        )
-        self.user_agent_suffix = None
-        self.listeners = config["listeners"]
-        self.soft_file_limit = config.get("soft_file_limit")
-        self.daemonize = config.get("daemonize")
-        self.pid_file = self.abspath(config.get("pid_file"))
-        self.macaroon_secret_key = config["macaroon_secret_key"]
-        self.expire_access_token = config.get("expire_access_token", False)
-
-        thresholds = config.get("gc_thresholds", None)
-        if thresholds is not None:
-            try:
-                assert len(thresholds) == 3
-                self.gc_thresholds = (
-                    int(thresholds[0]), int(thresholds[1]), int(thresholds[2]),
-                )
-            except:
-                raise ConfigError(
-                    "Value of `gc_threshold` must be a list of three integers if set"
-                )
-        else:
-            self.gc_thresholds = None
-
-    def default_config(self, server_name, **kwargs):
-        pid_file = self.abspath("synchroton.pid")
-        return """\
-        # Slave configuration
-
-        # The replication listener on the synapse to talk to.
-        #replication_url: https://localhost:{replication_port}/_synapse/replication
-
-        server_name: "%(server_name)s"
-
-        listeners:
-        # Enable a /sync listener on the synchrontron
-        #- type: http
-        #    port: {http_port}
-        #    bind_address: ""
-        # Enable a ssh manhole listener on the synchrotron
-        # - type: manhole
-        #   port: {manhole_port}
-        #   bind_address: 127.0.0.1
-        # Enable a metric listener on the synchrotron
-        # - type: http
-        #   port: {metrics_port}
-        #   bind_address: 127.0.0.1
-        #   resources:
-        #    - names: ["metrics"]
-        #      compress: False
-
-        report_stats: False
-
-        daemonize: False
-
-        pid_file: %(pid_file)s
-        """ % locals()
-
-
 class SynchrotronSlavedStore(
     SlavedPushRuleStore,
     SlavedEventStore,
@@ -163,7 +98,7 @@ class SynchrotronPresence(object):
         self.http_client = hs.get_simple_http_client()
         self.store = hs.get_datastore()
         self.user_to_num_current_syncs = {}
-        self.syncing_users_url = hs.config.replication_url + "/syncing_users"
+        self.syncing_users_url = hs.config.worker_replication_url + "/syncing_users"
         self.clock = hs.get_clock()
 
         active_presence = self.store.take_presence_startup_info()
@@ -350,8 +285,8 @@ class SynchrotronServer(HomeServer):
         )
         logger.info("Synapse synchrotron now listening on port %d", port)
 
-    def start_listening(self):
-        for listener in self.config.listeners:
+    def start_listening(self, listeners):
+        for listener in listeners:
             if listener["type"] == "http":
                 self._listen_http(listener)
             elif listener["type"] == "manhole":
@@ -371,7 +306,7 @@ class SynchrotronServer(HomeServer):
     def replicate(self):
         http_client = self.get_simple_http_client()
         store = self.get_datastore()
-        replication_url = self.config.replication_url
+        replication_url = self.config.worker_replication_url
         clock = self.get_clock()
         notifier = self.get_notifier()
         presence_handler = self.get_presence_handler()
@@ -470,19 +405,18 @@ class SynchrotronServer(HomeServer):
         return SynchrotronTyping(self)
 
 
-def setup(config_options):
+def start(config_options):
     try:
-        config = SynchrotronConfig.load_config(
+        config = HomeServerConfig.load_config(
             "Synapse synchrotron", config_options
         )
     except ConfigError as e:
         sys.stderr.write("\n" + e.message + "\n")
         sys.exit(1)
 
-    if not config:
-        sys.exit(0)
+    assert config.worker_app == "synapse.app.synchrotron"
 
-    config.setup_logging()
+    setup_logging(config.worker_log_config, config.worker_log_file)
 
     database_engine = create_engine(config.database_config)
 
@@ -496,11 +430,15 @@ def setup(config_options):
     )
 
     ss.setup()
-    ss.start_listening()
-
-    change_resource_limit(ss.config.soft_file_limit)
-    if ss.config.gc_thresholds:
-        ss.set_threshold(*ss.config.gc_thresholds)
+    ss.start_listening(config.worker_listeners)
+
+    def run():
+        with LoggingContext("run"):
+            logger.info("Running")
+            change_resource_limit(config.soft_file_limit)
+            if config.gc_thresholds:
+                gc.set_threshold(*config.gc_thresholds)
+            reactor.run()
 
     def start():
         ss.get_datastore().start_profiling()
@@ -508,30 +446,20 @@ def setup(config_options):
 
     reactor.callWhenRunning(start)
 
-    return ss
+    if config.worker_daemonize:
+        daemon = Daemonize(
+            app="synapse-synchrotron",
+            pid=config.worker_pid_file,
+            action=run,
+            auto_close_fds=False,
+            verbose=True,
+            logger=logger,
+        )
+        daemon.start()
+    else:
+        run()
 
 
 if __name__ == '__main__':
     with LoggingContext("main"):
-        ss = setup(sys.argv[1:])
-
-        if ss.config.daemonize:
-            def run():
-                with LoggingContext("run"):
-                    change_resource_limit(ss.config.soft_file_limit)
-                    if ss.config.gc_thresholds:
-                        gc.set_threshold(*ss.config.gc_thresholds)
-                    reactor.run()
-
-            daemon = Daemonize(
-                app="synapse-synchrotron",
-                pid=ss.config.pid_file,
-                action=run,
-                auto_close_fds=False,
-                verbose=True,
-                logger=logger,
-            )
-
-            daemon.start()
-        else:
-            reactor.run()
+        start(sys.argv[1:])
diff --git a/synapse/config/_base.py b/synapse/config/_base.py
index 7449f36491..af9f17bf7b 100644
--- a/synapse/config/_base.py
+++ b/synapse/config/_base.py
@@ -157,9 +157,40 @@ class Config(object):
         return default_config, config
 
     @classmethod
-    def load_config(cls, description, argv, generate_section=None):
+    def load_config(cls, description, argv):
+        config_parser = argparse.ArgumentParser(
+            description=description,
+        )
+        config_parser.add_argument(
+            "-c", "--config-path",
+            action="append",
+            metavar="CONFIG_FILE",
+            help="Specify config file. Can be given multiple times and"
+                 " may specify directories containing *.yaml files."
+        )
+
+        config_parser.add_argument(
+            "--keys-directory",
+            metavar="DIRECTORY",
+            help="Where files such as certs and signing keys are stored when"
+                 " their location is given explicitly in the config."
+                 " Defaults to the directory containing the last config file",
+        )
+
+        config_args = config_parser.parse_args(argv)
+
+        config_files = find_config_files(search_paths=config_args.config_path)
+
         obj = cls()
+        obj.read_config_files(
+            config_files,
+            keys_directory=config_args.keys_directory,
+            generate_keys=False,
+        )
+        return obj
 
+    @classmethod
+    def load_or_generate_config(cls, description, argv):
         config_parser = argparse.ArgumentParser(add_help=False)
         config_parser.add_argument(
             "-c", "--config-path",
@@ -176,7 +207,7 @@ class Config(object):
         config_parser.add_argument(
             "--report-stats",
             action="store",
-            help="Stuff",
+            help="Whether the generated config reports anonymized usage statistics",
             choices=["yes", "no"]
         )
         config_parser.add_argument(
@@ -197,36 +228,11 @@ class Config(object):
         )
         config_args, remaining_args = config_parser.parse_known_args(argv)
 
+        config_files = find_config_files(search_paths=config_args.config_path)
+
         generate_keys = config_args.generate_keys
 
-        config_files = []
-        if config_args.config_path:
-            for config_path in config_args.config_path:
-                if os.path.isdir(config_path):
-                    # We accept specifying directories as config paths, we search
-                    # inside that directory for all files matching *.yaml, and then
-                    # we apply them in *sorted* order.
-                    files = []
-                    for entry in os.listdir(config_path):
-                        entry_path = os.path.join(config_path, entry)
-                        if not os.path.isfile(entry_path):
-                            print (
-                                "Found subdirectory in config directory: %r. IGNORING."
-                            ) % (entry_path, )
-                            continue
-
-                        if not entry.endswith(".yaml"):
-                            print (
-                                "Found file in config directory that does not"
-                                " end in '.yaml': %r. IGNORING."
-                            ) % (entry_path, )
-                            continue
-
-                        files.append(entry_path)
-
-                    config_files.extend(sorted(files))
-                else:
-                    config_files.append(config_path)
+        obj = cls()
 
         if config_args.generate_config:
             if config_args.report_stats is None:
@@ -299,28 +305,43 @@ class Config(object):
                 " -c CONFIG-FILE\""
             )
 
-        if config_args.keys_directory:
-            config_dir_path = config_args.keys_directory
-        else:
-            config_dir_path = os.path.dirname(config_args.config_path[-1])
-        config_dir_path = os.path.abspath(config_dir_path)
+        obj.read_config_files(
+            config_files,
+            keys_directory=config_args.keys_directory,
+            generate_keys=generate_keys,
+        )
+
+        if generate_keys:
+            return None
+
+        obj.invoke_all("read_arguments", args)
+
+        return obj
+
+    def read_config_files(self, config_files, keys_directory=None,
+                          generate_keys=False):
+        if not keys_directory:
+            keys_directory = os.path.dirname(config_files[-1])
+
+        config_dir_path = os.path.abspath(keys_directory)
 
         specified_config = {}
         for config_file in config_files:
-            yaml_config = cls.read_config_file(config_file)
+            yaml_config = self.read_config_file(config_file)
             specified_config.update(yaml_config)
 
         if "server_name" not in specified_config:
             raise ConfigError(MISSING_SERVER_NAME)
 
         server_name = specified_config["server_name"]
-        _, config = obj.generate_config(
+        _, config = self.generate_config(
             config_dir_path=config_dir_path,
             server_name=server_name,
             is_generating_file=False,
         )
         config.pop("log_config")
         config.update(specified_config)
+
         if "report_stats" not in config:
             raise ConfigError(
                 MISSING_REPORT_STATS_CONFIG_INSTRUCTIONS + "\n" +
@@ -328,11 +349,51 @@ class Config(object):
             )
 
         if generate_keys:
-            obj.invoke_all("generate_files", config)
+            self.invoke_all("generate_files", config)
             return
 
-        obj.invoke_all("read_config", config)
-
-        obj.invoke_all("read_arguments", args)
-
-        return obj
+        self.invoke_all("read_config", config)
+
+
+def find_config_files(search_paths):
+    """Finds config files using a list of search paths. If a path is a file
+    then that file path is added to the list. If a search path is a directory
+    then all the "*.yaml" files in that directory are added to the list in
+    sorted order.
+
+    Args:
+        search_paths(list(str)): A list of paths to search.
+
+    Returns:
+        list(str): A list of file paths.
+    """
+
+    config_files = []
+    if search_paths:
+        for config_path in search_paths:
+            if os.path.isdir(config_path):
+                # We accept specifying directories as config paths, we search
+                # inside that directory for all files matching *.yaml, and then
+                # we apply them in *sorted* order.
+                files = []
+                for entry in os.listdir(config_path):
+                    entry_path = os.path.join(config_path, entry)
+                    if not os.path.isfile(entry_path):
+                        print (
+                            "Found subdirectory in config directory: %r. IGNORING."
+                        ) % (entry_path, )
+                        continue
+
+                    if not entry.endswith(".yaml"):
+                        print (
+                            "Found file in config directory that does not"
+                            " end in '.yaml': %r. IGNORING."
+                        ) % (entry_path, )
+                        continue
+
+                    files.append(entry_path)
+
+                config_files.extend(sorted(files))
+            else:
+                config_files.append(config_path)
+    return config_files
diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py
index fc2445484c..79b0534b3b 100644
--- a/synapse/config/homeserver.py
+++ b/synapse/config/homeserver.py
@@ -32,13 +32,15 @@ from .password import PasswordConfig
 from .jwt import JWTConfig
 from .ldap import LDAPConfig
 from .emailconfig import EmailConfig
+from .workers import WorkerConfig
 
 
 class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig,
                        RatelimitConfig, ContentRepositoryConfig, CaptchaConfig,
                        VoipConfig, RegistrationConfig, MetricsConfig, ApiConfig,
                        AppServiceConfig, KeyConfig, SAML2Config, CasConfig,
-                       JWTConfig, LDAPConfig, PasswordConfig, EmailConfig,):
+                       JWTConfig, LDAPConfig, PasswordConfig, EmailConfig,
+                       WorkerConfig,):
     pass
 
 
diff --git a/synapse/config/logger.py b/synapse/config/logger.py
index 5047db898f..dc68683fbc 100644
--- a/synapse/config/logger.py
+++ b/synapse/config/logger.py
@@ -126,54 +126,58 @@ class LoggingConfig(Config):
                 )
 
     def setup_logging(self):
-        log_format = (
-            "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s"
-            " - %(message)s"
-        )
-        if self.log_config is None:
-
-            level = logging.INFO
-            level_for_storage = logging.INFO
-            if self.verbosity:
-                level = logging.DEBUG
-                if self.verbosity > 1:
-                    level_for_storage = logging.DEBUG
-
-            # FIXME: we need a logging.WARN for a -q quiet option
-            logger = logging.getLogger('')
-            logger.setLevel(level)
-
-            logging.getLogger('synapse.storage').setLevel(level_for_storage)
-
-            formatter = logging.Formatter(log_format)
-            if self.log_file:
-                # TODO: Customisable file size / backup count
-                handler = logging.handlers.RotatingFileHandler(
-                    self.log_file, maxBytes=(1000 * 1000 * 100), backupCount=3
-                )
-
-                def sighup(signum, stack):
-                    logger.info("Closing log file due to SIGHUP")
-                    handler.doRollover()
-                    logger.info("Opened new log file due to SIGHUP")
-
-                # TODO(paul): obviously this is a terrible mechanism for
-                #   stealing SIGHUP, because it means no other part of synapse
-                #   can use it instead. If we want to catch SIGHUP anywhere
-                #   else as well, I'd suggest we find a nicer way to broadcast
-                #   it around.
-                if getattr(signal, "SIGHUP"):
-                    signal.signal(signal.SIGHUP, sighup)
-            else:
-                handler = logging.StreamHandler()
-            handler.setFormatter(formatter)
-
-            handler.addFilter(LoggingContextFilter(request=""))
-
-            logger.addHandler(handler)
+        setup_logging(self.log_config, self.log_file, self.verbosity)
+
+
+def setup_logging(log_config=None, log_file=None, verbosity=None):
+    log_format = (
+        "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s"
+        " - %(message)s"
+    )
+    if log_config is None:
+
+        level = logging.INFO
+        level_for_storage = logging.INFO
+        if verbosity:
+            level = logging.DEBUG
+            if verbosity > 1:
+                level_for_storage = logging.DEBUG
+
+        # FIXME: we need a logging.WARN for a -q quiet option
+        logger = logging.getLogger('')
+        logger.setLevel(level)
+
+        logging.getLogger('synapse.storage').setLevel(level_for_storage)
+
+        formatter = logging.Formatter(log_format)
+        if log_file:
+            # TODO: Customisable file size / backup count
+            handler = logging.handlers.RotatingFileHandler(
+                log_file, maxBytes=(1000 * 1000 * 100), backupCount=3
+            )
+
+            def sighup(signum, stack):
+                logger.info("Closing log file due to SIGHUP")
+                handler.doRollover()
+                logger.info("Opened new log file due to SIGHUP")
+
+            # TODO(paul): obviously this is a terrible mechanism for
+            #   stealing SIGHUP, because it means no other part of synapse
+            #   can use it instead. If we want to catch SIGHUP anywhere
+            #   else as well, I'd suggest we find a nicer way to broadcast
+            #   it around.
+            if getattr(signal, "SIGHUP"):
+                signal.signal(signal.SIGHUP, sighup)
         else:
-            with open(self.log_config, 'r') as f:
-                logging.config.dictConfig(yaml.load(f))
+            handler = logging.StreamHandler()
+        handler.setFormatter(formatter)
+
+        handler.addFilter(LoggingContextFilter(request=""))
+
+        logger.addHandler(handler)
+    else:
+        with open(log_config, 'r') as f:
+            logging.config.dictConfig(yaml.load(f))
 
-        observer = PythonLoggingObserver()
-        observer.start()
+    observer = PythonLoggingObserver()
+    observer.start()
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 44b8d422e0..7840dc3ad6 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -27,7 +27,7 @@ class ServerConfig(Config):
         self.daemonize = config.get("daemonize")
         self.print_pidfile = config.get("print_pidfile")
         self.user_agent_suffix = config.get("user_agent_suffix")
-        self.use_frozen_dicts = config.get("use_frozen_dicts", True)
+        self.use_frozen_dicts = config.get("use_frozen_dicts", False)
         self.public_baseurl = config.get("public_baseurl")
         self.secondary_directory_servers = config.get("secondary_directory_servers", [])
 
@@ -38,19 +38,7 @@ class ServerConfig(Config):
 
         self.listeners = config.get("listeners", [])
 
-        thresholds = config.get("gc_thresholds", None)
-        if thresholds is not None:
-            try:
-                assert len(thresholds) == 3
-                self.gc_thresholds = (
-                    int(thresholds[0]), int(thresholds[1]), int(thresholds[2]),
-                )
-            except:
-                raise ConfigError(
-                    "Value of `gc_threshold` must be a list of three integers if set"
-                )
-        else:
-            self.gc_thresholds = None
+        self.gc_thresholds = read_gc_thresholds(config.get("gc_thresholds", None))
 
         bind_port = config.get("bind_port")
         if bind_port:
@@ -264,3 +252,20 @@ class ServerConfig(Config):
                                   type=int,
                                   help="Turn on the twisted telnet manhole"
                                   " service on the given port.")
+
+
+def read_gc_thresholds(thresholds):
+    """Reads the three integer thresholds for garbage collection. Ensures that
+    the thresholds are integers if thresholds are supplied.
+    """
+    if thresholds is None:
+        return None
+    try:
+        assert len(thresholds) == 3
+        return (
+            int(thresholds[0]), int(thresholds[1]), int(thresholds[2]),
+        )
+    except:
+        raise ConfigError(
+            "Value of `gc_threshold` must be a list of three integers if set"
+        )
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
new file mode 100644
index 0000000000..904789d155
--- /dev/null
+++ b/synapse/config/workers.py
@@ -0,0 +1,31 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import Config
+
+
+class WorkerConfig(Config):
+    """The workers are processes run separately to the main synapse process.
+    They have their own pid_file and listener configuration. They use the
+    replication_url to talk to the main synapse process."""
+
+    def read_config(self, config):
+        self.worker_app = config.get("worker_app")
+        self.worker_listeners = config.get("worker_listeners")
+        self.worker_daemonize = config.get("worker_daemonize")
+        self.worker_pid_file = config.get("worker_pid_file")
+        self.worker_log_file = config.get("worker_log_file")
+        self.worker_log_config = config.get("worker_log_config")
+        self.worker_replication_url = config.get("worker_replication_url")
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index a0b7cb7963..da2f5e8cfd 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -31,6 +31,9 @@ logger = logging.getLogger(__name__)
 
 
 class FederationBase(object):
+    def __init__(self, hs):
+        pass
+
     @defer.inlineCallbacks
     def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False,
                                        include_none=False):
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index d835c1b038..b06387051c 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -52,6 +52,8 @@ sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"])
 
 
 class FederationClient(FederationBase):
+    def __init__(self, hs):
+        super(FederationClient, self).__init__(hs)
 
     def start_get_pdu_cache(self):
         self._get_pdu_cache = ExpiringCache(
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index f1d231b9d8..2a589524a4 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -19,6 +19,7 @@ from twisted.internet import defer
 from .federation_base import FederationBase
 from .units import Transaction, Edu
 
+from synapse.util.async import Linearizer
 from synapse.util.logutils import log_function
 from synapse.events import FrozenEvent
 import synapse.metrics
@@ -44,6 +45,11 @@ received_queries_counter = metrics.register_counter("received_queries", labels=[
 
 
 class FederationServer(FederationBase):
+    def __init__(self, hs):
+        super(FederationServer, self).__init__(hs)
+
+        self._room_pdu_linearizer = Linearizer()
+
     def set_handler(self, handler):
         """Sets the handler that the replication layer will use to communicate
         receipt of new PDUs from other home servers. The required methods are
@@ -187,13 +193,16 @@ class FederationServer(FederationBase):
             )
 
             for event in auth_chain:
-                event.signatures.update(
-                    compute_event_signature(
-                        event,
-                        self.hs.hostname,
-                        self.hs.config.signing_key[0]
+                # We sign these again because there was a bug where we
+                # incorrectly signed things the first time round
+                if self.hs.is_mine_id(event.event_id):
+                    event.signatures.update(
+                        compute_event_signature(
+                            event,
+                            self.hs.hostname,
+                            self.hs.config.signing_key[0]
+                        )
                     )
-                )
         else:
             raise NotImplementedError("Specify an event")
 
@@ -377,10 +386,20 @@ class FederationServer(FederationBase):
     @log_function
     def on_get_missing_events(self, origin, room_id, earliest_events,
                               latest_events, limit, min_depth):
+        logger.info(
+            "on_get_missing_events: earliest_events: %r, latest_events: %r,"
+            " limit: %d, min_depth: %d",
+            earliest_events, latest_events, limit, min_depth
+        )
         missing_events = yield self.handler.on_get_missing_events(
             origin, room_id, earliest_events, latest_events, limit, min_depth
         )
 
+        if len(missing_events) < 5:
+            logger.info("Returning %d events: %r", len(missing_events), missing_events)
+        else:
+            logger.info("Returning %d events", len(missing_events))
+
         time_now = self._clock.time_msec()
 
         defer.returnValue({
@@ -481,42 +500,59 @@ class FederationServer(FederationBase):
                 pdu.internal_metadata.outlier = True
             elif min_depth and pdu.depth > min_depth:
                 if get_missing and prevs - seen:
-                    latest = yield self.store.get_latest_event_ids_in_room(
-                        pdu.room_id
-                    )
-
-                    # We add the prev events that we have seen to the latest
-                    # list to ensure the remote server doesn't give them to us
-                    latest = set(latest)
-                    latest |= seen
-
-                    missing_events = yield self.get_missing_events(
-                        origin,
-                        pdu.room_id,
-                        earliest_events_ids=list(latest),
-                        latest_events=[pdu],
-                        limit=10,
-                        min_depth=min_depth,
-                    )
-
-                    # We want to sort these by depth so we process them and
-                    # tell clients about them in order.
-                    missing_events.sort(key=lambda x: x.depth)
-
-                    for e in missing_events:
-                        yield self._handle_new_pdu(
-                            origin,
-                            e,
-                            get_missing=False
-                        )
-
-                    have_seen = yield self.store.have_events(
-                        [ev for ev, _ in pdu.prev_events]
-                    )
+                    # If we're missing stuff, ensure we only fetch stuff one
+                    # at a time.
+                    with (yield self._room_pdu_linearizer.queue(pdu.room_id)):
+                        # We recalculate seen, since it may have changed.
+                        have_seen = yield self.store.have_events(prevs)
+                        seen = set(have_seen.keys())
+
+                        if prevs - seen:
+                            latest = yield self.store.get_latest_event_ids_in_room(
+                                pdu.room_id
+                            )
+
+                            # We add the prev events that we have seen to the latest
+                            # list to ensure the remote server doesn't give them to us
+                            latest = set(latest)
+                            latest |= seen
+
+                            logger.info(
+                                "Missing %d events for room %r: %r...",
+                                len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
+                            )
+
+                            missing_events = yield self.get_missing_events(
+                                origin,
+                                pdu.room_id,
+                                earliest_events_ids=list(latest),
+                                latest_events=[pdu],
+                                limit=10,
+                                min_depth=min_depth,
+                            )
+
+                            # We want to sort these by depth so we process them and
+                            # tell clients about them in order.
+                            missing_events.sort(key=lambda x: x.depth)
+
+                            for e in missing_events:
+                                yield self._handle_new_pdu(
+                                    origin,
+                                    e,
+                                    get_missing=False
+                                )
+
+                            have_seen = yield self.store.have_events(
+                                [ev for ev, _ in pdu.prev_events]
+                            )
 
             prevs = {e_id for e_id, _ in pdu.prev_events}
             seen = set(have_seen.keys())
             if prevs - seen:
+                logger.info(
+                    "Still missing %d events for room %r: %r...",
+                    len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
+                )
                 fetch_state = True
 
         if fetch_state:
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index 3e062a5eab..ea66a5dcbc 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -72,5 +72,7 @@ class ReplicationLayer(FederationClient, FederationServer):
 
         self.hs = hs
 
+        super(ReplicationLayer, self).__init__(hs)
+
     def __str__(self):
         return "<ReplicationLayer(%s)>" % self.server_name
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index a1a334955f..8a1965f45a 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -37,7 +37,7 @@ class TransportLayerServer(JsonResource):
         self.hs = hs
         self.clock = hs.get_clock()
 
-        super(TransportLayerServer, self).__init__(hs)
+        super(TransportLayerServer, self).__init__(hs, canonical_json=False)
 
         self.authenticator = Authenticator(hs)
         self.ratelimiter = FederationRateLimiter(
@@ -528,15 +528,10 @@ class PublicRoomList(BaseFederationServlet):
     PATH = "/publicRooms"
 
     @defer.inlineCallbacks
-    def on_GET(self, request):
+    def on_GET(self, origin, content, query):
         data = yield self.room_list_handler.get_local_public_room_list()
         defer.returnValue((200, data))
 
-    # Avoid doing remote HS authorization checks which are done by default by
-    # BaseFederationServlet.
-    def _wrap(self, code):
-        return code
-
 
 SERVLET_CLASSES = (
     FederationSendServlet,
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 200793b5ed..b38f81e999 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -626,6 +626,6 @@ class AuthHandler(BaseHandler):
             Whether self.hash(password) == stored_hash (bool).
         """
         if stored_hash:
-            return bcrypt.hashpw(password, stored_hash) == stored_hash
+            return bcrypt.hashpw(password, stored_hash.encode('utf-8')) == stored_hash
         else:
             return False
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index ff83c608e7..6c0bc7eafa 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -345,19 +345,21 @@ class FederationHandler(BaseHandler):
         )
 
         missing_auth = required_auth - set(auth_events)
-        results = yield defer.gatherResults(
-            [
-                self.replication_layer.get_pdu(
-                    [dest],
-                    event_id,
-                    outlier=True,
-                    timeout=10000,
-                )
-                for event_id in missing_auth
-            ],
-            consumeErrors=True
-        ).addErrback(unwrapFirstError)
-        auth_events.update({a.event_id: a for a in results})
+        if missing_auth:
+            logger.info("Missing auth for backfill: %r", missing_auth)
+            results = yield defer.gatherResults(
+                [
+                    self.replication_layer.get_pdu(
+                        [dest],
+                        event_id,
+                        outlier=True,
+                        timeout=10000,
+                    )
+                    for event_id in missing_auth
+                ],
+                consumeErrors=True
+            ).addErrback(unwrapFirstError)
+            auth_events.update({a.event_id: a for a in results})
 
         ev_infos = []
         for a in auth_events.values():
@@ -399,7 +401,7 @@ class FederationHandler(BaseHandler):
             # previous to work out the state.
             # TODO: We can probably do something more clever here.
             yield self._handle_new_event(
-                dest, event
+                dest, event, backfilled=True,
             )
 
         defer.returnValue(events)
@@ -1016,13 +1018,16 @@ class FederationHandler(BaseHandler):
 
             res = results.values()
             for event in res:
-                event.signatures.update(
-                    compute_event_signature(
-                        event,
-                        self.hs.hostname,
-                        self.hs.config.signing_key[0]
+                # We sign these again because there was a bug where we
+                # incorrectly signed things the first time round
+                if self.hs.is_mine_id(event.event_id):
+                    event.signatures.update(
+                        compute_event_signature(
+                            event,
+                            self.hs.hostname,
+                            self.hs.config.signing_key[0]
+                        )
                     )
-                )
 
             defer.returnValue(res)
         else:
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index e37409170d..711a6a567f 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -36,13 +36,6 @@ class ProfileHandler(BaseHandler):
             "profile", self.on_profile_query
         )
 
-        distributor = hs.get_distributor()
-
-        distributor.observe("registered_user", self.registered_user)
-
-    def registered_user(self, user):
-        return self.store.create_profile(user.localpart)
-
     @defer.inlineCallbacks
     def get_displayname(self, target_user):
         if self.hs.is_mine(target_user):
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index bbc07b045e..0b7517221d 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -23,7 +23,6 @@ from synapse.api.errors import (
 from ._base import BaseHandler
 from synapse.util.async import run_on_reactor
 from synapse.http.client import CaptchaServerHttpClient
-from synapse.util.distributor import registered_user
 
 import logging
 import urllib
@@ -37,8 +36,6 @@ class RegistrationHandler(BaseHandler):
         super(RegistrationHandler, self).__init__(hs)
 
         self.auth = hs.get_auth()
-        self.distributor = hs.get_distributor()
-        self.distributor.declare("registered_user")
         self.captcha_client = CaptchaServerHttpClient(hs)
 
         self._next_generated_user_id = None
@@ -140,9 +137,11 @@ class RegistrationHandler(BaseHandler):
                 password_hash=password_hash,
                 was_guest=was_guest,
                 make_guest=make_guest,
+                create_profile_with_localpart=(
+                    # If the user was a guest then they already have a profile
+                    None if was_guest else user.localpart
+                ),
             )
-
-            yield registered_user(self.distributor, user)
         else:
             # autogen a sequential user ID
             attempts = 0
@@ -160,7 +159,8 @@ class RegistrationHandler(BaseHandler):
                         user_id=user_id,
                         token=token,
                         password_hash=password_hash,
-                        make_guest=make_guest
+                        make_guest=make_guest,
+                        create_profile_with_localpart=user.localpart,
                     )
                 except SynapseError:
                     # if user id is taken, just generate another
@@ -168,7 +168,6 @@ class RegistrationHandler(BaseHandler):
                     user_id = None
                     token = None
                     attempts += 1
-            yield registered_user(self.distributor, user)
 
         # We used to generate default identicons here, but nowadays
         # we want clients to generate their own as part of their branding
@@ -201,8 +200,8 @@ class RegistrationHandler(BaseHandler):
             token=token,
             password_hash="",
             appservice_id=service_id,
+            create_profile_with_localpart=user.localpart,
         )
-        yield registered_user(self.distributor, user)
         defer.returnValue((user_id, token))
 
     @defer.inlineCallbacks
@@ -248,9 +247,9 @@ class RegistrationHandler(BaseHandler):
             yield self.store.register(
                 user_id=user_id,
                 token=token,
-                password_hash=None
+                password_hash=None,
+                create_profile_with_localpart=user.localpart,
             )
-            yield registered_user(self.distributor, user)
         except Exception as e:
             yield self.store.add_access_token_to_user(user_id, token)
             # Ignore Registration errors
@@ -388,17 +387,16 @@ class RegistrationHandler(BaseHandler):
 
         user = UserID(localpart, self.hs.hostname)
         user_id = user.to_string()
-        auth_handler = self.hs.get_handlers().auth_handler
-        token = auth_handler.generate_short_term_login_token(user_id, duration_seconds)
+        token = self.auth_handler().generate_short_term_login_token(
+            user_id, duration_seconds)
 
         if need_register:
             yield self.store.register(
                 user_id=user_id,
                 token=token,
-                password_hash=None
+                password_hash=None,
+                create_profile_with_localpart=user.localpart,
             )
-
-            yield registered_user(self.distributor, user)
         else:
             yield self.store.user_delete_access_tokens(user_id=user_id)
             yield self.store.add_access_token_to_user(user_id=user_id, token=token)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 9fd34588dd..ae44c7a556 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -20,7 +20,7 @@ from ._base import BaseHandler
 
 from synapse.types import UserID, RoomAlias, RoomID, RoomStreamToken
 from synapse.api.constants import (
-    EventTypes, JoinRules, RoomCreationPreset,
+    EventTypes, JoinRules, RoomCreationPreset, Membership,
 )
 from synapse.api.errors import AuthError, StoreError, SynapseError
 from synapse.util import stringutils
@@ -367,14 +367,10 @@ class RoomListHandler(BaseHandler):
 
         @defer.inlineCallbacks
         def handle_room(room_id):
-            # We pull each bit of state out indvidually to avoid pulling the
-            # full state into memory. Due to how the caching works this should
-            # be fairly quick, even if not originally in the cache.
-            def get_state(etype, state_key):
-                return self.state_handler.get_current_state(room_id, etype, state_key)
+            current_state = yield self.state_handler.get_current_state(room_id)
 
             # Double check that this is actually a public room.
-            join_rules_event = yield get_state(EventTypes.JoinRules, "")
+            join_rules_event = current_state.get((EventTypes.JoinRules, ""))
             if join_rules_event:
                 join_rule = join_rules_event.content.get("join_rule", None)
                 if join_rule and join_rule != JoinRules.PUBLIC:
@@ -382,47 +378,51 @@ class RoomListHandler(BaseHandler):
 
             result = {"room_id": room_id}
 
-            joined_users = yield self.store.get_users_in_room(room_id)
-            if len(joined_users) == 0:
+            num_joined_users = len([
+                1 for _, event in current_state.items()
+                if event.type == EventTypes.Member
+                and event.membership == Membership.JOIN
+            ])
+            if num_joined_users == 0:
                 return
 
-            result["num_joined_members"] = len(joined_users)
+            result["num_joined_members"] = num_joined_users
 
             aliases = yield self.store.get_aliases_for_room(room_id)
             if aliases:
                 result["aliases"] = aliases
 
-            name_event = yield get_state(EventTypes.Name, "")
+            name_event = yield current_state.get((EventTypes.Name, ""))
             if name_event:
                 name = name_event.content.get("name", None)
                 if name:
                     result["name"] = name
 
-            topic_event = yield get_state(EventTypes.Topic, "")
+            topic_event = current_state.get((EventTypes.Topic, ""))
             if topic_event:
                 topic = topic_event.content.get("topic", None)
                 if topic:
                     result["topic"] = topic
 
-            canonical_event = yield get_state(EventTypes.CanonicalAlias, "")
+            canonical_event = current_state.get((EventTypes.CanonicalAlias, ""))
             if canonical_event:
                 canonical_alias = canonical_event.content.get("alias", None)
                 if canonical_alias:
                     result["canonical_alias"] = canonical_alias
 
-            visibility_event = yield get_state(EventTypes.RoomHistoryVisibility, "")
+            visibility_event = current_state.get((EventTypes.RoomHistoryVisibility, ""))
             visibility = None
             if visibility_event:
                 visibility = visibility_event.content.get("history_visibility", None)
             result["world_readable"] = visibility == "world_readable"
 
-            guest_event = yield get_state(EventTypes.GuestAccess, "")
+            guest_event = current_state.get((EventTypes.GuestAccess, ""))
             guest = None
             if guest_event:
                 guest = guest_event.content.get("guest_access", None)
             result["guest_can_join"] = guest == "can_join"
 
-            avatar_event = yield get_state("m.room.avatar", "")
+            avatar_event = current_state.get(("m.room.avatar", ""))
             if avatar_event:
                 avatar_url = avatar_event.content.get("url", None)
                 if avatar_url:
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 861b8f7989..5589296c09 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -221,6 +221,9 @@ class TypingHandler(object):
 
     def get_all_typing_updates(self, last_id, current_id):
         # TODO: Work out a way to do this without scanning the entire state.
+        if last_id == current_id:
+            return []
+
         rows = []
         for room_id, serial in self._room_serials.items():
             if last_id < serial and serial <= current_id:
diff --git a/synapse/http/client.py b/synapse/http/client.py
index c7fa692435..3ec9bc7faf 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -24,12 +24,13 @@ from synapse.http.endpoint import SpiderEndpoint
 
 from canonicaljson import encode_canonical_json
 
-from twisted.internet import defer, reactor, ssl, protocol
+from twisted.internet import defer, reactor, ssl, protocol, task
 from twisted.internet.endpoints import SSL4ClientEndpoint, TCP4ClientEndpoint
 from twisted.web.client import (
     BrowserLikeRedirectAgent, ContentDecoderAgent, GzipDecoder, Agent,
-    readBody, FileBodyProducer, PartialDownloadError,
+    readBody, PartialDownloadError,
 )
+from twisted.web.client import FileBodyProducer as TwistedFileBodyProducer
 from twisted.web.http import PotentialDataLoss
 from twisted.web.http_headers import Headers
 from twisted.web._newclient import ResponseDone
@@ -468,3 +469,26 @@ class InsecureInterceptableContextFactory(ssl.ContextFactory):
 
     def creatorForNetloc(self, hostname, port):
         return self
+
+
+class FileBodyProducer(TwistedFileBodyProducer):
+    """Workaround for https://twistedmatrix.com/trac/ticket/8473
+
+    We override the pauseProducing and resumeProducing methods in twisted's
+    FileBodyProducer so that they do not raise exceptions if the task has
+    already completed.
+    """
+
+    def pauseProducing(self):
+        try:
+            super(FileBodyProducer, self).pauseProducing()
+        except task.TaskDone:
+            # task has already completed
+            pass
+
+    def resumeProducing(self):
+        try:
+            super(FileBodyProducer, self).resumeProducing()
+        except task.NotPaused:
+            # task was not paused (probably because it had already completed)
+            pass
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index e5c3929cd7..1028731bc9 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -273,16 +273,16 @@ class Mailer(object):
 
         sender_state_event = room_state[("m.room.member", event.sender)]
         sender_name = name_from_member_event(sender_state_event)
-        sender_avatar_url = None
-        if "avatar_url" in sender_state_event.content:
-            sender_avatar_url = sender_state_event.content["avatar_url"]
+        sender_avatar_url = sender_state_event.content.get("avatar_url")
 
         # 'hash' for deterministically picking default images: use
         # sender_hash % the number of default images to choose from
         sender_hash = string_ordinal_total(event.sender)
 
+        msgtype = event.content.get("msgtype")
+
         ret = {
-            "msgtype": event.content["msgtype"],
+            "msgtype": msgtype,
             "is_historical": event.event_id != notif['event_id'],
             "id": event.event_id,
             "ts": event.origin_server_ts,
@@ -291,9 +291,9 @@ class Mailer(object):
             "sender_hash": sender_hash,
         }
 
-        if event.content["msgtype"] == "m.text":
+        if msgtype == "m.text":
             self.add_text_message_vars(ret, event)
-        elif event.content["msgtype"] == "m.image":
+        elif msgtype == "m.image":
             self.add_image_message_vars(ret, event)
 
         if "body" in event.content:
@@ -302,16 +302,17 @@ class Mailer(object):
         return ret
 
     def add_text_message_vars(self, messagevars, event):
-        if "format" in event.content:
-            msgformat = event.content["format"]
-        else:
-            msgformat = None
+        msgformat = event.content.get("format")
+
         messagevars["format"] = msgformat
 
-        if msgformat == "org.matrix.custom.html":
-            messagevars["body_text_html"] = safe_markup(event.content["formatted_body"])
-        else:
-            messagevars["body_text_html"] = safe_text(event.content["body"])
+        formatted_body = event.content.get("formatted_body")
+        body = event.content.get("body")
+
+        if msgformat == "org.matrix.custom.html" and formatted_body:
+            messagevars["body_text_html"] = safe_markup(formatted_body)
+        elif body:
+            messagevars["body_text_html"] = safe_text(body)
 
         return messagevars
 
diff --git a/synapse/rest/client/v1/events.py b/synapse/rest/client/v1/events.py
index d1afa0f0d5..498bb9e18a 100644
--- a/synapse/rest/client/v1/events.py
+++ b/synapse/rest/client/v1/events.py
@@ -45,30 +45,27 @@ class EventStreamRestServlet(ClientV1RestServlet):
                 raise SynapseError(400, "Guest users must specify room_id param")
         if "room_id" in request.args:
             room_id = request.args["room_id"][0]
-        try:
-            handler = self.handlers.event_stream_handler
-            pagin_config = PaginationConfig.from_request(request)
-            timeout = EventStreamRestServlet.DEFAULT_LONGPOLL_TIME_MS
-            if "timeout" in request.args:
-                try:
-                    timeout = int(request.args["timeout"][0])
-                except ValueError:
-                    raise SynapseError(400, "timeout must be in milliseconds.")
-
-            as_client_event = "raw" not in request.args
-
-            chunk = yield handler.get_stream(
-                requester.user.to_string(),
-                pagin_config,
-                timeout=timeout,
-                as_client_event=as_client_event,
-                affect_presence=(not is_guest),
-                room_id=room_id,
-                is_guest=is_guest,
-            )
-        except:
-            logger.exception("Event stream failed")
-            raise
+
+        handler = self.handlers.event_stream_handler
+        pagin_config = PaginationConfig.from_request(request)
+        timeout = EventStreamRestServlet.DEFAULT_LONGPOLL_TIME_MS
+        if "timeout" in request.args:
+            try:
+                timeout = int(request.args["timeout"][0])
+            except ValueError:
+                raise SynapseError(400, "timeout must be in milliseconds.")
+
+        as_client_event = "raw" not in request.args
+
+        chunk = yield handler.get_stream(
+            requester.user.to_string(),
+            pagin_config,
+            timeout=timeout,
+            as_client_event=as_client_event,
+            affect_presence=(not is_guest),
+            room_id=room_id,
+            is_guest=is_guest,
+        )
 
         defer.returnValue((200, chunk))
 
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index db52a1fc39..86fbe2747d 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -72,8 +72,6 @@ class RoomCreateRestServlet(ClientV1RestServlet):
 
     def get_room_config(self, request):
         user_supplied_config = parse_json_object_from_request(request)
-        # default visibility
-        user_supplied_config.setdefault("visibility", "public")
         return user_supplied_config
 
     def on_OPTIONS(self, request):
@@ -279,6 +277,13 @@ class PublicRoomListRestServlet(ClientV1RestServlet):
 
     @defer.inlineCallbacks
     def on_GET(self, request):
+        try:
+            yield self.auth.get_user_by_req(request)
+        except AuthError:
+            # This endpoint isn't authed, but its useful to know who's hitting
+            # it if they *do* supply an access token
+            pass
+
         handler = self.hs.get_room_list_handler()
         data = yield handler.get_aggregated_public_room_list()
 
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index d96bf9afe2..2468c3ac42 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -26,6 +26,7 @@ from .thumbnailer import Thumbnailer
 
 from synapse.http.matrixfederationclient import MatrixFederationHttpClient
 from synapse.util.stringutils import random_string
+from synapse.api.errors import SynapseError
 
 from twisted.internet import defer, threads
 
@@ -134,10 +135,15 @@ class MediaRepository(object):
                 request_path = "/".join((
                     "/_matrix/media/v1/download", server_name, media_id,
                 ))
-                length, headers = yield self.client.get_file(
-                    server_name, request_path, output_stream=f,
-                    max_size=self.max_upload_size,
-                )
+                try:
+                    length, headers = yield self.client.get_file(
+                        server_name, request_path, output_stream=f,
+                        max_size=self.max_upload_size,
+                    )
+                except Exception as e:
+                    logger.warn("Failed to fetch remoted media %r", e)
+                    raise SynapseError(502, "Failed to fetch remoted media")
+
             media_type = headers["Content-Type"][0]
             time_now_ms = self.clock.time_msec()
 
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index 37dd1de899..74c64f1371 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -252,7 +252,8 @@ class PreviewUrlResource(Resource):
 
         og = {}
         for tag in tree.xpath("//*/meta[starts-with(@property, 'og:')]"):
-            og[tag.attrib['property']] = tag.attrib['content']
+            if 'content' in tag.attrib:
+                og[tag.attrib['property']] = tag.attrib['content']
 
         # TODO: grab article: meta tags too, e.g.:
 
@@ -279,7 +280,7 @@ class PreviewUrlResource(Resource):
                 # TODO: consider inlined CSS styles as well as width & height attribs
                 images = tree.xpath("//img[@src][number(@width)>10][number(@height)>10]")
                 images = sorted(images, key=lambda i: (
-                    -1 * int(i.attrib['width']) * int(i.attrib['height'])
+                    -1 * float(i.attrib['width']) * float(i.attrib['height'])
                 ))
                 if not images:
                     images = tree.xpath("//img[@src]")
@@ -287,9 +288,9 @@ class PreviewUrlResource(Resource):
                     og['og:image'] = images[0].attrib['src']
 
         # pre-cache the image for posterity
-        # FIXME: it might be cleaner to use the same flow as the main /preview_url request
-        # itself and benefit from the same caching etc.  But for now we just rely on the
-        # caching on the master request to speed things up.
+        # FIXME: it might be cleaner to use the same flow as the main /preview_url
+        # request itself and benefit from the same caching etc.  But for now we
+        # just rely on the caching on the master request to speed things up.
         if 'og:image' in og and og['og:image']:
             image_info = yield self._download_url(
                 self._rebase_url(og['og:image'], media_info['uri']), requester.user
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index ec7e8d40d2..3fa226e92d 100644
--- a/synapse/storage/account_data.py
+++ b/synapse/storage/account_data.py
@@ -138,6 +138,9 @@ class AccountDataStore(SQLBaseStore):
             A deferred pair of lists of tuples of stream_id int, user_id string,
             room_id string, type string, and content string.
         """
+        if last_room_id == current_id and last_global_id == current_id:
+            return defer.succeed(([], []))
+
         def get_updated_account_data_txn(txn):
             sql = (
                 "SELECT stream_id, user_id, account_data_type, content"
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 3fab57a7e8..d03f7c541e 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -118,6 +118,9 @@ class PresenceStore(SQLBaseStore):
             )
 
     def get_all_presence_updates(self, last_id, current_id):
+        if last_id == current_id:
+            return defer.succeed([])
+
         def get_all_presence_updates_txn(txn):
             sql = (
                 "SELECT stream_id, user_id, state, last_active_ts,"
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index 786d6f6d67..8183b7f1b0 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -421,6 +421,9 @@ class PushRuleStore(SQLBaseStore):
 
     def get_all_push_rule_updates(self, last_id, current_id, limit):
         """Get all the push rules changes that have happend on the server"""
+        if last_id == current_id:
+            return defer.succeed([])
+
         def get_all_push_rule_updates_txn(txn):
             sql = (
                 "SELECT stream_id, event_stream_ordering, user_id, rule_id,"
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index bda84a744a..3de9e0f709 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -76,7 +76,8 @@ class RegistrationStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def register(self, user_id, token, password_hash,
-                 was_guest=False, make_guest=False, appservice_id=None):
+                 was_guest=False, make_guest=False, appservice_id=None,
+                 create_profile_with_localpart=None):
         """Attempts to register an account.
 
         Args:
@@ -88,6 +89,8 @@ class RegistrationStore(SQLBaseStore):
             make_guest (boolean): True if the the new user should be guest,
                 false to add a regular user account.
             appservice_id (str): The ID of the appservice registering the user.
+            create_profile_with_localpart (str): Optionally create a profile for
+                the given localpart.
         Raises:
             StoreError if the user_id could not be registered.
         """
@@ -99,7 +102,8 @@ class RegistrationStore(SQLBaseStore):
             password_hash,
             was_guest,
             make_guest,
-            appservice_id
+            appservice_id,
+            create_profile_with_localpart,
         )
         self.get_user_by_id.invalidate((user_id,))
         self.is_guest.invalidate((user_id,))
@@ -112,7 +116,8 @@ class RegistrationStore(SQLBaseStore):
         password_hash,
         was_guest,
         make_guest,
-        appservice_id
+        appservice_id,
+        create_profile_with_localpart,
     ):
         now = int(self.clock.time())
 
@@ -157,6 +162,12 @@ class RegistrationStore(SQLBaseStore):
                 (next_id, user_id, token,)
             )
 
+        if create_profile_with_localpart:
+            txn.execute(
+                "INSERT INTO profiles(user_id) VALUES (?)",
+                (create_profile_with_localpart,)
+            )
+
     @cached()
     def get_user_by_id(self, user_id):
         return self._simple_select_one(
diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py
index 9da23f34cb..5a2c1aa59b 100644
--- a/synapse/storage/tags.py
+++ b/synapse/storage/tags.py
@@ -68,6 +68,9 @@ class TagsStore(SQLBaseStore):
             A deferred list of tuples of stream_id int, user_id string,
             room_id string, tag string and content string.
         """
+        if last_id == current_id:
+            defer.returnValue([])
+
         def get_all_updated_tags_txn(txn):
             sql = (
                 "SELECT stream_id, user_id, room_id"
diff --git a/synapse/types.py b/synapse/types.py
index 7b6ae44bdd..f639651a73 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -22,7 +22,10 @@ Requester = namedtuple("Requester", ["user", "access_token_id", "is_guest"])
 
 
 def get_domain_from_id(string):
-    return string.split(":", 1)[1]
+    try:
+        return string.split(":", 1)[1]
+    except IndexError:
+        raise SynapseError(400, "Invalid ID: %r", string)
 
 
 class DomainSpecificString(
diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py
index d7cccc06b1..e68f94ce77 100644
--- a/synapse/util/distributor.py
+++ b/synapse/util/distributor.py
@@ -27,10 +27,6 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-def registered_user(distributor, user):
-    return distributor.fire("registered_user", user)
-
-
 def user_left_room(distributor, user, room_id):
     return preserve_context_over_fn(
         distributor.fire,