summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorOlivier Wilkinson (reivilibre) <olivier@librepush.net>2019-08-29 15:02:09 +0100
committerOlivier Wilkinson (reivilibre) <olivier@librepush.net>2019-08-29 15:02:09 +0100
commit60481031f2f1e43aaedc96a33441bfcf15262330 (patch)
treeb7f1e9e461e3034b1ca0fd75e6ea4ea185986cfb /synapse
parentCount total_events and total_event_bytes within the loop. (diff)
parentMerge branch 'develop' into rei/rss_target (diff)
downloadsynapse-60481031f2f1e43aaedc96a33441bfcf15262330.tar.xz
Merge branch 'rei/rss_target' into rei/rss_inc5 github/rei/rss_inc5 rei/rss_inc5
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/constants.py3
-rw-r--r--synapse/app/_base.py12
-rw-r--r--synapse/app/admin_cmd.py4
-rw-r--r--synapse/app/appservice.py4
-rw-r--r--synapse/app/client_reader.py4
-rw-r--r--synapse/app/event_creator.py4
-rw-r--r--synapse/app/federation_reader.py4
-rw-r--r--synapse/app/federation_sender.py4
-rw-r--r--synapse/app/frontend_proxy.py4
-rw-r--r--synapse/app/homeserver.py4
-rw-r--r--synapse/app/media_repository.py4
-rw-r--r--synapse/app/pusher.py4
-rw-r--r--synapse/app/synchrotron.py4
-rw-r--r--synapse/app/user_dir.py4
-rw-r--r--synapse/config/__init__.py7
-rw-r--r--synapse/config/_base.py37
-rw-r--r--synapse/config/database.py27
-rw-r--r--synapse/config/logger.py106
-rw-r--r--synapse/config/server.py84
-rw-r--r--synapse/config/tls.py50
-rw-r--r--synapse/federation/transport/client.py46
-rw-r--r--synapse/federation/transport/server.py36
-rw-r--r--synapse/handlers/admin.py9
-rw-r--r--synapse/handlers/federation.py5
-rw-r--r--synapse/handlers/identity.py13
-rw-r--r--synapse/handlers/message.py5
-rw-r--r--synapse/handlers/room_list.py29
-rw-r--r--synapse/handlers/room_member.py128
-rw-r--r--synapse/http/federation/matrix_federation_agent.py364
-rw-r--r--synapse/http/federation/srv_resolver.py61
-rw-r--r--synapse/logging/_structured.py374
-rw-r--r--synapse/logging/_terse_json.py278
-rw-r--r--synapse/logging/context.py14
-rw-r--r--synapse/python_dependencies.py6
-rw-r--r--synapse/rest/admin/__init__.py2
-rw-r--r--synapse/rest/admin/users.py40
-rw-r--r--synapse/storage/_base.py24
-rw-r--r--synapse/storage/registration.py1
-rw-r--r--synapse/util/hash.py33
39 files changed, 1472 insertions, 370 deletions
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 3ffde0d7fc..f29bce560c 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -122,7 +122,8 @@ class UserTypes(object):
     """
 
     SUPPORT = "support"
-    ALL_USER_TYPES = (SUPPORT,)
+    BOT = "bot"
+    ALL_USER_TYPES = (SUPPORT, BOT)
 
 
 class RelationTypes(object):
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 69dcf3523f..c30fdeee9a 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -36,18 +36,20 @@ from synapse.util.versionstring import get_version_string
 
 logger = logging.getLogger(__name__)
 
+# list of tuples of function, args list, kwargs dict
 _sighup_callbacks = []
 
 
-def register_sighup(func):
+def register_sighup(func, *args, **kwargs):
     """
     Register a function to be called when a SIGHUP occurs.
 
     Args:
         func (function): Function to be called when sent a SIGHUP signal.
-            Will be called with a single argument, the homeserver.
+            Will be called with a single default argument, the homeserver.
+        *args, **kwargs: args and kwargs to be passed to the target function.
     """
-    _sighup_callbacks.append(func)
+    _sighup_callbacks.append((func, args, kwargs))
 
 
 def start_worker_reactor(appname, config, run_command=reactor.run):
@@ -248,8 +250,8 @@ def start(hs, listeners=None):
                 # we're not using systemd.
                 sdnotify(b"RELOADING=1")
 
-                for i in _sighup_callbacks:
-                    i(hs)
+                for i, args, kwargs in _sighup_callbacks:
+                    i(hs, *args, **kwargs)
 
                 sdnotify(b"READY=1")
 
diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py
index 1fd52a5526..04751a6a5e 100644
--- a/synapse/app/admin_cmd.py
+++ b/synapse/app/admin_cmd.py
@@ -227,8 +227,6 @@ def start(config_options):
     config.start_pushers = False
     config.send_federation = False
 
-    setup_logging(config, use_worker_options=True)
-
     synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
     database_engine = create_engine(config.database_config)
@@ -241,6 +239,8 @@ def start(config_options):
         database_engine=database_engine,
     )
 
+    setup_logging(ss, config, use_worker_options=True)
+
     ss.setup()
 
     # We use task.react as the basic run command as it correctly handles tearing
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index 54bb114dec..767b87d2db 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -141,8 +141,6 @@ def start(config_options):
 
     assert config.worker_app == "synapse.app.appservice"
 
-    setup_logging(config, use_worker_options=True)
-
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
     database_engine = create_engine(config.database_config)
@@ -167,6 +165,8 @@ def start(config_options):
         database_engine=database_engine,
     )
 
+    setup_logging(ps, config, use_worker_options=True)
+
     ps.setup()
     reactor.addSystemEventTrigger(
         "before", "startup", _base.start, ps, config.worker_listeners
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 721bb5b119..86193d35a8 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -179,8 +179,6 @@ def start(config_options):
 
     assert config.worker_app == "synapse.app.client_reader"
 
-    setup_logging(config, use_worker_options=True)
-
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
     database_engine = create_engine(config.database_config)
@@ -193,6 +191,8 @@ def start(config_options):
         database_engine=database_engine,
     )
 
+    setup_logging(ss, config, use_worker_options=True)
+
     ss.setup()
     reactor.addSystemEventTrigger(
         "before", "startup", _base.start, ss, config.worker_listeners
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
index 473c8895d0..c67fe69a50 100644
--- a/synapse/app/event_creator.py
+++ b/synapse/app/event_creator.py
@@ -175,8 +175,6 @@ def start(config_options):
 
     assert config.worker_replication_http_port is not None
 
-    setup_logging(config, use_worker_options=True)
-
     # This should only be done on the user directory worker or the master
     config.update_user_directory = False
 
@@ -192,6 +190,8 @@ def start(config_options):
         database_engine=database_engine,
     )
 
+    setup_logging(ss, config, use_worker_options=True)
+
     ss.setup()
     reactor.addSystemEventTrigger(
         "before", "startup", _base.start, ss, config.worker_listeners
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index 5255d9e8cc..1ef027a88c 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -160,8 +160,6 @@ def start(config_options):
 
     assert config.worker_app == "synapse.app.federation_reader"
 
-    setup_logging(config, use_worker_options=True)
-
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
     database_engine = create_engine(config.database_config)
@@ -174,6 +172,8 @@ def start(config_options):
         database_engine=database_engine,
     )
 
+    setup_logging(ss, config, use_worker_options=True)
+
     ss.setup()
     reactor.addSystemEventTrigger(
         "before", "startup", _base.start, ss, config.worker_listeners
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index c5a2880e69..04fbb407af 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -171,8 +171,6 @@ def start(config_options):
 
     assert config.worker_app == "synapse.app.federation_sender"
 
-    setup_logging(config, use_worker_options=True)
-
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
     database_engine = create_engine(config.database_config)
@@ -197,6 +195,8 @@ def start(config_options):
         database_engine=database_engine,
     )
 
+    setup_logging(ss, config, use_worker_options=True)
+
     ss.setup()
     reactor.addSystemEventTrigger(
         "before", "startup", _base.start, ss, config.worker_listeners
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index e2822ca848..611d285421 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -232,8 +232,6 @@ def start(config_options):
 
     assert config.worker_main_http_uri is not None
 
-    setup_logging(config, use_worker_options=True)
-
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
     database_engine = create_engine(config.database_config)
@@ -246,6 +244,8 @@ def start(config_options):
         database_engine=database_engine,
     )
 
+    setup_logging(ss, config, use_worker_options=True)
+
     ss.setup()
     reactor.addSystemEventTrigger(
         "before", "startup", _base.start, ss, config.worker_listeners
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 8233905844..04f1ed14f3 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -341,8 +341,6 @@ def setup(config_options):
         # generating config files and shouldn't try to continue.
         sys.exit(0)
 
-    synapse.config.logger.setup_logging(config, use_worker_options=False)
-
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
     database_engine = create_engine(config.database_config)
@@ -356,6 +354,8 @@ def setup(config_options):
         database_engine=database_engine,
     )
 
+    synapse.config.logger.setup_logging(hs, config, use_worker_options=False)
+
     logger.info("Preparing database: %s...", config.database_config["name"])
 
     try:
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index 3a168577c7..2ac783ffa3 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -155,8 +155,6 @@ def start(config_options):
             "Please add ``enable_media_repo: false`` to the main config\n"
         )
 
-    setup_logging(config, use_worker_options=True)
-
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
     database_engine = create_engine(config.database_config)
@@ -169,6 +167,8 @@ def start(config_options):
         database_engine=database_engine,
     )
 
+    setup_logging(ss, config, use_worker_options=True)
+
     ss.setup()
     reactor.addSystemEventTrigger(
         "before", "startup", _base.start, ss, config.worker_listeners
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 692ffa2f04..d84732ee3c 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -184,8 +184,6 @@ def start(config_options):
 
     assert config.worker_app == "synapse.app.pusher"
 
-    setup_logging(config, use_worker_options=True)
-
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
     if config.start_pushers:
@@ -210,6 +208,8 @@ def start(config_options):
         database_engine=database_engine,
     )
 
+    setup_logging(ps, config, use_worker_options=True)
+
     ps.setup()
 
     def start():
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index a1c3b162f7..473026fce5 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -435,8 +435,6 @@ def start(config_options):
 
     assert config.worker_app == "synapse.app.synchrotron"
 
-    setup_logging(config, use_worker_options=True)
-
     synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
     database_engine = create_engine(config.database_config)
@@ -450,6 +448,8 @@ def start(config_options):
         application_service_handler=SynchrotronApplicationService(),
     )
 
+    setup_logging(ss, config, use_worker_options=True)
+
     ss.setup()
     reactor.addSystemEventTrigger(
         "before", "startup", _base.start, ss, config.worker_listeners
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index cb29a1afab..e01afb39f2 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -197,8 +197,6 @@ def start(config_options):
 
     assert config.worker_app == "synapse.app.user_dir"
 
-    setup_logging(config, use_worker_options=True)
-
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
     database_engine = create_engine(config.database_config)
@@ -223,6 +221,8 @@ def start(config_options):
         database_engine=database_engine,
     )
 
+    setup_logging(ss, config, use_worker_options=True)
+
     ss.setup()
     reactor.addSystemEventTrigger(
         "before", "startup", _base.start, ss, config.worker_listeners
diff --git a/synapse/config/__init__.py b/synapse/config/__init__.py
index f2a5a41e92..1e76e9559d 100644
--- a/synapse/config/__init__.py
+++ b/synapse/config/__init__.py
@@ -13,8 +13,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import ConfigError
+from ._base import ConfigError, find_config_files
 
-# export ConfigError if somebody does import *
+# export ConfigError and find_config_files if somebody does
+# import *
 # this is largely a fudge to stop PEP8 moaning about the import
-__all__ = ["ConfigError"]
+__all__ = ["ConfigError", "find_config_files"]
diff --git a/synapse/config/_base.py b/synapse/config/_base.py
index 6ce5cd07fb..31f6530978 100644
--- a/synapse/config/_base.py
+++ b/synapse/config/_base.py
@@ -181,6 +181,11 @@ class Config(object):
         generate_secrets=False,
         report_stats=None,
         open_private_ports=False,
+        listeners=None,
+        database_conf=None,
+        tls_certificate_path=None,
+        tls_private_key_path=None,
+        acme_domain=None,
     ):
         """Build a default configuration file
 
@@ -207,6 +212,33 @@ class Config(object):
             open_private_ports (bool): True to leave private ports (such as the non-TLS
                 HTTP listener) open to the internet.
 
+            listeners (list(dict)|None): A list of descriptions of the listeners
+                synapse should start with each of which specifies a port (str), a list of
+                resources (list(str)), tls (bool) and type (str). For example:
+                [{
+                    "port": 8448,
+                    "resources": [{"names": ["federation"]}],
+                    "tls": True,
+                    "type": "http",
+                },
+                {
+                    "port": 443,
+                    "resources": [{"names": ["client"]}],
+                    "tls": False,
+                    "type": "http",
+                }],
+
+
+            database (str|None): The database type to configure, either `psycog2`
+                or `sqlite3`.
+
+            tls_certificate_path (str|None): The path to the tls certificate.
+
+            tls_private_key_path (str|None): The path to the tls private key.
+
+            acme_domain (str|None): The domain acme will try to validate. If
+                specified acme will be enabled.
+
         Returns:
             str: the yaml config file
         """
@@ -220,6 +252,11 @@ class Config(object):
                 generate_secrets=generate_secrets,
                 report_stats=report_stats,
                 open_private_ports=open_private_ports,
+                listeners=listeners,
+                database_conf=database_conf,
+                tls_certificate_path=tls_certificate_path,
+                tls_private_key_path=tls_private_key_path,
+                acme_domain=acme_domain,
             )
         )
 
diff --git a/synapse/config/database.py b/synapse/config/database.py
index 746a6cd1f4..118aafbd4a 100644
--- a/synapse/config/database.py
+++ b/synapse/config/database.py
@@ -13,6 +13,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import os
+from textwrap import indent
+
+import yaml
 
 from ._base import Config
 
@@ -38,20 +41,28 @@ class DatabaseConfig(Config):
 
         self.set_databasepath(config.get("database_path"))
 
-    def generate_config_section(self, data_dir_path, **kwargs):
-        database_path = os.path.join(data_dir_path, "homeserver.db")
-        return (
-            """\
-        ## Database ##
-
-        database:
-          # The database engine name
+    def generate_config_section(self, data_dir_path, database_conf, **kwargs):
+        if not database_conf:
+            database_path = os.path.join(data_dir_path, "homeserver.db")
+            database_conf = (
+                """# The database engine name
           name: "sqlite3"
           # Arguments to pass to the engine
           args:
             # Path to the database
             database: "%(database_path)s"
+            """
+                % locals()
+            )
+        else:
+            database_conf = indent(yaml.dump(database_conf), " " * 10).lstrip()
+
+        return (
+            """\
+        ## Database ##
 
+        database:
+          %(database_conf)s
         # Number of events to cache in memory.
         #
         #event_cache_size: 10K
diff --git a/synapse/config/logger.py b/synapse/config/logger.py
index d321d00b80..2704c18720 100644
--- a/synapse/config/logger.py
+++ b/synapse/config/logger.py
@@ -25,6 +25,10 @@ from twisted.logger import STDLibLogObserver, globalLogBeginner
 
 import synapse
 from synapse.app import _base as appbase
+from synapse.logging._structured import (
+    reload_structured_logging,
+    setup_structured_logging,
+)
 from synapse.logging.context import LoggingContextFilter
 from synapse.util.versionstring import get_version_string
 
@@ -85,7 +89,8 @@ class LoggingConfig(Config):
             """\
         ## Logging ##
 
-        # A yaml python logging config file
+        # A yaml python logging config file as described by
+        # https://docs.python.org/3.7/library/logging.config.html#configuration-dictionary-schema
         #
         log_config: "%(log_config)s"
         """
@@ -119,21 +124,10 @@ class LoggingConfig(Config):
                 log_config_file.write(DEFAULT_LOG_CONFIG.substitute(log_file=log_file))
 
 
-def setup_logging(config, use_worker_options=False):
-    """ Set up python logging
-
-    Args:
-        config (LoggingConfig | synapse.config.workers.WorkerConfig):
-            configuration data
-
-        use_worker_options (bool): True to use the 'worker_log_config' option
-            instead of 'log_config'.
-
-        register_sighup (func | None): Function to call to register a
-            sighup handler.
+def _setup_stdlib_logging(config, log_config):
+    """
+    Set up Python stdlib logging.
     """
-    log_config = config.worker_log_config if use_worker_options else config.log_config
-
     if log_config is None:
         log_format = (
             "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s"
@@ -151,35 +145,10 @@ def setup_logging(config, use_worker_options=False):
         handler.addFilter(LoggingContextFilter(request=""))
         logger.addHandler(handler)
     else:
+        logging.config.dictConfig(log_config)
 
-        def load_log_config():
-            with open(log_config, "r") as f:
-                logging.config.dictConfig(yaml.safe_load(f))
-
-        def sighup(*args):
-            # it might be better to use a file watcher or something for this.
-            load_log_config()
-            logging.info("Reloaded log config from %s due to SIGHUP", log_config)
-
-        load_log_config()
-        appbase.register_sighup(sighup)
-
-    # make sure that the first thing we log is a thing we can grep backwards
-    # for
-    logging.warn("***** STARTING SERVER *****")
-    logging.warn("Server %s version %s", sys.argv[0], get_version_string(synapse))
-    logging.info("Server hostname: %s", config.server_name)
-
-    # It's critical to point twisted's internal logging somewhere, otherwise it
-    # stacks up and leaks kup to 64K object;
-    # see: https://twistedmatrix.com/trac/ticket/8164
-    #
-    # Routing to the python logging framework could be a performance problem if
-    # the handlers blocked for a long time as python.logging is a blocking API
-    # see https://twistedmatrix.com/documents/current/core/howto/logger.html
-    # filed as https://github.com/matrix-org/synapse/issues/1727
-    #
-    # However this may not be too much of a problem if we are just writing to a file.
+    # Route Twisted's native logging through to the standard library logging
+    # system.
     observer = STDLibLogObserver()
 
     def _log(event):
@@ -201,3 +170,54 @@ def setup_logging(config, use_worker_options=False):
     )
     if not config.no_redirect_stdio:
         print("Redirected stdout/stderr to logs")
+
+
+def _reload_stdlib_logging(*args, log_config=None):
+    logger = logging.getLogger("")
+
+    if not log_config:
+        logger.warn("Reloaded a blank config?")
+
+    logging.config.dictConfig(log_config)
+
+
+def setup_logging(hs, config, use_worker_options=False):
+    """
+    Set up the logging subsystem.
+
+    Args:
+        config (LoggingConfig | synapse.config.workers.WorkerConfig):
+            configuration data
+
+        use_worker_options (bool): True to use the 'worker_log_config' option
+            instead of 'log_config'.
+    """
+    log_config = config.worker_log_config if use_worker_options else config.log_config
+
+    def read_config(*args, callback=None):
+        if log_config is None:
+            return None
+
+        with open(log_config, "rb") as f:
+            log_config_body = yaml.safe_load(f.read())
+
+        if callback:
+            callback(log_config=log_config_body)
+            logging.info("Reloaded log config from %s due to SIGHUP", log_config)
+
+        return log_config_body
+
+    log_config_body = read_config()
+
+    if log_config_body and log_config_body.get("structured") is True:
+        setup_structured_logging(hs, config, log_config_body)
+        appbase.register_sighup(read_config, callback=reload_structured_logging)
+    else:
+        _setup_stdlib_logging(config, log_config_body)
+        appbase.register_sighup(read_config, callback=_reload_stdlib_logging)
+
+    # make sure that the first thing we log is a thing we can grep backwards
+    # for
+    logging.warn("***** STARTING SERVER *****")
+    logging.warn("Server %s version %s", sys.argv[0], get_version_string(synapse))
+    logging.info("Server hostname: %s", config.server_name)
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 15449695d1..2abdef0971 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -17,8 +17,11 @@
 
 import logging
 import os.path
+import re
+from textwrap import indent
 
 import attr
+import yaml
 from netaddr import IPSet
 
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
@@ -352,7 +355,7 @@ class ServerConfig(Config):
         return any(l["tls"] for l in self.listeners)
 
     def generate_config_section(
-        self, server_name, data_dir_path, open_private_ports, **kwargs
+        self, server_name, data_dir_path, open_private_ports, listeners, **kwargs
     ):
         _, bind_port = parse_and_validate_server_name(server_name)
         if bind_port is not None:
@@ -366,11 +369,68 @@ class ServerConfig(Config):
         # Bring DEFAULT_ROOM_VERSION into the local-scope for use in the
         # default config string
         default_room_version = DEFAULT_ROOM_VERSION
+        secure_listeners = []
+        unsecure_listeners = []
+        private_addresses = ["::1", "127.0.0.1"]
+        if listeners:
+            for listener in listeners:
+                if listener["tls"]:
+                    secure_listeners.append(listener)
+                else:
+                    # If we don't want open ports we need to bind the listeners
+                    # to some address other than 0.0.0.0. Here we chose to use
+                    # localhost.
+                    # If the addresses are already bound we won't overwrite them
+                    # however.
+                    if not open_private_ports:
+                        listener.setdefault("bind_addresses", private_addresses)
+
+                    unsecure_listeners.append(listener)
+
+            secure_http_bindings = indent(
+                yaml.dump(secure_listeners), " " * 10
+            ).lstrip()
+
+            unsecure_http_bindings = indent(
+                yaml.dump(unsecure_listeners), " " * 10
+            ).lstrip()
+
+        if not unsecure_listeners:
+            unsecure_http_bindings = (
+                """- port: %(unsecure_port)s
+            tls: false
+            type: http
+            x_forwarded: true"""
+                % locals()
+            )
+
+            if not open_private_ports:
+                unsecure_http_bindings += (
+                    "\n            bind_addresses: ['::1', '127.0.0.1']"
+                )
+
+            unsecure_http_bindings += """
+
+            resources:
+              - names: [client, federation]
+                compress: false"""
+
+            if listeners:
+                # comment out this block
+                unsecure_http_bindings = "#" + re.sub(
+                    "\n {10}",
+                    lambda match: match.group(0) + "#",
+                    unsecure_http_bindings,
+                )
 
-        unsecure_http_binding = "port: %i\n            tls: false" % (unsecure_port,)
-        if not open_private_ports:
-            unsecure_http_binding += (
-                "\n            bind_addresses: ['::1', '127.0.0.1']"
+        if not secure_listeners:
+            secure_http_bindings = (
+                """#- port: %(bind_port)s
+          #  type: http
+          #  tls: true
+          #  resources:
+          #    - names: [client, federation]"""
+                % locals()
             )
 
         return (
@@ -556,11 +616,7 @@ class ServerConfig(Config):
           # will also need to give Synapse a TLS key and certificate: see the TLS section
           # below.)
           #
-          #- port: %(bind_port)s
-          #  type: http
-          #  tls: true
-          #  resources:
-          #    - names: [client, federation]
+          %(secure_http_bindings)s
 
           # Unsecure HTTP listener: for when matrix traffic passes through a reverse proxy
           # that unwraps TLS.
@@ -568,13 +624,7 @@ class ServerConfig(Config):
           # If you plan to use a reverse proxy, please see
           # https://github.com/matrix-org/synapse/blob/master/docs/reverse_proxy.rst.
           #
-          - %(unsecure_http_binding)s
-            type: http
-            x_forwarded: true
-
-            resources:
-              - names: [client, federation]
-                compress: false
+          %(unsecure_http_bindings)s
 
             # example additional_resources:
             #
diff --git a/synapse/config/tls.py b/synapse/config/tls.py
index ca508a224f..c0148aa95c 100644
--- a/synapse/config/tls.py
+++ b/synapse/config/tls.py
@@ -239,12 +239,38 @@ class TlsConfig(Config):
                 self.tls_fingerprints.append({"sha256": sha256_fingerprint})
 
     def generate_config_section(
-        self, config_dir_path, server_name, data_dir_path, **kwargs
+        self,
+        config_dir_path,
+        server_name,
+        data_dir_path,
+        tls_certificate_path,
+        tls_private_key_path,
+        acme_domain,
+        **kwargs
     ):
+        """If the acme_domain is specified acme will be enabled.
+        If the TLS paths are not specified the default will be certs in the
+        config directory"""
+
         base_key_name = os.path.join(config_dir_path, server_name)
 
-        tls_certificate_path = base_key_name + ".tls.crt"
-        tls_private_key_path = base_key_name + ".tls.key"
+        if bool(tls_certificate_path) != bool(tls_private_key_path):
+            raise ConfigError(
+                "Please specify both a cert path and a key path or neither."
+            )
+
+        tls_enabled = (
+            "" if tls_certificate_path and tls_private_key_path or acme_domain else "#"
+        )
+
+        if not tls_certificate_path:
+            tls_certificate_path = base_key_name + ".tls.crt"
+        if not tls_private_key_path:
+            tls_private_key_path = base_key_name + ".tls.key"
+
+        acme_enabled = bool(acme_domain)
+        acme_domain = "matrix.example.com"
+
         default_acme_account_file = os.path.join(data_dir_path, "acme_account.key")
 
         # this is to avoid the max line length. Sorrynotsorry
@@ -269,11 +295,11 @@ class TlsConfig(Config):
         # instance, if using certbot, use `fullchain.pem` as your certificate,
         # not `cert.pem`).
         #
-        #tls_certificate_path: "%(tls_certificate_path)s"
+        %(tls_enabled)stls_certificate_path: "%(tls_certificate_path)s"
 
         # PEM-encoded private key for TLS
         #
-        #tls_private_key_path: "%(tls_private_key_path)s"
+        %(tls_enabled)stls_private_key_path: "%(tls_private_key_path)s"
 
         # Whether to verify TLS server certificates for outbound federation requests.
         #
@@ -340,10 +366,10 @@ class TlsConfig(Config):
         #    permission to listen on port 80.
         #
         acme:
-            # ACME support is disabled by default. Uncomment the following line
-            # (and tls_certificate_path and tls_private_key_path above) to enable it.
+            # ACME support is disabled by default. Set this to `true` and uncomment
+            # tls_certificate_path and tls_private_key_path above to enable it.
             #
-            #enabled: true
+            enabled: %(acme_enabled)s
 
             # Endpoint to use to request certificates. If you only want to test,
             # use Let's Encrypt's staging url:
@@ -354,17 +380,17 @@ class TlsConfig(Config):
             # Port number to listen on for the HTTP-01 challenge. Change this if
             # you are forwarding connections through Apache/Nginx/etc.
             #
-            #port: 80
+            port: 80
 
             # Local addresses to listen on for incoming connections.
             # Again, you may want to change this if you are forwarding connections
             # through Apache/Nginx/etc.
             #
-            #bind_addresses: ['::', '0.0.0.0']
+            bind_addresses: ['::', '0.0.0.0']
 
             # How many days remaining on a certificate before it is renewed.
             #
-            #reprovision_threshold: 30
+            reprovision_threshold: 30
 
             # The domain that the certificate should be for. Normally this
             # should be the same as your Matrix domain (i.e., 'server_name'), but,
@@ -378,7 +404,7 @@ class TlsConfig(Config):
             #
             # If not set, defaults to your 'server_name'.
             #
-            #domain: matrix.example.com
+            domain: %(acme_domain)s
 
             # file to use for the account key. This will be generated if it doesn't
             # exist.
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 0cea0d2a10..482a101c09 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -327,21 +327,37 @@ class TransportLayerClient(object):
         include_all_networks=False,
         third_party_instance_id=None,
     ):
-        path = _create_v1_path("/publicRooms")
-
-        args = {"include_all_networks": "true" if include_all_networks else "false"}
-        if third_party_instance_id:
-            args["third_party_instance_id"] = (third_party_instance_id,)
-        if limit:
-            args["limit"] = [str(limit)]
-        if since_token:
-            args["since"] = [since_token]
-
-        # TODO(erikj): Actually send the search_filter across federation.
-
-        response = yield self.client.get_json(
-            destination=remote_server, path=path, args=args, ignore_backoff=True
-        )
+        if search_filter:
+            # this uses MSC2197 (Search Filtering over Federation)
+            path = _create_v1_path("/publicRooms")
+
+            data = {"include_all_networks": "true" if include_all_networks else "false"}
+            if third_party_instance_id:
+                data["third_party_instance_id"] = third_party_instance_id
+            if limit:
+                data["limit"] = str(limit)
+            if since_token:
+                data["since"] = since_token
+
+            data["filter"] = search_filter
+
+            response = yield self.client.post_json(
+                destination=remote_server, path=path, data=data, ignore_backoff=True
+            )
+        else:
+            path = _create_v1_path("/publicRooms")
+
+            args = {"include_all_networks": "true" if include_all_networks else "false"}
+            if third_party_instance_id:
+                args["third_party_instance_id"] = (third_party_instance_id,)
+            if limit:
+                args["limit"] = [str(limit)]
+            if since_token:
+                args["since"] = [since_token]
+
+            response = yield self.client.get_json(
+                destination=remote_server, path=path, args=args, ignore_backoff=True
+            )
 
         return response
 
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index dc53b4b170..f9930b6460 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -770,6 +770,42 @@ class PublicRoomList(BaseFederationServlet):
         )
         return 200, data
 
+    async def on_POST(self, origin, content, query):
+        # This implements MSC2197 (Search Filtering over Federation)
+        if not self.allow_access:
+            raise FederationDeniedError(origin)
+
+        limit = int(content.get("limit", 100))
+        since_token = content.get("since", None)
+        search_filter = content.get("filter", None)
+
+        include_all_networks = content.get("include_all_networks", False)
+        third_party_instance_id = content.get("third_party_instance_id", None)
+
+        if include_all_networks:
+            network_tuple = None
+            if third_party_instance_id is not None:
+                raise SynapseError(
+                    400, "Can't use include_all_networks with an explicit network"
+                )
+        elif third_party_instance_id is None:
+            network_tuple = ThirdPartyInstanceID(None, None)
+        else:
+            network_tuple = ThirdPartyInstanceID.from_string(third_party_instance_id)
+
+        if search_filter is None:
+            logger.warning("Nonefilter")
+
+        data = await self.handler.get_local_public_room_list(
+            limit=limit,
+            since_token=since_token,
+            search_filter=search_filter,
+            network_tuple=network_tuple,
+            from_federation=True,
+        )
+
+        return 200, data
+
 
 class FederationVersionServlet(BaseFederationServlet):
     PATH = "/version"
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index d30a68b650..1a87b58838 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -94,6 +94,15 @@ class AdminHandler(BaseHandler):
 
         return ret
 
+    def get_user_server_admin(self, user):
+        """
+        Get the admin bit on a user.
+
+        Args:
+            user_id (UserID): the (necessarily local) user to manipulate
+        """
+        return self.store.is_server_admin(user)
+
     def set_user_server_admin(self, user, admin):
         """
         Set the admin bit on a user.
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index c86903b98b..94306c94a9 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -326,8 +326,9 @@ class FederationHandler(BaseHandler):
                     ours = yield self.store.get_state_groups_ids(room_id, seen)
 
                     # state_maps is a list of mappings from (type, state_key) to event_id
-                    # type: list[dict[tuple[str, str], str]]
-                    state_maps = list(ours.values())
+                    state_maps = list(
+                        ours.values()
+                    )  # type: list[dict[tuple[str, str], str]]
 
                     # we don't need this any more, let's delete it.
                     del ours
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index d199521b58..97daca5fee 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -282,3 +282,16 @@ class IdentityHandler(BaseHandler):
         except HttpResponseException as e:
             logger.info("Proxied requestToken failed: %r", e)
             raise e.to_synapse_error()
+
+
+class LookupAlgorithm:
+    """
+    Supported hashing algorithms when performing a 3PID lookup.
+
+    SHA256 - Hashing an (address, medium, pepper) combo with sha256, then url-safe base64
+        encoding
+    NONE - Not performing any hashing. Simply sending an (address, medium) combo in plaintext
+    """
+
+    SHA256 = "sha256"
+    NONE = "none"
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index a5e23c4caf..111f7c7e2f 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -24,7 +24,7 @@ from twisted.internet import defer
 from twisted.internet.defer import succeed
 
 from synapse import event_auth
-from synapse.api.constants import EventTypes, Membership, RelationTypes
+from synapse.api.constants import EventTypes, Membership, RelationTypes, UserTypes
 from synapse.api.errors import (
     AuthError,
     Codes,
@@ -469,6 +469,9 @@ class EventCreationHandler(object):
 
         u = yield self.store.get_user_by_id(user_id)
         assert u is not None
+        if u["user_type"] in (UserTypes.SUPPORT, UserTypes.BOT):
+            # support and bot users are not required to consent
+            return
         if u["appservice_id"] is not None:
             # users registered by an appservice are exempt
             return
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index e9094ad02b..a7e55f00e5 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -25,6 +25,7 @@ from unpaddedbase64 import decode_base64, encode_base64
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, JoinRules
+from synapse.api.errors import Codes, HttpResponseException
 from synapse.types import ThirdPartyInstanceID
 from synapse.util.async_helpers import concurrently_execute
 from synapse.util.caches.descriptors import cachedInlineCallbacks
@@ -485,7 +486,33 @@ class RoomListHandler(BaseHandler):
             return {"chunk": [], "total_room_count_estimate": 0}
 
         if search_filter:
-            # We currently don't support searching across federation, so we have
+            # Searching across federation is defined in MSC2197.
+            # However, the remote homeserver may or may not actually support it.
+            # So we first try an MSC2197 remote-filtered search, then fall back
+            # to a locally-filtered search if we must.
+
+            try:
+                res = yield self._get_remote_list_cached(
+                    server_name,
+                    limit=limit,
+                    since_token=since_token,
+                    include_all_networks=include_all_networks,
+                    third_party_instance_id=third_party_instance_id,
+                    search_filter=search_filter,
+                )
+                return res
+            except HttpResponseException as hre:
+                syn_err = hre.to_synapse_error()
+                if hre.code in (404, 405) or syn_err.errcode in (
+                    Codes.UNRECOGNIZED,
+                    Codes.NOT_FOUND,
+                ):
+                    logger.debug("Falling back to locally-filtered /publicRooms")
+                else:
+                    raise  # Not an error that should trigger a fallback.
+
+            # if we reach this point, then we fall back to the situation where
+            # we currently don't support searching across federation, so we have
             # to do it manually without pagination
             limit = None
             since_token = None
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 249a6d9c5d..4605cb9c0b 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -29,9 +29,11 @@ from twisted.internet import defer
 from synapse import types
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError, Codes, HttpResponseException, SynapseError
+from synapse.handlers.identity import LookupAlgorithm
 from synapse.types import RoomID, UserID
 from synapse.util.async_helpers import Linearizer
 from synapse.util.distributor import user_joined_room, user_left_room
+from synapse.util.hash import sha256_and_url_safe_base64
 
 from ._base import BaseHandler
 
@@ -523,7 +525,7 @@ class RoomMemberHandler(object):
             event (SynapseEvent): The membership event.
             context: The context of the event.
             is_guest (bool): Whether the sender is a guest.
-            room_hosts ([str]): Homeservers which are likely to already be in
+            remote_room_hosts (list[str]|None): Homeservers which are likely to already be in
                 the room, and could be danced with in order to join this
                 homeserver for the first time.
             ratelimit (bool): Whether to rate limit this request.
@@ -634,7 +636,7 @@ class RoomMemberHandler(object):
             servers.remove(room_alias.domain)
         servers.insert(0, room_alias.domain)
 
-        return (RoomID.from_string(room_id), servers)
+        return RoomID.from_string(room_id), servers
 
     @defer.inlineCallbacks
     def _get_inviter(self, user_id, room_id):
@@ -697,6 +699,44 @@ class RoomMemberHandler(object):
             raise SynapseError(
                 403, "Looking up third-party identifiers is denied from this server"
             )
+
+        # Check what hashing details are supported by this identity server
+        use_v1 = False
+        hash_details = None
+        try:
+            hash_details = yield self.simple_http_client.get_json(
+                "%s%s/_matrix/identity/v2/hash_details" % (id_server_scheme, id_server)
+            )
+        except (HttpResponseException, ValueError) as e:
+            # Catch HttpResponseExcept for a non-200 response code
+            # Catch ValueError for non-JSON response body
+
+            # Check if this identity server does not know about v2 lookups
+            if e.code == 404:
+                # This is an old identity server that does not yet support v2 lookups
+                use_v1 = True
+            else:
+                logger.warn("Error when looking up hashing details: %s" % (e,))
+                return None
+
+        if use_v1:
+            return (yield self._lookup_3pid_v1(id_server, medium, address))
+
+        return (yield self._lookup_3pid_v2(id_server, medium, address, hash_details))
+
+    @defer.inlineCallbacks
+    def _lookup_3pid_v1(self, id_server, medium, address):
+        """Looks up a 3pid in the passed identity server using v1 lookup.
+
+        Args:
+            id_server (str): The server name (including port, if required)
+                of the identity server to use.
+            medium (str): The type of the third party identifier (e.g. "email").
+            address (str): The third party identifier (e.g. "foo@example.com").
+
+        Returns:
+            str: the matrix ID of the 3pid, or None if it is not recognized.
+        """
         try:
             data = yield self.simple_http_client.get_json(
                 "%s%s/_matrix/identity/api/v1/lookup" % (id_server_scheme, id_server),
@@ -711,8 +751,83 @@ class RoomMemberHandler(object):
 
         except IOError as e:
             logger.warn("Error from identity server lookup: %s" % (e,))
+
+        return None
+
+    @defer.inlineCallbacks
+    def _lookup_3pid_v2(self, id_server, medium, address, hash_details):
+        """Looks up a 3pid in the passed identity server using v2 lookup.
+
+        Args:
+            id_server (str): The server name (including port, if required)
+                of the identity server to use.
+            medium (str): The type of the third party identifier (e.g. "email").
+            address (str): The third party identifier (e.g. "foo@example.com").
+            hash_details (dict[str, str|list]): A dictionary containing hashing information
+                provided by an identity server.
+
+        Returns:
+            Deferred[str|None]: the matrix ID of the 3pid, or None if it is not recognised.
+        """
+        # Extract information from hash_details
+        supported_lookup_algorithms = hash_details["algorithms"]
+        lookup_pepper = hash_details["lookup_pepper"]
+
+        # Check if any of the supported lookup algorithms are present
+        if LookupAlgorithm.SHA256 in supported_lookup_algorithms:
+            # Perform a hashed lookup
+            lookup_algorithm = LookupAlgorithm.SHA256
+
+            # Hash address, medium and the pepper with sha256
+            to_hash = "%s %s %s" % (address, medium, lookup_pepper)
+            lookup_value = sha256_and_url_safe_base64(to_hash)
+
+        elif LookupAlgorithm.NONE in supported_lookup_algorithms:
+            # Perform a non-hashed lookup
+            lookup_algorithm = LookupAlgorithm.NONE
+
+            # Combine together plaintext address and medium
+            lookup_value = "%s %s" % (address, medium)
+
+        else:
+            logger.warn(
+                "None of the provided lookup algorithms of %s%s are supported: %s",
+                id_server_scheme,
+                id_server,
+                hash_details["algorithms"],
+            )
+            raise SynapseError(
+                400,
+                "Provided identity server does not support any v2 lookup "
+                "algorithms that this homeserver supports.",
+            )
+
+        try:
+            lookup_results = yield self.simple_http_client.post_json_get_json(
+                "%s%s/_matrix/identity/v2/lookup" % (id_server_scheme, id_server),
+                {
+                    "addresses": [lookup_value],
+                    "algorithm": lookup_algorithm,
+                    "pepper": lookup_pepper,
+                },
+            )
+        except (HttpResponseException, ValueError) as e:
+            # Catch HttpResponseExcept for a non-200 response code
+            # Catch ValueError for non-JSON response body
+            logger.warn("Error when performing a 3pid lookup: %s" % (e,))
+            return None
+
+        # Check for a mapping from what we looked up to an MXID
+        if "mappings" not in lookup_results or not isinstance(
+            lookup_results["mappings"], dict
+        ):
+            logger.debug("No results from 3pid lookup")
             return None
 
+        # Return the MXID if it's available, or None otherwise
+        mxid = lookup_results["mappings"].get(lookup_value)
+        return mxid
+
     @defer.inlineCallbacks
     def _verify_any_signature(self, data, server_hostname):
         if server_hostname not in data["signatures"]:
@@ -962,9 +1077,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
         )
 
         if complexity:
-            if complexity["v1"] > max_complexity:
-                return True
-            return False
+            return complexity["v1"] > max_complexity
         return None
 
     @defer.inlineCallbacks
@@ -980,10 +1093,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
         max_complexity = self.hs.config.limit_remote_rooms.complexity
         complexity = yield self.store.get_room_complexity(room_id)
 
-        if complexity["v1"] > max_complexity:
-            return True
-
-        return False
+        return complexity["v1"] > max_complexity
 
     @defer.inlineCallbacks
     def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py
index 64f62aaeec..feae7de5be 100644
--- a/synapse/http/federation/matrix_federation_agent.py
+++ b/synapse/http/federation/matrix_federation_agent.py
@@ -14,21 +14,21 @@
 # limitations under the License.
 
 import logging
+import urllib
 
-import attr
-from netaddr import IPAddress
+from netaddr import AddrFormatError, IPAddress
 from zope.interface import implementer
 
 from twisted.internet import defer
 from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
 from twisted.internet.interfaces import IStreamClientEndpoint
-from twisted.web.client import URI, Agent, HTTPConnectionPool
+from twisted.web.client import Agent, HTTPConnectionPool
 from twisted.web.http_headers import Headers
-from twisted.web.iweb import IAgent
+from twisted.web.iweb import IAgent, IAgentEndpointFactory
 
-from synapse.http.federation.srv_resolver import SrvResolver, pick_server_from_list
+from synapse.http.federation.srv_resolver import Server, SrvResolver
 from synapse.http.federation.well_known_resolver import WellKnownResolver
-from synapse.logging.context import make_deferred_yieldable
+from synapse.logging.context import make_deferred_yieldable, run_in_background
 from synapse.util import Clock
 
 logger = logging.getLogger(__name__)
@@ -36,8 +36,9 @@ logger = logging.getLogger(__name__)
 
 @implementer(IAgent)
 class MatrixFederationAgent(object):
-    """An Agent-like thing which provides a `request` method which will look up a matrix
-    server and send an HTTP request to it.
+    """An Agent-like thing which provides a `request` method which correctly
+    handles resolving matrix server names when using matrix://. Handles standard
+    https URIs as normal.
 
     Doesn't implement any retries. (Those are done in MatrixFederationHttpClient.)
 
@@ -65,17 +66,19 @@ class MatrixFederationAgent(object):
     ):
         self._reactor = reactor
         self._clock = Clock(reactor)
-
-        self._tls_client_options_factory = tls_client_options_factory
-        if _srv_resolver is None:
-            _srv_resolver = SrvResolver()
-        self._srv_resolver = _srv_resolver
-
         self._pool = HTTPConnectionPool(reactor)
         self._pool.retryAutomatically = False
         self._pool.maxPersistentPerHost = 5
         self._pool.cachedConnectionTimeout = 2 * 60
 
+        self._agent = Agent.usingEndpointFactory(
+            self._reactor,
+            MatrixHostnameEndpointFactory(
+                reactor, tls_client_options_factory, _srv_resolver
+            ),
+            pool=self._pool,
+        )
+
         if _well_known_resolver is None:
             _well_known_resolver = WellKnownResolver(
                 self._reactor,
@@ -93,19 +96,15 @@ class MatrixFederationAgent(object):
         """
         Args:
             method (bytes): HTTP method: GET/POST/etc
-
             uri (bytes): Absolute URI to be retrieved
-
             headers (twisted.web.http_headers.Headers|None):
                 HTTP headers to send with the request, or None to
                 send no extra headers.
-
             bodyProducer (twisted.web.iweb.IBodyProducer|None):
                 An object which can generate bytes to make up the
                 body of this request (for example, the properly encoded contents of
                 a file for a file upload).  Or None if the request is to have
                 no body.
-
         Returns:
             Deferred[twisted.web.iweb.IResponse]:
                 fires when the header of the response has been received (regardless of the
@@ -113,210 +112,207 @@ class MatrixFederationAgent(object):
                 response from being received (including problems that prevent the request
                 from being sent).
         """
-        parsed_uri = URI.fromBytes(uri, defaultPort=-1)
-        res = yield self._route_matrix_uri(parsed_uri)
+        # We use urlparse as that will set `port` to None if there is no
+        # explicit port.
+        parsed_uri = urllib.parse.urlparse(uri)
 
-        # set up the TLS connection params
+        # If this is a matrix:// URI check if the server has delegated matrix
+        # traffic using well-known delegation.
         #
-        # XXX disabling TLS is really only supported here for the benefit of the
-        # unit tests. We should make the UTs cope with TLS rather than having to make
-        # the code support the unit tests.
-        if self._tls_client_options_factory is None:
-            tls_options = None
-        else:
-            tls_options = self._tls_client_options_factory.get_options(
-                res.tls_server_name.decode("ascii")
+        # We have to do this here and not in the endpoint as we need to rewrite
+        # the host header with the delegated server name.
+        delegated_server = None
+        if (
+            parsed_uri.scheme == b"matrix"
+            and not _is_ip_literal(parsed_uri.hostname)
+            and not parsed_uri.port
+        ):
+            well_known_result = yield self._well_known_resolver.get_well_known(
+                parsed_uri.hostname
             )
+            delegated_server = well_known_result.delegated_server
+
+        if delegated_server:
+            # Ok, the server has delegated matrix traffic to somewhere else, so
+            # lets rewrite the URL to replace the server with the delegated
+            # server name.
+            uri = urllib.parse.urlunparse(
+                (
+                    parsed_uri.scheme,
+                    delegated_server,
+                    parsed_uri.path,
+                    parsed_uri.params,
+                    parsed_uri.query,
+                    parsed_uri.fragment,
+                )
+            )
+            parsed_uri = urllib.parse.urlparse(uri)
 
-        # make sure that the Host header is set correctly
+        # We need to make sure the host header is set to the netloc of the
+        # server.
         if headers is None:
             headers = Headers()
         else:
             headers = headers.copy()
 
         if not headers.hasHeader(b"host"):
-            headers.addRawHeader(b"host", res.host_header)
+            headers.addRawHeader(b"host", parsed_uri.netloc)
 
-        class EndpointFactory(object):
-            @staticmethod
-            def endpointForURI(_uri):
-                ep = LoggingHostnameEndpoint(
-                    self._reactor, res.target_host, res.target_port
-                )
-                if tls_options is not None:
-                    ep = wrapClientTLS(tls_options, ep)
-                return ep
-
-        agent = Agent.usingEndpointFactory(self._reactor, EndpointFactory(), self._pool)
         res = yield make_deferred_yieldable(
-            agent.request(method, uri, headers, bodyProducer)
+            self._agent.request(method, uri, headers, bodyProducer)
         )
+
         return res
 
-    @defer.inlineCallbacks
-    def _route_matrix_uri(self, parsed_uri, lookup_well_known=True):
-        """Helper for `request`: determine the routing for a Matrix URI
 
-        Args:
-            parsed_uri (twisted.web.client.URI): uri to route. Note that it should be
-                parsed with URI.fromBytes(uri, defaultPort=-1) to set the `port` to -1
-                if there is no explicit port given.
+@implementer(IAgentEndpointFactory)
+class MatrixHostnameEndpointFactory(object):
+    """Factory for MatrixHostnameEndpoint for parsing to an Agent.
+    """
 
-            lookup_well_known (bool): True if we should look up the .well-known file if
-                there is no SRV record.
+    def __init__(self, reactor, tls_client_options_factory, srv_resolver):
+        self._reactor = reactor
+        self._tls_client_options_factory = tls_client_options_factory
 
-        Returns:
-            Deferred[_RoutingResult]
-        """
-        # check for an IP literal
-        try:
-            ip_address = IPAddress(parsed_uri.host.decode("ascii"))
-        except Exception:
-            # not an IP address
-            ip_address = None
-
-        if ip_address:
-            port = parsed_uri.port
-            if port == -1:
-                port = 8448
-            return _RoutingResult(
-                host_header=parsed_uri.netloc,
-                tls_server_name=parsed_uri.host,
-                target_host=parsed_uri.host,
-                target_port=port,
-            )
+        if srv_resolver is None:
+            srv_resolver = SrvResolver()
 
-        if parsed_uri.port != -1:
-            # there is an explicit port
-            return _RoutingResult(
-                host_header=parsed_uri.netloc,
-                tls_server_name=parsed_uri.host,
-                target_host=parsed_uri.host,
-                target_port=parsed_uri.port,
-            )
+        self._srv_resolver = srv_resolver
 
-        if lookup_well_known:
-            # try a .well-known lookup
-            well_known_result = yield self._well_known_resolver.get_well_known(
-                parsed_uri.host
-            )
-            well_known_server = well_known_result.delegated_server
-
-            if well_known_server:
-                # if we found a .well-known, start again, but don't do another
-                # .well-known lookup.
-
-                # parse the server name in the .well-known response into host/port.
-                # (This code is lifted from twisted.web.client.URI.fromBytes).
-                if b":" in well_known_server:
-                    well_known_host, well_known_port = well_known_server.rsplit(b":", 1)
-                    try:
-                        well_known_port = int(well_known_port)
-                    except ValueError:
-                        # the part after the colon could not be parsed as an int
-                        # - we assume it is an IPv6 literal with no port (the closing
-                        # ']' stops it being parsed as an int)
-                        well_known_host, well_known_port = well_known_server, -1
-                else:
-                    well_known_host, well_known_port = well_known_server, -1
-
-                new_uri = URI(
-                    scheme=parsed_uri.scheme,
-                    netloc=well_known_server,
-                    host=well_known_host,
-                    port=well_known_port,
-                    path=parsed_uri.path,
-                    params=parsed_uri.params,
-                    query=parsed_uri.query,
-                    fragment=parsed_uri.fragment,
-                )
+    def endpointForURI(self, parsed_uri):
+        return MatrixHostnameEndpoint(
+            self._reactor,
+            self._tls_client_options_factory,
+            self._srv_resolver,
+            parsed_uri,
+        )
 
-                res = yield self._route_matrix_uri(new_uri, lookup_well_known=False)
-                return res
-
-        # try a SRV lookup
-        service_name = b"_matrix._tcp.%s" % (parsed_uri.host,)
-        server_list = yield self._srv_resolver.resolve_service(service_name)
-
-        if not server_list:
-            target_host = parsed_uri.host
-            port = 8448
-            logger.debug(
-                "No SRV record for %s, using %s:%i",
-                parsed_uri.host.decode("ascii"),
-                target_host.decode("ascii"),
-                port,
-            )
+
+@implementer(IStreamClientEndpoint)
+class MatrixHostnameEndpoint(object):
+    """An endpoint that resolves matrix:// URLs using Matrix server name
+    resolution (i.e. via SRV). Does not check for well-known delegation.
+
+    Args:
+        reactor (IReactor)
+        tls_client_options_factory (ClientTLSOptionsFactory|None):
+            factory to use for fetching client tls options, or none to disable TLS.
+        srv_resolver (SrvResolver): The SRV resolver to use
+        parsed_uri (twisted.web.client.URI): The parsed URI that we're wanting
+            to connect to.
+    """
+
+    def __init__(self, reactor, tls_client_options_factory, srv_resolver, parsed_uri):
+        self._reactor = reactor
+
+        self._parsed_uri = parsed_uri
+
+        # set up the TLS connection params
+        #
+        # XXX disabling TLS is really only supported here for the benefit of the
+        # unit tests. We should make the UTs cope with TLS rather than having to make
+        # the code support the unit tests.
+
+        if tls_client_options_factory is None:
+            self._tls_options = None
         else:
-            target_host, port = pick_server_from_list(server_list)
-            logger.debug(
-                "Picked %s:%i from SRV records for %s",
-                target_host.decode("ascii"),
-                port,
-                parsed_uri.host.decode("ascii"),
+            self._tls_options = tls_client_options_factory.get_options(
+                self._parsed_uri.host.decode("ascii")
             )
 
-        return _RoutingResult(
-            host_header=parsed_uri.netloc,
-            tls_server_name=parsed_uri.host,
-            target_host=target_host,
-            target_port=port,
-        )
+        self._srv_resolver = srv_resolver
 
+    def connect(self, protocol_factory):
+        """Implements IStreamClientEndpoint interface
+        """
 
-@implementer(IStreamClientEndpoint)
-class LoggingHostnameEndpoint(object):
-    """A wrapper for HostnameEndpint which logs when it connects"""
+        return run_in_background(self._do_connect, protocol_factory)
 
-    def __init__(self, reactor, host, port, *args, **kwargs):
-        self.host = host
-        self.port = port
-        self.ep = HostnameEndpoint(reactor, host, port, *args, **kwargs)
+    @defer.inlineCallbacks
+    def _do_connect(self, protocol_factory):
+        first_exception = None
+
+        server_list = yield self._resolve_server()
+
+        for server in server_list:
+            host = server.host
+            port = server.port
+
+            try:
+                logger.info("Connecting to %s:%i", host.decode("ascii"), port)
+                endpoint = HostnameEndpoint(self._reactor, host, port)
+                if self._tls_options:
+                    endpoint = wrapClientTLS(self._tls_options, endpoint)
+                result = yield make_deferred_yieldable(
+                    endpoint.connect(protocol_factory)
+                )
 
-    def connect(self, protocol_factory):
-        logger.info("Connecting to %s:%i", self.host.decode("ascii"), self.port)
-        return self.ep.connect(protocol_factory)
+                return result
+            except Exception as e:
+                logger.info(
+                    "Failed to connect to %s:%i: %s", host.decode("ascii"), port, e
+                )
+                if not first_exception:
+                    first_exception = e
 
+        # We return the first failure because that's probably the most interesting.
+        if first_exception:
+            raise first_exception
 
-@attr.s
-class _RoutingResult(object):
-    """The result returned by `_route_matrix_uri`.
+        # This shouldn't happen as we should always have at least one host/port
+        # to try and if that doesn't work then we'll have an exception.
+        raise Exception("Failed to resolve server %r" % (self._parsed_uri.netloc,))
 
-    Contains the parameters needed to direct a federation connection to a particular
-    server.
+    @defer.inlineCallbacks
+    def _resolve_server(self):
+        """Resolves the server name to a list of hosts and ports to attempt to
+        connect to.
 
-    Where a SRV record points to several servers, this object contains a single server
-    chosen from the list.
-    """
+        Returns:
+            Deferred[list[Server]]
+        """
 
-    host_header = attr.ib()
-    """
-    The value we should assign to the Host header (host:port from the matrix
-    URI, or .well-known).
+        if self._parsed_uri.scheme != b"matrix":
+            return [Server(host=self._parsed_uri.host, port=self._parsed_uri.port)]
 
-    :type: bytes
-    """
+        # Note: We don't do well-known lookup as that needs to have happened
+        # before now, due to needing to rewrite the Host header of the HTTP
+        # request.
 
-    tls_server_name = attr.ib()
-    """
-    The server name we should set in the SNI (typically host, without port, from the
-    matrix URI or .well-known)
+        # We reparse the URI so that defaultPort is -1 rather than 80
+        parsed_uri = urllib.parse.urlparse(self._parsed_uri.toBytes())
 
-    :type: bytes
-    """
+        host = parsed_uri.hostname
+        port = parsed_uri.port
 
-    target_host = attr.ib()
-    """
-    The hostname (or IP literal) we should route the TCP connection to (the target of the
-    SRV record, or the hostname from the URL/.well-known)
+        # If there is an explicit port or the host is an IP address we bypass
+        # SRV lookups and just use the given host/port.
+        if port or _is_ip_literal(host):
+            return [Server(host, port or 8448)]
 
-    :type: bytes
-    """
+        server_list = yield self._srv_resolver.resolve_service(b"_matrix._tcp." + host)
 
-    target_port = attr.ib()
-    """
-    The port we should route the TCP connection to (the target of the SRV record, or
-    the port from the URL/.well-known, or 8448)
+        if server_list:
+            return server_list
+
+        # No SRV records, so we fallback to host and 8448
+        return [Server(host, 8448)]
+
+
+def _is_ip_literal(host):
+    """Test if the given host name is either an IPv4 or IPv6 literal.
 
-    :type: int
+    Args:
+        host (bytes)
+
+    Returns:
+        bool
     """
+
+    host = host.decode("ascii")
+
+    try:
+        IPAddress(host)
+        return True
+    except AddrFormatError:
+        return False
diff --git a/synapse/http/federation/srv_resolver.py b/synapse/http/federation/srv_resolver.py
index b32188766d..3fe4ffb9e5 100644
--- a/synapse/http/federation/srv_resolver.py
+++ b/synapse/http/federation/srv_resolver.py
@@ -32,7 +32,7 @@ logger = logging.getLogger(__name__)
 SERVER_CACHE = {}
 
 
-@attr.s
+@attr.s(slots=True, frozen=True)
 class Server(object):
     """
     Our record of an individual server which can be tried to reach a destination.
@@ -53,34 +53,47 @@ class Server(object):
     expires = attr.ib(default=0)
 
 
-def pick_server_from_list(server_list):
-    """Randomly choose a server from the server list
+def _sort_server_list(server_list):
+    """Given a list of SRV records sort them into priority order and shuffle
+    each priority with the given weight.
+    """
+    priority_map = {}
 
-    Args:
-        server_list (list[Server]): list of candidate servers
+    for server in server_list:
+        priority_map.setdefault(server.priority, []).append(server)
 
-    Returns:
-        Tuple[bytes, int]: (host, port) pair for the chosen server
-    """
-    if not server_list:
-        raise RuntimeError("pick_server_from_list called with empty list")
+    results = []
+    for priority in sorted(priority_map):
+        servers = priority_map[priority]
+
+        # This algorithms roughly follows the algorithm described in RFC2782,
+        # changed to remove an off-by-one error.
+        #
+        # N.B. Weights can be zero, which means that they should be picked
+        # rarely.
+
+        total_weight = sum(s.weight for s in servers)
+
+        # Total weight can become zero if there are only zero weight servers
+        # left, which we handle by just shuffling and appending to the results.
+        while servers and total_weight:
+            target_weight = random.randint(1, total_weight)
 
-    # TODO: currently we only use the lowest-priority servers. We should maintain a
-    # cache of servers known to be "down" and filter them out
+            for s in servers:
+                target_weight -= s.weight
 
-    min_priority = min(s.priority for s in server_list)
-    eligible_servers = list(s for s in server_list if s.priority == min_priority)
-    total_weight = sum(s.weight for s in eligible_servers)
-    target_weight = random.randint(0, total_weight)
+                if target_weight <= 0:
+                    break
 
-    for s in eligible_servers:
-        target_weight -= s.weight
+            results.append(s)
+            servers.remove(s)
+            total_weight -= s.weight
 
-        if target_weight <= 0:
-            return s.host, s.port
+        if servers:
+            random.shuffle(servers)
+            results.extend(servers)
 
-    # this should be impossible.
-    raise RuntimeError("pick_server_from_list got to end of eligible server list.")
+    return results
 
 
 class SrvResolver(object):
@@ -120,7 +133,7 @@ class SrvResolver(object):
         if cache_entry:
             if all(s.expires > now for s in cache_entry):
                 servers = list(cache_entry)
-                return servers
+                return _sort_server_list(servers)
 
         try:
             answers, _, _ = yield make_deferred_yieldable(
@@ -169,4 +182,4 @@ class SrvResolver(object):
             )
 
         self._cache[service_name] = list(servers)
-        return servers
+        return _sort_server_list(servers)
diff --git a/synapse/logging/_structured.py b/synapse/logging/_structured.py
new file mode 100644
index 0000000000..0367d6dfc4
--- /dev/null
+++ b/synapse/logging/_structured.py
@@ -0,0 +1,374 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import os.path
+import sys
+import typing
+import warnings
+
+import attr
+from constantly import NamedConstant, Names, ValueConstant, Values
+from zope.interface import implementer
+
+from twisted.logger import (
+    FileLogObserver,
+    FilteringLogObserver,
+    ILogObserver,
+    LogBeginner,
+    Logger,
+    LogLevel,
+    LogLevelFilterPredicate,
+    LogPublisher,
+    eventAsText,
+    globalLogBeginner,
+    jsonFileLogObserver,
+)
+
+from synapse.config._base import ConfigError
+from synapse.logging._terse_json import (
+    TerseJSONToConsoleLogObserver,
+    TerseJSONToTCPLogObserver,
+)
+from synapse.logging.context import LoggingContext
+
+
+def stdlib_log_level_to_twisted(level: str) -> LogLevel:
+    """
+    Convert a stdlib log level to Twisted's log level.
+    """
+    lvl = level.lower().replace("warning", "warn")
+    return LogLevel.levelWithName(lvl)
+
+
+@attr.s
+@implementer(ILogObserver)
+class LogContextObserver(object):
+    """
+    An ILogObserver which adds Synapse-specific log context information.
+
+    Attributes:
+        observer (ILogObserver): The target parent observer.
+    """
+
+    observer = attr.ib()
+
+    def __call__(self, event: dict) -> None:
+        """
+        Consume a log event and emit it to the parent observer after filtering
+        and adding log context information.
+
+        Args:
+            event (dict)
+        """
+        # Filter out some useless events that Twisted outputs
+        if "log_text" in event:
+            if event["log_text"].startswith("DNSDatagramProtocol starting on "):
+                return
+
+            if event["log_text"].startswith("(UDP Port "):
+                return
+
+            if event["log_text"].startswith("Timing out client") or event[
+                "log_format"
+            ].startswith("Timing out client"):
+                return
+
+        context = LoggingContext.current_context()
+
+        # Copy the context information to the log event.
+        if context is not None:
+            context.copy_to_twisted_log_entry(event)
+        else:
+            # If there's no logging context, not even the root one, we might be
+            # starting up or it might be from non-Synapse code. Log it as if it
+            # came from the root logger.
+            event["request"] = None
+            event["scope"] = None
+
+        self.observer(event)
+
+
+class PythonStdlibToTwistedLogger(logging.Handler):
+    """
+    Transform a Python stdlib log message into a Twisted one.
+    """
+
+    def __init__(self, observer, *args, **kwargs):
+        """
+        Args:
+            observer (ILogObserver): A Twisted logging observer.
+            *args, **kwargs: Args/kwargs to be passed to logging.Handler.
+        """
+        self.observer = observer
+        super().__init__(*args, **kwargs)
+
+    def emit(self, record: logging.LogRecord) -> None:
+        """
+        Emit a record to Twisted's observer.
+
+        Args:
+            record (logging.LogRecord)
+        """
+
+        self.observer(
+            {
+                "log_time": record.created,
+                "log_text": record.getMessage(),
+                "log_format": "{log_text}",
+                "log_namespace": record.name,
+                "log_level": stdlib_log_level_to_twisted(record.levelname),
+            }
+        )
+
+
+def SynapseFileLogObserver(outFile: typing.io.TextIO) -> FileLogObserver:
+    """
+    A log observer that formats events like the traditional log formatter and
+    sends them to `outFile`.
+
+    Args:
+        outFile (file object): The file object to write to.
+    """
+
+    def formatEvent(_event: dict) -> str:
+        event = dict(_event)
+        event["log_level"] = event["log_level"].name.upper()
+        event["log_format"] = "- {log_namespace} - {log_level} - {request} - " + (
+            event.get("log_format", "{log_text}") or "{log_text}"
+        )
+        return eventAsText(event, includeSystem=False) + "\n"
+
+    return FileLogObserver(outFile, formatEvent)
+
+
+class DrainType(Names):
+    CONSOLE = NamedConstant()
+    CONSOLE_JSON = NamedConstant()
+    CONSOLE_JSON_TERSE = NamedConstant()
+    FILE = NamedConstant()
+    FILE_JSON = NamedConstant()
+    NETWORK_JSON_TERSE = NamedConstant()
+
+
+class OutputPipeType(Values):
+    stdout = ValueConstant(sys.__stdout__)
+    stderr = ValueConstant(sys.__stderr__)
+
+
+@attr.s
+class DrainConfiguration(object):
+    name = attr.ib()
+    type = attr.ib()
+    location = attr.ib()
+    options = attr.ib(default=None)
+
+
+@attr.s
+class NetworkJSONTerseOptions(object):
+    maximum_buffer = attr.ib(type=int)
+
+
+DEFAULT_LOGGERS = {"synapse": {"level": "INFO"}}
+
+
+def parse_drain_configs(
+    drains: dict
+) -> typing.Generator[DrainConfiguration, None, None]:
+    """
+    Parse the drain configurations.
+
+    Args:
+        drains (dict): A list of drain configurations.
+
+    Yields:
+        DrainConfiguration instances.
+
+    Raises:
+        ConfigError: If any of the drain configuration items are invalid.
+    """
+    for name, config in drains.items():
+        if "type" not in config:
+            raise ConfigError("Logging drains require a 'type' key.")
+
+        try:
+            logging_type = DrainType.lookupByName(config["type"].upper())
+        except ValueError:
+            raise ConfigError(
+                "%s is not a known logging drain type." % (config["type"],)
+            )
+
+        if logging_type in [
+            DrainType.CONSOLE,
+            DrainType.CONSOLE_JSON,
+            DrainType.CONSOLE_JSON_TERSE,
+        ]:
+            location = config.get("location")
+            if location is None or location not in ["stdout", "stderr"]:
+                raise ConfigError(
+                    (
+                        "The %s drain needs the 'location' key set to "
+                        "either 'stdout' or 'stderr'."
+                    )
+                    % (logging_type,)
+                )
+
+            pipe = OutputPipeType.lookupByName(location).value
+
+            yield DrainConfiguration(name=name, type=logging_type, location=pipe)
+
+        elif logging_type in [DrainType.FILE, DrainType.FILE_JSON]:
+            if "location" not in config:
+                raise ConfigError(
+                    "The %s drain needs the 'location' key set." % (logging_type,)
+                )
+
+            location = config.get("location")
+            if os.path.abspath(location) != location:
+                raise ConfigError(
+                    "File paths need to be absolute, '%s' is a relative path"
+                    % (location,)
+                )
+            yield DrainConfiguration(name=name, type=logging_type, location=location)
+
+        elif logging_type in [DrainType.NETWORK_JSON_TERSE]:
+            host = config.get("host")
+            port = config.get("port")
+            maximum_buffer = config.get("maximum_buffer", 1000)
+            yield DrainConfiguration(
+                name=name,
+                type=logging_type,
+                location=(host, port),
+                options=NetworkJSONTerseOptions(maximum_buffer=maximum_buffer),
+            )
+
+        else:
+            raise ConfigError(
+                "The %s drain type is currently not implemented."
+                % (config["type"].upper(),)
+            )
+
+
+def setup_structured_logging(
+    hs,
+    config,
+    log_config: dict,
+    logBeginner: LogBeginner = globalLogBeginner,
+    redirect_stdlib_logging: bool = True,
+) -> LogPublisher:
+    """
+    Set up Twisted's structured logging system.
+
+    Args:
+        hs: The homeserver to use.
+        config (HomeserverConfig): The configuration of the Synapse homeserver.
+        log_config (dict): The log configuration to use.
+    """
+    if config.no_redirect_stdio:
+        raise ConfigError(
+            "no_redirect_stdio cannot be defined using structured logging."
+        )
+
+    logger = Logger()
+
+    if "drains" not in log_config:
+        raise ConfigError("The logging configuration requires a list of drains.")
+
+    observers = []
+
+    for observer in parse_drain_configs(log_config["drains"]):
+        # Pipe drains
+        if observer.type == DrainType.CONSOLE:
+            logger.debug(
+                "Starting up the {name} console logger drain", name=observer.name
+            )
+            observers.append(SynapseFileLogObserver(observer.location))
+        elif observer.type == DrainType.CONSOLE_JSON:
+            logger.debug(
+                "Starting up the {name} JSON console logger drain", name=observer.name
+            )
+            observers.append(jsonFileLogObserver(observer.location))
+        elif observer.type == DrainType.CONSOLE_JSON_TERSE:
+            logger.debug(
+                "Starting up the {name} terse JSON console logger drain",
+                name=observer.name,
+            )
+            observers.append(
+                TerseJSONToConsoleLogObserver(observer.location, metadata={})
+            )
+
+        # File drains
+        elif observer.type == DrainType.FILE:
+            logger.debug("Starting up the {name} file logger drain", name=observer.name)
+            log_file = open(observer.location, "at", buffering=1, encoding="utf8")
+            observers.append(SynapseFileLogObserver(log_file))
+        elif observer.type == DrainType.FILE_JSON:
+            logger.debug(
+                "Starting up the {name} JSON file logger drain", name=observer.name
+            )
+            log_file = open(observer.location, "at", buffering=1, encoding="utf8")
+            observers.append(jsonFileLogObserver(log_file))
+
+        elif observer.type == DrainType.NETWORK_JSON_TERSE:
+            metadata = {"server_name": hs.config.server_name}
+            log_observer = TerseJSONToTCPLogObserver(
+                hs=hs,
+                host=observer.location[0],
+                port=observer.location[1],
+                metadata=metadata,
+                maximum_buffer=observer.options.maximum_buffer,
+            )
+            log_observer.start()
+            observers.append(log_observer)
+        else:
+            # We should never get here, but, just in case, throw an error.
+            raise ConfigError("%s drain type cannot be configured" % (observer.type,))
+
+    publisher = LogPublisher(*observers)
+    log_filter = LogLevelFilterPredicate()
+
+    for namespace, namespace_config in log_config.get(
+        "loggers", DEFAULT_LOGGERS
+    ).items():
+        # Set the log level for twisted.logger.Logger namespaces
+        log_filter.setLogLevelForNamespace(
+            namespace,
+            stdlib_log_level_to_twisted(namespace_config.get("level", "INFO")),
+        )
+
+        # Also set the log levels for the stdlib logger namespaces, to prevent
+        # them getting to PythonStdlibToTwistedLogger and having to be formatted
+        if "level" in namespace_config:
+            logging.getLogger(namespace).setLevel(namespace_config.get("level"))
+
+    f = FilteringLogObserver(publisher, [log_filter])
+    lco = LogContextObserver(f)
+
+    if redirect_stdlib_logging:
+        stuff_into_twisted = PythonStdlibToTwistedLogger(lco)
+        stdliblogger = logging.getLogger()
+        stdliblogger.addHandler(stuff_into_twisted)
+
+    # Always redirect standard I/O, otherwise other logging outputs might miss
+    # it.
+    logBeginner.beginLoggingTo([lco], redirectStandardIO=True)
+
+    return publisher
+
+
+def reload_structured_logging(*args, log_config=None) -> None:
+    warnings.warn(
+        "Currently the structured logging system can not be reloaded, doing nothing"
+    )
diff --git a/synapse/logging/_terse_json.py b/synapse/logging/_terse_json.py
new file mode 100644
index 0000000000..7f1e8f23fe
--- /dev/null
+++ b/synapse/logging/_terse_json.py
@@ -0,0 +1,278 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Log formatters that output terse JSON.
+"""
+
+import sys
+from collections import deque
+from ipaddress import IPv4Address, IPv6Address, ip_address
+from math import floor
+from typing.io import TextIO
+
+import attr
+from simplejson import dumps
+
+from twisted.application.internet import ClientService
+from twisted.internet.endpoints import (
+    HostnameEndpoint,
+    TCP4ClientEndpoint,
+    TCP6ClientEndpoint,
+)
+from twisted.internet.protocol import Factory, Protocol
+from twisted.logger import FileLogObserver, Logger
+from twisted.python.failure import Failure
+
+
+def flatten_event(event: dict, metadata: dict, include_time: bool = False):
+    """
+    Flatten a Twisted logging event to an dictionary capable of being sent
+    as a log event to a logging aggregation system.
+
+    The format is vastly simplified and is not designed to be a "human readable
+    string" in the sense that traditional logs are. Instead, the structure is
+    optimised for searchability and filtering, with human-understandable log
+    keys.
+
+    Args:
+        event (dict): The Twisted logging event we are flattening.
+        metadata (dict): Additional data to include with each log message. This
+            can be information like the server name. Since the target log
+            consumer does not know who we are other than by host IP, this
+            allows us to forward through static information.
+        include_time (bool): Should we include the `time` key? If False, the
+            event time is stripped from the event.
+    """
+    new_event = {}
+
+    # If it's a failure, make the new event's log_failure be the traceback text.
+    if "log_failure" in event:
+        new_event["log_failure"] = event["log_failure"].getTraceback()
+
+    # If it's a warning, copy over a string representation of the warning.
+    if "warning" in event:
+        new_event["warning"] = str(event["warning"])
+
+    # Stdlib logging events have "log_text" as their human-readable portion,
+    # Twisted ones have "log_format". For now, include the log_format, so that
+    # context only given in the log format (e.g. what is being logged) is
+    # available.
+    if "log_text" in event:
+        new_event["log"] = event["log_text"]
+    else:
+        new_event["log"] = event["log_format"]
+
+    # We want to include the timestamp when forwarding over the network, but
+    # exclude it when we are writing to stdout. This is because the log ingester
+    # (e.g. logstash, fluentd) can add its own timestamp.
+    if include_time:
+        new_event["time"] = round(event["log_time"], 2)
+
+    # Convert the log level to a textual representation.
+    new_event["level"] = event["log_level"].name.upper()
+
+    # Ignore these keys, and do not transfer them over to the new log object.
+    # They are either useless (isError), transferred manually above (log_time,
+    # log_level, etc), or contain Python objects which are not useful for output
+    # (log_logger, log_source).
+    keys_to_delete = [
+        "isError",
+        "log_failure",
+        "log_format",
+        "log_level",
+        "log_logger",
+        "log_source",
+        "log_system",
+        "log_time",
+        "log_text",
+        "observer",
+        "warning",
+    ]
+
+    # If it's from the Twisted legacy logger (twisted.python.log), it adds some
+    # more keys we want to purge.
+    if event.get("log_namespace") == "log_legacy":
+        keys_to_delete.extend(["message", "system", "time"])
+
+    # Rather than modify the dictionary in place, construct a new one with only
+    # the content we want. The original event should be considered 'frozen'.
+    for key in event.keys():
+
+        if key in keys_to_delete:
+            continue
+
+        if isinstance(event[key], (str, int, bool, float)) or event[key] is None:
+            # If it's a plain type, include it as is.
+            new_event[key] = event[key]
+        else:
+            # If it's not one of those basic types, write out a string
+            # representation. This should probably be a warning in development,
+            # so that we are sure we are only outputting useful data.
+            new_event[key] = str(event[key])
+
+    # Add the metadata information to the event (e.g. the server_name).
+    new_event.update(metadata)
+
+    return new_event
+
+
+def TerseJSONToConsoleLogObserver(outFile: TextIO, metadata: dict) -> FileLogObserver:
+    """
+    A log observer that formats events to a flattened JSON representation.
+
+    Args:
+        outFile: The file object to write to.
+        metadata: Metadata to be added to each log object.
+    """
+
+    def formatEvent(_event: dict) -> str:
+        flattened = flatten_event(_event, metadata)
+        return dumps(flattened, ensure_ascii=False, separators=(",", ":")) + "\n"
+
+    return FileLogObserver(outFile, formatEvent)
+
+
+@attr.s
+class TerseJSONToTCPLogObserver(object):
+    """
+    An IObserver that writes JSON logs to a TCP target.
+
+    Args:
+        hs (HomeServer): The Homeserver that is being logged for.
+        host: The host of the logging target.
+        port: The logging target's port.
+        metadata: Metadata to be added to each log entry.
+    """
+
+    hs = attr.ib()
+    host = attr.ib(type=str)
+    port = attr.ib(type=int)
+    metadata = attr.ib(type=dict)
+    maximum_buffer = attr.ib(type=int)
+    _buffer = attr.ib(default=attr.Factory(deque), type=deque)
+    _writer = attr.ib(default=None)
+    _logger = attr.ib(default=attr.Factory(Logger))
+
+    def start(self) -> None:
+
+        # Connect without DNS lookups if it's a direct IP.
+        try:
+            ip = ip_address(self.host)
+            if isinstance(ip, IPv4Address):
+                endpoint = TCP4ClientEndpoint(
+                    self.hs.get_reactor(), self.host, self.port
+                )
+            elif isinstance(ip, IPv6Address):
+                endpoint = TCP6ClientEndpoint(
+                    self.hs.get_reactor(), self.host, self.port
+                )
+        except ValueError:
+            endpoint = HostnameEndpoint(self.hs.get_reactor(), self.host, self.port)
+
+        factory = Factory.forProtocol(Protocol)
+        self._service = ClientService(endpoint, factory, clock=self.hs.get_reactor())
+        self._service.startService()
+
+    def _write_loop(self) -> None:
+        """
+        Implement the write loop.
+        """
+        if self._writer:
+            return
+
+        self._writer = self._service.whenConnected()
+
+        @self._writer.addBoth
+        def writer(r):
+            if isinstance(r, Failure):
+                r.printTraceback(file=sys.__stderr__)
+                self._writer = None
+                self.hs.get_reactor().callLater(1, self._write_loop)
+                return
+
+            try:
+                for event in self._buffer:
+                    r.transport.write(
+                        dumps(event, ensure_ascii=False, separators=(",", ":")).encode(
+                            "utf8"
+                        )
+                    )
+                    r.transport.write(b"\n")
+                self._buffer.clear()
+            except Exception as e:
+                sys.__stderr__.write("Failed writing out logs with %s\n" % (str(e),))
+
+            self._writer = False
+            self.hs.get_reactor().callLater(1, self._write_loop)
+
+    def _handle_pressure(self) -> None:
+        """
+        Handle backpressure by shedding events.
+
+        The buffer will, in this order, until the buffer is below the maximum:
+            - Shed DEBUG events
+            - Shed INFO events
+            - Shed the middle 50% of the events.
+        """
+        if len(self._buffer) <= self.maximum_buffer:
+            return
+
+        # Strip out DEBUGs
+        self._buffer = deque(
+            filter(lambda event: event["level"] != "DEBUG", self._buffer)
+        )
+
+        if len(self._buffer) <= self.maximum_buffer:
+            return
+
+        # Strip out INFOs
+        self._buffer = deque(
+            filter(lambda event: event["level"] != "INFO", self._buffer)
+        )
+
+        if len(self._buffer) <= self.maximum_buffer:
+            return
+
+        # Cut the middle entries out
+        buffer_split = floor(self.maximum_buffer / 2)
+
+        old_buffer = self._buffer
+        self._buffer = deque()
+
+        for i in range(buffer_split):
+            self._buffer.append(old_buffer.popleft())
+
+        end_buffer = []
+        for i in range(buffer_split):
+            end_buffer.append(old_buffer.pop())
+
+        self._buffer.extend(reversed(end_buffer))
+
+    def __call__(self, event: dict) -> None:
+        flattened = flatten_event(event, self.metadata, include_time=True)
+        self._buffer.append(flattened)
+
+        # Handle backpressure, if it exists.
+        try:
+            self._handle_pressure()
+        except Exception:
+            # If handling backpressure fails,clear the buffer and log the
+            # exception.
+            self._buffer.clear()
+            self._logger.failure("Failed clearing backpressure")
+
+        # Try and write immediately.
+        self._write_loop()
diff --git a/synapse/logging/context.py b/synapse/logging/context.py
index b456c31f70..63379bfb93 100644
--- a/synapse/logging/context.py
+++ b/synapse/logging/context.py
@@ -25,6 +25,7 @@ See doc/log_contexts.rst for details on how this works.
 import logging
 import threading
 import types
+from typing import Any, List
 
 from twisted.internet import defer, threads
 
@@ -194,7 +195,7 @@ class LoggingContext(object):
     class Sentinel(object):
         """Sentinel to represent the root context"""
 
-        __slots__ = []
+        __slots__ = []  # type: List[Any]
 
         def __str__(self):
             return "sentinel"
@@ -202,6 +203,10 @@ class LoggingContext(object):
         def copy_to(self, record):
             pass
 
+        def copy_to_twisted_log_entry(self, record):
+            record["request"] = None
+            record["scope"] = None
+
         def start(self):
             pass
 
@@ -330,6 +335,13 @@ class LoggingContext(object):
         # we also track the current scope:
         record.scope = self.scope
 
+    def copy_to_twisted_log_entry(self, record):
+        """
+        Copy logging fields from this context to a Twisted log record.
+        """
+        record["request"] = self.request
+        record["scope"] = self.scope
+
     def start(self):
         if get_thread_id() != self.main_thread:
             logger.warning("Started logcontext %s on different thread", self)
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index c6465c0386..ec0ac547c1 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -47,9 +47,9 @@ REQUIREMENTS = [
     "idna>=2.5",
     # validating SSL certs for IP addresses requires service_identity 18.1.
     "service_identity>=18.1.0",
-    # our logcontext handling relies on the ability to cancel inlineCallbacks
-    # (https://twistedmatrix.com/trac/ticket/4632) which landed in Twisted 18.7.
-    "Twisted>=18.7.0",
+    # Twisted 18.9 introduces some logger improvements that the structured
+    # logger utilises
+    "Twisted>=18.9.0",
     "treq>=15.1",
     # Twisted has required pyopenssl 16.0 since about Twisted 16.6.
     "pyopenssl>=16.0.0",
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index 9ab1c2c9e0..fa91cc8dee 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -52,7 +52,7 @@ logger = logging.getLogger(__name__)
 
 
 class UsersRestServlet(RestServlet):
-    PATTERNS = historical_admin_path_patterns("/users/(?P<user_id>[^/]*)")
+    PATTERNS = historical_admin_path_patterns("/users/(?P<user_id>[^/]*)$")
 
     def __init__(self, hs):
         self.hs = hs
diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py
index b0fddb6898..5364117420 100644
--- a/synapse/rest/admin/users.py
+++ b/synapse/rest/admin/users.py
@@ -22,24 +22,34 @@ from synapse.http.servlet import (
     assert_params_in_dict,
     parse_json_object_from_request,
 )
-from synapse.rest.admin import assert_requester_is_admin
+from synapse.rest.admin import assert_requester_is_admin, assert_user_is_admin
 from synapse.types import UserID
 
 
 class UserAdminServlet(RestServlet):
     """
-    Set whether or not a user is a server administrator.
+    Get or set whether or not a user is a server administrator.
 
     Note that only local users can be server administrators, and that an
     administrator may not demote themselves.
 
     Only server administrators can use this API.
 
-    Example:
-        PUT /_synapse/admin/v1/users/@reivilibre:librepush.net/admin
-        {
-            "admin": true
-        }
+    Examples:
+        * Get
+            GET /_synapse/admin/v1/users/@nonadmin:example.com/admin
+            response on success:
+                {
+                    "admin": false
+                }
+        * Set
+            PUT /_synapse/admin/v1/users/@reivilibre:librepush.net/admin
+            request body:
+                {
+                    "admin": true
+                }
+            response on success:
+                {}
     """
 
     PATTERNS = (re.compile("^/_synapse/admin/v1/users/(?P<user_id>@[^/]*)/admin$"),)
@@ -50,9 +60,23 @@ class UserAdminServlet(RestServlet):
         self.handlers = hs.get_handlers()
 
     @defer.inlineCallbacks
-    def on_PUT(self, request, user_id):
+    def on_GET(self, request, user_id):
         yield assert_requester_is_admin(self.auth, request)
+
+        target_user = UserID.from_string(user_id)
+
+        if not self.hs.is_mine(target_user):
+            raise SynapseError(400, "Only local users can be admins of this homeserver")
+
+        is_admin = yield self.handlers.admin_handler.get_user_server_admin(target_user)
+        is_admin = bool(is_admin)
+
+        return (200, {"admin": is_admin})
+
+    @defer.inlineCallbacks
+    def on_PUT(self, request, user_id):
         requester = yield self.auth.get_user_by_req(request)
+        yield assert_user_is_admin(self.auth, requester.user)
         auth_user = requester.user
 
         target_user = UserID.from_string(user_id)
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 489ce82fae..abe16334ec 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -1395,14 +1395,22 @@ class SQLBaseStore(object):
         """
         txn.call_after(self._invalidate_state_caches, room_id, members_changed)
 
-        # We need to be careful that the size of the `members_changed` list
-        # isn't so large that it causes problems sending over replication, so we
-        # send them in chunks.
-        # Max line length is 16K, and max user ID length is 255, so 50 should
-        # be safe.
-        for chunk in batch_iter(members_changed, 50):
-            keys = itertools.chain([room_id], chunk)
-            self._send_invalidation_to_replication(txn, _CURRENT_STATE_CACHE_NAME, keys)
+        if members_changed:
+            # We need to be careful that the size of the `members_changed` list
+            # isn't so large that it causes problems sending over replication, so we
+            # send them in chunks.
+            # Max line length is 16K, and max user ID length is 255, so 50 should
+            # be safe.
+            for chunk in batch_iter(members_changed, 50):
+                keys = itertools.chain([room_id], chunk)
+                self._send_invalidation_to_replication(
+                    txn, _CURRENT_STATE_CACHE_NAME, keys
+                )
+        else:
+            # if no members changed, we still need to invalidate the other caches.
+            self._send_invalidation_to_replication(
+                txn, _CURRENT_STATE_CACHE_NAME, [room_id]
+            )
 
     def _invalidate_state_caches(self, room_id, members_changed):
         """Invalidates caches that are based on the current state, but does
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 9027b917c1..3f50324253 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -56,6 +56,7 @@ class RegistrationWorkerStore(SQLBaseStore):
                 "consent_server_notice_sent",
                 "appservice_id",
                 "creation_ts",
+                "user_type",
             ],
             allow_none=True,
             desc="get_user_by_id",
diff --git a/synapse/util/hash.py b/synapse/util/hash.py
new file mode 100644
index 0000000000..359168704e
--- /dev/null
+++ b/synapse/util/hash.py
@@ -0,0 +1,33 @@
+# -*- coding: utf-8 -*-
+
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import hashlib
+
+import unpaddedbase64
+
+
+def sha256_and_url_safe_base64(input_text):
+    """SHA256 hash an input string, encode the digest as url-safe base64, and
+    return
+
+    :param input_text: string to hash
+    :type input_text: str
+
+    :returns a sha256 hashed and url-safe base64 encoded digest
+    :rtype: str
+    """
+    digest = hashlib.sha256(input_text.encode()).digest()
+    return unpaddedbase64.encode_base64(digest, urlsafe=True)