summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/__init__.py2
-rw-r--r--synapse/api/filtering.py2
-rw-r--r--synapse/app/__init__.py2
-rw-r--r--synapse/app/appservice.py3
-rw-r--r--synapse/app/client_reader.py3
-rw-r--r--synapse/app/event_creator.py3
-rw-r--r--synapse/app/federation_reader.py3
-rw-r--r--synapse/app/federation_sender.py3
-rw-r--r--synapse/app/frontend_proxy.py3
-rwxr-xr-xsynapse/app/homeserver.py5
-rw-r--r--synapse/app/media_repository.py3
-rw-r--r--synapse/app/pusher.py3
-rw-r--r--synapse/app/synchrotron.py3
-rwxr-xr-xsynapse/app/synctl.py284
-rw-r--r--synapse/app/user_dir.py3
-rw-r--r--synapse/config/__main__.py2
-rw-r--r--synapse/federation/federation_client.py38
-rw-r--r--synapse/federation/federation_server.py34
-rw-r--r--synapse/handlers/e2e_keys.py2
-rw-r--r--synapse/handlers/federation.py286
-rw-r--r--synapse/handlers/profile.py2
-rw-r--r--synapse/handlers/sync.py9
-rw-r--r--synapse/http/server.py49
-rw-r--r--synapse/http/site.py6
-rw-r--r--synapse/metrics/background_process_metrics.py6
-rw-r--r--synapse/notifier.py8
-rw-r--r--synapse/push/mailer.py2
-rw-r--r--synapse/python_dependencies.py31
-rw-r--r--synapse/rest/media/v1/download_resource.py1
-rw-r--r--synapse/rest/media/v1/preview_url_resource.py1
-rw-r--r--synapse/state/__init__.py9
-rw-r--r--synapse/state/v1.py14
-rw-r--r--synapse/storage/client_ips.py34
-rw-r--r--synapse/storage/monthly_active_users.py5
-rw-r--r--synapse/storage/state.py30
-rw-r--r--synapse/storage/transactions.py8
-rw-r--r--synapse/util/caches/__init__.py27
-rw-r--r--synapse/util/caches/expiringcache.py18
-rw-r--r--synapse/util/logcontext.py41
-rw-r--r--synapse/util/retryutils.py2
40 files changed, 442 insertions, 548 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py
index 9dbe0b9f10..b1f7a89fba 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -27,4 +27,4 @@ try:
 except ImportError:
     pass
 
-__version__ = "0.33.4"
+__version__ = "0.33.5.1"
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index a31a9a17e0..eed8c67e6a 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -226,7 +226,7 @@ class Filtering(object):
             jsonschema.validate(user_filter_json, USER_FILTER_SCHEMA,
                                 format_checker=FormatChecker())
         except jsonschema.ValidationError as e:
-            raise SynapseError(400, e.message)
+            raise SynapseError(400, str(e))
 
 
 class FilterCollection(object):
diff --git a/synapse/app/__init__.py b/synapse/app/__init__.py
index 3b6b9368b8..c3afcc573b 100644
--- a/synapse/app/__init__.py
+++ b/synapse/app/__init__.py
@@ -24,7 +24,7 @@ try:
     python_dependencies.check_requirements()
 except python_dependencies.MissingRequirementError as e:
     message = "\n".join([
-        "Missing Requirement: %s" % (e.message,),
+        "Missing Requirement: %s" % (str(e),),
         "To install run:",
         "    pip install --upgrade --force \"%s\"" % (e.dependency,),
         "",
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index 86b5067400..8559e141af 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -136,7 +136,7 @@ def start(config_options):
             "Synapse appservice", config_options
         )
     except ConfigError as e:
-        sys.stderr.write("\n" + e.message + "\n")
+        sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
 
     assert config.worker_app == "synapse.app.appservice"
@@ -172,7 +172,6 @@ def start(config_options):
 
     def start():
         ps.get_datastore().start_profiling()
-        ps.get_state_handler().start_caching()
 
     reactor.callWhenRunning(start)
 
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index ce2b113dbb..76aed8c60a 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -153,7 +153,7 @@ def start(config_options):
             "Synapse client reader", config_options
         )
     except ConfigError as e:
-        sys.stderr.write("\n" + e.message + "\n")
+        sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
 
     assert config.worker_app == "synapse.app.client_reader"
@@ -181,7 +181,6 @@ def start(config_options):
     ss.start_listening(config.worker_listeners)
 
     def start():
-        ss.get_state_handler().start_caching()
         ss.get_datastore().start_profiling()
 
     reactor.callWhenRunning(start)
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
index f98e456ea0..9060ab14f6 100644
--- a/synapse/app/event_creator.py
+++ b/synapse/app/event_creator.py
@@ -169,7 +169,7 @@ def start(config_options):
             "Synapse event creator", config_options
         )
     except ConfigError as e:
-        sys.stderr.write("\n" + e.message + "\n")
+        sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
 
     assert config.worker_app == "synapse.app.event_creator"
@@ -199,7 +199,6 @@ def start(config_options):
     ss.start_listening(config.worker_listeners)
 
     def start():
-        ss.get_state_handler().start_caching()
         ss.get_datastore().start_profiling()
 
     reactor.callWhenRunning(start)
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index 60f5973505..228a297fb8 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -140,7 +140,7 @@ def start(config_options):
             "Synapse federation reader", config_options
         )
     except ConfigError as e:
-        sys.stderr.write("\n" + e.message + "\n")
+        sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
 
     assert config.worker_app == "synapse.app.federation_reader"
@@ -168,7 +168,6 @@ def start(config_options):
     ss.start_listening(config.worker_listeners)
 
     def start():
-        ss.get_state_handler().start_caching()
         ss.get_datastore().start_profiling()
 
     reactor.callWhenRunning(start)
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 60dd09aac3..e9a99d76e1 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -160,7 +160,7 @@ def start(config_options):
             "Synapse federation sender", config_options
         )
     except ConfigError as e:
-        sys.stderr.write("\n" + e.message + "\n")
+        sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
 
     assert config.worker_app == "synapse.app.federation_sender"
@@ -201,7 +201,6 @@ def start(config_options):
 
     def start():
         ps.get_datastore().start_profiling()
-        ps.get_state_handler().start_caching()
 
     reactor.callWhenRunning(start)
     _base.start_worker_reactor("synapse-federation-sender", config)
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index 8c0b9c67b0..fc4b25de1c 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -228,7 +228,7 @@ def start(config_options):
             "Synapse frontend proxy", config_options
         )
     except ConfigError as e:
-        sys.stderr.write("\n" + e.message + "\n")
+        sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
 
     assert config.worker_app == "synapse.app.frontend_proxy"
@@ -258,7 +258,6 @@ def start(config_options):
     ss.start_listening(config.worker_listeners)
 
     def start():
-        ss.get_state_handler().start_caching()
         ss.get_datastore().start_profiling()
 
     reactor.callWhenRunning(start)
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 3241ded188..a98fdbd210 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -301,7 +301,7 @@ class SynapseHomeServer(HomeServer):
         try:
             database_engine.check_database(db_conn.cursor())
         except IncorrectDatabaseSetup as e:
-            quit_with_error(e.message)
+            quit_with_error(str(e))
 
 
 # Gauges to expose monthly active user control metrics
@@ -328,7 +328,7 @@ def setup(config_options):
             config_options,
         )
     except ConfigError as e:
-        sys.stderr.write("\n" + e.message + "\n")
+        sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
 
     if not config:
@@ -384,7 +384,6 @@ def setup(config_options):
 
     def start():
         hs.get_pusherpool().start()
-        hs.get_state_handler().start_caching()
         hs.get_datastore().start_profiling()
         hs.get_datastore().start_doing_background_updates()
         hs.get_federation_client().start_get_pdu_cache()
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index e3dbb3b4e6..acc0487adc 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -133,7 +133,7 @@ def start(config_options):
             "Synapse media repository", config_options
         )
     except ConfigError as e:
-        sys.stderr.write("\n" + e.message + "\n")
+        sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
 
     assert config.worker_app == "synapse.app.media_repository"
@@ -168,7 +168,6 @@ def start(config_options):
     ss.start_listening(config.worker_listeners)
 
     def start():
-        ss.get_state_handler().start_caching()
         ss.get_datastore().start_profiling()
 
     reactor.callWhenRunning(start)
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 244c604de9..630dcda478 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -191,7 +191,7 @@ def start(config_options):
             "Synapse pusher", config_options
         )
     except ConfigError as e:
-        sys.stderr.write("\n" + e.message + "\n")
+        sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
 
     assert config.worker_app == "synapse.app.pusher"
@@ -228,7 +228,6 @@ def start(config_options):
     def start():
         ps.get_pusherpool().start()
         ps.get_datastore().start_profiling()
-        ps.get_state_handler().start_caching()
 
     reactor.callWhenRunning(start)
 
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 6662340797..9a7fc6ee9d 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -410,7 +410,7 @@ def start(config_options):
             "Synapse synchrotron", config_options
         )
     except ConfigError as e:
-        sys.stderr.write("\n" + e.message + "\n")
+        sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
 
     assert config.worker_app == "synapse.app.synchrotron"
@@ -435,7 +435,6 @@ def start(config_options):
 
     def start():
         ss.get_datastore().start_profiling()
-        ss.get_state_handler().start_caching()
 
     reactor.callWhenRunning(start)
 
diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py
deleted file mode 100755
index d658f967ba..0000000000
--- a/synapse/app/synctl.py
+++ /dev/null
@@ -1,284 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-# Copyright 2014-2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import argparse
-import collections
-import errno
-import glob
-import os
-import os.path
-import signal
-import subprocess
-import sys
-import time
-
-from six import iteritems
-
-import yaml
-
-SYNAPSE = [sys.executable, "-B", "-m", "synapse.app.homeserver"]
-
-GREEN = "\x1b[1;32m"
-YELLOW = "\x1b[1;33m"
-RED = "\x1b[1;31m"
-NORMAL = "\x1b[m"
-
-
-def pid_running(pid):
-    try:
-        os.kill(pid, 0)
-        return True
-    except OSError as err:
-        if err.errno == errno.EPERM:
-            return True
-        return False
-
-
-def write(message, colour=NORMAL, stream=sys.stdout):
-    if colour == NORMAL:
-        stream.write(message + "\n")
-    else:
-        stream.write(colour + message + NORMAL + "\n")
-
-
-def abort(message, colour=RED, stream=sys.stderr):
-    write(message, colour, stream)
-    sys.exit(1)
-
-
-def start(configfile):
-    write("Starting ...")
-    args = SYNAPSE
-    args.extend(["--daemonize", "-c", configfile])
-
-    try:
-        subprocess.check_call(args)
-        write("started synapse.app.homeserver(%r)" %
-              (configfile,), colour=GREEN)
-    except subprocess.CalledProcessError as e:
-        write(
-            "error starting (exit code: %d); see above for logs" % e.returncode,
-            colour=RED,
-        )
-
-
-def start_worker(app, configfile, worker_configfile):
-    args = [
-        "python", "-B",
-        "-m", app,
-        "-c", configfile,
-        "-c", worker_configfile
-    ]
-
-    try:
-        subprocess.check_call(args)
-        write("started %s(%r)" % (app, worker_configfile), colour=GREEN)
-    except subprocess.CalledProcessError as e:
-        write(
-            "error starting %s(%r) (exit code: %d); see above for logs" % (
-                app, worker_configfile, e.returncode,
-            ),
-            colour=RED,
-        )
-
-
-def stop(pidfile, app):
-    if os.path.exists(pidfile):
-        pid = int(open(pidfile).read())
-        try:
-            os.kill(pid, signal.SIGTERM)
-            write("stopped %s" % (app,), colour=GREEN)
-        except OSError as err:
-            if err.errno == errno.ESRCH:
-                write("%s not running" % (app,), colour=YELLOW)
-            elif err.errno == errno.EPERM:
-                abort("Cannot stop %s: Operation not permitted" % (app,))
-            else:
-                abort("Cannot stop %s: Unknown error" % (app,))
-
-
-Worker = collections.namedtuple("Worker", [
-    "app", "configfile", "pidfile", "cache_factor"
-])
-
-
-def main():
-
-    parser = argparse.ArgumentParser()
-
-    parser.add_argument(
-        "action",
-        choices=["start", "stop", "restart"],
-        help="whether to start, stop or restart the synapse",
-    )
-    parser.add_argument(
-        "configfile",
-        nargs="?",
-        default="homeserver.yaml",
-        help="the homeserver config file, defaults to homeserver.yaml",
-    )
-    parser.add_argument(
-        "-w", "--worker",
-        metavar="WORKERCONFIG",
-        help="start or stop a single worker",
-    )
-    parser.add_argument(
-        "-a", "--all-processes",
-        metavar="WORKERCONFIGDIR",
-        help="start or stop all the workers in the given directory"
-             " and the main synapse process",
-    )
-
-    options = parser.parse_args()
-
-    if options.worker and options.all_processes:
-        write(
-            'Cannot use "--worker" with "--all-processes"',
-            stream=sys.stderr
-        )
-        sys.exit(1)
-
-    configfile = options.configfile
-
-    if not os.path.exists(configfile):
-        write(
-            "No config file found\n"
-            "To generate a config file, run '%s -c %s --generate-config"
-            " --server-name=<server name>'\n" % (
-                " ".join(SYNAPSE), options.configfile
-            ),
-            stream=sys.stderr,
-        )
-        sys.exit(1)
-
-    with open(configfile) as stream:
-        config = yaml.load(stream)
-
-    pidfile = config["pid_file"]
-    cache_factor = config.get("synctl_cache_factor")
-    start_stop_synapse = True
-
-    if cache_factor:
-        os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
-
-    cache_factors = config.get("synctl_cache_factors", {})
-    for cache_name, factor in iteritems(cache_factors):
-        os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
-
-    worker_configfiles = []
-    if options.worker:
-        start_stop_synapse = False
-        worker_configfile = options.worker
-        if not os.path.exists(worker_configfile):
-            write(
-                "No worker config found at %r" % (worker_configfile,),
-                stream=sys.stderr,
-            )
-            sys.exit(1)
-        worker_configfiles.append(worker_configfile)
-
-    if options.all_processes:
-        # To start the main synapse with -a you need to add a worker file
-        # with worker_app == "synapse.app.homeserver"
-        start_stop_synapse = False
-        worker_configdir = options.all_processes
-        if not os.path.isdir(worker_configdir):
-            write(
-                "No worker config directory found at %r" % (worker_configdir,),
-                stream=sys.stderr,
-            )
-            sys.exit(1)
-        worker_configfiles.extend(sorted(glob.glob(
-            os.path.join(worker_configdir, "*.yaml")
-        )))
-
-    workers = []
-    for worker_configfile in worker_configfiles:
-        with open(worker_configfile) as stream:
-            worker_config = yaml.load(stream)
-        worker_app = worker_config["worker_app"]
-        if worker_app == "synapse.app.homeserver":
-            # We need to special case all of this to pick up options that may
-            # be set in the main config file or in this worker config file.
-            worker_pidfile = (
-                worker_config.get("pid_file")
-                or pidfile
-            )
-            worker_cache_factor = worker_config.get("synctl_cache_factor") or cache_factor
-            daemonize = worker_config.get("daemonize") or config.get("daemonize")
-            assert daemonize, "Main process must have daemonize set to true"
-
-            # The master process doesn't support using worker_* config.
-            for key in worker_config:
-                if key == "worker_app":  # But we allow worker_app
-                    continue
-                assert not key.startswith("worker_"), \
-                    "Main process cannot use worker_* config"
-        else:
-            worker_pidfile = worker_config["worker_pid_file"]
-            worker_daemonize = worker_config["worker_daemonize"]
-            assert worker_daemonize, "In config %r: expected '%s' to be True" % (
-                worker_configfile, "worker_daemonize")
-            worker_cache_factor = worker_config.get("synctl_cache_factor")
-        workers.append(Worker(
-            worker_app, worker_configfile, worker_pidfile, worker_cache_factor,
-        ))
-
-    action = options.action
-
-    if action == "stop" or action == "restart":
-        for worker in workers:
-            stop(worker.pidfile, worker.app)
-
-        if start_stop_synapse:
-            stop(pidfile, "synapse.app.homeserver")
-
-    # Wait for synapse to actually shutdown before starting it again
-    if action == "restart":
-        running_pids = []
-        if start_stop_synapse and os.path.exists(pidfile):
-            running_pids.append(int(open(pidfile).read()))
-        for worker in workers:
-            if os.path.exists(worker.pidfile):
-                running_pids.append(int(open(worker.pidfile).read()))
-        if len(running_pids) > 0:
-            write("Waiting for process to exit before restarting...")
-            for running_pid in running_pids:
-                while pid_running(running_pid):
-                    time.sleep(0.2)
-            write("All processes exited; now restarting...")
-
-    if action == "start" or action == "restart":
-        if start_stop_synapse:
-            # Check if synapse is already running
-            if os.path.exists(pidfile) and pid_running(int(open(pidfile).read())):
-                abort("synapse.app.homeserver already running")
-            start(configfile)
-
-        for worker in workers:
-            if worker.cache_factor:
-                os.environ["SYNAPSE_CACHE_FACTOR"] = str(worker.cache_factor)
-
-            start_worker(worker.app, configfile, worker.configfile)
-
-            if cache_factor:
-                os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
-            else:
-                os.environ.pop("SYNAPSE_CACHE_FACTOR", None)
-
-
-if __name__ == "__main__":
-    main()
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index 96ffcaf073..0a5f62b509 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -188,7 +188,7 @@ def start(config_options):
             "Synapse user directory", config_options
         )
     except ConfigError as e:
-        sys.stderr.write("\n" + e.message + "\n")
+        sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
 
     assert config.worker_app == "synapse.app.user_dir"
@@ -229,7 +229,6 @@ def start(config_options):
 
     def start():
         ps.get_datastore().start_profiling()
-        ps.get_state_handler().start_caching()
 
     reactor.callWhenRunning(start)
 
diff --git a/synapse/config/__main__.py b/synapse/config/__main__.py
index 58c97a70af..8fccf573ee 100644
--- a/synapse/config/__main__.py
+++ b/synapse/config/__main__.py
@@ -25,7 +25,7 @@ if __name__ == "__main__":
         try:
             config = HomeServerConfig.load_config("", sys.argv[3:])
         except ConfigError as e:
-            sys.stderr.write("\n" + e.message + "\n")
+            sys.stderr.write("\n" + str(e) + "\n")
             sys.exit(1)
 
         print (getattr(config, key))
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index fe67b2ff42..d05ed91d64 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -66,6 +66,14 @@ class FederationClient(FederationBase):
         self.state = hs.get_state_handler()
         self.transport_layer = hs.get_federation_transport_client()
 
+        self._get_pdu_cache = ExpiringCache(
+            cache_name="get_pdu_cache",
+            clock=self._clock,
+            max_len=1000,
+            expiry_ms=120 * 1000,
+            reset_expiry_on_get=False,
+        )
+
     def _clear_tried_cache(self):
         """Clear pdu_destination_tried cache"""
         now = self._clock.time_msec()
@@ -82,17 +90,6 @@ class FederationClient(FederationBase):
             if destination_dict:
                 self.pdu_destination_tried[event_id] = destination_dict
 
-    def start_get_pdu_cache(self):
-        self._get_pdu_cache = ExpiringCache(
-            cache_name="get_pdu_cache",
-            clock=self._clock,
-            max_len=1000,
-            expiry_ms=120 * 1000,
-            reset_expiry_on_get=False,
-        )
-
-        self._get_pdu_cache.start()
-
     @log_function
     def make_query(self, destination, query_type, args,
                    retry_on_dns_fail=False, ignore_backoff=False):
@@ -212,8 +209,6 @@ class FederationClient(FederationBase):
         Will attempt to get the PDU from each destination in the list until
         one succeeds.
 
-        This will persist the PDU locally upon receipt.
-
         Args:
             destinations (list): Which home servers to query
             event_id (str): event to fetch
@@ -229,10 +224,9 @@ class FederationClient(FederationBase):
 
         # TODO: Rate limit the number of times we try and get the same event.
 
-        if self._get_pdu_cache:
-            ev = self._get_pdu_cache.get(event_id)
-            if ev:
-                defer.returnValue(ev)
+        ev = self._get_pdu_cache.get(event_id)
+        if ev:
+            defer.returnValue(ev)
 
         pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {})
 
@@ -285,7 +279,7 @@ class FederationClient(FederationBase):
                 )
                 continue
 
-        if self._get_pdu_cache is not None and signed_pdu:
+        if signed_pdu:
             self._get_pdu_cache[event_id] = signed_pdu
 
         defer.returnValue(signed_pdu)
@@ -293,8 +287,7 @@ class FederationClient(FederationBase):
     @defer.inlineCallbacks
     @log_function
     def get_state_for_room(self, destination, room_id, event_id):
-        """Requests all of the `current` state PDUs for a given room from
-        a remote home server.
+        """Requests all of the room state at a given event from a remote home server.
 
         Args:
             destination (str): The remote homeserver to query for the state.
@@ -302,9 +295,10 @@ class FederationClient(FederationBase):
             event_id (str): The id of the event we want the state at.
 
         Returns:
-            Deferred: Results in a list of PDUs.
+            Deferred[Tuple[List[EventBase], List[EventBase]]]:
+                A list of events in the state, and a list of events in the auth chain
+                for the given event.
         """
-
         try:
             # First we try and ask for just the IDs, as thats far quicker if
             # we have most of the state and auth_chain already.
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index dbee404ea7..819e8f7331 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -46,6 +46,7 @@ from synapse.replication.http.federation import (
 from synapse.types import get_domain_from_id
 from synapse.util.async_helpers import Linearizer, concurrently_execute
 from synapse.util.caches.response_cache import ResponseCache
+from synapse.util.logcontext import nested_logging_context
 from synapse.util.logutils import log_function
 
 # when processing incoming transactions, we try to handle multiple rooms in
@@ -187,21 +188,22 @@ class FederationServer(FederationBase):
 
             for pdu in pdus_by_room[room_id]:
                 event_id = pdu.event_id
-                try:
-                    yield self._handle_received_pdu(
-                        origin, pdu
-                    )
-                    pdu_results[event_id] = {}
-                except FederationError as e:
-                    logger.warn("Error handling PDU %s: %s", event_id, e)
-                    pdu_results[event_id] = {"error": str(e)}
-                except Exception as e:
-                    f = failure.Failure()
-                    pdu_results[event_id] = {"error": str(e)}
-                    logger.error(
-                        "Failed to handle PDU %s: %s",
-                        event_id, f.getTraceback().rstrip(),
-                    )
+                with nested_logging_context(event_id):
+                    try:
+                        yield self._handle_received_pdu(
+                            origin, pdu
+                        )
+                        pdu_results[event_id] = {}
+                    except FederationError as e:
+                        logger.warn("Error handling PDU %s: %s", event_id, e)
+                        pdu_results[event_id] = {"error": str(e)}
+                    except Exception as e:
+                        f = failure.Failure()
+                        pdu_results[event_id] = {"error": str(e)}
+                        logger.error(
+                            "Failed to handle PDU %s: %s",
+                            event_id, f.getTraceback().rstrip(),
+                        )
 
         yield concurrently_execute(
             process_pdus_for_room, pdus_by_room.keys(),
@@ -618,7 +620,7 @@ class FederationServer(FederationBase):
             )
 
         yield self.handler.on_receive_pdu(
-            origin, pdu, get_missing=True, sent_to_us_directly=True,
+            origin, pdu, sent_to_us_directly=True,
         )
 
     def __str__(self):
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 578e9250fb..9dc46aa15f 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -341,7 +341,7 @@ class E2eKeysHandler(object):
 def _exception_to_failure(e):
     if isinstance(e, CodeMessageException):
         return {
-            "status": e.code, "message": e.message,
+            "status": e.code, "message": str(e),
         }
 
     if isinstance(e, NotRetryingDestination):
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index f10b46414b..38bebbf598 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -69,6 +69,27 @@ from ._base import BaseHandler
 logger = logging.getLogger(__name__)
 
 
+def shortstr(iterable, maxitems=5):
+    """If iterable has maxitems or fewer, return the stringification of a list
+    containing those items.
+
+    Otherwise, return the stringification of a a list with the first maxitems items,
+    followed by "...".
+
+    Args:
+        iterable (Iterable): iterable to truncate
+        maxitems (int): number of items to return before truncating
+
+    Returns:
+        unicode
+    """
+
+    items = list(itertools.islice(iterable, maxitems + 1))
+    if len(items) <= maxitems:
+        return str(items)
+    return u"[" + u", ".join(repr(r) for r in items[:maxitems]) + u", ...]"
+
+
 class FederationHandler(BaseHandler):
     """Handles events that originated from federation.
         Responsible for:
@@ -114,9 +135,8 @@ class FederationHandler(BaseHandler):
         self._room_pdu_linearizer = Linearizer("fed_room_pdu")
 
     @defer.inlineCallbacks
-    @log_function
     def on_receive_pdu(
-            self, origin, pdu, get_missing=True, sent_to_us_directly=False,
+            self, origin, pdu, sent_to_us_directly=False,
     ):
         """ Process a PDU received via a federation /send/ transaction, or
         via backfill of missing prev_events
@@ -125,14 +145,23 @@ class FederationHandler(BaseHandler):
             origin (str): server which initiated the /send/ transaction. Will
                 be used to fetch missing events or state.
             pdu (FrozenEvent): received PDU
-            get_missing (bool): True if we should fetch missing prev_events
+            sent_to_us_directly (bool): True if this event was pushed to us; False if
+                we pulled it as the result of a missing prev_event.
 
         Returns (Deferred): completes with None
         """
 
+        room_id = pdu.room_id
+        event_id = pdu.event_id
+
+        logger.info(
+            "[%s %s] handling received PDU: %s",
+            room_id, event_id, pdu,
+        )
+
         # We reprocess pdus when we have seen them only as outliers
         existing = yield self.store.get_event(
-            pdu.event_id,
+            event_id,
             allow_none=True,
             allow_rejected=True,
         )
@@ -147,7 +176,7 @@ class FederationHandler(BaseHandler):
             )
         )
         if already_seen:
-            logger.debug("Already seen pdu %s", pdu.event_id)
+            logger.debug("[%s %s]: Already seen pdu", room_id, event_id)
             return
 
         # do some initial sanity-checking of the event. In particular, make
@@ -156,6 +185,7 @@ class FederationHandler(BaseHandler):
         try:
             self._sanity_check_event(pdu)
         except SynapseError as err:
+            logger.warn("[%s %s] Received event failed sanity checks", room_id, event_id)
             raise FederationError(
                 "ERROR",
                 err.code,
@@ -165,10 +195,12 @@ class FederationHandler(BaseHandler):
 
         # If we are currently in the process of joining this room, then we
         # queue up events for later processing.
-        if pdu.room_id in self.room_queues:
-            logger.info("Ignoring PDU %s for room %s from %s for now; join "
-                        "in progress", pdu.event_id, pdu.room_id, origin)
-            self.room_queues[pdu.room_id].append((pdu, origin))
+        if room_id in self.room_queues:
+            logger.info(
+                "[%s %s] Queuing PDU from %s for now: join in progress",
+                room_id, event_id, origin,
+            )
+            self.room_queues[room_id].append((pdu, origin))
             return
 
         # If we're no longer in the room just ditch the event entirely. This
@@ -179,7 +211,7 @@ class FederationHandler(BaseHandler):
         # we should check if we *are* in fact in the room. If we are then we
         # can magically rejoin the room.
         is_in_room = yield self.auth.check_host_in_room(
-            pdu.room_id,
+            room_id,
             self.server_name
         )
         if not is_in_room:
@@ -188,8 +220,8 @@ class FederationHandler(BaseHandler):
             )
             if was_in_room:
                 logger.info(
-                    "Ignoring PDU %s for room %s from %s as we've left the room!",
-                    pdu.event_id, pdu.room_id, origin,
+                    "[%s %s] Ignoring PDU from %s as we've left the room",
+                    room_id, event_id, origin,
                 )
                 defer.returnValue(None)
 
@@ -204,8 +236,8 @@ class FederationHandler(BaseHandler):
             )
 
             logger.debug(
-                "_handle_new_pdu min_depth for %s: %d",
-                pdu.room_id, min_depth
+                "[%s %s] min_depth: %d",
+                room_id, event_id, min_depth,
             )
 
             prevs = {e_id for e_id, _ in pdu.prev_events}
@@ -218,17 +250,18 @@ class FederationHandler(BaseHandler):
                 # send to the clients.
                 pdu.internal_metadata.outlier = True
             elif min_depth and pdu.depth > min_depth:
-                if get_missing and prevs - seen:
+                missing_prevs = prevs - seen
+                if sent_to_us_directly and missing_prevs:
                     # If we're missing stuff, ensure we only fetch stuff one
                     # at a time.
                     logger.info(
-                        "Acquiring lock for room %r to fetch %d missing events: %r...",
-                        pdu.room_id, len(prevs - seen), list(prevs - seen)[:5],
+                        "[%s %s] Acquiring room lock to fetch %d missing prev_events: %s",
+                        room_id, event_id, len(missing_prevs), shortstr(missing_prevs),
                     )
                     with (yield self._room_pdu_linearizer.queue(pdu.room_id)):
                         logger.info(
-                            "Acquired lock for room %r to fetch %d missing events",
-                            pdu.room_id, len(prevs - seen),
+                            "[%s %s] Acquired room lock to fetch %d missing prev_events",
+                            room_id, event_id, len(missing_prevs),
                         )
 
                         yield self._get_missing_events_for_pdu(
@@ -241,49 +274,91 @@ class FederationHandler(BaseHandler):
 
                         if not prevs - seen:
                             logger.info(
-                                "Found all missing prev events for %s", pdu.event_id
+                                "[%s %s] Found all missing prev_events",
+                                room_id, event_id,
                             )
-                elif prevs - seen:
+                elif missing_prevs:
                     logger.info(
-                        "Not fetching %d missing events for room %r,event %s: %r...",
-                        len(prevs - seen), pdu.room_id, pdu.event_id,
-                        list(prevs - seen)[:5],
+                        "[%s %s] Not recursively fetching %d missing prev_events: %s",
+                        room_id, event_id, len(missing_prevs), shortstr(missing_prevs),
+                    )
+
+            if prevs - seen:
+                # We've still not been able to get all of the prev_events for this event.
+                #
+                # In this case, we need to fall back to asking another server in the
+                # federation for the state at this event. That's ok provided we then
+                # resolve the state against other bits of the DAG before using it (which
+                # will ensure that you can't just take over a room by sending an event,
+                # withholding its prev_events, and declaring yourself to be an admin in
+                # the subsequent state request).
+                #
+                # Now, if we're pulling this event as a missing prev_event, then clearly
+                # this event is not going to become the only forward-extremity and we are
+                # guaranteed to resolve its state against our existing forward
+                # extremities, so that should be fine.
+                #
+                # On the other hand, if this event was pushed to us, it is possible for
+                # it to become the only forward-extremity in the room, and we would then
+                # trust its state to be the state for the whole room. This is very bad.
+                # Further, if the event was pushed to us, there is no excuse for us not to
+                # have all the prev_events. We therefore reject any such events.
+                #
+                # XXX this really feels like it could/should be merged with the above,
+                # but there is an interaction with min_depth that I'm not really
+                # following.
+
+                if sent_to_us_directly:
+                    logger.warn(
+                        "[%s %s] Failed to fetch %d prev events: rejecting",
+                        room_id, event_id, len(prevs - seen),
+                    )
+                    raise FederationError(
+                        "ERROR",
+                        403,
+                        (
+                            "Your server isn't divulging details about prev_events "
+                            "referenced in this event."
+                        ),
+                        affected=pdu.event_id,
                     )
 
-            if sent_to_us_directly and prevs - seen:
-                # If they have sent it to us directly, and the server
-                # isn't telling us about the auth events that it's
-                # made a message referencing, we explode
-                raise FederationError(
-                    "ERROR",
-                    403,
-                    (
-                        "Your server isn't divulging details about prev_events "
-                        "referenced in this event."
-                    ),
-                    affected=pdu.event_id,
-                )
-            elif prevs - seen:
                 # Calculate the state of the previous events, and
                 # de-conflict them to find the current state.
                 state_groups = []
                 auth_chains = set()
                 try:
                     # Get the state of the events we know about
-                    ours = yield self.store.get_state_groups(pdu.room_id, list(seen))
+                    ours = yield self.store.get_state_groups(room_id, list(seen))
                     state_groups.append(ours)
 
                     # Ask the remote server for the states we don't
                     # know about
                     for p in prevs - seen:
-                        state, got_auth_chain = (
-                            yield self.federation_client.get_state_for_room(
-                                origin, pdu.room_id, p
-                            )
+                        logger.info(
+                            "[%s %s] Requesting state at missing prev_event %s",
+                            room_id, event_id, p,
                         )
-                        auth_chains.update(got_auth_chain)
-                        state_group = {(x.type, x.state_key): x.event_id for x in state}
-                        state_groups.append(state_group)
+
+                        with logcontext.nested_logging_context(p):
+                            # note that if any of the missing prevs share missing state or
+                            # auth events, the requests to fetch those events are deduped
+                            # by the get_pdu_cache in federation_client.
+                            remote_state, got_auth_chain = (
+                                yield self.federation_client.get_state_for_room(
+                                    origin, room_id, p,
+                                )
+                            )
+
+                            # XXX hrm I'm not convinced that duplicate events will compare
+                            # for equality, so I'm not sure this does what the author
+                            # hoped.
+                            auth_chains.update(got_auth_chain)
+
+                            state_group = {
+                                (x.type, x.state_key): x.event_id for x in remote_state
+                            }
+                            state_groups.append(state_group)
 
                     # Resolve any conflicting state
                     def fetch(ev_ids):
@@ -291,19 +366,24 @@ class FederationHandler(BaseHandler):
                             ev_ids, get_prev_content=False, check_redacted=False
                         )
 
-                    room_version = yield self.store.get_room_version(pdu.room_id)
+                    room_version = yield self.store.get_room_version(room_id)
                     state_map = yield resolve_events_with_factory(
-                        room_version, state_groups, {pdu.event_id: pdu}, fetch
+                        room_version, state_groups, {event_id: pdu}, fetch
                     )
 
                     state = (yield self.store.get_events(state_map.values())).values()
                     auth_chain = list(auth_chains)
                 except Exception:
+                    logger.warn(
+                        "[%s %s] Error attempting to resolve state at missing "
+                        "prev_events",
+                        room_id, event_id, exc_info=True,
+                    )
                     raise FederationError(
                         "ERROR",
                         403,
                         "We can't get valid state history.",
-                        affected=pdu.event_id,
+                        affected=event_id,
                     )
 
         yield self._process_received_pdu(
@@ -322,15 +402,16 @@ class FederationHandler(BaseHandler):
             prevs (set(str)): List of event ids which we are missing
             min_depth (int): Minimum depth of events to return.
         """
-        # We recalculate seen, since it may have changed.
+
+        room_id = pdu.room_id
+        event_id = pdu.event_id
+
         seen = yield self.store.have_seen_events(prevs)
 
         if not prevs - seen:
             return
 
-        latest = yield self.store.get_latest_event_ids_in_room(
-            pdu.room_id
-        )
+        latest = yield self.store.get_latest_event_ids_in_room(room_id)
 
         # We add the prev events that we have seen to the latest
         # list to ensure the remote server doesn't give them to us
@@ -338,8 +419,8 @@ class FederationHandler(BaseHandler):
         latest |= seen
 
         logger.info(
-            "Missing %d events for room %r pdu %s: %r...",
-            len(prevs - seen), pdu.room_id, pdu.event_id, list(prevs - seen)[:5]
+            "[%s %s]: Requesting %d prev_events: %s",
+            room_id, event_id, len(prevs - seen), shortstr(prevs - seen)
         )
 
         # XXX: we set timeout to 10s to help workaround
@@ -392,7 +473,7 @@ class FederationHandler(BaseHandler):
 
         missing_events = yield self.federation_client.get_missing_events(
             origin,
-            pdu.room_id,
+            room_id,
             earliest_events_ids=list(latest),
             latest_events=[pdu],
             limit=10,
@@ -401,37 +482,47 @@ class FederationHandler(BaseHandler):
         )
 
         logger.info(
-            "Got %d events: %r...",
-            len(missing_events), [e.event_id for e in missing_events[:5]]
+            "[%s %s]: Got %d prev_events: %s",
+            room_id, event_id, len(missing_events), shortstr(missing_events),
         )
 
         # We want to sort these by depth so we process them and
         # tell clients about them in order.
         missing_events.sort(key=lambda x: x.depth)
 
-        for e in missing_events:
-            logger.info("Handling found event %s", e.event_id)
-            try:
-                yield self.on_receive_pdu(
-                    origin,
-                    e,
-                    get_missing=False
-                )
-            except FederationError as e:
-                if e.code == 403:
-                    logger.warn("Event %s failed history check.")
-                else:
-                    raise
+        for ev in missing_events:
+            logger.info(
+                "[%s %s] Handling received prev_event %s",
+                room_id, event_id, ev.event_id,
+            )
+            with logcontext.nested_logging_context(ev.event_id):
+                try:
+                    yield self.on_receive_pdu(
+                        origin,
+                        ev,
+                        sent_to_us_directly=False,
+                    )
+                except FederationError as e:
+                    if e.code == 403:
+                        logger.warn(
+                            "[%s %s] Received prev_event %s failed history check.",
+                            room_id, event_id, ev.event_id,
+                        )
+                    else:
+                        raise
 
-    @log_function
     @defer.inlineCallbacks
-    def _process_received_pdu(self, origin, pdu, state, auth_chain):
+    def _process_received_pdu(self, origin, event, state, auth_chain):
         """ Called when we have a new pdu. We need to do auth checks and put it
         through the StateHandler.
         """
-        event = pdu
+        room_id = event.room_id
+        event_id = event.event_id
 
-        logger.debug("Processing event: %s", event)
+        logger.debug(
+            "[%s %s] Processing event: %s",
+            room_id, event_id, event,
+        )
 
         # FIXME (erikj): Awful hack to make the case where we are not currently
         # in the room work
@@ -440,15 +531,16 @@ class FederationHandler(BaseHandler):
         # event.
         if state and auth_chain and not event.internal_metadata.is_outlier():
             is_in_room = yield self.auth.check_host_in_room(
-                event.room_id,
+                room_id,
                 self.server_name
             )
         else:
             is_in_room = True
+
         if not is_in_room:
             logger.info(
-                "Got event for room we're not in: %r %r",
-                event.room_id, event.event_id
+                "[%s %s] Got event for room we're not in",
+                room_id, event_id,
             )
 
             try:
@@ -460,7 +552,7 @@ class FederationHandler(BaseHandler):
                     "ERROR",
                     e.code,
                     e.msg,
-                    affected=event.event_id,
+                    affected=event_id,
                 )
 
         else:
@@ -493,6 +585,10 @@ class FederationHandler(BaseHandler):
                     })
                     seen_ids.add(e.event_id)
 
+                logger.info(
+                    "[%s %s] persisting newly-received auth/state events %s",
+                    room_id, event_id, [e["event"].event_id for e in event_infos]
+                )
                 yield self._handle_new_events(origin, event_infos)
 
             try:
@@ -509,12 +605,12 @@ class FederationHandler(BaseHandler):
                     affected=event.event_id,
                 )
 
-        room = yield self.store.get_room(event.room_id)
+        room = yield self.store.get_room(room_id)
 
         if not room:
             try:
                 yield self.store.store_room(
-                    room_id=event.room_id,
+                    room_id=room_id,
                     room_creator_user_id="",
                     is_public=False,
                 )
@@ -542,7 +638,7 @@ class FederationHandler(BaseHandler):
 
                 if newly_joined:
                     user = UserID.from_string(event.state_key)
-                    yield self.user_joined_room(user, event.room_id)
+                    yield self.user_joined_room(user, room_id)
 
     @log_function
     @defer.inlineCallbacks
@@ -1056,7 +1152,8 @@ class FederationHandler(BaseHandler):
             try:
                 logger.info("Processing queued PDU %s which was received "
                             "while we were joining %s", p.event_id, p.room_id)
-                yield self.on_receive_pdu(origin, p)
+                with logcontext.nested_logging_context(p.event_id):
+                    yield self.on_receive_pdu(origin, p, sent_to_us_directly=True)
             except Exception as e:
                 logger.warn(
                     "Error handling queued PDU %s from %s: %s",
@@ -1459,12 +1556,10 @@ class FederationHandler(BaseHandler):
         else:
             defer.returnValue(None)
 
-    @log_function
     def get_min_depth_for_context(self, context):
         return self.store.get_min_depth(context)
 
     @defer.inlineCallbacks
-    @log_function
     def _handle_new_event(self, origin, event, state=None, auth_events=None,
                           backfilled=False):
         context = yield self._prep_event(
@@ -1504,15 +1599,22 @@ class FederationHandler(BaseHandler):
 
         Notifies about the events where appropriate.
         """
-        contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
-            [
-                logcontext.run_in_background(
-                    self._prep_event,
+
+        @defer.inlineCallbacks
+        def prep(ev_info):
+            event = ev_info["event"]
+            with logcontext.nested_logging_context(suffix=event.event_id):
+                res = yield self._prep_event(
                     origin,
-                    ev_info["event"],
+                    event,
                     state=ev_info.get("state"),
                     auth_events=ev_info.get("auth_events"),
                 )
+            defer.returnValue(res)
+
+        contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
+            [
+                logcontext.run_in_background(prep, ev_info)
                 for ev_info in event_infos
             ], consumeErrors=True,
         ))
@@ -1664,8 +1766,8 @@ class FederationHandler(BaseHandler):
             )
         except AuthError as e:
             logger.warn(
-                "Rejecting %s because %s",
-                event.event_id, e.msg
+                "[%s %s] Rejecting: %s",
+                event.room_id, event.event_id, e.msg
             )
 
             context.rejected = RejectedReason.AUTH_ERROR
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 75b8b7ce6a..f284d5a385 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -278,7 +278,7 @@ class BaseProfileHandler(BaseHandler):
             except Exception as e:
                 logger.warn(
                     "Failed to update join event for room %s - %s",
-                    room_id, str(e.message)
+                    room_id, str(e)
                 )
 
 
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 9bca4e7067..c7d69d9d80 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -722,6 +722,13 @@ class SyncHandler(object):
             }
 
             if full_state:
+                if lazy_load_members:
+                    # always make sure we LL ourselves so we know we're in the room
+                    # (if we are) to fix https://github.com/vector-im/riot-web/issues/7209
+                    # We only need apply this on full state syncs given we disabled
+                    # LL for incr syncs in #3840.
+                    types.append((EventTypes.Member, sync_config.user.to_string()))
+
                 if batch:
                     current_state_ids = yield self.store.get_state_ids_for_event(
                         batch.events[-1].event_id, types=types,
@@ -790,7 +797,7 @@ class SyncHandler(object):
             else:
                 state_ids = {}
                 if lazy_load_members:
-                    if types:
+                    if types and batch.events:
                         # We're returning an incremental sync, with no
                         # "gap" since the previous sync, so normally there would be
                         # no state to return.
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 2d5c23e673..b4b25cab19 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -84,10 +84,21 @@ def wrap_json_request_handler(h):
             logger.info(
                 "%s SynapseError: %s - %s", request, code, e.msg
             )
-            respond_with_json(
-                request, code, e.error_dict(), send_cors=True,
-                pretty_print=_request_user_agent_is_curl(request),
-            )
+
+            # Only respond with an error response if we haven't already started
+            # writing, otherwise lets just kill the connection
+            if request.startedWriting:
+                if request.transport:
+                    try:
+                        request.transport.abortConnection()
+                    except Exception:
+                        # abortConnection throws if the connection is already closed
+                        pass
+            else:
+                respond_with_json(
+                    request, code, e.error_dict(), send_cors=True,
+                    pretty_print=_request_user_agent_is_curl(request),
+                )
 
         except Exception:
             # failure.Failure() fishes the original Failure out
@@ -100,16 +111,26 @@ def wrap_json_request_handler(h):
                 request,
                 f.getTraceback().rstrip(),
             )
-            respond_with_json(
-                request,
-                500,
-                {
-                    "error": "Internal server error",
-                    "errcode": Codes.UNKNOWN,
-                },
-                send_cors=True,
-                pretty_print=_request_user_agent_is_curl(request),
-            )
+            # Only respond with an error response if we haven't already started
+            # writing, otherwise lets just kill the connection
+            if request.startedWriting:
+                if request.transport:
+                    try:
+                        request.transport.abortConnection()
+                    except Exception:
+                        # abortConnection throws if the connection is already closed
+                        pass
+            else:
+                respond_with_json(
+                    request,
+                    500,
+                    {
+                        "error": "Internal server error",
+                        "errcode": Codes.UNKNOWN,
+                    },
+                    send_cors=True,
+                    pretty_print=_request_user_agent_is_curl(request),
+                )
 
     return wrap_async_request_handler(wrapped_request_handler)
 
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 9579e8cd0d..50be2de3bb 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -75,9 +75,9 @@ class SynapseRequest(Request):
         return '<%s at 0x%x method=%r uri=%r clientproto=%r site=%r>' % (
             self.__class__.__name__,
             id(self),
-            self.method,
+            self.method.decode('ascii', errors='replace'),
             self.get_redacted_uri(),
-            self.clientproto,
+            self.clientproto.decode('ascii', errors='replace'),
             self.site.site_tag,
         )
 
@@ -308,7 +308,7 @@ class XForwardedForRequest(SynapseRequest):
             C{b"-"}.
         """
         return self.requestHeaders.getRawHeaders(
-            b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip()
+            b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip().decode('ascii')
 
 
 class SynapseRequestFactory(object):
diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py
index 167167be0a..173908299c 100644
--- a/synapse/metrics/background_process_metrics.py
+++ b/synapse/metrics/background_process_metrics.py
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import logging
 import threading
 
 import six
@@ -23,6 +24,9 @@ from twisted.internet import defer
 
 from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
 
+logger = logging.getLogger(__name__)
+
+
 _background_process_start_count = Counter(
     "synapse_background_process_start_count",
     "Number of background processes started",
@@ -191,6 +195,8 @@ def run_as_background_process(desc, func, *args, **kwargs):
 
             try:
                 yield func(*args, **kwargs)
+            except Exception:
+                logger.exception("Background process '%s' threw an exception", desc)
             finally:
                 proc.update_metrics()
 
diff --git a/synapse/notifier.py b/synapse/notifier.py
index f1d92c1395..340b16ce25 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -24,9 +24,10 @@ from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError
 from synapse.handlers.presence import format_user_presence_state
 from synapse.metrics import LaterGauge
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.types import StreamToken
 from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
-from synapse.util.logcontext import PreserveLoggingContext, run_in_background
+from synapse.util.logcontext import PreserveLoggingContext
 from synapse.util.logutils import log_function
 from synapse.util.metrics import Measure
 from synapse.visibility import filter_events_for_client
@@ -248,7 +249,10 @@ class Notifier(object):
     def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
         """Notify any user streams that are interested in this room event"""
         # poke any interested application service.
-        run_in_background(self._notify_app_services, room_stream_id)
+        run_as_background_process(
+            "notify_app_services",
+            self._notify_app_services, room_stream_id,
+        )
 
         if self.federation_sender:
             self.federation_sender.notify_new_events(room_stream_id)
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index b78ce10396..1a5a10d974 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -441,7 +441,7 @@ class Mailer(object):
 
     def make_room_link(self, room_id):
         if self.hs.config.email_riot_base_url:
-            base_url = self.hs.config.email_riot_base_url
+            base_url = "%s/#/room" % (self.hs.config.email_riot_base_url)
         elif self.app_name == "Vector":
             # need /beta for Universal Links to work on iOS
             base_url = "https://vector.im/beta/#/room"
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 0d8de600cf..0f339a0320 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -33,32 +33,35 @@ logger = logging.getLogger(__name__)
 # [2] https://setuptools.readthedocs.io/en/latest/setuptools.html#declaring-dependencies
 REQUIREMENTS = {
     "jsonschema>=2.5.1": ["jsonschema>=2.5.1"],
-    "frozendict>=0.4": ["frozendict"],
+    "frozendict>=1": ["frozendict"],
     "unpaddedbase64>=1.1.0": ["unpaddedbase64>=1.1.0"],
     "canonicaljson>=1.1.3": ["canonicaljson>=1.1.3"],
     "signedjson>=1.0.0": ["signedjson>=1.0.0"],
     "pynacl>=1.2.1": ["nacl>=1.2.1", "nacl.bindings"],
-    "service_identity>=1.0.0": ["service_identity>=1.0.0"],
+    "service_identity>=16.0.0": ["service_identity>=16.0.0"],
     "Twisted>=17.1.0": ["twisted>=17.1.0"],
     "treq>=15.1": ["treq>=15.1"],
 
     # Twisted has required pyopenssl 16.0 since about Twisted 16.6.
     "pyopenssl>=16.0.0": ["OpenSSL>=16.0.0"],
 
-    "pyyaml": ["yaml"],
-    "pyasn1": ["pyasn1"],
-    "daemonize": ["daemonize"],
-    "bcrypt": ["bcrypt>=3.1.0"],
-    "pillow": ["PIL"],
-    "pydenticon": ["pydenticon"],
-    "sortedcontainers": ["sortedcontainers"],
-    "pysaml2>=3.0.0": ["saml2>=3.0.0"],
-    "pymacaroons-pynacl": ["pymacaroons"],
+    "pyyaml>=3.11": ["yaml"],
+    "pyasn1>=0.1.9": ["pyasn1"],
+    "pyasn1-modules>=0.0.7": ["pyasn1_modules"],
+    "daemonize>=2.3.1": ["daemonize"],
+    "bcrypt>=3.1.0": ["bcrypt>=3.1.0"],
+    "pillow>=3.1.2": ["PIL"],
+    "pydenticon>=0.2": ["pydenticon"],
+    "sortedcontainers>=1.4.4": ["sortedcontainers"],
+    "pysaml2>=3.0.0": ["saml2"],
+    "pymacaroons-pynacl>=0.9.3": ["pymacaroons"],
     "msgpack-python>=0.3.0": ["msgpack"],
     "phonenumbers>=8.2.0": ["phonenumbers"],
-    "six": ["six"],
-    "prometheus_client": ["prometheus_client"],
-    "attrs": ["attr"],
+    "six>=1.10": ["six"],
+    "prometheus_client>=0.0.18": ["prometheus_client"],
+
+    # we use attr.s(slots), which arrived in 16.0.0
+    "attrs>=16.0.0": ["attr>=16.0.0"],
     "netaddr>=0.7.18": ["netaddr"],
 }
 
diff --git a/synapse/rest/media/v1/download_resource.py b/synapse/rest/media/v1/download_resource.py
index ca90964d1d..f911b120b1 100644
--- a/synapse/rest/media/v1/download_resource.py
+++ b/synapse/rest/media/v1/download_resource.py
@@ -52,6 +52,7 @@ class DownloadResource(Resource):
             b" script-src 'none';"
             b" plugin-types application/pdf;"
             b" style-src 'unsafe-inline';"
+            b" media-src 'self';"
             b" object-src 'self';"
         )
         server_name, media_id, name = parse_media_id(request)
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index cad2dec33a..af01040a38 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -79,7 +79,6 @@ class PreviewUrlResource(Resource):
             # don't spider URLs more often than once an hour
             expiry_ms=60 * 60 * 1000,
         )
-        self._cache.start()
 
         self._cleaner_loop = self.clock.looping_call(
             self._start_expire_url_cache_data, 10 * 1000,
diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index d7ae22a661..b22495c1f9 100644
--- a/synapse/state/__init__.py
+++ b/synapse/state/__init__.py
@@ -95,10 +95,6 @@ class StateHandler(object):
         self.hs = hs
         self._state_resolution_handler = hs.get_state_resolution_handler()
 
-    def start_caching(self):
-        # TODO: remove this shim
-        self._state_resolution_handler.start_caching()
-
     @defer.inlineCallbacks
     def get_current_state(self, room_id, event_type=None, state_key="",
                           latest_event_ids=None):
@@ -428,9 +424,6 @@ class StateResolutionHandler(object):
         self._state_cache = None
         self.resolve_linearizer = Linearizer(name="state_resolve_lock")
 
-    def start_caching(self):
-        logger.debug("start_caching")
-
         self._state_cache = ExpiringCache(
             cache_name="state_cache",
             clock=self.clock,
@@ -440,8 +433,6 @@ class StateResolutionHandler(object):
             reset_expiry_on_get=True,
         )
 
-        self._state_cache.start()
-
     @defer.inlineCallbacks
     @log_function
     def resolve_state_groups(
diff --git a/synapse/state/v1.py b/synapse/state/v1.py
index c95477d318..7a7157b352 100644
--- a/synapse/state/v1.py
+++ b/synapse/state/v1.py
@@ -65,10 +65,15 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory):
         for event_ids in itervalues(conflicted_state)
         for event_id in event_ids
     )
+    needed_event_count = len(needed_events)
     if event_map is not None:
         needed_events -= set(iterkeys(event_map))
 
-    logger.info("Asking for %d conflicted events", len(needed_events))
+    logger.info(
+        "Asking for %d/%d conflicted events",
+        len(needed_events),
+        needed_event_count,
+    )
 
     # dict[str, FrozenEvent]: a map from state event id to event. Only includes
     # the state events which are in conflict (and those in event_map)
@@ -85,11 +90,16 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory):
     )
 
     new_needed_events = set(itervalues(auth_events))
+    new_needed_event_count = len(new_needed_events)
     new_needed_events -= needed_events
     if event_map is not None:
         new_needed_events -= set(iterkeys(event_map))
 
-    logger.info("Asking for %d auth events", len(new_needed_events))
+    logger.info(
+        "Asking for %d/%d auth events",
+        len(new_needed_events),
+        new_needed_event_count,
+    )
 
     state_map_new = yield state_map_factory(new_needed_events)
     state_map.update(state_map_new)
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index 8fc678fa67..9ad17b7c25 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -119,21 +119,25 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
         for entry in iteritems(to_update):
             (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry
 
-            self._simple_upsert_txn(
-                txn,
-                table="user_ips",
-                keyvalues={
-                    "user_id": user_id,
-                    "access_token": access_token,
-                    "ip": ip,
-                    "user_agent": user_agent,
-                    "device_id": device_id,
-                },
-                values={
-                    "last_seen": last_seen,
-                },
-                lock=False,
-            )
+            try:
+                self._simple_upsert_txn(
+                    txn,
+                    table="user_ips",
+                    keyvalues={
+                        "user_id": user_id,
+                        "access_token": access_token,
+                        "ip": ip,
+                        "user_agent": user_agent,
+                        "device_id": device_id,
+                    },
+                    values={
+                        "last_seen": last_seen,
+                    },
+                    lock=False,
+                )
+            except Exception as e:
+                # Failed to upsert, log and continue
+                logger.error("Failed to insert client IP %r: %r", entry, e)
 
     @defer.inlineCallbacks
     def get_last_client_ip_by_device(self, user_id, device_id):
diff --git a/synapse/storage/monthly_active_users.py b/synapse/storage/monthly_active_users.py
index 59580949f1..0fe8c8e24c 100644
--- a/synapse/storage/monthly_active_users.py
+++ b/synapse/storage/monthly_active_users.py
@@ -172,6 +172,10 @@ class MonthlyActiveUsersStore(SQLBaseStore):
             Deferred[bool]: True if a new entry was created, False if an
                 existing one was updated.
         """
+        # Am consciously deciding to lock the table on the basis that is ought
+        # never be a big table and alternative approaches (batching multiple
+        # upserts into a single txn) introduced a lot of extra complexity.
+        # See https://github.com/matrix-org/synapse/issues/3854 for more
         is_insert = yield self._simple_upsert(
             desc="upsert_monthly_active_user",
             table="monthly_active_users",
@@ -181,7 +185,6 @@ class MonthlyActiveUsersStore(SQLBaseStore):
             values={
                 "timestamp": int(self._clock.time_msec()),
             },
-            lock=False,
         )
         if is_insert:
             self.user_last_seen_monthly_active.invalidate((user_id,))
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 4b971efdba..3f4cbd61c4 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -255,7 +255,17 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
         )
 
     @defer.inlineCallbacks
-    def get_state_groups_ids(self, room_id, event_ids):
+    def get_state_groups_ids(self, _room_id, event_ids):
+        """Get the event IDs of all the state for the state groups for the given events
+
+        Args:
+            _room_id (str): id of the room for these events
+            event_ids (iterable[str]): ids of the events
+
+        Returns:
+            Deferred[dict[int, dict[tuple[str, str], str]]]:
+                dict of state_group_id -> (dict of (type, state_key) -> event id)
+        """
         if not event_ids:
             defer.returnValue({})
 
@@ -270,7 +280,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
 
     @defer.inlineCallbacks
     def get_state_ids_for_group(self, state_group):
-        """Get the state IDs for the given state group
+        """Get the event IDs of all the state in the given state group
 
         Args:
             state_group (int)
@@ -286,7 +296,9 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
     def get_state_groups(self, room_id, event_ids):
         """ Get the state groups for the given list of event_ids
 
-        The return value is a dict mapping group names to lists of events.
+        Returns:
+            Deferred[dict[int, list[EventBase]]]:
+                dict of state_group_id -> list of state events.
         """
         if not event_ids:
             defer.returnValue({})
@@ -324,7 +336,9 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
                 member events (if True), or to exclude member events (if False)
 
         Returns:
-            dictionary state_group -> (dict of (type, state_key) -> event id)
+        Returns:
+            Deferred[dict[int, dict[tuple[str, str], str]]]:
+                dict of state_group_id -> (dict of (type, state_key) -> event id)
         """
         results = {}
 
@@ -732,8 +746,8 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
                 If None, `types` filtering is applied to all events.
 
         Returns:
-            Deferred[dict[int, dict[(type, state_key), EventBase]]]
-                a dictionary mapping from state group to state dictionary.
+            Deferred[dict[int, dict[tuple[str, str], str]]]:
+                dict of state_group_id -> (dict of (type, state_key) -> event id)
         """
         if types is not None:
             non_member_types = [t for t in types if t[0] != EventTypes.Member]
@@ -788,8 +802,8 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
                 If None, `types` filtering is applied to all events.
 
         Returns:
-            Deferred[dict[int, dict[(type, state_key), EventBase]]]
-                a dictionary mapping from state group to state dictionary.
+            Deferred[dict[int, dict[tuple[str, str], str]]]:
+                dict of state_group_id -> (dict of (type, state_key) -> event id)
         """
         if types:
             types = frozenset(types)
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index 0c42bd3322..baf0379a68 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -23,7 +23,6 @@ from canonicaljson import encode_canonical_json
 from twisted.internet import defer
 
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.util.caches.descriptors import cached
 
 from ._base import SQLBaseStore, db_to_json
 
@@ -156,7 +155,6 @@ class TransactionStore(SQLBaseStore):
         """
         pass
 
-    @cached(max_entries=10000)
     def get_destination_retry_timings(self, destination):
         """Gets the current retry timings (if any) for a given destination.
 
@@ -198,8 +196,6 @@ class TransactionStore(SQLBaseStore):
             retry_interval (int) - how long until next retry in ms
         """
 
-        # XXX: we could chose to not bother persisting this if our cache thinks
-        # this is a NOOP
         return self.runInteraction(
             "set_destination_retry_timings",
             self._set_destination_retry_timings,
@@ -212,10 +208,6 @@ class TransactionStore(SQLBaseStore):
                                        retry_last_ts, retry_interval):
         self.database_engine.lock_table(txn, "destinations")
 
-        self._invalidate_cache_and_stream(
-            txn, self.get_destination_retry_timings, (destination,)
-        )
-
         # We need to be careful here as the data may have changed from under us
         # due to a worker setting the timings.
 
diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py
index 7b065b195e..f37d5bec08 100644
--- a/synapse/util/caches/__init__.py
+++ b/synapse/util/caches/__init__.py
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import logging
 import os
 
 import six
@@ -20,6 +21,8 @@ from six.moves import intern
 
 from prometheus_client.core import REGISTRY, Gauge, GaugeMetricFamily
 
+logger = logging.getLogger(__name__)
+
 CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5))
 
 
@@ -76,16 +79,20 @@ def register_cache(cache_type, cache_name, cache):
             return []
 
         def collect(self):
-            if cache_type == "response_cache":
-                response_cache_size.labels(cache_name).set(len(cache))
-                response_cache_hits.labels(cache_name).set(self.hits)
-                response_cache_evicted.labels(cache_name).set(self.evicted_size)
-                response_cache_total.labels(cache_name).set(self.hits + self.misses)
-            else:
-                cache_size.labels(cache_name).set(len(cache))
-                cache_hits.labels(cache_name).set(self.hits)
-                cache_evicted.labels(cache_name).set(self.evicted_size)
-                cache_total.labels(cache_name).set(self.hits + self.misses)
+            try:
+                if cache_type == "response_cache":
+                    response_cache_size.labels(cache_name).set(len(cache))
+                    response_cache_hits.labels(cache_name).set(self.hits)
+                    response_cache_evicted.labels(cache_name).set(self.evicted_size)
+                    response_cache_total.labels(cache_name).set(self.hits + self.misses)
+                else:
+                    cache_size.labels(cache_name).set(len(cache))
+                    cache_hits.labels(cache_name).set(self.hits)
+                    cache_evicted.labels(cache_name).set(self.evicted_size)
+                    cache_total.labels(cache_name).set(self.hits + self.misses)
+            except Exception as e:
+                logger.warn("Error calculating metrics for %s: %s", cache_name, e)
+                raise
 
             yield GaugeMetricFamily("__unused", "")
 
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index ce85b2ae11..9af4ec4aa8 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -16,6 +16,8 @@
 import logging
 from collections import OrderedDict
 
+from six import itervalues
+
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.util.caches import register_cache
 
@@ -54,11 +56,8 @@ class ExpiringCache(object):
 
         self.iterable = iterable
 
-        self._size_estimate = 0
-
         self.metrics = register_cache("expiring", cache_name, self)
 
-    def start(self):
         if not self._expiry_ms:
             # Don't bother starting the loop if things never expire
             return
@@ -75,16 +74,11 @@ class ExpiringCache(object):
         now = self._clock.time_msec()
         self._cache[key] = _CacheEntry(now, value)
 
-        if self.iterable:
-            self._size_estimate += len(value)
-
         # Evict if there are now too many items
         while self._max_len and len(self) > self._max_len:
             _key, value = self._cache.popitem(last=False)
             if self.iterable:
-                removed_len = len(value.value)
-                self.metrics.inc_evictions(removed_len)
-                self._size_estimate -= removed_len
+                self.metrics.inc_evictions(len(value.value))
             else:
                 self.metrics.inc_evictions()
 
@@ -135,7 +129,9 @@ class ExpiringCache(object):
         for k in keys_to_delete:
             value = self._cache.pop(k)
             if self.iterable:
-                self._size_estimate -= len(value.value)
+                self.metrics.inc_evictions(len(value.value))
+            else:
+                self.metrics.inc_evictions()
 
         logger.debug(
             "[%s] _prune_cache before: %d, after len: %d",
@@ -144,7 +140,7 @@ class ExpiringCache(object):
 
     def __len__(self):
         if self.iterable:
-            return self._size_estimate
+            return sum(len(entry.value) for entry in itervalues(self._cache))
         else:
             return len(self._cache)
 
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index a0c2d37610..89224b26cc 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -200,7 +200,7 @@ class LoggingContext(object):
 
     sentinel = Sentinel()
 
-    def __init__(self, name=None, parent_context=None):
+    def __init__(self, name=None, parent_context=None, request=None):
         self.previous_context = LoggingContext.current_context()
         self.name = name
 
@@ -218,6 +218,13 @@ class LoggingContext(object):
 
         self.parent_context = parent_context
 
+        if self.parent_context is not None:
+            self.parent_context.copy_to(self)
+
+        if request is not None:
+            # the request param overrides the request from the parent context
+            self.request = request
+
     def __str__(self):
         return "%s@%x" % (self.name, id(self))
 
@@ -256,9 +263,6 @@ class LoggingContext(object):
             )
         self.alive = True
 
-        if self.parent_context is not None:
-            self.parent_context.copy_to(self)
-
         return self
 
     def __exit__(self, type, value, traceback):
@@ -439,6 +443,35 @@ class PreserveLoggingContext(object):
                 )
 
 
+def nested_logging_context(suffix, parent_context=None):
+    """Creates a new logging context as a child of another.
+
+    The nested logging context will have a 'request' made up of the parent context's
+    request, plus the given suffix.
+
+    CPU/db usage stats will be added to the parent context's on exit.
+
+    Normal usage looks like:
+
+        with nested_logging_context(suffix):
+            # ... do stuff
+
+    Args:
+        suffix (str): suffix to add to the parent context's 'request'.
+        parent_context (LoggingContext|None): parent context. Will use the current context
+            if None.
+
+    Returns:
+        LoggingContext: new logging context.
+    """
+    if parent_context is None:
+        parent_context = LoggingContext.current_context()
+    return LoggingContext(
+        parent_context=parent_context,
+        request=parent_context.request + "-" + suffix,
+    )
+
+
 def preserve_fn(f):
     """Function decorator which wraps the function with run_in_background"""
     def g(*args, **kwargs):
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 8a3a06fd74..26cce7d197 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -188,7 +188,7 @@ class RetryDestinationLimiter(object):
             else:
                 self.retry_interval = self.min_retry_interval
 
-            logger.debug(
+            logger.info(
                 "Connection to %s was unsuccessful (%s(%s)); backoff now %i",
                 self.destination, exc_type, exc_val, self.retry_interval
             )