summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--docs/log_contexts.rst447
-rw-r--r--synapse/app/appservice.py8
-rw-r--r--synapse/app/client_reader.py8
-rw-r--r--synapse/app/federation_reader.py8
-rw-r--r--synapse/app/federation_sender.py8
-rwxr-xr-xsynapse/app/homeserver.py9
-rw-r--r--synapse/app/media_repository.py8
-rw-r--r--synapse/app/pusher.py9
-rw-r--r--synapse/app/synchrotron.py9
-rw-r--r--synapse/crypto/keyring.py9
-rw-r--r--synapse/events/snapshot.py26
-rw-r--r--synapse/handlers/federation.py15
-rw-r--r--synapse/state.py11
-rw-r--r--synapse/storage/background_updates.py15
-rw-r--r--synapse/storage/event_federation.py24
-rw-r--r--synapse/storage/events.py281
-rw-r--r--synapse/storage/state.py10
-rw-r--r--synapse/util/logcontext.py71
-rw-r--r--tests/util/test_log_context.py61
19 files changed, 882 insertions, 155 deletions
diff --git a/docs/log_contexts.rst b/docs/log_contexts.rst
index 0046e171be..8d04a973de 100644
--- a/docs/log_contexts.rst
+++ b/docs/log_contexts.rst
@@ -1,10 +1,441 @@
-What do I do about "Unexpected logging context" debug log-lines everywhere?
+Log contexts
+============
 
-<Mjark> The logging context lives in thread local storage
-<Mjark> Sometimes it gets out of sync with what it should actually be, usually because something scheduled something to run on the reactor without preserving the logging context. 
-<Matthew> what is the impact of it getting out of sync? and how and when should we preserve log context?
-<Mjark> The impact is that some of the CPU and database metrics will be under-reported, and some log lines will be mis-attributed.
-<Mjark> It should happen auto-magically in all the APIs that do IO or otherwise defer to the reactor.
-<Erik> Mjark: the other place is if we branch, e.g. using defer.gatherResults
+.. contents::
 
-Unanswered: how and when should we preserve log context?
\ No newline at end of file
+To help track the processing of individual requests, synapse uses a
+'log context' to track which request it is handling at any given moment. This
+is done via a thread-local variable; a ``logging.Filter`` is then used to fish
+the information back out of the thread-local variable and add it to each log
+record.
+
+Logcontexts are also used for CPU and database accounting, so that we can track
+which requests were responsible for high CPU use or database activity.
+
+The ``synapse.util.logcontext`` module provides a facilities for managing the
+current log context (as well as providing the ``LoggingContextFilter`` class).
+
+Deferreds make the whole thing complicated, so this document describes how it
+all works, and how to write code which follows the rules.
+
+Logcontexts without Deferreds
+-----------------------------
+
+In the absence of any Deferred voodoo, things are simple enough. As with any
+code of this nature, the rule is that our function should leave things as it
+found them:
+
+.. code:: python
+
+    from synapse.util import logcontext         # omitted from future snippets
+
+    def handle_request(request_id):
+        request_context = logcontext.LoggingContext()
+
+        calling_context = logcontext.LoggingContext.current_context()
+        logcontext.LoggingContext.set_current_context(request_context)
+        try:
+            request_context.request = request_id
+            do_request_handling()
+            logger.debug("finished")
+        finally:
+            logcontext.LoggingContext.set_current_context(calling_context)
+
+    def do_request_handling():
+        logger.debug("phew")  # this will be logged against request_id
+
+
+LoggingContext implements the context management methods, so the above can be
+written much more succinctly as:
+
+.. code:: python
+
+    def handle_request(request_id):
+        with logcontext.LoggingContext() as request_context:
+            request_context.request = request_id
+            do_request_handling()
+            logger.debug("finished")
+
+    def do_request_handling():
+        logger.debug("phew")
+
+
+Using logcontexts with Deferreds
+--------------------------------
+
+Deferreds — and in particular, ``defer.inlineCallbacks`` — break
+the linear flow of code so that there is no longer a single entry point where
+we should set the logcontext and a single exit point where we should remove it.
+
+Consider the example above, where ``do_request_handling`` needs to do some
+blocking operation, and returns a deferred:
+
+.. code:: python
+
+    @defer.inlineCallbacks
+    def handle_request(request_id):
+        with logcontext.LoggingContext() as request_context:
+            request_context.request = request_id
+            yield do_request_handling()
+            logger.debug("finished")
+
+
+In the above flow:
+
+* The logcontext is set
+* ``do_request_handling`` is called, and returns a deferred
+* ``handle_request`` yields the deferred
+* The ``inlineCallbacks`` wrapper of ``handle_request`` returns a deferred
+
+So we have stopped processing the request (and will probably go on to start
+processing the next), without clearing the logcontext.
+
+To circumvent this problem, synapse code assumes that, wherever you have a
+deferred, you will want to yield on it. To that end, whereever functions return
+a deferred, we adopt the following conventions:
+
+**Rules for functions returning deferreds:**
+
+  * If the deferred is already complete, the function returns with the same
+    logcontext it started with.
+  * If the deferred is incomplete, the function clears the logcontext before
+    returning; when the deferred completes, it restores the logcontext before
+    running any callbacks.
+
+That sounds complicated, but actually it means a lot of code (including the
+example above) "just works". There are two cases:
+
+* If ``do_request_handling`` returns a completed deferred, then the logcontext
+  will still be in place. In this case, execution will continue immediately
+  after the ``yield``; the "finished" line will be logged against the right
+  context, and the ``with`` block restores the original context before we
+  return to the caller.
+
+* If the returned deferred is incomplete, ``do_request_handling`` clears the
+  logcontext before returning. The logcontext is therefore clear when
+  ``handle_request`` yields the deferred. At that point, the ``inlineCallbacks``
+  wrapper adds a callback to the deferred, and returns another (incomplete)
+  deferred to the caller, and it is safe to begin processing the next request.
+
+  Once ``do_request_handling``'s deferred completes, it will reinstate the
+  logcontext, before running the callback added by the ``inlineCallbacks``
+  wrapper. That callback runs the second half of ``handle_request``, so again
+  the "finished" line will be logged against the right
+  context, and the ``with`` block restores the original context.
+
+As an aside, it's worth noting that ``handle_request`` follows our rules -
+though that only matters if the caller has its own logcontext which it cares
+about.
+
+The following sections describe pitfalls and helpful patterns when implementing
+these rules.
+
+Always yield your deferreds
+---------------------------
+
+Whenever you get a deferred back from a function, you should ``yield`` on it
+as soon as possible. (Returning it directly to your caller is ok too, if you're
+not doing ``inlineCallbacks``.) Do not pass go; do not do any logging; do not
+call any other functions.
+
+.. code:: python
+
+    @defer.inlineCallbacks
+    def fun():
+        logger.debug("starting")
+        yield do_some_stuff()       # just like this
+
+        d = more_stuff()
+        result = yield d            # also fine, of course
+
+        defer.returnValue(result)
+
+    def nonInlineCallbacksFun():
+        logger.debug("just a wrapper really")
+        return do_some_stuff()      # this is ok too - the caller will yield on
+                                    # it anyway.
+
+Provided this pattern is followed all the way back up to the callchain to where
+the logcontext was set, this will make things work out ok: provided
+``do_some_stuff`` and ``more_stuff`` follow the rules above, then so will
+``fun`` (as wrapped by ``inlineCallbacks``) and ``nonInlineCallbacksFun``.
+
+It's all too easy to forget to ``yield``: for instance if we forgot that
+``do_some_stuff`` returned a deferred, we might plough on regardless. This
+leads to a mess; it will probably work itself out eventually, but not before
+a load of stuff has been logged against the wrong content. (Normally, other
+things will break, more obviously, if you forget to ``yield``, so this tends
+not to be a major problem in practice.)
+
+Of course sometimes you need to do something a bit fancier with your Deferreds
+- not all code follows the linear A-then-B-then-C pattern. Notes on
+implementing more complex patterns are in later sections.
+
+Where you create a new Deferred, make it follow the rules
+---------------------------------------------------------
+
+Most of the time, a Deferred comes from another synapse function. Sometimes,
+though, we need to make up a new Deferred, or we get a Deferred back from
+external code. We need to make it follow our rules.
+
+The easy way to do it is with a combination of ``defer.inlineCallbacks``, and
+``logcontext.PreserveLoggingContext``. Suppose we want to implement ``sleep``,
+which returns a deferred which will run its callbacks after a given number of
+seconds. That might look like:
+
+.. code:: python
+
+    # not a logcontext-rules-compliant function
+    def get_sleep_deferred(seconds):
+        d = defer.Deferred()
+        reactor.callLater(seconds, d.callback, None)
+        return d
+
+That doesn't follow the rules, but we can fix it by wrapping it with
+``PreserveLoggingContext`` and ``yield`` ing on it:
+
+.. code:: python
+
+    @defer.inlineCallbacks
+    def sleep(seconds):
+        with PreserveLoggingContext():
+            yield get_sleep_deferred(seconds)
+
+This technique works equally for external functions which return deferreds,
+or deferreds we have made ourselves.
+
+XXX: think this is what ``preserve_context_over_deferred`` is supposed to do,
+though it is broken, in that it only restores the logcontext for the duration
+of the callbacks, which doesn't comply with the logcontext rules.
+
+Fire-and-forget
+---------------
+
+Sometimes you want to fire off a chain of execution, but not wait for its
+result. That might look a bit like this:
+
+.. code:: python
+
+    @defer.inlineCallbacks
+    def do_request_handling():
+        yield foreground_operation()
+
+        # *don't* do this
+        background_operation()
+
+        logger.debug("Request handling complete")
+
+    @defer.inlineCallbacks
+    def background_operation():
+        yield first_background_step()
+        logger.debug("Completed first step")
+        yield second_background_step()
+        logger.debug("Completed second step")
+
+The above code does a couple of steps in the background after
+``do_request_handling`` has finished. The log lines are still logged against
+the ``request_context`` logcontext, which may or may not be desirable. There
+are two big problems with the above, however. The first problem is that, if
+``background_operation`` returns an incomplete Deferred, it will expect its
+caller to ``yield`` immediately, so will have cleared the logcontext. In this
+example, that means that 'Request handling complete' will be logged without any
+context.
+
+The second problem, which is potentially even worse, is that when the Deferred
+returned by ``background_operation`` completes, it will restore the original
+logcontext. There is nothing waiting on that Deferred, so the logcontext will
+leak into the reactor and possibly get attached to some arbitrary future
+operation.
+
+There are two potential solutions to this.
+
+One option is to surround the call to ``background_operation`` with a
+``PreserveLoggingContext`` call. That will reset the logcontext before
+starting ``background_operation`` (so the context restored when the deferred
+completes will be the empty logcontext), and will restore the current
+logcontext before continuing the foreground process:
+
+.. code:: python
+
+    @defer.inlineCallbacks
+    def do_request_handling():
+        yield foreground_operation()
+
+        # start background_operation off in the empty logcontext, to
+        # avoid leaking the current context into the reactor.
+        with PreserveLoggingContext():
+            background_operation()
+
+        # this will now be logged against the request context
+        logger.debug("Request handling complete")
+
+Obviously that option means that the operations done in
+``background_operation`` would be not be logged against a logcontext (though
+that might be fixed by setting a different logcontext via a ``with
+LoggingContext(...)`` in ``background_operation``).
+
+The second option is to use ``logcontext.preserve_fn``, which wraps a function
+so that it doesn't reset the logcontext even when it returns an incomplete
+deferred, and adds a callback to the returned deferred to reset the
+logcontext. In other words, it turns a function that follows the Synapse rules
+about logcontexts and Deferreds into one which behaves more like an external
+function — the opposite operation to that described in the previous section.
+It can be used like this:
+
+.. code:: python
+
+    @defer.inlineCallbacks
+    def do_request_handling():
+        yield foreground_operation()
+
+        logcontext.preserve_fn(background_operation)()
+
+        # this will now be logged against the request context
+        logger.debug("Request handling complete")
+
+XXX: I think ``preserve_context_over_fn`` is supposed to do the first option,
+but the fact that it does ``preserve_context_over_deferred`` on its results
+means that its use is fraught with difficulty.
+
+Passing synapse deferreds into third-party functions
+----------------------------------------------------
+
+A typical example of this is where we want to collect together two or more
+deferred via ``defer.gatherResults``:
+
+.. code:: python
+
+    d1 = operation1()
+    d2 = operation2()
+    d3 = defer.gatherResults([d1, d2])
+
+This is really a variation of the fire-and-forget problem above, in that we are
+firing off ``d1`` and ``d2`` without yielding on them. The difference
+is that we now have third-party code attached to their callbacks. Anyway either
+technique given in the `Fire-and-forget`_ section will work.
+
+Of course, the new Deferred returned by ``gatherResults`` needs to be wrapped
+in order to make it follow the logcontext rules before we can yield it, as
+described in `Where you create a new Deferred, make it follow the rules`_.
+
+So, option one: reset the logcontext before starting the operations to be
+gathered:
+
+.. code:: python
+
+    @defer.inlineCallbacks
+    def do_request_handling():
+        with PreserveLoggingContext():
+            d1 = operation1()
+            d2 = operation2()
+            result = yield defer.gatherResults([d1, d2])
+
+In this case particularly, though, option two, of using
+``logcontext.preserve_fn`` almost certainly makes more sense, so that
+``operation1`` and ``operation2`` are both logged against the original
+logcontext. This looks like:
+
+.. code:: python
+
+    @defer.inlineCallbacks
+    def do_request_handling():
+        d1 = logcontext.preserve_fn(operation1)()
+        d2 = logcontext.preserve_fn(operation2)()
+
+        with PreserveLoggingContext():
+            result = yield defer.gatherResults([d1, d2])
+
+
+Was all this really necessary?
+------------------------------
+
+The conventions used work fine for a linear flow where everything happens in
+series via ``defer.inlineCallbacks`` and ``yield``, but are certainly tricky to
+follow for any more exotic flows. It's hard not to wonder if we could have done
+something else.
+
+We're not going to rewrite Synapse now, so the following is entirely of
+academic interest, but I'd like to record some thoughts on an alternative
+approach.
+
+I briefly prototyped some code following an alternative set of rules. I think
+it would work, but I certainly didn't get as far as thinking how it would
+interact with concepts as complicated as the cache descriptors.
+
+My alternative rules were:
+
+* functions always preserve the logcontext of their caller, whether or not they
+  are returning a Deferred.
+
+* Deferreds returned by synapse functions run their callbacks in the same
+  context as the function was orignally called in.
+
+The main point of this scheme is that everywhere that sets the logcontext is
+responsible for clearing it before returning control to the reactor.
+
+So, for example, if you were the function which started a ``with
+LoggingContext`` block, you wouldn't ``yield`` within it — instead you'd start
+off the background process, and then leave the ``with`` block to wait for it:
+
+.. code:: python
+
+    def handle_request(request_id):
+        with logcontext.LoggingContext() as request_context:
+            request_context.request = request_id
+            d = do_request_handling()
+
+        def cb(r):
+            logger.debug("finished")
+
+        d.addCallback(cb)
+        return d
+
+(in general, mixing ``with LoggingContext`` blocks and
+``defer.inlineCallbacks`` in the same function leads to slighly
+counter-intuitive code, under this scheme).
+
+Because we leave the original ``with`` block as soon as the Deferred is
+returned (as opposed to waiting for it to be resolved, as we do today), the
+logcontext is cleared before control passes back to the reactor; so if there is
+some code within ``do_request_handling`` which needs to wait for a Deferred to
+complete, there is no need for it to worry about clearing the logcontext before
+doing so:
+
+.. code:: python
+
+    def handle_request():
+        r = do_some_stuff()
+        r.addCallback(do_some_more_stuff)
+        return r
+
+— and provided ``do_some_stuff`` follows the rules of returning a Deferred which
+runs its callbacks in the original logcontext, all is happy.
+
+The business of a Deferred which runs its callbacks in the original logcontext
+isn't hard to achieve — we have it today, in the shape of
+``logcontext._PreservingContextDeferred``:
+
+.. code:: python
+
+    def do_some_stuff():
+        deferred = do_some_io()
+        pcd = _PreservingContextDeferred(LoggingContext.current_context())
+        deferred.chainDeferred(pcd)
+        return pcd
+
+It turns out that, thanks to the way that Deferreds chain together, we
+automatically get the property of a context-preserving deferred with
+``defer.inlineCallbacks``, provided the final Defered the function ``yields``
+on has that property. So we can just write:
+
+.. code:: python
+
+    @defer.inlineCallbacks
+    def handle_request():
+        yield do_some_stuff()
+        yield do_some_more_stuff()
+
+To conclude: I think this scheme would have worked equally well, with less
+danger of messing it up, and probably made some more esoteric code easier to
+write. But again — changing the conventions of the entire Synapse codebase is
+not a sensible option for the marginal improvement offered.
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index 83ee3e3ce3..a6f1e7594e 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -29,7 +29,7 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
 from synapse.storage.engines import create_engine
 from synapse.util.async import sleep
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
@@ -187,7 +187,11 @@ def start(config_options):
     ps.start_listening(config.worker_listeners)
 
     def run():
-        with LoggingContext("run"):
+        # make sure that we run the reactor with the sentinel log context,
+        # otherwise other PreserveLoggingContext instances will get confused
+        # and complain when they see the logcontext arbitrarily swapping
+        # between the sentinel and `run` logcontexts.
+        with PreserveLoggingContext():
             logger.info("Running")
             change_resource_limit(config.soft_file_limit)
             if config.gc_thresholds:
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 7ed0de4117..a821a6ce62 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -35,7 +35,7 @@ from synapse.storage.client_ips import ClientIpStore
 from synapse.storage.engines import create_engine
 from synapse.util.async import sleep
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
@@ -193,7 +193,11 @@ def start(config_options):
     ss.start_listening(config.worker_listeners)
 
     def run():
-        with LoggingContext("run"):
+        # make sure that we run the reactor with the sentinel log context,
+        # otherwise other PreserveLoggingContext instances will get confused
+        # and complain when they see the logcontext arbitrarily swapping
+        # between the sentinel and `run` logcontexts.
+        with PreserveLoggingContext():
             logger.info("Running")
             change_resource_limit(config.soft_file_limit)
             if config.gc_thresholds:
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index ca742de6b2..e52b0f240d 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -31,7 +31,7 @@ from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.util.async import sleep
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
@@ -184,7 +184,11 @@ def start(config_options):
     ss.start_listening(config.worker_listeners)
 
     def run():
-        with LoggingContext("run"):
+        # make sure that we run the reactor with the sentinel log context,
+        # otherwise other PreserveLoggingContext instances will get confused
+        # and complain when they see the logcontext arbitrarily swapping
+        # between the sentinel and `run` logcontexts.
+        with PreserveLoggingContext():
             logger.info("Running")
             change_resource_limit(config.soft_file_limit)
             if config.gc_thresholds:
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 0cf5b196e6..76c4cc54d1 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -35,7 +35,7 @@ from synapse.storage.engines import create_engine
 from synapse.storage.presence import UserPresenceState
 from synapse.util.async import sleep
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
@@ -193,7 +193,11 @@ def start(config_options):
     ps.start_listening(config.worker_listeners)
 
     def run():
-        with LoggingContext("run"):
+        # make sure that we run the reactor with the sentinel log context,
+        # otherwise other PreserveLoggingContext instances will get confused
+        # and complain when they see the logcontext arbitrarily swapping
+        # between the sentinel and `run` logcontexts.
+        with PreserveLoggingContext():
             logger.info("Running")
             change_resource_limit(config.soft_file_limit)
             if config.gc_thresholds:
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 0b9d78c13c..2cdd2d39ff 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -52,7 +52,7 @@ from synapse.api.urls import (
 )
 from synapse.config.homeserver import HomeServerConfig
 from synapse.crypto import context_factory
-from synapse.util.logcontext import LoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
 from synapse.metrics import register_memory_metrics, get_metrics_for
 from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
 from synapse.replication.resource import ReplicationResource, REPLICATION_PREFIX
@@ -456,7 +456,12 @@ def run(hs):
     def in_thread():
         # Uncomment to enable tracing of log context changes.
         # sys.settrace(logcontext_tracer)
-        with LoggingContext("run"):
+
+        # make sure that we run the reactor with the sentinel log context,
+        # otherwise other PreserveLoggingContext instances will get confused
+        # and complain when they see the logcontext arbitrarily swapping
+        # between the sentinel and `run` logcontexts.
+        with PreserveLoggingContext():
             change_resource_limit(hs.config.soft_file_limit)
             if hs.config.gc_thresholds:
                 gc.set_threshold(*hs.config.gc_thresholds)
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index c5579d9e38..3c7984a237 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -32,7 +32,7 @@ from synapse.storage.engines import create_engine
 from synapse.storage.media_repository import MediaRepositoryStore
 from synapse.util.async import sleep
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
@@ -190,7 +190,11 @@ def start(config_options):
     ss.start_listening(config.worker_listeners)
 
     def run():
-        with LoggingContext("run"):
+        # make sure that we run the reactor with the sentinel log context,
+        # otherwise other PreserveLoggingContext instances will get confused
+        # and complain when they see the logcontext arbitrarily swapping
+        # between the sentinel and `run` logcontexts.
+        with PreserveLoggingContext():
             logger.info("Running")
             change_resource_limit(config.soft_file_limit)
             if config.gc_thresholds:
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index b025db54d4..ab682e52ec 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -31,7 +31,8 @@ from synapse.storage.engines import create_engine
 from synapse.storage import DataStore
 from synapse.util.async import sleep
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, preserve_fn, \
+    PreserveLoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
@@ -275,7 +276,11 @@ def start(config_options):
     ps.start_listening(config.worker_listeners)
 
     def run():
-        with LoggingContext("run"):
+        # make sure that we run the reactor with the sentinel log context,
+        # otherwise other PreserveLoggingContext instances will get confused
+        # and complain when they see the logcontext arbitrarily swapping
+        # between the sentinel and `run` logcontexts.
+        with PreserveLoggingContext():
             logger.info("Running")
             change_resource_limit(config.soft_file_limit)
             if config.gc_thresholds:
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 9f93b311d1..34e34e5580 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -47,7 +47,8 @@ from synapse.storage.presence import PresenceStore, UserPresenceState
 from synapse.storage.roommember import RoomMemberStore
 from synapse.util.async import sleep
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, preserve_fn, \
+    PreserveLoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.rlimit import change_resource_limit
 from synapse.util.stringutils import random_string
@@ -500,7 +501,11 @@ def start(config_options):
     ss.start_listening(config.worker_listeners)
 
     def run():
-        with LoggingContext("run"):
+        # make sure that we run the reactor with the sentinel log context,
+        # otherwise other PreserveLoggingContext instances will get confused
+        # and complain when they see the logcontext arbitrarily swapping
+        # between the sentinel and `run` logcontexts.
+        with PreserveLoggingContext():
             logger.info("Running")
             change_resource_limit(config.soft_file_limit)
             if config.gc_thresholds:
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index d7211ee9b3..80f27f8c53 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -96,10 +96,11 @@ class Keyring(object):
         verify_requests = []
 
         for server_name, json_object in server_and_json:
-            logger.debug("Verifying for %s", server_name)
 
             key_ids = signature_ids(json_object, server_name)
             if not key_ids:
+                logger.warn("Request from %s: no supported signature keys",
+                            server_name)
                 deferred = defer.fail(SynapseError(
                     400,
                     "Not signed with a supported algorithm",
@@ -108,6 +109,9 @@ class Keyring(object):
             else:
                 deferred = defer.Deferred()
 
+            logger.debug("Verifying for %s with key_ids %s",
+                         server_name, key_ids)
+
             verify_request = VerifyKeyRequest(
                 server_name, key_ids, json_object, deferred
             )
@@ -142,6 +146,9 @@ class Keyring(object):
 
             json_object = verify_request.json_object
 
+            logger.debug("Got key %s %s:%s for server %s, verifying" % (
+                key_id, verify_key.alg, verify_key.version, server_name,
+            ))
             try:
                 verify_signed_json(json_object, server_name, verify_key)
             except:
diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py
index 11605b34a3..6be18880b9 100644
--- a/synapse/events/snapshot.py
+++ b/synapse/events/snapshot.py
@@ -15,6 +15,32 @@
 
 
 class EventContext(object):
+    """
+    Attributes:
+        current_state_ids (dict[(str, str), str]):
+            The current state map including the current event.
+            (type, state_key) -> event_id
+
+        prev_state_ids (dict[(str, str), str]):
+            The current state map excluding the current event.
+            (type, state_key) -> event_id
+
+        state_group (int): state group id
+        rejected (bool|str): A rejection reason if the event was rejected, else
+            False
+
+        push_actions (list[(str, list[object])]): list of (user_id, actions)
+            tuples
+
+        prev_group (int): Previously persisted state group. ``None`` for an
+            outlier.
+        delta_ids (dict[(str, str), str]): Delta from ``prev_group``.
+            (type, state_key) -> event_id. ``None`` for an outlier.
+
+        prev_state_events (?): XXX: is this ever set to anything other than
+            the empty list?
+    """
+
     __slots__ = [
         "current_state_ids",
         "prev_state_ids",
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 0cd5501b05..888dd01240 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -933,8 +933,9 @@ class FederationHandler(BaseHandler):
             # lots of requests for missing prev_events which we do actually
             # have. Hence we fire off the deferred, but don't wait for it.
 
-            synapse.util.logcontext.reset_context_after_deferred(
-                self._handle_queued_pdus(room_queue))
+            synapse.util.logcontext.preserve_fn(self._handle_queued_pdus)(
+                room_queue
+            )
 
         defer.returnValue(True)
 
@@ -1537,7 +1538,17 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def _prep_event(self, origin, event, state=None, auth_events=None):
+        """
+
+        Args:
+            origin:
+            event:
+            state:
+            auth_events:
 
+        Returns:
+            Deferred, which resolves to synapse.events.snapshot.EventContext
+        """
         context = yield self.state_handler.compute_event_context(
             event, old_state=state,
         )
diff --git a/synapse/state.py b/synapse/state.py
index 383d32b163..9a523a1b89 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -177,17 +177,12 @@ class StateHandler(object):
 
     @defer.inlineCallbacks
     def compute_event_context(self, event, old_state=None):
-        """ Fills out the context with the `current state` of the graph. The
-        `current state` here is defined to be the state of the event graph
-        just before the event - i.e. it never includes `event`
-
-        If `event` has `auth_events` then this will also fill out the
-        `auth_events` field on `context` from the `current_state`.
+        """Build an EventContext structure for the event.
 
         Args:
-            event (EventBase)
+            event (synapse.events.EventBase):
         Returns:
-            an EventContext
+            synapse.events.snapshot.EventContext:
         """
         context = EventContext()
 
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 94b2bcc54a..813ad59e56 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -12,6 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import synapse.util.async
 
 from ._base import SQLBaseStore
 from . import engines
@@ -84,24 +85,14 @@ class BackgroundUpdateStore(SQLBaseStore):
         self._background_update_performance = {}
         self._background_update_queue = []
         self._background_update_handlers = {}
-        self._background_update_timer = None
 
     @defer.inlineCallbacks
     def start_doing_background_updates(self):
-        assert self._background_update_timer is None, \
-            "background updates already running"
-
         logger.info("Starting background schema updates")
 
         while True:
-            sleep = defer.Deferred()
-            self._background_update_timer = self._clock.call_later(
-                self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback, None
-            )
-            try:
-                yield sleep
-            finally:
-                self._background_update_timer = None
+            yield synapse.util.async.sleep(
+                self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.)
 
             try:
                 result = yield self.do_next_background_update(
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 256e50dc20..0d97de2fe7 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -201,19 +201,19 @@ class EventFederationStore(SQLBaseStore):
     def _update_min_depth_for_room_txn(self, txn, room_id, depth):
         min_depth = self._get_min_depth_interaction(txn, room_id)
 
-        do_insert = depth < min_depth if min_depth else True
+        if min_depth and depth >= min_depth:
+            return
 
-        if do_insert:
-            self._simple_upsert_txn(
-                txn,
-                table="room_depth",
-                keyvalues={
-                    "room_id": room_id,
-                },
-                values={
-                    "min_depth": depth,
-                },
-            )
+        self._simple_upsert_txn(
+            txn,
+            table="room_depth",
+            keyvalues={
+                "room_id": room_id,
+            },
+            values={
+                "min_depth": depth,
+            },
+        )
 
     def _handle_mult_prev_events(self, txn, events):
         """
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index e8a7717640..3c8393bfe8 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -34,14 +34,16 @@ from canonicaljson import encode_canonical_json
 from collections import deque, namedtuple, OrderedDict
 from functools import wraps
 
-import synapse
 import synapse.metrics
 
-
 import logging
 import math
 import ujson as json
 
+# these are only included to make the type annotations work
+from synapse.events import EventBase    # noqa: F401
+from synapse.events.snapshot import EventContext   # noqa: F401
+
 logger = logging.getLogger(__name__)
 
 
@@ -82,6 +84,11 @@ class _EventPeristenceQueue(object):
 
     def add_to_queue(self, room_id, events_and_contexts, backfilled):
         """Add events to the queue, with the given persist_event options.
+
+        Args:
+            room_id (str):
+            events_and_contexts (list[(EventBase, EventContext)]):
+            backfilled (bool):
         """
         queue = self._event_persist_queues.setdefault(room_id, deque())
         if queue:
@@ -227,6 +234,17 @@ class EventsStore(SQLBaseStore):
     @defer.inlineCallbacks
     @log_function
     def persist_event(self, event, context, backfilled=False):
+        """
+
+        Args:
+            event (EventBase):
+            context (EventContext):
+            backfilled (bool):
+
+        Returns:
+            Deferred: resolves to (int, int): the stream ordering of ``event``,
+            and the stream ordering of the latest persisted event
+        """
         deferred = self._event_persist_queue.add_to_queue(
             event.room_id, [(event, context)],
             backfilled=backfilled,
@@ -253,6 +271,16 @@ class EventsStore(SQLBaseStore):
     @defer.inlineCallbacks
     def _persist_events(self, events_and_contexts, backfilled=False,
                         delete_existing=False):
+        """Persist events to db
+
+        Args:
+            events_and_contexts (list[(EventBase, EventContext)]):
+            backfilled (bool):
+            delete_existing (bool):
+
+        Returns:
+            Deferred: resolves when the events have been persisted
+        """
         if not events_and_contexts:
             return
 
@@ -554,11 +582,91 @@ class EventsStore(SQLBaseStore):
         and the rejections table. Things reading from those table will need to check
         whether the event was rejected.
 
-        If delete_existing is True then existing events will be purged from the
-        database before insertion. This is useful when retrying due to IntegrityError.
+        Args:
+            txn (twisted.enterprise.adbapi.Connection): db connection
+            events_and_contexts (list[(EventBase, EventContext)]):
+                events to persist
+            backfilled (bool): True if the events were backfilled
+            delete_existing (bool): True to purge existing table rows for the
+                events from the database. This is useful when retrying due to
+                IntegrityError.
+            current_state_for_room (dict[str, (list[str], list[str])]):
+                The current-state delta for each room. For each room, a tuple
+                (to_delete, to_insert), being a list of event ids to be removed
+                from the current state, and a list of event ids to be added to
+                the current state.
+            new_forward_extremeties (dict[str, list[str]]):
+                The new forward extremities for each room. For each room, a
+                list of the event ids which are the forward extremities.
+
         """
+        self._update_current_state_txn(txn, current_state_for_room)
+
         max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
-        for room_id, current_state_tuple in current_state_for_room.iteritems():
+        self._update_forward_extremities_txn(
+            txn,
+            new_forward_extremities=new_forward_extremeties,
+            max_stream_order=max_stream_order,
+        )
+
+        # Ensure that we don't have the same event twice.
+        events_and_contexts = self._filter_events_and_contexts_for_duplicates(
+            events_and_contexts,
+        )
+
+        self._update_room_depths_txn(
+            txn,
+            events_and_contexts=events_and_contexts,
+            backfilled=backfilled,
+        )
+
+        # _update_outliers_txn filters out any events which have already been
+        # persisted, and returns the filtered list.
+        events_and_contexts = self._update_outliers_txn(
+            txn,
+            events_and_contexts=events_and_contexts,
+        )
+
+        # From this point onwards the events are only events that we haven't
+        # seen before.
+
+        if delete_existing:
+            # For paranoia reasons, we go and delete all the existing entries
+            # for these events so we can reinsert them.
+            # This gets around any problems with some tables already having
+            # entries.
+            self._delete_existing_rows_txn(
+                txn,
+                events_and_contexts=events_and_contexts,
+            )
+
+        self._store_event_txn(
+            txn,
+            events_and_contexts=events_and_contexts,
+        )
+
+        # Insert into the state_groups, state_groups_state, and
+        # event_to_state_groups tables.
+        self._store_mult_state_groups_txn(txn, events_and_contexts)
+
+        # _store_rejected_events_txn filters out any events which were
+        # rejected, and returns the filtered list.
+        events_and_contexts = self._store_rejected_events_txn(
+            txn,
+            events_and_contexts=events_and_contexts,
+        )
+
+        # From this point onwards the events are only ones that weren't
+        # rejected.
+
+        self._update_metadata_tables_txn(
+            txn,
+            events_and_contexts=events_and_contexts,
+            backfilled=backfilled,
+        )
+
+    def _update_current_state_txn(self, txn, state_delta_by_room):
+        for room_id, current_state_tuple in state_delta_by_room.iteritems():
                 to_delete, to_insert = current_state_tuple
                 txn.executemany(
                     "DELETE FROM current_state_events WHERE event_id = ?",
@@ -608,7 +716,9 @@ class EventsStore(SQLBaseStore):
                     txn, self.get_current_state_ids, (room_id,)
                 )
 
-        for room_id, new_extrem in new_forward_extremeties.items():
+    def _update_forward_extremities_txn(self, txn, new_forward_extremities,
+                                        max_stream_order):
+        for room_id, new_extrem in new_forward_extremities.items():
             self._simple_delete_txn(
                 txn,
                 table="event_forward_extremities",
@@ -626,7 +736,7 @@ class EventsStore(SQLBaseStore):
                     "event_id": ev_id,
                     "room_id": room_id,
                 }
-                for room_id, new_extrem in new_forward_extremeties.items()
+                for room_id, new_extrem in new_forward_extremities.items()
                 for ev_id in new_extrem
             ],
         )
@@ -643,13 +753,22 @@ class EventsStore(SQLBaseStore):
                     "event_id": event_id,
                     "stream_ordering": max_stream_order,
                 }
-                for room_id, new_extrem in new_forward_extremeties.items()
+                for room_id, new_extrem in new_forward_extremities.items()
                 for event_id in new_extrem
             ]
         )
 
-        # Ensure that we don't have the same event twice.
-        # Pick the earliest non-outlier if there is one, else the earliest one.
+    @classmethod
+    def _filter_events_and_contexts_for_duplicates(cls, events_and_contexts):
+        """Ensure that we don't have the same event twice.
+
+        Pick the earliest non-outlier if there is one, else the earliest one.
+
+        Args:
+            events_and_contexts (list[(EventBase, EventContext)]):
+        Returns:
+            list[(EventBase, EventContext)]: filtered list
+        """
         new_events_and_contexts = OrderedDict()
         for event, context in events_and_contexts:
             prev_event_context = new_events_and_contexts.get(event.event_id)
@@ -662,9 +781,17 @@ class EventsStore(SQLBaseStore):
                         new_events_and_contexts[event.event_id] = (event, context)
             else:
                 new_events_and_contexts[event.event_id] = (event, context)
+        return new_events_and_contexts.values()
 
-        events_and_contexts = new_events_and_contexts.values()
+    def _update_room_depths_txn(self, txn, events_and_contexts, backfilled):
+        """Update min_depth for each room
 
+        Args:
+            txn (twisted.enterprise.adbapi.Connection): db connection
+            events_and_contexts (list[(EventBase, EventContext)]): events
+                we are persisting
+            backfilled (bool): True if the events were backfilled
+        """
         depth_updates = {}
         for event, context in events_and_contexts:
             # Remove the any existing cache entries for the event_ids
@@ -683,6 +810,21 @@ class EventsStore(SQLBaseStore):
         for room_id, depth in depth_updates.items():
             self._update_min_depth_for_room_txn(txn, room_id, depth)
 
+    def _update_outliers_txn(self, txn, events_and_contexts):
+        """Update any outliers with new event info.
+
+        This turns outliers into ex-outliers (unless the new event was
+        rejected).
+
+        Args:
+            txn (twisted.enterprise.adbapi.Connection): db connection
+            events_and_contexts (list[(EventBase, EventContext)]): events
+                we are persisting
+
+        Returns:
+            list[(EventBase, EventContext)] new list, without events which
+            are already in the events table.
+        """
         txn.execute(
             "SELECT event_id, outlier FROM events WHERE event_id in (%s)" % (
                 ",".join(["?"] * len(events_and_contexts)),
@@ -697,19 +839,16 @@ class EventsStore(SQLBaseStore):
 
         to_remove = set()
         for event, context in events_and_contexts:
-            if context.rejected:
-                # If the event is rejected then we don't care if the event
-                # was an outlier or not.
-                if event.event_id in have_persisted:
-                    # If we have already seen the event then ignore it.
-                    to_remove.add(event)
-                continue
-
             if event.event_id not in have_persisted:
                 continue
 
             to_remove.add(event)
 
+            if context.rejected:
+                # If the event is rejected then we don't care if the event
+                # was an outlier or not.
+                continue
+
             outlier_persisted = have_persisted[event.event_id]
             if not event.internal_metadata.is_outlier() and outlier_persisted:
                 # We received a copy of an event that we had already stored as
@@ -764,37 +903,19 @@ class EventsStore(SQLBaseStore):
                 # event isn't an outlier any more.
                 self._update_backward_extremeties(txn, [event])
 
-        events_and_contexts = [
+        return [
             ec for ec in events_and_contexts if ec[0] not in to_remove
         ]
 
+    @classmethod
+    def _delete_existing_rows_txn(cls, txn, events_and_contexts):
         if not events_and_contexts:
-            # Make sure we don't pass an empty list to functions that expect to
-            # be storing at least one element.
+            # nothing to do here
             return
 
-        # From this point onwards the events are only events that we haven't
-        # seen before.
+        logger.info("Deleting existing")
 
-        def event_dict(event):
-            return {
-                k: v
-                for k, v in event.get_dict().items()
-                if k not in [
-                    "redacted",
-                    "redacted_because",
-                ]
-            }
-
-        if delete_existing:
-            # For paranoia reasons, we go and delete all the existing entries
-            # for these events so we can reinsert them.
-            # This gets around any problems with some tables already having
-            # entries.
-
-            logger.info("Deleting existing")
-
-            for table in (
+        for table in (
                 "events",
                 "event_auth",
                 "event_json",
@@ -817,11 +938,34 @@ class EventsStore(SQLBaseStore):
                 "redactions",
                 "room_memberships",
                 "topics"
-            ):
-                txn.executemany(
-                    "DELETE FROM %s WHERE event_id = ?" % (table,),
-                    [(ev.event_id,) for ev, _ in events_and_contexts]
-                )
+        ):
+            txn.executemany(
+                "DELETE FROM %s WHERE event_id = ?" % (table,),
+                [(ev.event_id,) for ev, _ in events_and_contexts]
+            )
+
+    def _store_event_txn(self, txn, events_and_contexts):
+        """Insert new events into the event and event_json tables
+
+        Args:
+            txn (twisted.enterprise.adbapi.Connection): db connection
+            events_and_contexts (list[(EventBase, EventContext)]): events
+                we are persisting
+        """
+
+        if not events_and_contexts:
+            # nothing to do here
+            return
+
+        def event_dict(event):
+            return {
+                k: v
+                for k, v in event.get_dict().items()
+                if k not in [
+                    "redacted",
+                    "redacted_because",
+                ]
+            }
 
         self._simple_insert_many_txn(
             txn,
@@ -865,6 +1009,19 @@ class EventsStore(SQLBaseStore):
             ],
         )
 
+    def _store_rejected_events_txn(self, txn, events_and_contexts):
+        """Add rows to the 'rejections' table for received events which were
+        rejected
+
+        Args:
+            txn (twisted.enterprise.adbapi.Connection): db connection
+            events_and_contexts (list[(EventBase, EventContext)]): events
+                we are persisting
+
+        Returns:
+            list[(EventBase, EventContext)] new list, without the rejected
+                events.
+        """
         # Remove the rejected events from the list now that we've added them
         # to the events table and the events_json table.
         to_remove = set()
@@ -876,17 +1033,24 @@ class EventsStore(SQLBaseStore):
                 )
                 to_remove.add(event)
 
-        events_and_contexts = [
+        return [
             ec for ec in events_and_contexts if ec[0] not in to_remove
         ]
 
+    def _update_metadata_tables_txn(self, txn, events_and_contexts, backfilled):
+        """Update all the miscellaneous tables for new events
+
+        Args:
+            txn (twisted.enterprise.adbapi.Connection): db connection
+            events_and_contexts (list[(EventBase, EventContext)]): events
+                we are persisting
+            backfilled (bool): True if the events were backfilled
+        """
+
         if not events_and_contexts:
-            # Make sure we don't pass an empty list to functions that expect to
-            # be storing at least one element.
+            # nothing to do here
             return
 
-        # From this point onwards the events are only ones that weren't rejected.
-
         for event, context in events_and_contexts:
             # Insert all the push actions into the event_push_actions table.
             if context.push_actions:
@@ -915,10 +1079,6 @@ class EventsStore(SQLBaseStore):
             ],
         )
 
-        # Insert into the state_groups, state_groups_state, and
-        # event_to_state_groups tables.
-        self._store_mult_state_groups_txn(txn, events_and_contexts)
-
         # Update the event_forward_extremities, event_backward_extremities and
         # event_edges tables.
         self._handle_mult_prev_events(
@@ -1005,13 +1165,6 @@ class EventsStore(SQLBaseStore):
         # Prefill the event cache
         self._add_to_cache(txn, events_and_contexts)
 
-        if backfilled:
-            # Backfilled events come before the current state so we don't need
-            # to update the current state table
-            return
-
-        return
-
     def _add_to_cache(self, txn, events_and_contexts):
         to_prefill = []
 
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 27f1ec89ec..1b42bea07a 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -136,6 +136,16 @@ class StateStore(SQLBaseStore):
                 continue
 
             if context.current_state_ids is None:
+                # AFAIK, this can never happen
+                logger.error(
+                    "Non-outlier event %s had current_state_ids==None",
+                    event.event_id)
+                continue
+
+            # if the event was rejected, just give it the same state as its
+            # predecessor.
+            if context.rejected:
+                state_groups[event.event_id] = context.prev_group
                 continue
 
             state_groups[event.event_id] = context.state_group
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index d73670f9f2..ff67b1d794 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -12,6 +12,16 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+""" Thread-local-alike tracking of log contexts within synapse
+
+This module provides objects and utilities for tracking contexts through
+synapse code, so that log lines can include a request identifier, and so that
+CPU and database activity can be accounted for against the request that caused
+them.
+
+See doc/log_contexts.rst for details on how this works.
+"""
+
 from twisted.internet import defer
 
 import threading
@@ -308,47 +318,44 @@ def preserve_context_over_deferred(deferred, context=None):
     return d
 
 
-def reset_context_after_deferred(deferred):
-    """If the deferred is incomplete, add a callback which will reset the
-    context.
-
-    This is useful when you want to fire off a deferred, but don't want to
-    wait for it to complete. (The deferred will restore the current log context
-    when it completes, so if you don't do anything, it will leak log context.)
-
-    (If this feels asymmetric, consider it this way: we are effectively forking
-    a new thread of execution. We are probably currently within a
-    ``with LoggingContext()`` block, which is supposed to have a single entry
-    and exit point. But by spawning off another deferred, we are effectively
-    adding a new exit point.)
+def preserve_fn(f):
+    """Wraps a function, to ensure that the current context is restored after
+    return from the function, and that the sentinel context is set once the
+    deferred returned by the funtion completes.
 
-    Args:
-        deferred (defer.Deferred): deferred
+    Useful for wrapping functions that return a deferred which you don't yield
+    on.
     """
     def reset_context(result):
         LoggingContext.set_current_context(LoggingContext.sentinel)
         return result
 
-    if not deferred.called:
-        deferred.addBoth(reset_context)
-
-
-def preserve_fn(f):
-    """Ensures that function is called with correct context and that context is
-    restored after return. Useful for wrapping functions that return a deferred
-    which you don't yield on.
-    """
+    # XXX: why is this here rather than inside g? surely we want to preserve
+    # the context from the time the function was called, not when it was
+    # wrapped?
     current = LoggingContext.current_context()
 
     def g(*args, **kwargs):
-        with PreserveLoggingContext(current):
-            res = f(*args, **kwargs)
-            if isinstance(res, defer.Deferred):
-                return preserve_context_over_deferred(
-                    res, context=LoggingContext.sentinel
-                )
-            else:
-                return res
+        res = f(*args, **kwargs)
+        if isinstance(res, defer.Deferred) and not res.called:
+            # The function will have reset the context before returning, so
+            # we need to restore it now.
+            LoggingContext.set_current_context(current)
+
+            # The original context will be restored when the deferred
+            # completes, but there is nothing waiting for it, so it will
+            # get leaked into the reactor or some other function which
+            # wasn't expecting it. We therefore need to reset the context
+            # here.
+            #
+            # (If this feels asymmetric, consider it this way: we are
+            # effectively forking a new thread of execution. We are
+            # probably currently within a ``with LoggingContext()`` block,
+            # which is supposed to have a single entry and exit point. But
+            # by spawning off another deferred, we are effectively
+            # adding a new exit point.)
+            res.addBoth(reset_context)
+        return res
     return g
 
 
diff --git a/tests/util/test_log_context.py b/tests/util/test_log_context.py
index 65a330a0e9..9ffe209c4d 100644
--- a/tests/util/test_log_context.py
+++ b/tests/util/test_log_context.py
@@ -1,8 +1,10 @@
+import twisted.python.failure
 from twisted.internet import defer
 from twisted.internet import reactor
 from .. import unittest
 
 from synapse.util.async import sleep
+from synapse.util import logcontext
 from synapse.util.logcontext import LoggingContext
 
 
@@ -33,3 +35,62 @@ class LoggingContextTestCase(unittest.TestCase):
             context_one.test_key = "one"
             yield sleep(0)
             self._check_test_key("one")
+
+    def _test_preserve_fn(self, function):
+        sentinel_context = LoggingContext.current_context()
+
+        callback_completed = [False]
+
+        @defer.inlineCallbacks
+        def cb():
+            context_one.test_key = "one"
+            yield function()
+            self._check_test_key("one")
+
+            callback_completed[0] = True
+
+        with LoggingContext() as context_one:
+            context_one.test_key = "one"
+
+            # fire off function, but don't wait on it.
+            logcontext.preserve_fn(cb)()
+
+            self._check_test_key("one")
+
+        # now wait for the function under test to have run, and check that
+        # the logcontext is left in a sane state.
+        d2 = defer.Deferred()
+
+        def check_logcontext():
+            if not callback_completed[0]:
+                reactor.callLater(0.01, check_logcontext)
+                return
+
+            # make sure that the context was reset before it got thrown back
+            # into the reactor
+            try:
+                self.assertIs(LoggingContext.current_context(),
+                              sentinel_context)
+                d2.callback(None)
+            except BaseException:
+                d2.errback(twisted.python.failure.Failure())
+
+        reactor.callLater(0.01, check_logcontext)
+
+        # test is done once d2 finishes
+        return d2
+
+    def test_preserve_fn_with_blocking_fn(self):
+        @defer.inlineCallbacks
+        def blocking_function():
+            yield sleep(0)
+
+        return self._test_preserve_fn(blocking_function)
+
+    def test_preserve_fn_with_non_blocking_fn(self):
+        @defer.inlineCallbacks
+        def nonblocking_function():
+            with logcontext.PreserveLoggingContext():
+                yield defer.succeed(None)
+
+        return self._test_preserve_fn(nonblocking_function)