diff --git a/docs/log_contexts.rst b/docs/log_contexts.rst
index 0046e171be..8d04a973de 100644
--- a/docs/log_contexts.rst
+++ b/docs/log_contexts.rst
@@ -1,10 +1,441 @@
-What do I do about "Unexpected logging context" debug log-lines everywhere?
+Log contexts
+============
-<Mjark> The logging context lives in thread local storage
-<Mjark> Sometimes it gets out of sync with what it should actually be, usually because something scheduled something to run on the reactor without preserving the logging context.
-<Matthew> what is the impact of it getting out of sync? and how and when should we preserve log context?
-<Mjark> The impact is that some of the CPU and database metrics will be under-reported, and some log lines will be mis-attributed.
-<Mjark> It should happen auto-magically in all the APIs that do IO or otherwise defer to the reactor.
-<Erik> Mjark: the other place is if we branch, e.g. using defer.gatherResults
+.. contents::
-Unanswered: how and when should we preserve log context?
\ No newline at end of file
+To help track the processing of individual requests, synapse uses a
+'log context' to track which request it is handling at any given moment. This
+is done via a thread-local variable; a ``logging.Filter`` is then used to fish
+the information back out of the thread-local variable and add it to each log
+record.
+
+Logcontexts are also used for CPU and database accounting, so that we can track
+which requests were responsible for high CPU use or database activity.
+
+The ``synapse.util.logcontext`` module provides a facilities for managing the
+current log context (as well as providing the ``LoggingContextFilter`` class).
+
+Deferreds make the whole thing complicated, so this document describes how it
+all works, and how to write code which follows the rules.
+
+Logcontexts without Deferreds
+-----------------------------
+
+In the absence of any Deferred voodoo, things are simple enough. As with any
+code of this nature, the rule is that our function should leave things as it
+found them:
+
+.. code:: python
+
+ from synapse.util import logcontext # omitted from future snippets
+
+ def handle_request(request_id):
+ request_context = logcontext.LoggingContext()
+
+ calling_context = logcontext.LoggingContext.current_context()
+ logcontext.LoggingContext.set_current_context(request_context)
+ try:
+ request_context.request = request_id
+ do_request_handling()
+ logger.debug("finished")
+ finally:
+ logcontext.LoggingContext.set_current_context(calling_context)
+
+ def do_request_handling():
+ logger.debug("phew") # this will be logged against request_id
+
+
+LoggingContext implements the context management methods, so the above can be
+written much more succinctly as:
+
+.. code:: python
+
+ def handle_request(request_id):
+ with logcontext.LoggingContext() as request_context:
+ request_context.request = request_id
+ do_request_handling()
+ logger.debug("finished")
+
+ def do_request_handling():
+ logger.debug("phew")
+
+
+Using logcontexts with Deferreds
+--------------------------------
+
+Deferreds — and in particular, ``defer.inlineCallbacks`` — break
+the linear flow of code so that there is no longer a single entry point where
+we should set the logcontext and a single exit point where we should remove it.
+
+Consider the example above, where ``do_request_handling`` needs to do some
+blocking operation, and returns a deferred:
+
+.. code:: python
+
+ @defer.inlineCallbacks
+ def handle_request(request_id):
+ with logcontext.LoggingContext() as request_context:
+ request_context.request = request_id
+ yield do_request_handling()
+ logger.debug("finished")
+
+
+In the above flow:
+
+* The logcontext is set
+* ``do_request_handling`` is called, and returns a deferred
+* ``handle_request`` yields the deferred
+* The ``inlineCallbacks`` wrapper of ``handle_request`` returns a deferred
+
+So we have stopped processing the request (and will probably go on to start
+processing the next), without clearing the logcontext.
+
+To circumvent this problem, synapse code assumes that, wherever you have a
+deferred, you will want to yield on it. To that end, whereever functions return
+a deferred, we adopt the following conventions:
+
+**Rules for functions returning deferreds:**
+
+ * If the deferred is already complete, the function returns with the same
+ logcontext it started with.
+ * If the deferred is incomplete, the function clears the logcontext before
+ returning; when the deferred completes, it restores the logcontext before
+ running any callbacks.
+
+That sounds complicated, but actually it means a lot of code (including the
+example above) "just works". There are two cases:
+
+* If ``do_request_handling`` returns a completed deferred, then the logcontext
+ will still be in place. In this case, execution will continue immediately
+ after the ``yield``; the "finished" line will be logged against the right
+ context, and the ``with`` block restores the original context before we
+ return to the caller.
+
+* If the returned deferred is incomplete, ``do_request_handling`` clears the
+ logcontext before returning. The logcontext is therefore clear when
+ ``handle_request`` yields the deferred. At that point, the ``inlineCallbacks``
+ wrapper adds a callback to the deferred, and returns another (incomplete)
+ deferred to the caller, and it is safe to begin processing the next request.
+
+ Once ``do_request_handling``'s deferred completes, it will reinstate the
+ logcontext, before running the callback added by the ``inlineCallbacks``
+ wrapper. That callback runs the second half of ``handle_request``, so again
+ the "finished" line will be logged against the right
+ context, and the ``with`` block restores the original context.
+
+As an aside, it's worth noting that ``handle_request`` follows our rules -
+though that only matters if the caller has its own logcontext which it cares
+about.
+
+The following sections describe pitfalls and helpful patterns when implementing
+these rules.
+
+Always yield your deferreds
+---------------------------
+
+Whenever you get a deferred back from a function, you should ``yield`` on it
+as soon as possible. (Returning it directly to your caller is ok too, if you're
+not doing ``inlineCallbacks``.) Do not pass go; do not do any logging; do not
+call any other functions.
+
+.. code:: python
+
+ @defer.inlineCallbacks
+ def fun():
+ logger.debug("starting")
+ yield do_some_stuff() # just like this
+
+ d = more_stuff()
+ result = yield d # also fine, of course
+
+ defer.returnValue(result)
+
+ def nonInlineCallbacksFun():
+ logger.debug("just a wrapper really")
+ return do_some_stuff() # this is ok too - the caller will yield on
+ # it anyway.
+
+Provided this pattern is followed all the way back up to the callchain to where
+the logcontext was set, this will make things work out ok: provided
+``do_some_stuff`` and ``more_stuff`` follow the rules above, then so will
+``fun`` (as wrapped by ``inlineCallbacks``) and ``nonInlineCallbacksFun``.
+
+It's all too easy to forget to ``yield``: for instance if we forgot that
+``do_some_stuff`` returned a deferred, we might plough on regardless. This
+leads to a mess; it will probably work itself out eventually, but not before
+a load of stuff has been logged against the wrong content. (Normally, other
+things will break, more obviously, if you forget to ``yield``, so this tends
+not to be a major problem in practice.)
+
+Of course sometimes you need to do something a bit fancier with your Deferreds
+- not all code follows the linear A-then-B-then-C pattern. Notes on
+implementing more complex patterns are in later sections.
+
+Where you create a new Deferred, make it follow the rules
+---------------------------------------------------------
+
+Most of the time, a Deferred comes from another synapse function. Sometimes,
+though, we need to make up a new Deferred, or we get a Deferred back from
+external code. We need to make it follow our rules.
+
+The easy way to do it is with a combination of ``defer.inlineCallbacks``, and
+``logcontext.PreserveLoggingContext``. Suppose we want to implement ``sleep``,
+which returns a deferred which will run its callbacks after a given number of
+seconds. That might look like:
+
+.. code:: python
+
+ # not a logcontext-rules-compliant function
+ def get_sleep_deferred(seconds):
+ d = defer.Deferred()
+ reactor.callLater(seconds, d.callback, None)
+ return d
+
+That doesn't follow the rules, but we can fix it by wrapping it with
+``PreserveLoggingContext`` and ``yield`` ing on it:
+
+.. code:: python
+
+ @defer.inlineCallbacks
+ def sleep(seconds):
+ with PreserveLoggingContext():
+ yield get_sleep_deferred(seconds)
+
+This technique works equally for external functions which return deferreds,
+or deferreds we have made ourselves.
+
+XXX: think this is what ``preserve_context_over_deferred`` is supposed to do,
+though it is broken, in that it only restores the logcontext for the duration
+of the callbacks, which doesn't comply with the logcontext rules.
+
+Fire-and-forget
+---------------
+
+Sometimes you want to fire off a chain of execution, but not wait for its
+result. That might look a bit like this:
+
+.. code:: python
+
+ @defer.inlineCallbacks
+ def do_request_handling():
+ yield foreground_operation()
+
+ # *don't* do this
+ background_operation()
+
+ logger.debug("Request handling complete")
+
+ @defer.inlineCallbacks
+ def background_operation():
+ yield first_background_step()
+ logger.debug("Completed first step")
+ yield second_background_step()
+ logger.debug("Completed second step")
+
+The above code does a couple of steps in the background after
+``do_request_handling`` has finished. The log lines are still logged against
+the ``request_context`` logcontext, which may or may not be desirable. There
+are two big problems with the above, however. The first problem is that, if
+``background_operation`` returns an incomplete Deferred, it will expect its
+caller to ``yield`` immediately, so will have cleared the logcontext. In this
+example, that means that 'Request handling complete' will be logged without any
+context.
+
+The second problem, which is potentially even worse, is that when the Deferred
+returned by ``background_operation`` completes, it will restore the original
+logcontext. There is nothing waiting on that Deferred, so the logcontext will
+leak into the reactor and possibly get attached to some arbitrary future
+operation.
+
+There are two potential solutions to this.
+
+One option is to surround the call to ``background_operation`` with a
+``PreserveLoggingContext`` call. That will reset the logcontext before
+starting ``background_operation`` (so the context restored when the deferred
+completes will be the empty logcontext), and will restore the current
+logcontext before continuing the foreground process:
+
+.. code:: python
+
+ @defer.inlineCallbacks
+ def do_request_handling():
+ yield foreground_operation()
+
+ # start background_operation off in the empty logcontext, to
+ # avoid leaking the current context into the reactor.
+ with PreserveLoggingContext():
+ background_operation()
+
+ # this will now be logged against the request context
+ logger.debug("Request handling complete")
+
+Obviously that option means that the operations done in
+``background_operation`` would be not be logged against a logcontext (though
+that might be fixed by setting a different logcontext via a ``with
+LoggingContext(...)`` in ``background_operation``).
+
+The second option is to use ``logcontext.preserve_fn``, which wraps a function
+so that it doesn't reset the logcontext even when it returns an incomplete
+deferred, and adds a callback to the returned deferred to reset the
+logcontext. In other words, it turns a function that follows the Synapse rules
+about logcontexts and Deferreds into one which behaves more like an external
+function — the opposite operation to that described in the previous section.
+It can be used like this:
+
+.. code:: python
+
+ @defer.inlineCallbacks
+ def do_request_handling():
+ yield foreground_operation()
+
+ logcontext.preserve_fn(background_operation)()
+
+ # this will now be logged against the request context
+ logger.debug("Request handling complete")
+
+XXX: I think ``preserve_context_over_fn`` is supposed to do the first option,
+but the fact that it does ``preserve_context_over_deferred`` on its results
+means that its use is fraught with difficulty.
+
+Passing synapse deferreds into third-party functions
+----------------------------------------------------
+
+A typical example of this is where we want to collect together two or more
+deferred via ``defer.gatherResults``:
+
+.. code:: python
+
+ d1 = operation1()
+ d2 = operation2()
+ d3 = defer.gatherResults([d1, d2])
+
+This is really a variation of the fire-and-forget problem above, in that we are
+firing off ``d1`` and ``d2`` without yielding on them. The difference
+is that we now have third-party code attached to their callbacks. Anyway either
+technique given in the `Fire-and-forget`_ section will work.
+
+Of course, the new Deferred returned by ``gatherResults`` needs to be wrapped
+in order to make it follow the logcontext rules before we can yield it, as
+described in `Where you create a new Deferred, make it follow the rules`_.
+
+So, option one: reset the logcontext before starting the operations to be
+gathered:
+
+.. code:: python
+
+ @defer.inlineCallbacks
+ def do_request_handling():
+ with PreserveLoggingContext():
+ d1 = operation1()
+ d2 = operation2()
+ result = yield defer.gatherResults([d1, d2])
+
+In this case particularly, though, option two, of using
+``logcontext.preserve_fn`` almost certainly makes more sense, so that
+``operation1`` and ``operation2`` are both logged against the original
+logcontext. This looks like:
+
+.. code:: python
+
+ @defer.inlineCallbacks
+ def do_request_handling():
+ d1 = logcontext.preserve_fn(operation1)()
+ d2 = logcontext.preserve_fn(operation2)()
+
+ with PreserveLoggingContext():
+ result = yield defer.gatherResults([d1, d2])
+
+
+Was all this really necessary?
+------------------------------
+
+The conventions used work fine for a linear flow where everything happens in
+series via ``defer.inlineCallbacks`` and ``yield``, but are certainly tricky to
+follow for any more exotic flows. It's hard not to wonder if we could have done
+something else.
+
+We're not going to rewrite Synapse now, so the following is entirely of
+academic interest, but I'd like to record some thoughts on an alternative
+approach.
+
+I briefly prototyped some code following an alternative set of rules. I think
+it would work, but I certainly didn't get as far as thinking how it would
+interact with concepts as complicated as the cache descriptors.
+
+My alternative rules were:
+
+* functions always preserve the logcontext of their caller, whether or not they
+ are returning a Deferred.
+
+* Deferreds returned by synapse functions run their callbacks in the same
+ context as the function was orignally called in.
+
+The main point of this scheme is that everywhere that sets the logcontext is
+responsible for clearing it before returning control to the reactor.
+
+So, for example, if you were the function which started a ``with
+LoggingContext`` block, you wouldn't ``yield`` within it — instead you'd start
+off the background process, and then leave the ``with`` block to wait for it:
+
+.. code:: python
+
+ def handle_request(request_id):
+ with logcontext.LoggingContext() as request_context:
+ request_context.request = request_id
+ d = do_request_handling()
+
+ def cb(r):
+ logger.debug("finished")
+
+ d.addCallback(cb)
+ return d
+
+(in general, mixing ``with LoggingContext`` blocks and
+``defer.inlineCallbacks`` in the same function leads to slighly
+counter-intuitive code, under this scheme).
+
+Because we leave the original ``with`` block as soon as the Deferred is
+returned (as opposed to waiting for it to be resolved, as we do today), the
+logcontext is cleared before control passes back to the reactor; so if there is
+some code within ``do_request_handling`` which needs to wait for a Deferred to
+complete, there is no need for it to worry about clearing the logcontext before
+doing so:
+
+.. code:: python
+
+ def handle_request():
+ r = do_some_stuff()
+ r.addCallback(do_some_more_stuff)
+ return r
+
+— and provided ``do_some_stuff`` follows the rules of returning a Deferred which
+runs its callbacks in the original logcontext, all is happy.
+
+The business of a Deferred which runs its callbacks in the original logcontext
+isn't hard to achieve — we have it today, in the shape of
+``logcontext._PreservingContextDeferred``:
+
+.. code:: python
+
+ def do_some_stuff():
+ deferred = do_some_io()
+ pcd = _PreservingContextDeferred(LoggingContext.current_context())
+ deferred.chainDeferred(pcd)
+ return pcd
+
+It turns out that, thanks to the way that Deferreds chain together, we
+automatically get the property of a context-preserving deferred with
+``defer.inlineCallbacks``, provided the final Defered the function ``yields``
+on has that property. So we can just write:
+
+.. code:: python
+
+ @defer.inlineCallbacks
+ def handle_request():
+ yield do_some_stuff()
+ yield do_some_more_stuff()
+
+To conclude: I think this scheme would have worked equally well, with less
+danger of messing it up, and probably made some more esoteric code easier to
+write. But again — changing the conventions of the entire Synapse codebase is
+not a sensible option for the marginal improvement offered.
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index 83ee3e3ce3..a6f1e7594e 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -29,7 +29,7 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
from synapse.storage.engines import create_engine
from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.manhole import manhole
from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
@@ -187,7 +187,11 @@ def start(config_options):
ps.start_listening(config.worker_listeners)
def run():
- with LoggingContext("run"):
+ # make sure that we run the reactor with the sentinel log context,
+ # otherwise other PreserveLoggingContext instances will get confused
+ # and complain when they see the logcontext arbitrarily swapping
+ # between the sentinel and `run` logcontexts.
+ with PreserveLoggingContext():
logger.info("Running")
change_resource_limit(config.soft_file_limit)
if config.gc_thresholds:
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 7ed0de4117..a821a6ce62 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -35,7 +35,7 @@ from synapse.storage.client_ips import ClientIpStore
from synapse.storage.engines import create_engine
from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.manhole import manhole
from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
@@ -193,7 +193,11 @@ def start(config_options):
ss.start_listening(config.worker_listeners)
def run():
- with LoggingContext("run"):
+ # make sure that we run the reactor with the sentinel log context,
+ # otherwise other PreserveLoggingContext instances will get confused
+ # and complain when they see the logcontext arbitrarily swapping
+ # between the sentinel and `run` logcontexts.
+ with PreserveLoggingContext():
logger.info("Running")
change_resource_limit(config.soft_file_limit)
if config.gc_thresholds:
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index ca742de6b2..e52b0f240d 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -31,7 +31,7 @@ from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.manhole import manhole
from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
@@ -184,7 +184,11 @@ def start(config_options):
ss.start_listening(config.worker_listeners)
def run():
- with LoggingContext("run"):
+ # make sure that we run the reactor with the sentinel log context,
+ # otherwise other PreserveLoggingContext instances will get confused
+ # and complain when they see the logcontext arbitrarily swapping
+ # between the sentinel and `run` logcontexts.
+ with PreserveLoggingContext():
logger.info("Running")
change_resource_limit(config.soft_file_limit)
if config.gc_thresholds:
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 0cf5b196e6..76c4cc54d1 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -35,7 +35,7 @@ from synapse.storage.engines import create_engine
from synapse.storage.presence import UserPresenceState
from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.manhole import manhole
from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
@@ -193,7 +193,11 @@ def start(config_options):
ps.start_listening(config.worker_listeners)
def run():
- with LoggingContext("run"):
+ # make sure that we run the reactor with the sentinel log context,
+ # otherwise other PreserveLoggingContext instances will get confused
+ # and complain when they see the logcontext arbitrarily swapping
+ # between the sentinel and `run` logcontexts.
+ with PreserveLoggingContext():
logger.info("Running")
change_resource_limit(config.soft_file_limit)
if config.gc_thresholds:
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 0b9d78c13c..2cdd2d39ff 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -52,7 +52,7 @@ from synapse.api.urls import (
)
from synapse.config.homeserver import HomeServerConfig
from synapse.crypto import context_factory
-from synapse.util.logcontext import LoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.metrics import register_memory_metrics, get_metrics_for
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
from synapse.replication.resource import ReplicationResource, REPLICATION_PREFIX
@@ -456,7 +456,12 @@ def run(hs):
def in_thread():
# Uncomment to enable tracing of log context changes.
# sys.settrace(logcontext_tracer)
- with LoggingContext("run"):
+
+ # make sure that we run the reactor with the sentinel log context,
+ # otherwise other PreserveLoggingContext instances will get confused
+ # and complain when they see the logcontext arbitrarily swapping
+ # between the sentinel and `run` logcontexts.
+ with PreserveLoggingContext():
change_resource_limit(hs.config.soft_file_limit)
if hs.config.gc_thresholds:
gc.set_threshold(*hs.config.gc_thresholds)
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index c5579d9e38..3c7984a237 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -32,7 +32,7 @@ from synapse.storage.engines import create_engine
from synapse.storage.media_repository import MediaRepositoryStore
from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.manhole import manhole
from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
@@ -190,7 +190,11 @@ def start(config_options):
ss.start_listening(config.worker_listeners)
def run():
- with LoggingContext("run"):
+ # make sure that we run the reactor with the sentinel log context,
+ # otherwise other PreserveLoggingContext instances will get confused
+ # and complain when they see the logcontext arbitrarily swapping
+ # between the sentinel and `run` logcontexts.
+ with PreserveLoggingContext():
logger.info("Running")
change_resource_limit(config.soft_file_limit)
if config.gc_thresholds:
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index b025db54d4..ab682e52ec 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -31,7 +31,8 @@ from synapse.storage.engines import create_engine
from synapse.storage import DataStore
from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, preserve_fn, \
+ PreserveLoggingContext
from synapse.util.manhole import manhole
from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
@@ -275,7 +276,11 @@ def start(config_options):
ps.start_listening(config.worker_listeners)
def run():
- with LoggingContext("run"):
+ # make sure that we run the reactor with the sentinel log context,
+ # otherwise other PreserveLoggingContext instances will get confused
+ # and complain when they see the logcontext arbitrarily swapping
+ # between the sentinel and `run` logcontexts.
+ with PreserveLoggingContext():
logger.info("Running")
change_resource_limit(config.soft_file_limit)
if config.gc_thresholds:
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 9f93b311d1..34e34e5580 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -47,7 +47,8 @@ from synapse.storage.presence import PresenceStore, UserPresenceState
from synapse.storage.roommember import RoomMemberStore
from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, preserve_fn, \
+ PreserveLoggingContext
from synapse.util.manhole import manhole
from synapse.util.rlimit import change_resource_limit
from synapse.util.stringutils import random_string
@@ -500,7 +501,11 @@ def start(config_options):
ss.start_listening(config.worker_listeners)
def run():
- with LoggingContext("run"):
+ # make sure that we run the reactor with the sentinel log context,
+ # otherwise other PreserveLoggingContext instances will get confused
+ # and complain when they see the logcontext arbitrarily swapping
+ # between the sentinel and `run` logcontexts.
+ with PreserveLoggingContext():
logger.info("Running")
change_resource_limit(config.soft_file_limit)
if config.gc_thresholds:
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index d7211ee9b3..80f27f8c53 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -96,10 +96,11 @@ class Keyring(object):
verify_requests = []
for server_name, json_object in server_and_json:
- logger.debug("Verifying for %s", server_name)
key_ids = signature_ids(json_object, server_name)
if not key_ids:
+ logger.warn("Request from %s: no supported signature keys",
+ server_name)
deferred = defer.fail(SynapseError(
400,
"Not signed with a supported algorithm",
@@ -108,6 +109,9 @@ class Keyring(object):
else:
deferred = defer.Deferred()
+ logger.debug("Verifying for %s with key_ids %s",
+ server_name, key_ids)
+
verify_request = VerifyKeyRequest(
server_name, key_ids, json_object, deferred
)
@@ -142,6 +146,9 @@ class Keyring(object):
json_object = verify_request.json_object
+ logger.debug("Got key %s %s:%s for server %s, verifying" % (
+ key_id, verify_key.alg, verify_key.version, server_name,
+ ))
try:
verify_signed_json(json_object, server_name, verify_key)
except:
diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py
index 11605b34a3..6be18880b9 100644
--- a/synapse/events/snapshot.py
+++ b/synapse/events/snapshot.py
@@ -15,6 +15,32 @@
class EventContext(object):
+ """
+ Attributes:
+ current_state_ids (dict[(str, str), str]):
+ The current state map including the current event.
+ (type, state_key) -> event_id
+
+ prev_state_ids (dict[(str, str), str]):
+ The current state map excluding the current event.
+ (type, state_key) -> event_id
+
+ state_group (int): state group id
+ rejected (bool|str): A rejection reason if the event was rejected, else
+ False
+
+ push_actions (list[(str, list[object])]): list of (user_id, actions)
+ tuples
+
+ prev_group (int): Previously persisted state group. ``None`` for an
+ outlier.
+ delta_ids (dict[(str, str), str]): Delta from ``prev_group``.
+ (type, state_key) -> event_id. ``None`` for an outlier.
+
+ prev_state_events (?): XXX: is this ever set to anything other than
+ the empty list?
+ """
+
__slots__ = [
"current_state_ids",
"prev_state_ids",
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 0cd5501b05..888dd01240 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -933,8 +933,9 @@ class FederationHandler(BaseHandler):
# lots of requests for missing prev_events which we do actually
# have. Hence we fire off the deferred, but don't wait for it.
- synapse.util.logcontext.reset_context_after_deferred(
- self._handle_queued_pdus(room_queue))
+ synapse.util.logcontext.preserve_fn(self._handle_queued_pdus)(
+ room_queue
+ )
defer.returnValue(True)
@@ -1537,7 +1538,17 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def _prep_event(self, origin, event, state=None, auth_events=None):
+ """
+
+ Args:
+ origin:
+ event:
+ state:
+ auth_events:
+ Returns:
+ Deferred, which resolves to synapse.events.snapshot.EventContext
+ """
context = yield self.state_handler.compute_event_context(
event, old_state=state,
)
diff --git a/synapse/state.py b/synapse/state.py
index 383d32b163..9a523a1b89 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -177,17 +177,12 @@ class StateHandler(object):
@defer.inlineCallbacks
def compute_event_context(self, event, old_state=None):
- """ Fills out the context with the `current state` of the graph. The
- `current state` here is defined to be the state of the event graph
- just before the event - i.e. it never includes `event`
-
- If `event` has `auth_events` then this will also fill out the
- `auth_events` field on `context` from the `current_state`.
+ """Build an EventContext structure for the event.
Args:
- event (EventBase)
+ event (synapse.events.EventBase):
Returns:
- an EventContext
+ synapse.events.snapshot.EventContext:
"""
context = EventContext()
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 94b2bcc54a..813ad59e56 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import synapse.util.async
from ._base import SQLBaseStore
from . import engines
@@ -84,24 +85,14 @@ class BackgroundUpdateStore(SQLBaseStore):
self._background_update_performance = {}
self._background_update_queue = []
self._background_update_handlers = {}
- self._background_update_timer = None
@defer.inlineCallbacks
def start_doing_background_updates(self):
- assert self._background_update_timer is None, \
- "background updates already running"
-
logger.info("Starting background schema updates")
while True:
- sleep = defer.Deferred()
- self._background_update_timer = self._clock.call_later(
- self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback, None
- )
- try:
- yield sleep
- finally:
- self._background_update_timer = None
+ yield synapse.util.async.sleep(
+ self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.)
try:
result = yield self.do_next_background_update(
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 256e50dc20..0d97de2fe7 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -201,19 +201,19 @@ class EventFederationStore(SQLBaseStore):
def _update_min_depth_for_room_txn(self, txn, room_id, depth):
min_depth = self._get_min_depth_interaction(txn, room_id)
- do_insert = depth < min_depth if min_depth else True
+ if min_depth and depth >= min_depth:
+ return
- if do_insert:
- self._simple_upsert_txn(
- txn,
- table="room_depth",
- keyvalues={
- "room_id": room_id,
- },
- values={
- "min_depth": depth,
- },
- )
+ self._simple_upsert_txn(
+ txn,
+ table="room_depth",
+ keyvalues={
+ "room_id": room_id,
+ },
+ values={
+ "min_depth": depth,
+ },
+ )
def _handle_mult_prev_events(self, txn, events):
"""
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index e8a7717640..3c8393bfe8 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -34,14 +34,16 @@ from canonicaljson import encode_canonical_json
from collections import deque, namedtuple, OrderedDict
from functools import wraps
-import synapse
import synapse.metrics
-
import logging
import math
import ujson as json
+# these are only included to make the type annotations work
+from synapse.events import EventBase # noqa: F401
+from synapse.events.snapshot import EventContext # noqa: F401
+
logger = logging.getLogger(__name__)
@@ -82,6 +84,11 @@ class _EventPeristenceQueue(object):
def add_to_queue(self, room_id, events_and_contexts, backfilled):
"""Add events to the queue, with the given persist_event options.
+
+ Args:
+ room_id (str):
+ events_and_contexts (list[(EventBase, EventContext)]):
+ backfilled (bool):
"""
queue = self._event_persist_queues.setdefault(room_id, deque())
if queue:
@@ -227,6 +234,17 @@ class EventsStore(SQLBaseStore):
@defer.inlineCallbacks
@log_function
def persist_event(self, event, context, backfilled=False):
+ """
+
+ Args:
+ event (EventBase):
+ context (EventContext):
+ backfilled (bool):
+
+ Returns:
+ Deferred: resolves to (int, int): the stream ordering of ``event``,
+ and the stream ordering of the latest persisted event
+ """
deferred = self._event_persist_queue.add_to_queue(
event.room_id, [(event, context)],
backfilled=backfilled,
@@ -253,6 +271,16 @@ class EventsStore(SQLBaseStore):
@defer.inlineCallbacks
def _persist_events(self, events_and_contexts, backfilled=False,
delete_existing=False):
+ """Persist events to db
+
+ Args:
+ events_and_contexts (list[(EventBase, EventContext)]):
+ backfilled (bool):
+ delete_existing (bool):
+
+ Returns:
+ Deferred: resolves when the events have been persisted
+ """
if not events_and_contexts:
return
@@ -554,11 +582,91 @@ class EventsStore(SQLBaseStore):
and the rejections table. Things reading from those table will need to check
whether the event was rejected.
- If delete_existing is True then existing events will be purged from the
- database before insertion. This is useful when retrying due to IntegrityError.
+ Args:
+ txn (twisted.enterprise.adbapi.Connection): db connection
+ events_and_contexts (list[(EventBase, EventContext)]):
+ events to persist
+ backfilled (bool): True if the events were backfilled
+ delete_existing (bool): True to purge existing table rows for the
+ events from the database. This is useful when retrying due to
+ IntegrityError.
+ current_state_for_room (dict[str, (list[str], list[str])]):
+ The current-state delta for each room. For each room, a tuple
+ (to_delete, to_insert), being a list of event ids to be removed
+ from the current state, and a list of event ids to be added to
+ the current state.
+ new_forward_extremeties (dict[str, list[str]]):
+ The new forward extremities for each room. For each room, a
+ list of the event ids which are the forward extremities.
+
"""
+ self._update_current_state_txn(txn, current_state_for_room)
+
max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
- for room_id, current_state_tuple in current_state_for_room.iteritems():
+ self._update_forward_extremities_txn(
+ txn,
+ new_forward_extremities=new_forward_extremeties,
+ max_stream_order=max_stream_order,
+ )
+
+ # Ensure that we don't have the same event twice.
+ events_and_contexts = self._filter_events_and_contexts_for_duplicates(
+ events_and_contexts,
+ )
+
+ self._update_room_depths_txn(
+ txn,
+ events_and_contexts=events_and_contexts,
+ backfilled=backfilled,
+ )
+
+ # _update_outliers_txn filters out any events which have already been
+ # persisted, and returns the filtered list.
+ events_and_contexts = self._update_outliers_txn(
+ txn,
+ events_and_contexts=events_and_contexts,
+ )
+
+ # From this point onwards the events are only events that we haven't
+ # seen before.
+
+ if delete_existing:
+ # For paranoia reasons, we go and delete all the existing entries
+ # for these events so we can reinsert them.
+ # This gets around any problems with some tables already having
+ # entries.
+ self._delete_existing_rows_txn(
+ txn,
+ events_and_contexts=events_and_contexts,
+ )
+
+ self._store_event_txn(
+ txn,
+ events_and_contexts=events_and_contexts,
+ )
+
+ # Insert into the state_groups, state_groups_state, and
+ # event_to_state_groups tables.
+ self._store_mult_state_groups_txn(txn, events_and_contexts)
+
+ # _store_rejected_events_txn filters out any events which were
+ # rejected, and returns the filtered list.
+ events_and_contexts = self._store_rejected_events_txn(
+ txn,
+ events_and_contexts=events_and_contexts,
+ )
+
+ # From this point onwards the events are only ones that weren't
+ # rejected.
+
+ self._update_metadata_tables_txn(
+ txn,
+ events_and_contexts=events_and_contexts,
+ backfilled=backfilled,
+ )
+
+ def _update_current_state_txn(self, txn, state_delta_by_room):
+ for room_id, current_state_tuple in state_delta_by_room.iteritems():
to_delete, to_insert = current_state_tuple
txn.executemany(
"DELETE FROM current_state_events WHERE event_id = ?",
@@ -608,7 +716,9 @@ class EventsStore(SQLBaseStore):
txn, self.get_current_state_ids, (room_id,)
)
- for room_id, new_extrem in new_forward_extremeties.items():
+ def _update_forward_extremities_txn(self, txn, new_forward_extremities,
+ max_stream_order):
+ for room_id, new_extrem in new_forward_extremities.items():
self._simple_delete_txn(
txn,
table="event_forward_extremities",
@@ -626,7 +736,7 @@ class EventsStore(SQLBaseStore):
"event_id": ev_id,
"room_id": room_id,
}
- for room_id, new_extrem in new_forward_extremeties.items()
+ for room_id, new_extrem in new_forward_extremities.items()
for ev_id in new_extrem
],
)
@@ -643,13 +753,22 @@ class EventsStore(SQLBaseStore):
"event_id": event_id,
"stream_ordering": max_stream_order,
}
- for room_id, new_extrem in new_forward_extremeties.items()
+ for room_id, new_extrem in new_forward_extremities.items()
for event_id in new_extrem
]
)
- # Ensure that we don't have the same event twice.
- # Pick the earliest non-outlier if there is one, else the earliest one.
+ @classmethod
+ def _filter_events_and_contexts_for_duplicates(cls, events_and_contexts):
+ """Ensure that we don't have the same event twice.
+
+ Pick the earliest non-outlier if there is one, else the earliest one.
+
+ Args:
+ events_and_contexts (list[(EventBase, EventContext)]):
+ Returns:
+ list[(EventBase, EventContext)]: filtered list
+ """
new_events_and_contexts = OrderedDict()
for event, context in events_and_contexts:
prev_event_context = new_events_and_contexts.get(event.event_id)
@@ -662,9 +781,17 @@ class EventsStore(SQLBaseStore):
new_events_and_contexts[event.event_id] = (event, context)
else:
new_events_and_contexts[event.event_id] = (event, context)
+ return new_events_and_contexts.values()
- events_and_contexts = new_events_and_contexts.values()
+ def _update_room_depths_txn(self, txn, events_and_contexts, backfilled):
+ """Update min_depth for each room
+ Args:
+ txn (twisted.enterprise.adbapi.Connection): db connection
+ events_and_contexts (list[(EventBase, EventContext)]): events
+ we are persisting
+ backfilled (bool): True if the events were backfilled
+ """
depth_updates = {}
for event, context in events_and_contexts:
# Remove the any existing cache entries for the event_ids
@@ -683,6 +810,21 @@ class EventsStore(SQLBaseStore):
for room_id, depth in depth_updates.items():
self._update_min_depth_for_room_txn(txn, room_id, depth)
+ def _update_outliers_txn(self, txn, events_and_contexts):
+ """Update any outliers with new event info.
+
+ This turns outliers into ex-outliers (unless the new event was
+ rejected).
+
+ Args:
+ txn (twisted.enterprise.adbapi.Connection): db connection
+ events_and_contexts (list[(EventBase, EventContext)]): events
+ we are persisting
+
+ Returns:
+ list[(EventBase, EventContext)] new list, without events which
+ are already in the events table.
+ """
txn.execute(
"SELECT event_id, outlier FROM events WHERE event_id in (%s)" % (
",".join(["?"] * len(events_and_contexts)),
@@ -697,19 +839,16 @@ class EventsStore(SQLBaseStore):
to_remove = set()
for event, context in events_and_contexts:
- if context.rejected:
- # If the event is rejected then we don't care if the event
- # was an outlier or not.
- if event.event_id in have_persisted:
- # If we have already seen the event then ignore it.
- to_remove.add(event)
- continue
-
if event.event_id not in have_persisted:
continue
to_remove.add(event)
+ if context.rejected:
+ # If the event is rejected then we don't care if the event
+ # was an outlier or not.
+ continue
+
outlier_persisted = have_persisted[event.event_id]
if not event.internal_metadata.is_outlier() and outlier_persisted:
# We received a copy of an event that we had already stored as
@@ -764,37 +903,19 @@ class EventsStore(SQLBaseStore):
# event isn't an outlier any more.
self._update_backward_extremeties(txn, [event])
- events_and_contexts = [
+ return [
ec for ec in events_and_contexts if ec[0] not in to_remove
]
+ @classmethod
+ def _delete_existing_rows_txn(cls, txn, events_and_contexts):
if not events_and_contexts:
- # Make sure we don't pass an empty list to functions that expect to
- # be storing at least one element.
+ # nothing to do here
return
- # From this point onwards the events are only events that we haven't
- # seen before.
+ logger.info("Deleting existing")
- def event_dict(event):
- return {
- k: v
- for k, v in event.get_dict().items()
- if k not in [
- "redacted",
- "redacted_because",
- ]
- }
-
- if delete_existing:
- # For paranoia reasons, we go and delete all the existing entries
- # for these events so we can reinsert them.
- # This gets around any problems with some tables already having
- # entries.
-
- logger.info("Deleting existing")
-
- for table in (
+ for table in (
"events",
"event_auth",
"event_json",
@@ -817,11 +938,34 @@ class EventsStore(SQLBaseStore):
"redactions",
"room_memberships",
"topics"
- ):
- txn.executemany(
- "DELETE FROM %s WHERE event_id = ?" % (table,),
- [(ev.event_id,) for ev, _ in events_and_contexts]
- )
+ ):
+ txn.executemany(
+ "DELETE FROM %s WHERE event_id = ?" % (table,),
+ [(ev.event_id,) for ev, _ in events_and_contexts]
+ )
+
+ def _store_event_txn(self, txn, events_and_contexts):
+ """Insert new events into the event and event_json tables
+
+ Args:
+ txn (twisted.enterprise.adbapi.Connection): db connection
+ events_and_contexts (list[(EventBase, EventContext)]): events
+ we are persisting
+ """
+
+ if not events_and_contexts:
+ # nothing to do here
+ return
+
+ def event_dict(event):
+ return {
+ k: v
+ for k, v in event.get_dict().items()
+ if k not in [
+ "redacted",
+ "redacted_because",
+ ]
+ }
self._simple_insert_many_txn(
txn,
@@ -865,6 +1009,19 @@ class EventsStore(SQLBaseStore):
],
)
+ def _store_rejected_events_txn(self, txn, events_and_contexts):
+ """Add rows to the 'rejections' table for received events which were
+ rejected
+
+ Args:
+ txn (twisted.enterprise.adbapi.Connection): db connection
+ events_and_contexts (list[(EventBase, EventContext)]): events
+ we are persisting
+
+ Returns:
+ list[(EventBase, EventContext)] new list, without the rejected
+ events.
+ """
# Remove the rejected events from the list now that we've added them
# to the events table and the events_json table.
to_remove = set()
@@ -876,17 +1033,24 @@ class EventsStore(SQLBaseStore):
)
to_remove.add(event)
- events_and_contexts = [
+ return [
ec for ec in events_and_contexts if ec[0] not in to_remove
]
+ def _update_metadata_tables_txn(self, txn, events_and_contexts, backfilled):
+ """Update all the miscellaneous tables for new events
+
+ Args:
+ txn (twisted.enterprise.adbapi.Connection): db connection
+ events_and_contexts (list[(EventBase, EventContext)]): events
+ we are persisting
+ backfilled (bool): True if the events were backfilled
+ """
+
if not events_and_contexts:
- # Make sure we don't pass an empty list to functions that expect to
- # be storing at least one element.
+ # nothing to do here
return
- # From this point onwards the events are only ones that weren't rejected.
-
for event, context in events_and_contexts:
# Insert all the push actions into the event_push_actions table.
if context.push_actions:
@@ -915,10 +1079,6 @@ class EventsStore(SQLBaseStore):
],
)
- # Insert into the state_groups, state_groups_state, and
- # event_to_state_groups tables.
- self._store_mult_state_groups_txn(txn, events_and_contexts)
-
# Update the event_forward_extremities, event_backward_extremities and
# event_edges tables.
self._handle_mult_prev_events(
@@ -1005,13 +1165,6 @@ class EventsStore(SQLBaseStore):
# Prefill the event cache
self._add_to_cache(txn, events_and_contexts)
- if backfilled:
- # Backfilled events come before the current state so we don't need
- # to update the current state table
- return
-
- return
-
def _add_to_cache(self, txn, events_and_contexts):
to_prefill = []
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 27f1ec89ec..1b42bea07a 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -136,6 +136,16 @@ class StateStore(SQLBaseStore):
continue
if context.current_state_ids is None:
+ # AFAIK, this can never happen
+ logger.error(
+ "Non-outlier event %s had current_state_ids==None",
+ event.event_id)
+ continue
+
+ # if the event was rejected, just give it the same state as its
+ # predecessor.
+ if context.rejected:
+ state_groups[event.event_id] = context.prev_group
continue
state_groups[event.event_id] = context.state_group
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index d73670f9f2..ff67b1d794 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -12,6 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+""" Thread-local-alike tracking of log contexts within synapse
+
+This module provides objects and utilities for tracking contexts through
+synapse code, so that log lines can include a request identifier, and so that
+CPU and database activity can be accounted for against the request that caused
+them.
+
+See doc/log_contexts.rst for details on how this works.
+"""
+
from twisted.internet import defer
import threading
@@ -308,47 +318,44 @@ def preserve_context_over_deferred(deferred, context=None):
return d
-def reset_context_after_deferred(deferred):
- """If the deferred is incomplete, add a callback which will reset the
- context.
-
- This is useful when you want to fire off a deferred, but don't want to
- wait for it to complete. (The deferred will restore the current log context
- when it completes, so if you don't do anything, it will leak log context.)
-
- (If this feels asymmetric, consider it this way: we are effectively forking
- a new thread of execution. We are probably currently within a
- ``with LoggingContext()`` block, which is supposed to have a single entry
- and exit point. But by spawning off another deferred, we are effectively
- adding a new exit point.)
+def preserve_fn(f):
+ """Wraps a function, to ensure that the current context is restored after
+ return from the function, and that the sentinel context is set once the
+ deferred returned by the funtion completes.
- Args:
- deferred (defer.Deferred): deferred
+ Useful for wrapping functions that return a deferred which you don't yield
+ on.
"""
def reset_context(result):
LoggingContext.set_current_context(LoggingContext.sentinel)
return result
- if not deferred.called:
- deferred.addBoth(reset_context)
-
-
-def preserve_fn(f):
- """Ensures that function is called with correct context and that context is
- restored after return. Useful for wrapping functions that return a deferred
- which you don't yield on.
- """
+ # XXX: why is this here rather than inside g? surely we want to preserve
+ # the context from the time the function was called, not when it was
+ # wrapped?
current = LoggingContext.current_context()
def g(*args, **kwargs):
- with PreserveLoggingContext(current):
- res = f(*args, **kwargs)
- if isinstance(res, defer.Deferred):
- return preserve_context_over_deferred(
- res, context=LoggingContext.sentinel
- )
- else:
- return res
+ res = f(*args, **kwargs)
+ if isinstance(res, defer.Deferred) and not res.called:
+ # The function will have reset the context before returning, so
+ # we need to restore it now.
+ LoggingContext.set_current_context(current)
+
+ # The original context will be restored when the deferred
+ # completes, but there is nothing waiting for it, so it will
+ # get leaked into the reactor or some other function which
+ # wasn't expecting it. We therefore need to reset the context
+ # here.
+ #
+ # (If this feels asymmetric, consider it this way: we are
+ # effectively forking a new thread of execution. We are
+ # probably currently within a ``with LoggingContext()`` block,
+ # which is supposed to have a single entry and exit point. But
+ # by spawning off another deferred, we are effectively
+ # adding a new exit point.)
+ res.addBoth(reset_context)
+ return res
return g
diff --git a/tests/util/test_log_context.py b/tests/util/test_log_context.py
index 65a330a0e9..9ffe209c4d 100644
--- a/tests/util/test_log_context.py
+++ b/tests/util/test_log_context.py
@@ -1,8 +1,10 @@
+import twisted.python.failure
from twisted.internet import defer
from twisted.internet import reactor
from .. import unittest
from synapse.util.async import sleep
+from synapse.util import logcontext
from synapse.util.logcontext import LoggingContext
@@ -33,3 +35,62 @@ class LoggingContextTestCase(unittest.TestCase):
context_one.test_key = "one"
yield sleep(0)
self._check_test_key("one")
+
+ def _test_preserve_fn(self, function):
+ sentinel_context = LoggingContext.current_context()
+
+ callback_completed = [False]
+
+ @defer.inlineCallbacks
+ def cb():
+ context_one.test_key = "one"
+ yield function()
+ self._check_test_key("one")
+
+ callback_completed[0] = True
+
+ with LoggingContext() as context_one:
+ context_one.test_key = "one"
+
+ # fire off function, but don't wait on it.
+ logcontext.preserve_fn(cb)()
+
+ self._check_test_key("one")
+
+ # now wait for the function under test to have run, and check that
+ # the logcontext is left in a sane state.
+ d2 = defer.Deferred()
+
+ def check_logcontext():
+ if not callback_completed[0]:
+ reactor.callLater(0.01, check_logcontext)
+ return
+
+ # make sure that the context was reset before it got thrown back
+ # into the reactor
+ try:
+ self.assertIs(LoggingContext.current_context(),
+ sentinel_context)
+ d2.callback(None)
+ except BaseException:
+ d2.errback(twisted.python.failure.Failure())
+
+ reactor.callLater(0.01, check_logcontext)
+
+ # test is done once d2 finishes
+ return d2
+
+ def test_preserve_fn_with_blocking_fn(self):
+ @defer.inlineCallbacks
+ def blocking_function():
+ yield sleep(0)
+
+ return self._test_preserve_fn(blocking_function)
+
+ def test_preserve_fn_with_non_blocking_fn(self):
+ @defer.inlineCallbacks
+ def nonblocking_function():
+ with logcontext.PreserveLoggingContext():
+ yield defer.succeed(None)
+
+ return self._test_preserve_fn(nonblocking_function)
|