From 73a5f06652c6966eead46eded1d68f6f3522b54a Mon Sep 17 00:00:00 2001 From: David Baker Date: Mon, 13 Mar 2017 17:27:51 +0000 Subject: Support registration / login with phone number Changes from https://github.com/matrix-org/synapse/pull/1971 --- synapse/util/msisdn.py | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 synapse/util/msisdn.py (limited to 'synapse/util') diff --git a/synapse/util/msisdn.py b/synapse/util/msisdn.py new file mode 100644 index 0000000000..607161e7f0 --- /dev/null +++ b/synapse/util/msisdn.py @@ -0,0 +1,40 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import phonenumbers +from synapse.api.errors import SynapseError + + +def phone_number_to_msisdn(country, number): + """ + Takes an ISO-3166-1 2 letter country code and phone number and + returns an msisdn representing the canonical version of that + phone number. + Args: + country (str): ISO-3166-1 2 letter country code + number (str): Phone number in a national or international format + + Returns: + (str) The canonical form of the phone number, as an msisdn + Raises: + SynapseError if the number could not be parsed. + """ + try: + phoneNumber = phonenumbers.parse(number, country) + except phonenumbers.NumberParseException: + raise SynapseError(400, "Unable to parse phone number") + return phonenumbers.format_number( + phoneNumber, phonenumbers.PhoneNumberFormat.E164 + )[1:] -- cgit 1.4.1 From b5d1c68beb24c314006398052070448d67bb4983 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 15 Mar 2017 02:21:07 +0000 Subject: Implement reset_context_after_deferred to correctly reset the context when we fire off a deferred we aren't going to wait for. --- synapse/util/logcontext.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) (limited to 'synapse/util') diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 6c83eb213d..d73670f9f2 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -308,6 +308,31 @@ def preserve_context_over_deferred(deferred, context=None): return d +def reset_context_after_deferred(deferred): + """If the deferred is incomplete, add a callback which will reset the + context. + + This is useful when you want to fire off a deferred, but don't want to + wait for it to complete. (The deferred will restore the current log context + when it completes, so if you don't do anything, it will leak log context.) + + (If this feels asymmetric, consider it this way: we are effectively forking + a new thread of execution. We are probably currently within a + ``with LoggingContext()`` block, which is supposed to have a single entry + and exit point. But by spawning off another deferred, we are effectively + adding a new exit point.) + + Args: + deferred (defer.Deferred): deferred + """ + def reset_context(result): + LoggingContext.set_current_context(LoggingContext.sentinel) + return result + + if not deferred.called: + deferred.addBoth(reset_context) + + def preserve_fn(f): """Ensures that function is called with correct context and that context is restored after return. Useful for wrapping functions that return a deferred -- cgit 1.4.1 From 29ed09e80a8c7ddeebe3f257a336e4c387a06c88 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 15 Mar 2017 12:16:55 +0000 Subject: Fix assertion to stop transaction queue getting wedged ... and update some docstrings to correctly reflect the types being used. get_new_device_msgs_for_remote can return a long under some circumstances, which was being stored in last_device_list_stream_id_by_dest, and was then upsetting things on the next loop. --- synapse/federation/transaction_queue.py | 5 +++++ synapse/replication/slave/storage/_slaved_id_tracker.py | 5 +++++ synapse/storage/deviceinbox.py | 6 +++--- synapse/storage/devices.py | 2 +- synapse/storage/util/id_generators.py | 14 ++++++++++++++ synapse/util/caches/stream_change_cache.py | 2 +- 6 files changed, 29 insertions(+), 5 deletions(-) (limited to 'synapse/util') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 90235ff098..c802dd67a3 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -99,7 +99,12 @@ class TransactionQueue(object): # destination -> list of tuple(failure, deferred) self.pending_failures_by_dest = {} + # destination -> stream_id of last successfully sent to-device message. + # NB: may be a long or an int. self.last_device_stream_id_by_dest = {} + + # destination -> stream_id of last successfully sent device list + # update. self.last_device_list_stream_id_by_dest = {} # HACK to get unique tx id diff --git a/synapse/replication/slave/storage/_slaved_id_tracker.py b/synapse/replication/slave/storage/_slaved_id_tracker.py index 24b5c79d4a..9d1d173b2f 100644 --- a/synapse/replication/slave/storage/_slaved_id_tracker.py +++ b/synapse/replication/slave/storage/_slaved_id_tracker.py @@ -27,4 +27,9 @@ class SlavedIdTracker(object): self._current = (max if self.step > 0 else min)(self._current, new_id) def get_current_token(self): + """ + + Returns: + int + """ return self._current diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 5c7db5e5f6..7925cb5f1b 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -357,12 +357,12 @@ class DeviceInboxStore(BackgroundUpdateStore): """ Args: destination(str): The name of the remote server. - last_stream_id(int): The last position of the device message stream + last_stream_id(int|long): The last position of the device message stream that the server sent up to. - current_stream_id(int): The current position of the device + current_stream_id(int|long): The current position of the device message stream. Returns: - Deferred ([dict], int): List of messages for the device and where + Deferred ([dict], int|long): List of messages for the device and where in the stream the messages got to. """ diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 563071b7a9..e545b62e39 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -308,7 +308,7 @@ class DeviceStore(SQLBaseStore): """Get stream of updates to send to remote servers Returns: - (now_stream_id, [ { updates }, .. ]) + (int, list[dict]): current stream id and list of updates """ now_stream_id = self._device_list_id_gen.get_current_token() diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 46cf93ff87..95031dc9ec 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -30,6 +30,17 @@ class IdGenerator(object): def _load_current_id(db_conn, table, column, step=1): + """ + + Args: + db_conn (object): + table (str): + column (str): + step (int): + + Returns: + int + """ cur = db_conn.cursor() if step == 1: cur.execute("SELECT MAX(%s) FROM %s" % (column, table,)) @@ -131,6 +142,9 @@ class StreamIdGenerator(object): def get_current_token(self): """Returns the maximum stream id such that all stream ids less than or equal to it have been successfully persisted. + + Returns: + int """ with self._lock: if self._unfinished_ids: diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index b72bb0ff02..70fe00ce0b 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -50,7 +50,7 @@ class StreamChangeCache(object): def has_entity_changed(self, entity, stream_pos): """Returns True if the entity may have been updated since stream_pos """ - assert type(stream_pos) is int + assert type(stream_pos) is int or type(stream_pos) is long if stream_pos < self._earliest_known_stream_pos: self.metrics.inc_misses() -- cgit 1.4.1 From d2d146a3143e94c5f68caadc3dbeff4452f86853 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 17 Mar 2017 15:11:26 +0000 Subject: Logcontext docs --- docs/log_contexts.rst | 447 ++++++++++++++++++++++++++++++++++++++++++++- synapse/util/logcontext.py | 10 + 2 files changed, 449 insertions(+), 8 deletions(-) (limited to 'synapse/util') diff --git a/docs/log_contexts.rst b/docs/log_contexts.rst index 0046e171be..a5f59745e6 100644 --- a/docs/log_contexts.rst +++ b/docs/log_contexts.rst @@ -1,10 +1,441 @@ -What do I do about "Unexpected logging context" debug log-lines everywhere? +Log contexts +============ - The logging context lives in thread local storage - Sometimes it gets out of sync with what it should actually be, usually because something scheduled something to run on the reactor without preserving the logging context. - what is the impact of it getting out of sync? and how and when should we preserve log context? - The impact is that some of the CPU and database metrics will be under-reported, and some log lines will be mis-attributed. - It should happen auto-magically in all the APIs that do IO or otherwise defer to the reactor. - Mjark: the other place is if we branch, e.g. using defer.gatherResults +.. contents:: -Unanswered: how and when should we preserve log context? \ No newline at end of file +To help track the processing of individual requests, synapse uses a +'log context' to track which request it is handling at any given moment. This +is done via a thread-local variable; a ``logging.Filter`` is then used to fish +the information back out of the thread-local variable and add it to each log +record. + +Logcontexts are also used for CPU and database accounting, so that we can track +which requests were responsible for high CPU use or database activity. + +The ``synapse.util.logcontext`` module provides a facilities for managing the +current log context (as well as providing the ``LoggingContextFilter`` class). + +Deferreds make the whole thing complicated, so this document describes how it +all works, and how to write code which follows the rules. + +Logcontexts without Deferreds +----------------------------- + +In the absence of any Deferred voodoo, things are simple enough. As with any +code of this nature, the rule is that our function should leave things as it +found them: + +.. code:: python + + from synapse.util import logcontext # omitted from future snippets + + def handle_request(request_id): + request_context = logcontext.LoggingContext() + + calling_context = logcontext.LoggingContext.current_context() + logcontext.LoggingContext.set_current_context(request_context) + try: + request_context.request = request_id + do_request_handling() + logger.debug("finished") + finally: + logcontext.LoggingContext.set_current_context(calling_context) + + def do_request_handling(): + logger.debug("phew") # this will be logged against request_id + + +LoggingContext implements the context management methods, so the above can be +written much more succinctly as: + +.. code:: python + + def handle_request(request_id): + with logcontext.LoggingContext() as request_context: + request_context.request = request_id + do_request_handling() + logger.debug("finished") + + def do_request_handling(): + logger.debug("phew") + + +Using logcontexts with Deferreds +-------------------------------- + +Deferreds — and in particular, ``defer.inlineCallbacks`` — break +the linear flow of code so that there is no longer a single entry point where +we should set the logcontext and a single exit point where we should remove it. + +Consider the example above, where ``do_request_handling`` needs to do some +blocking operation, and returns a deferred: + +.. code:: python + + @defer.inlineCallbacks + def handle_request(request_id): + with logcontext.LoggingContext() as request_context: + request_context.request = request_id + yield do_request_handling() + logger.debug("finished") + + +In the above flow: + +* The logcontext is set +* ``do_request_handling`` is called, and returns a deferred +* ``handle_request`` yields the deferred +* The ``inlineCallbacks`` wrapper of ``handle_request`` returns a deferred + +So we have stopped processing the request (and will probably go on to start +processing the next), without clearing the logcontext. + +To circumvent this problem, synapse code assumes that, wherever you have a +deferred, you will want to yield on it. To that end, whereever functions return +a deferred, we adopt the following conventions: + +.. note:: Rules for functions returning deferreds: + + * If the deferred is already complete, the function returns with the same + logcontext it started with. + * If the deferred is incomplete, the function clears the logcontext before + returning; when the deferred completes, it restores the logcontext before + running any callbacks. + +That sounds complicated, but actually it means a lot of code (including the +example above) "just works". There are two cases: + +* If ``do_request_handling`` returns a completed deferred, then the logcontext + will still be in place. In this case, execution will continue immediately + after the ``yield``; the "finished" line will be logged against the right + context, and the ``with`` block restores the original context before we + return to the caller. + +* If the returned deferred is incomplete, ``do_request_handling`` clears the + logcontext before returning. The logcontext is therefore clear when + ``handle_request`` yields the deferred. At that point, the ``inlineCallbacks`` + wrapper adds a callback to the deferred, and returns another (incomplete) + deferred to the caller, and it is safe to begin processing the next request. + + Once ``do_request_handling``'s deferred completes, it will reinstate the + logcontext, before running the callback added by the ``inlineCallbacks`` + wrapper. That callback runs the second half of ``handle_request``, so again + the "finished" line will be logged against the right + context, and the ``with`` block restores the original context. + +As an aside, it's worth noting that ``handle_request`` follows our rules - +though that only matters if the caller has its own logcontext which it cares +about. + +The following sections describe pitfalls and helpful patterns when implementing +these rules. + +Always yield your deferreds +--------------------------- + +Whenever you get a deferred back from a function, you should ``yield`` on it +as soon as possible. (Returning it directly to your caller is ok too, if you're +not doing ``inlineCallbacks``.) Do not pass go; do not do any logging; do not +call any other functions. + +.. code:: python + + @defer.inlineCallbacks + def fun(): + logger.debug("starting") + yield do_some_stuff() # just like this + + d = more_stuff() + result = yield d # also fine, of course + + defer.returnValue(result) + + def nonInlineCallbacksFun(): + logger.debug("just a wrapper really") + return do_some_stuff() # this is ok too - the caller will yield on + # it anyway. + +Provided this pattern is followed all the way back up to the callchain to where +the logcontext was set, this will make things work out ok: provided +``do_some_stuff`` and ``more_stuff`` follow the rules above, then so will +``fun`` (as wrapped by ``inlineCallbacks``) and ``nonInlineCallbacksFun``. + +It's all too easy to forget to ``yield``: for instance if we forgot that +``do_some_stuff`` returned a deferred, we might plough on regardless. This +leads to a mess; it will probably work itself out eventually, but not before +a load of stuff has been logged against the wrong content. (Normally, other +things will break, more obviously, if you forget to ``yield``, so this tends +not to be a major problem in practice.) + +Of course sometimes you need to do something a bit fancier with your Deferreds +- not all code follows the linear A-then-B-then-C pattern. Notes on +implementing more complex patterns are in later sections. + +Where you create a new Deferred, make it follow the rules +--------------------------------------------------------- + +Most of the time, a Deferred comes from another synapse function. Sometimes, +though, we need to make up a new Deferred, or we get a Deferred back from +external code. We need to make it follow our rules. + +The easy way to do it is with a combination of ``defer.inlineCallbacks``, and +``logcontext.PreserveLoggingContext``. Suppose we want to implement ``sleep``, +which returns a deferred which will run its callbacks after a given number of +seconds. That might look like: + +.. code:: python + + # not a logcontext-rules-compliant function + def get_sleep_deferred(seconds): + d = defer.Deferred() + reactor.callLater(seconds, d.callback, None) + return d + +That doesn't follow the rules, but we can fix it by wrapping it with +``PreserveLoggingContext`` and ``yield`` ing on it: + +.. code:: python + + @defer.inlineCallbacks + def sleep(seconds): + with PreserveLoggingContext(): + yield get_sleep_deferred(seconds) + +This technique works equally for external functions which return deferreds, +or deferreds we have made ourselves. + +XXX: think this is what ``preserve_context_over_deferred`` is supposed to do, +though it is broken, in that it only restores the logcontext for the duration +of the callbacks, which doesn't comply with the logcontext rules. + +Fire-and-forget +--------------- + +Sometimes you want to fire off a chain of execution, but not wait for its +result. That might look a bit like this: + +.. code:: python + + @defer.inlineCallbacks + def do_request_handling(): + yield foreground_operation() + + # *don't* do this + background_operation() + + logger.debug("Request handling complete") + + @defer.inlineCallbacks + def background_operation(): + yield first_background_step() + logger.debug("Completed first step") + yield second_background_step() + logger.debug("Completed second step") + +The above code does a couple of steps in the background after +``do_request_handling`` has finished. The log lines are still logged against +the ``request_context`` logcontext, which may or may not be desirable. There +are two big problems with the above, however. The first problem is that, if +``background_operation`` returns an incomplete Deferred, it will expect its +caller to ``yield`` immediately, so will have cleared the logcontext. In this +example, that means that 'Request handling complete' will be logged without any +context. + +The second problem, which is potentially even worse, is that when the Deferred +returned by ``background_operation`` completes, it will restore the original +logcontext. There is nothing waiting on that Deferred, so the logcontext will +leak into the reactor and possibly get attached to some arbitrary future +operation. + +There are two potential solutions to this. + +One option is to surround the call to ``background_operation`` with a +``PreserveLoggingContext`` call. That will reset the logcontext before +starting ``background_operation`` (so the context restored when the deferred +completes will be the empty logcontext), and will restore the current +logcontext before continuing the foreground process: + +.. code:: python + + @defer.inlineCallbacks + def do_request_handling(): + yield foreground_operation() + + # start background_operation off in the empty logcontext, to + # avoid leaking the current context into the reactor. + with PreserveLoggingContext(): + background_operation() + + # this will now be logged against the request context + logger.debug("Request handling complete") + +Obviously that option means that the operations done in +``background_operation`` would be not be logged against a logcontext (though +that might be fixed by setting a different logcontext via a ``with +LoggingContext(...)`` in ``background_operation``). + +The second option is to use ``logcontext.preserve_fn``, which wraps a function +so that it doesn't reset the logcontext even when it returns an incomplete +deferred, and adds a callback to the returned deferred to reset the +logcontext. In other words, it turns a function that follows the Synapse rules +about logcontexts and Deferreds into one which behaves more like an external +function — the opposite operation to that described in the previous section. +It can be used like this: + +.. code:: python + + @defer.inlineCallbacks + def do_request_handling(): + yield foreground_operation() + + logcontext.preserve_fn(background_operation)() + + # this will now be logged against the request context + logger.debug("Request handling complete") + +XXX: I think ``preserve_context_over_fn`` is supposed to do the first option, +but the fact that it does ``preserve_context_over_deferred`` on its results +means that its use is fraught with difficulty. + +Passing synapse deferreds into third-party functions +---------------------------------------------------- + +A typical example of this is where we want to collect together two or more +deferred via ``defer.gatherResults``: + +.. code:: python + + d1 = operation1() + d2 = operation2() + d3 = defer.gatherResults([d1, d2]) + +This is really a variation of the fire-and-forget problem above, in that we are +firing off ``d1`` and ``d2`` without yielding on them. The difference +is that we now have third-party code attached to their callbacks. Anyway either +technique given in the `Fire-and-forget`_ section will work. + +Of course, the new Deferred returned by ``gatherResults`` needs to be wrapped +in order to make it follow the logcontext rules before we can yield it, as +described in `Where you create a new Deferred, make it follow the rules`_. + +So, option one: reset the logcontext before starting the operations to be +gathered: + +.. code:: python + + @defer.inlineCallbacks + def do_request_handling(): + with PreserveLoggingContext(): + d1 = operation1() + d2 = operation2() + result = yield defer.gatherResults([d1, d2]) + +In this case particularly, though, option two, of using +``logcontext.preserve.fn`` almost certainly makes more sense, so that +``operation1`` and ``operation2`` are both logged against the original +logcontext. This looks like: + +.. code:: python + + @defer.inlineCallbacks + def do_request_handling(): + d1 = logcontext.preserve_fn(operation1)() + d2 = logcontext.preserve_fn(operation2)() + + with PreserveLoggingContext(): + result = yield defer.gatherResults([d1, d2]) + + +Was all this really necessary? +------------------------------ + +The conventions used work fine for a linear flow where everything happens in +series via ``defer.inlineCallbacks`` and ``yield``, but are certainly tricky to +follow for any more exotic flows. It's hard not to wonder if we could have done +something else. + +We're not going to rewrite Synapse now, so the following is entirely of +acadamic interest, but I'd like to record some thoughts on an alternative +approach. + +I briefly prototyped some code following an alternative set of rules. I think +it would work, but I certainly didn't get as far as thinking how it would +interact with concepts as complicated as the cache descriptors. + +My alternative rules were: + +* functions always preserve the logcontext of their caller, whether or not they + are returning a Deferred. + +* Deferreds returned by synapse functions run their callbacks in the same + context as the function was orignally called in. + +The main point of this scheme is that everywhere that sets the logcontext is +responsible for clearing it before returning control to the reactor. + +So, for example, if you were the function which started a ``with +LoggingContext`` block, you wouldn't ``yield`` within it — instead you'd start +off the background process, and then leave the ``with`` block to wait for it: + +.. code:: python + + def handle_request(request_id): + with logcontext.LoggingContext() as request_context: + request_context.request = request_id + d = do_request_handling() + + def cb(r): + logger.debug("finished") + + d.addCallback(cb) + return d + +(in general, mixing ``with LoggingContext`` blocks and +``defer.inlineCallbacks`` in the same function leads to slighly +counter-intuitive code, under this scheme). + +Because we leave the original ``with`` block as soon as the Deferred is +returned (as opposed to waiting for it to be resolved, as we do today), the +logcontext is cleared before control passes back to the reactor; so if there is +some code within ``do_request_handling`` which needs to wait for a Deferred to +complete, there is no need for it to worry about clearing the logcontext before +doing so: + +.. code:: python + + def handle_request(): + r = do_some_stuff() + r.addCallback(do_some_more_stuff) + return r + +— and provided ``do_some_stuff`` follows the rules of returning a Deferred which +runs its callbacks in the original logcontext, all is happy. + +The business of a Deferred which runs its callbacks in the original logcontext +isn't hard to achieve — we have it today, in the shape of +``logcontext._PreservingContextDeferred``: + +.. code:: python + + def do_some_stuff(): + deferred = do_some_io() + pcd = _PreservingContextDeferred(LoggingContext.current_context()) + deferred.chainDeferred(pcd) + return pcd + +It turns out that, thanks to the way that Deferreds chain together, we +automatically get the property of a context-preserving deferred with +``defer.inlineCallbacks``, provided the final Defered the function ``yields`` +on has that property. So we can just write: + +.. code:: python + + @defer.inlineCallbacks + def handle_request(): + yield do_some_stuff() + yield do_some_more_stuff() + +To conclude: I think this scheme would have worked equally well, with less +danger of messing it up, and probably made some more esoteric code easier to +write. But again — changing the conventions of the entire Synapse codebase is +not a sensible option for the marginal improvement offered. diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index d73670f9f2..1135a683af 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -12,6 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +""" Thread-local-alike tracking of log contexts within synapse + +This module provides objects and utilities for tracking contexts through +synapse code, so that log lines can include a request identifier, and so that +CPU and database activity can be accounted for against the request that caused +them. + +See doc/log_contexts.rst for details on how this works. +""" + from twisted.internet import defer import threading -- cgit 1.4.1 From f40c2db05ae5e76428c97f1e194fe7d843913054 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 17 Mar 2017 20:56:54 +0000 Subject: Stop preserve_fn leaking context into the reactor Fix a bug in ``logcontext.preserve_fn`` which made it leak context into the reactor, and add a test for it. Also, get rid of ``logcontext.reset_context_after_deferred``, which tried to do the same thing but had its own, different, set of bugs. --- synapse/handlers/federation.py | 5 ++-- synapse/util/logcontext.py | 61 ++++++++++++++++++++---------------------- tests/util/test_log_context.py | 61 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 93 insertions(+), 34 deletions(-) (limited to 'synapse/util') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0cd5501b05..6157204924 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -933,8 +933,9 @@ class FederationHandler(BaseHandler): # lots of requests for missing prev_events which we do actually # have. Hence we fire off the deferred, but don't wait for it. - synapse.util.logcontext.reset_context_after_deferred( - self._handle_queued_pdus(room_queue)) + synapse.util.logcontext.preserve_fn(self._handle_queued_pdus)( + room_queue + ) defer.returnValue(True) diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index d73670f9f2..7cbe390b15 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -308,47 +308,44 @@ def preserve_context_over_deferred(deferred, context=None): return d -def reset_context_after_deferred(deferred): - """If the deferred is incomplete, add a callback which will reset the - context. - - This is useful when you want to fire off a deferred, but don't want to - wait for it to complete. (The deferred will restore the current log context - when it completes, so if you don't do anything, it will leak log context.) - - (If this feels asymmetric, consider it this way: we are effectively forking - a new thread of execution. We are probably currently within a - ``with LoggingContext()`` block, which is supposed to have a single entry - and exit point. But by spawning off another deferred, we are effectively - adding a new exit point.) +def preserve_fn(f): + """Wraps a function, to ensure that the current context is restored after + return from the function, and that the sentinel context is set once the + deferred returned by the funtion completes. - Args: - deferred (defer.Deferred): deferred + Useful for wrapping functions that return a deferred which you don't yield + on. """ def reset_context(result): LoggingContext.set_current_context(LoggingContext.sentinel) return result - if not deferred.called: - deferred.addBoth(reset_context) - - -def preserve_fn(f): - """Ensures that function is called with correct context and that context is - restored after return. Useful for wrapping functions that return a deferred - which you don't yield on. - """ + # XXX: why is this here rather than inside g? surely we want to preserve + # the context from the time the function was called, not when it was + # wrapped? current = LoggingContext.current_context() def g(*args, **kwargs): - with PreserveLoggingContext(current): - res = f(*args, **kwargs) - if isinstance(res, defer.Deferred): - return preserve_context_over_deferred( - res, context=LoggingContext.sentinel - ) - else: - return res + res = f(*args, **kwargs) + if isinstance(res, defer.Deferred) and not res.called: + # The function will have reset the context before returning, so + # we need to restore it now. + LoggingContext.set_current_context(current) + + # The original context will be restored when the deferred + # completes, but there is nothing waiting for it, so it will + # get leaked into the reactor or some other function which + # wasn't expecting it. We therefore need to reset the context + # here. + # + # (If this feels asymmetric, consider it this way: we are + # effectively forking a new thread of execution. We are + # probably currently within a ``with LoggingContext()`` block, + # which is supposed to have a single entry and exit point. But + # by spawning off another deferred, we are effectively + # adding a new exit point.) + res.addBoth(reset_context) + return res return g diff --git a/tests/util/test_log_context.py b/tests/util/test_log_context.py index 65a330a0e9..9ffe209c4d 100644 --- a/tests/util/test_log_context.py +++ b/tests/util/test_log_context.py @@ -1,8 +1,10 @@ +import twisted.python.failure from twisted.internet import defer from twisted.internet import reactor from .. import unittest from synapse.util.async import sleep +from synapse.util import logcontext from synapse.util.logcontext import LoggingContext @@ -33,3 +35,62 @@ class LoggingContextTestCase(unittest.TestCase): context_one.test_key = "one" yield sleep(0) self._check_test_key("one") + + def _test_preserve_fn(self, function): + sentinel_context = LoggingContext.current_context() + + callback_completed = [False] + + @defer.inlineCallbacks + def cb(): + context_one.test_key = "one" + yield function() + self._check_test_key("one") + + callback_completed[0] = True + + with LoggingContext() as context_one: + context_one.test_key = "one" + + # fire off function, but don't wait on it. + logcontext.preserve_fn(cb)() + + self._check_test_key("one") + + # now wait for the function under test to have run, and check that + # the logcontext is left in a sane state. + d2 = defer.Deferred() + + def check_logcontext(): + if not callback_completed[0]: + reactor.callLater(0.01, check_logcontext) + return + + # make sure that the context was reset before it got thrown back + # into the reactor + try: + self.assertIs(LoggingContext.current_context(), + sentinel_context) + d2.callback(None) + except BaseException: + d2.errback(twisted.python.failure.Failure()) + + reactor.callLater(0.01, check_logcontext) + + # test is done once d2 finishes + return d2 + + def test_preserve_fn_with_blocking_fn(self): + @defer.inlineCallbacks + def blocking_function(): + yield sleep(0) + + return self._test_preserve_fn(blocking_function) + + def test_preserve_fn_with_non_blocking_fn(self): + @defer.inlineCallbacks + def nonblocking_function(): + with logcontext.PreserveLoggingContext(): + yield defer.succeed(None) + + return self._test_preserve_fn(nonblocking_function) -- cgit 1.4.1 From 95f21c7a66b256e9e5330edd0f35a83b177c84a8 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 22 Mar 2017 13:54:20 +0000 Subject: Fix caching of remote servers' signature keys The `@cached` decorator on `KeyStore._get_server_verify_key` was missing its `num_args` parameter, which meant that it was returning the wrong key for any server which had more than one recorded key. By way of a fix, change the default for `num_args` to be *all* arguments. To implement that, factor out a common base class for `CacheDescriptor` and `CacheListDescriptor`. --- synapse/util/caches/descriptors.py | 135 ++++++++++++++++++---------------- tests/storage/test_keys.py | 53 +++++++++++++ tests/util/caches/__init__.py | 14 ++++ tests/util/caches/test_descriptors.py | 86 ++++++++++++++++++++++ 4 files changed, 225 insertions(+), 63 deletions(-) create mode 100644 tests/storage/test_keys.py create mode 100644 tests/util/caches/__init__.py create mode 100644 tests/util/caches/test_descriptors.py (limited to 'synapse/util') diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 998de70d29..19595df422 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -189,7 +189,55 @@ class Cache(object): self.cache.clear() -class CacheDescriptor(object): +class _CacheDescriptorBase(object): + def __init__(self, orig, num_args, inlineCallbacks, cache_context=False): + self.orig = orig + + if inlineCallbacks: + self.function_to_call = defer.inlineCallbacks(orig) + else: + self.function_to_call = orig + + arg_spec = inspect.getargspec(orig) + all_args = arg_spec.args + + if "cache_context" in all_args: + if not cache_context: + raise ValueError( + "Cannot have a 'cache_context' arg without setting" + " cache_context=True" + ) + elif cache_context: + raise ValueError( + "Cannot have cache_context=True without having an arg" + " named `cache_context`" + ) + + if num_args is None: + num_args = len(all_args) - 1 + if cache_context: + num_args -= 1 + + if len(all_args) < num_args + 1: + raise Exception( + "Not enough explicit positional arguments to key off for %r: " + "got %i args, but wanted %i. (@cached cannot key off *args or " + "**kwargs)" + % (orig.__name__, len(all_args), num_args) + ) + + self.num_args = num_args + self.arg_names = all_args[1:num_args + 1] + + if "cache_context" in self.arg_names: + raise Exception( + "cache_context arg cannot be included among the cache keys" + ) + + self.add_cache_context = cache_context + + +class CacheDescriptor(_CacheDescriptorBase): """ A method decorator that applies a memoizing cache around the function. This caches deferreds, rather than the results themselves. Deferreds that @@ -217,52 +265,24 @@ class CacheDescriptor(object): r2 = yield self.bar2(key, on_invalidate=cache_context.invalidate) defer.returnValue(r1 + r2) + Args: + num_args (int): number of positional arguments (excluding ``self`` and + ``cache_context``) to use as cache keys. Defaults to all named + args of the function. """ - def __init__(self, orig, max_entries=1000, num_args=1, tree=False, + def __init__(self, orig, max_entries=1000, num_args=None, tree=False, inlineCallbacks=False, cache_context=False, iterable=False): - max_entries = int(max_entries * CACHE_SIZE_FACTOR) - self.orig = orig + super(CacheDescriptor, self).__init__( + orig, num_args=num_args, inlineCallbacks=inlineCallbacks, + cache_context=cache_context) - if inlineCallbacks: - self.function_to_call = defer.inlineCallbacks(orig) - else: - self.function_to_call = orig + max_entries = int(max_entries * CACHE_SIZE_FACTOR) self.max_entries = max_entries - self.num_args = num_args self.tree = tree - self.iterable = iterable - all_args = inspect.getargspec(orig) - self.arg_names = all_args.args[1:num_args + 1] - - if "cache_context" in all_args.args: - if not cache_context: - raise ValueError( - "Cannot have a 'cache_context' arg without setting" - " cache_context=True" - ) - try: - self.arg_names.remove("cache_context") - except ValueError: - pass - elif cache_context: - raise ValueError( - "Cannot have cache_context=True without having an arg" - " named `cache_context`" - ) - - self.add_cache_context = cache_context - - if len(self.arg_names) < self.num_args: - raise Exception( - "Not enough explicit positional arguments to key off of for %r." - " (@cached cannot key off of *args or **kwargs)" - % (orig.__name__,) - ) - def __get__(self, obj, objtype=None): cache = Cache( name=self.orig.__name__, @@ -338,48 +358,36 @@ class CacheDescriptor(object): return wrapped -class CacheListDescriptor(object): +class CacheListDescriptor(_CacheDescriptorBase): """Wraps an existing cache to support bulk fetching of keys. Given a list of keys it looks in the cache to find any hits, then passes the list of missing keys to the wrapped fucntion. """ - def __init__(self, orig, cached_method_name, list_name, num_args=1, + def __init__(self, orig, cached_method_name, list_name, num_args=None, inlineCallbacks=False): """ Args: orig (function) - method_name (str); The name of the chached method. + cached_method_name (str): The name of the chached method. list_name (str): Name of the argument which is the bulk lookup list - num_args (int) + num_args (int): number of positional arguments (excluding ``self``, + but including list_name) to use as cache keys. Defaults to all + named args of the function. inlineCallbacks (bool): Whether orig is a generator that should be wrapped by defer.inlineCallbacks """ - self.orig = orig + super(CacheListDescriptor, self).__init__( + orig, num_args=num_args, inlineCallbacks=inlineCallbacks) - if inlineCallbacks: - self.function_to_call = defer.inlineCallbacks(orig) - else: - self.function_to_call = orig - - self.num_args = num_args self.list_name = list_name - self.arg_names = inspect.getargspec(orig).args[1:num_args + 1] self.list_pos = self.arg_names.index(self.list_name) - self.cached_method_name = cached_method_name self.sentinel = object() - if len(self.arg_names) < self.num_args: - raise Exception( - "Not enough explicit positional arguments to key off of for %r." - " (@cached cannot key off of *args or **kwars)" - % (orig.__name__,) - ) - if self.list_name not in self.arg_names: raise Exception( "Couldn't see arguments %r for %r." @@ -487,7 +495,7 @@ class _CacheContext(namedtuple("_CacheContext", ("cache", "key"))): self.cache.invalidate(self.key) -def cached(max_entries=1000, num_args=1, tree=False, cache_context=False, +def cached(max_entries=1000, num_args=None, tree=False, cache_context=False, iterable=False): return lambda orig: CacheDescriptor( orig, @@ -499,8 +507,8 @@ def cached(max_entries=1000, num_args=1, tree=False, cache_context=False, ) -def cachedInlineCallbacks(max_entries=1000, num_args=1, tree=False, cache_context=False, - iterable=False): +def cachedInlineCallbacks(max_entries=1000, num_args=None, tree=False, + cache_context=False, iterable=False): return lambda orig: CacheDescriptor( orig, max_entries=max_entries, @@ -512,7 +520,7 @@ def cachedInlineCallbacks(max_entries=1000, num_args=1, tree=False, cache_contex ) -def cachedList(cached_method_name, list_name, num_args=1, inlineCallbacks=False): +def cachedList(cached_method_name, list_name, num_args=None, inlineCallbacks=False): """Creates a descriptor that wraps a function in a `CacheListDescriptor`. Used to do batch lookups for an already created cache. A single argument @@ -525,7 +533,8 @@ def cachedList(cached_method_name, list_name, num_args=1, inlineCallbacks=False) cache (Cache): The underlying cache to use. list_name (str): The name of the argument that is the list to use to do batch lookups in the cache. - num_args (int): Number of arguments to use as the key in the cache. + num_args (int): Number of arguments to use as the key in the cache + (including list_name). Defaults to all named parameters. inlineCallbacks (bool): Should the function be wrapped in an `defer.inlineCallbacks`? diff --git a/tests/storage/test_keys.py b/tests/storage/test_keys.py new file mode 100644 index 0000000000..0be790d8f8 --- /dev/null +++ b/tests/storage/test_keys.py @@ -0,0 +1,53 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import signedjson.key +from twisted.internet import defer + +import tests.unittest +import tests.utils + + +class KeyStoreTestCase(tests.unittest.TestCase): + def __init__(self, *args, **kwargs): + super(KeyStoreTestCase, self).__init__(*args, **kwargs) + self.store = None # type: synapse.storage.keys.KeyStore + + @defer.inlineCallbacks + def setUp(self): + hs = yield tests.utils.setup_test_homeserver() + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def test_get_server_verify_keys(self): + key1 = signedjson.key.decode_verify_key_base64( + "ed25519", "key1", "fP5l4JzpZPq/zdbBg5xx6lQGAAOM9/3w94cqiJ5jPrw" + ) + key2 = signedjson.key.decode_verify_key_base64( + "ed25519", "key2", "Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw" + ) + yield self.store.store_server_verify_key( + "server1", "from_server", 0, key1 + ) + yield self.store.store_server_verify_key( + "server1", "from_server", 0, key2 + ) + + res = yield self.store.get_server_verify_keys( + "server1", ["ed25519:key1", "ed25519:key2", "ed25519:key3"]) + + self.assertEqual(len(res.keys()), 2) + self.assertEqual(res["ed25519:key1"].version, "key1") + self.assertEqual(res["ed25519:key2"].version, "key2") diff --git a/tests/util/caches/__init__.py b/tests/util/caches/__init__.py new file mode 100644 index 0000000000..451dae3b6c --- /dev/null +++ b/tests/util/caches/__init__.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/util/caches/test_descriptors.py b/tests/util/caches/test_descriptors.py new file mode 100644 index 0000000000..419281054d --- /dev/null +++ b/tests/util/caches/test_descriptors.py @@ -0,0 +1,86 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import mock +from twisted.internet import defer +from synapse.util.caches import descriptors +from tests import unittest + + +class DescriptorTestCase(unittest.TestCase): + @defer.inlineCallbacks + def test_cache(self): + class Cls(object): + def __init__(self): + self.mock = mock.Mock() + + @descriptors.cached() + def fn(self, arg1, arg2): + return self.mock(arg1, arg2) + + obj = Cls() + + obj.mock.return_value = 'fish' + r = yield obj.fn(1, 2) + self.assertEqual(r, 'fish') + obj.mock.assert_called_once_with(1, 2) + obj.mock.reset_mock() + + # a call with different params should call the mock again + obj.mock.return_value = 'chips' + r = yield obj.fn(1, 3) + self.assertEqual(r, 'chips') + obj.mock.assert_called_once_with(1, 3) + obj.mock.reset_mock() + + # the two values should now be cached + r = yield obj.fn(1, 2) + self.assertEqual(r, 'fish') + r = yield obj.fn(1, 3) + self.assertEqual(r, 'chips') + obj.mock.assert_not_called() + + @defer.inlineCallbacks + def test_cache_num_args(self): + """Only the first num_args arguments should matter to the cache""" + + class Cls(object): + def __init__(self): + self.mock = mock.Mock() + + @descriptors.cached(num_args=1) + def fn(self, arg1, arg2): + return self.mock(arg1, arg2) + + obj = Cls() + obj.mock.return_value = 'fish' + r = yield obj.fn(1, 2) + self.assertEqual(r, 'fish') + obj.mock.assert_called_once_with(1, 2) + obj.mock.reset_mock() + + # a call with different params should call the mock again + obj.mock.return_value = 'chips' + r = yield obj.fn(2, 3) + self.assertEqual(r, 'chips') + obj.mock.assert_called_once_with(2, 3) + obj.mock.reset_mock() + + # the two values should now be cached; we should be able to vary + # the second argument and still get the cached result. + r = yield obj.fn(1, 4) + self.assertEqual(r, 'fish') + r = yield obj.fn(2, 5) + self.assertEqual(r, 'chips') + obj.mock.assert_not_called() -- cgit 1.4.1 From 19b9366d73585e815e59e7a6d07d6e307af5e0fb Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 23 Mar 2017 00:17:46 +0000 Subject: Fix a couple of logcontext leaks Use preserve_fn to correctly manage the logcontexts around things we don't want to yield on. --- synapse/api/auth.py | 5 ++--- synapse/util/retryutils.py | 5 +++-- 2 files changed, 5 insertions(+), 5 deletions(-) (limited to 'synapse/util') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 03a215ab1b..9dbc7993df 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -23,7 +23,7 @@ from synapse import event_auth from synapse.api.constants import EventTypes, Membership, JoinRules from synapse.api.errors import AuthError, Codes from synapse.types import UserID -from synapse.util.logcontext import preserve_context_over_fn +from synapse.util import logcontext from synapse.util.metrics import Measure logger = logging.getLogger(__name__) @@ -209,8 +209,7 @@ class Auth(object): default=[""] )[0] if user and access_token and ip_addr: - preserve_context_over_fn( - self.store.insert_client_ip, + logcontext.preserve_fn(self.store.insert_client_ip)( user=user, access_token=access_token, ip=ip_addr, diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 153ef001ad..b68e8c4e9f 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import synapse.util.logcontext from twisted.internet import defer from synapse.api.errors import CodeMessageException @@ -173,4 +173,5 @@ class RetryDestinationLimiter(object): "Failed to store set_destination_retry_timings", ) - store_retry_timings() + # we deliberately do this in the background. + synapse.util.logcontext.preserve_fn(store_retry_timings)() -- cgit 1.4.1 From 4bd597d9fcb8e6c6888ee3e8fa683ba812272997 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 23 Mar 2017 00:12:21 +0000 Subject: push federation retry limiter down to matrixfederationclient rather than having to instrument everywhere we make a federation call, make the MatrixFederationHttpClient manage the retry limiter. --- synapse/crypto/keyring.py | 39 +++--- synapse/federation/federation_client.py | 33 ++--- synapse/federation/transaction_queue.py | 216 +++++++++++++----------------- synapse/federation/transport/client.py | 1 + synapse/handlers/e2e_keys.py | 32 ++--- synapse/http/matrixfederationclient.py | 228 ++++++++++++++++++-------------- synapse/util/retryutils.py | 16 ++- tests/handlers/test_typing.py | 2 + 8 files changed, 280 insertions(+), 287 deletions(-) (limited to 'synapse/util') diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 80f27f8c53..c4bc4f4d31 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -15,7 +15,6 @@ from synapse.crypto.keyclient import fetch_server_key from synapse.api.errors import SynapseError, Codes -from synapse.util.retryutils import get_retry_limiter from synapse.util import unwrapFirstError from synapse.util.async import ObservableDeferred from synapse.util.logcontext import ( @@ -363,30 +362,24 @@ class Keyring(object): def get_keys_from_server(self, server_name_and_key_ids): @defer.inlineCallbacks def get_key(server_name, key_ids): - limiter = yield get_retry_limiter( - server_name, - self.clock, - self.store, - ) - with limiter: - keys = None - try: - keys = yield self.get_server_verify_key_v2_direct( - server_name, key_ids - ) - except Exception as e: - logger.info( - "Unable to get key %r for %r directly: %s %s", - key_ids, server_name, - type(e).__name__, str(e.message), - ) + keys = None + try: + keys = yield self.get_server_verify_key_v2_direct( + server_name, key_ids + ) + except Exception as e: + logger.info( + "Unable to get key %r for %r directly: %s %s", + key_ids, server_name, + type(e).__name__, str(e.message), + ) - if not keys: - keys = yield self.get_server_verify_key_v1_direct( - server_name, key_ids - ) + if not keys: + keys = yield self.get_server_verify_key_v1_direct( + server_name, key_ids + ) - keys = {server_name: keys} + keys = {server_name: keys} defer.returnValue(keys) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 5dcd4eecce..dc44727b36 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -29,7 +29,7 @@ from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred from synapse.events import FrozenEvent, builder import synapse.metrics -from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination +from synapse.util.retryutils import NotRetryingDestination import copy import itertools @@ -234,31 +234,24 @@ class FederationClient(FederationBase): continue try: - limiter = yield get_retry_limiter( - destination, - self._clock, - self.store, + transaction_data = yield self.transport_layer.get_event( + destination, event_id, timeout=timeout, ) - with limiter: - transaction_data = yield self.transport_layer.get_event( - destination, event_id, timeout=timeout, - ) - - logger.debug("transaction_data %r", transaction_data) + logger.debug("transaction_data %r", transaction_data) - pdu_list = [ - self.event_from_pdu_json(p, outlier=outlier) - for p in transaction_data["pdus"] - ] + pdu_list = [ + self.event_from_pdu_json(p, outlier=outlier) + for p in transaction_data["pdus"] + ] - if pdu_list and pdu_list[0]: - pdu = pdu_list[0] + if pdu_list and pdu_list[0]: + pdu = pdu_list[0] - # Check signatures are correct. - signed_pdu = yield self._check_sigs_and_hashes([pdu])[0] + # Check signatures are correct. + signed_pdu = yield self._check_sigs_and_hashes([pdu])[0] - break + break pdu_attempts[destination] = now diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index c802dd67a3..d7ecefcc64 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import datetime from twisted.internet import defer @@ -22,9 +22,7 @@ from .units import Transaction, Edu from synapse.api.errors import HttpResponseException from synapse.util.async import run_on_reactor from synapse.util.logcontext import preserve_context_over_fn -from synapse.util.retryutils import ( - get_retry_limiter, NotRetryingDestination, -) +from synapse.util.retryutils import NotRetryingDestination from synapse.util.metrics import measure_func from synapse.types import get_domain_from_id from synapse.handlers.presence import format_user_presence_state @@ -312,13 +310,6 @@ class TransactionQueue(object): yield run_on_reactor() while True: - limiter = yield get_retry_limiter( - destination, - self.clock, - self.store, - backoff_on_404=True, # If we get a 404 the other side has gone - ) - device_message_edus, device_stream_id, dev_list_id = ( yield self._get_new_device_messages(destination) ) @@ -374,7 +365,6 @@ class TransactionQueue(object): success = yield self._send_new_transaction( destination, pending_pdus, pending_edus, pending_failures, - limiter=limiter, ) if success: # Remove the acknowledged device messages from the database @@ -392,12 +382,24 @@ class TransactionQueue(object): self.last_device_list_stream_id_by_dest[destination] = dev_list_id else: break - except NotRetryingDestination: + except NotRetryingDestination as e: logger.debug( - "TX [%s] not ready for retry yet - " + "TX [%s] not ready for retry yet (next retry at %s) - " "dropping transaction for now", destination, + datetime.datetime.fromtimestamp( + (e.retry_last_ts + e.retry_interval) / 1000.0 + ), + ) + except Exception as e: + logger.warn( + "TX [%s] Failed to send transaction: %s", + destination, + e, ) + for p in pending_pdus: + logger.info("Failed to send event %s to %s", p.event_id, + destination) finally: # We want to be *very* sure we delete this after we stop processing self.pending_transactions.pop(destination, None) @@ -437,7 +439,7 @@ class TransactionQueue(object): @measure_func("_send_new_transaction") @defer.inlineCallbacks def _send_new_transaction(self, destination, pending_pdus, pending_edus, - pending_failures, limiter): + pending_failures): # Sort based on the order field pending_pdus.sort(key=lambda t: t[1]) @@ -447,132 +449,104 @@ class TransactionQueue(object): success = True - try: - logger.debug("TX [%s] _attempt_new_transaction", destination) + logger.debug("TX [%s] _attempt_new_transaction", destination) - txn_id = str(self._next_txn_id) + txn_id = str(self._next_txn_id) - logger.debug( - "TX [%s] {%s} Attempting new transaction" - " (pdus: %d, edus: %d, failures: %d)", - destination, txn_id, - len(pdus), - len(edus), - len(failures) - ) + logger.debug( + "TX [%s] {%s} Attempting new transaction" + " (pdus: %d, edus: %d, failures: %d)", + destination, txn_id, + len(pdus), + len(edus), + len(failures) + ) - logger.debug("TX [%s] Persisting transaction...", destination) + logger.debug("TX [%s] Persisting transaction...", destination) - transaction = Transaction.create_new( - origin_server_ts=int(self.clock.time_msec()), - transaction_id=txn_id, - origin=self.server_name, - destination=destination, - pdus=pdus, - edus=edus, - pdu_failures=failures, - ) + transaction = Transaction.create_new( + origin_server_ts=int(self.clock.time_msec()), + transaction_id=txn_id, + origin=self.server_name, + destination=destination, + pdus=pdus, + edus=edus, + pdu_failures=failures, + ) - self._next_txn_id += 1 + self._next_txn_id += 1 - yield self.transaction_actions.prepare_to_send(transaction) + yield self.transaction_actions.prepare_to_send(transaction) - logger.debug("TX [%s] Persisted transaction", destination) - logger.info( - "TX [%s] {%s} Sending transaction [%s]," - " (PDUs: %d, EDUs: %d, failures: %d)", - destination, txn_id, - transaction.transaction_id, - len(pdus), - len(edus), - len(failures), - ) + logger.debug("TX [%s] Persisted transaction", destination) + logger.info( + "TX [%s] {%s} Sending transaction [%s]," + " (PDUs: %d, EDUs: %d, failures: %d)", + destination, txn_id, + transaction.transaction_id, + len(pdus), + len(edus), + len(failures), + ) - with limiter: - # Actually send the transaction - - # FIXME (erikj): This is a bit of a hack to make the Pdu age - # keys work - def json_data_cb(): - data = transaction.get_dict() - now = int(self.clock.time_msec()) - if "pdus" in data: - for p in data["pdus"]: - if "age_ts" in p: - unsigned = p.setdefault("unsigned", {}) - unsigned["age"] = now - int(p["age_ts"]) - del p["age_ts"] - return data - - try: - response = yield self.transport_layer.send_transaction( - transaction, json_data_cb - ) - code = 200 - - if response: - for e_id, r in response.get("pdus", {}).items(): - if "error" in r: - logger.warn( - "Transaction returned error for %s: %s", - e_id, r, - ) - except HttpResponseException as e: - code = e.code - response = e.response - - if e.code in (401, 404, 429) or 500 <= e.code: - logger.info( - "TX [%s] {%s} got %d response", - destination, txn_id, code + # Actually send the transaction + + # FIXME (erikj): This is a bit of a hack to make the Pdu age + # keys work + def json_data_cb(): + data = transaction.get_dict() + now = int(self.clock.time_msec()) + if "pdus" in data: + for p in data["pdus"]: + if "age_ts" in p: + unsigned = p.setdefault("unsigned", {}) + unsigned["age"] = now - int(p["age_ts"]) + del p["age_ts"] + return data + + try: + response = yield self.transport_layer.send_transaction( + transaction, json_data_cb + ) + code = 200 + + if response: + for e_id, r in response.get("pdus", {}).items(): + if "error" in r: + logger.warn( + "Transaction returned error for %s: %s", + e_id, r, ) - raise e + except HttpResponseException as e: + code = e.code + response = e.response + if e.code in (401, 404, 429) or 500 <= e.code: logger.info( "TX [%s] {%s} got %d response", destination, txn_id, code ) + raise e - logger.debug("TX [%s] Sent transaction", destination) - logger.debug("TX [%s] Marking as delivered...", destination) - - yield self.transaction_actions.delivered( - transaction, code, response - ) + logger.info( + "TX [%s] {%s} got %d response", + destination, txn_id, code + ) - logger.debug("TX [%s] Marked as delivered", destination) + logger.debug("TX [%s] Sent transaction", destination) + logger.debug("TX [%s] Marking as delivered...", destination) - if code != 200: - for p in pdus: - logger.info( - "Failed to send event %s to %s", p.event_id, destination - ) - success = False - except RuntimeError as e: - # We capture this here as there as nothing actually listens - # for this finishing functions deferred. - logger.warn( - "TX [%s] Problem in _attempt_transaction: %s", - destination, - e, - ) + yield self.transaction_actions.delivered( + transaction, code, response + ) - success = False + logger.debug("TX [%s] Marked as delivered", destination) + if code != 200: for p in pdus: - logger.info("Failed to send event %s to %s", p.event_id, destination) - except Exception as e: - # We capture this here as there as nothing actually listens - # for this finishing functions deferred. - logger.warn( - "TX [%s] Problem in _attempt_transaction: %s", - destination, - e, - ) - + logger.info( + "Failed to send event %s to %s", p.event_id, destination + ) success = False - for p in pdus: - logger.info("Failed to send event %s to %s", p.event_id, destination) - defer.returnValue(success) diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index f49e8a2cc4..cc9bc7f14b 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -163,6 +163,7 @@ class TransportLayerClient(object): data=json_data, json_data_callback=json_data_callback, long_retries=True, + backoff_on_404=True, # If we get a 404 the other side has gone ) logger.debug( diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index e40495d1ab..a33135de67 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -22,7 +22,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, CodeMessageException from synapse.types import get_domain_from_id from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred -from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination +from synapse.util.retryutils import NotRetryingDestination logger = logging.getLogger(__name__) @@ -121,15 +121,11 @@ class E2eKeysHandler(object): def do_remote_query(destination): destination_query = remote_queries_not_in_cache[destination] try: - limiter = yield get_retry_limiter( - destination, self.clock, self.store + remote_result = yield self.federation.query_client_keys( + destination, + {"device_keys": destination_query}, + timeout=timeout ) - with limiter: - remote_result = yield self.federation.query_client_keys( - destination, - {"device_keys": destination_query}, - timeout=timeout - ) for user_id, keys in remote_result["device_keys"].items(): if user_id in destination_query: @@ -239,18 +235,14 @@ class E2eKeysHandler(object): def claim_client_keys(destination): device_keys = remote_queries[destination] try: - limiter = yield get_retry_limiter( - destination, self.clock, self.store + remote_result = yield self.federation.claim_client_keys( + destination, + {"one_time_keys": device_keys}, + timeout=timeout ) - with limiter: - remote_result = yield self.federation.claim_client_keys( - destination, - {"one_time_keys": device_keys}, - timeout=timeout - ) - for user_id, keys in remote_result["one_time_keys"].items(): - if user_id in device_keys: - json_result[user_id] = keys + for user_id, keys in remote_result["one_time_keys"].items(): + if user_id in device_keys: + json_result[user_id] = keys except CodeMessageException as e: failures[destination] = { "status": e.code, "message": e.message diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index f15903f862..b0885dc979 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -12,8 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - - +import synapse.util.retryutils from twisted.internet import defer, reactor, protocol from twisted.internet.error import DNSLookupError from twisted.web.client import readBody, HTTPConnectionPool, Agent @@ -94,6 +93,7 @@ class MatrixFederationHttpClient(object): reactor, MatrixFederationEndpointFactory(hs), pool=pool ) self.clock = hs.get_clock() + self._store = hs.get_datastore() self.version_string = hs.version_string self._next_id = 1 @@ -106,133 +106,143 @@ class MatrixFederationHttpClient(object): def _request(self, destination, method, path, body_callback, headers_dict={}, param_bytes=b"", query_bytes=b"", retry_on_dns_fail=True, - timeout=None, long_retries=False): + timeout=None, long_retries=False, backoff_on_404=False): """ Creates and sends a request to the given server Args: destination (str): The remote server to send the HTTP request to. method (str): HTTP method path (str): The HTTP path + backoff_on_404 (bool): Back off if we get a 404 Returns: Deferred: resolves with the http response object on success. Fails with ``HTTPRequestException``: if we get an HTTP response - code >= 300. + code >= 300. + Fails with ``NotRetryingDestination`` if we are not yet ready + to retry this server. """ + limiter = yield synapse.util.retryutils.get_retry_limiter( + destination, + self.clock, + self._store, + backoff_on_404=backoff_on_404, + ) + destination = destination.encode("ascii") path_bytes = path.encode("ascii") + with limiter: + headers_dict[b"User-Agent"] = [self.version_string] + headers_dict[b"Host"] = [destination] - headers_dict[b"User-Agent"] = [self.version_string] - headers_dict[b"Host"] = [destination] + url_bytes = self._create_url( + destination, path_bytes, param_bytes, query_bytes + ) - url_bytes = self._create_url( - destination, path_bytes, param_bytes, query_bytes - ) + txn_id = "%s-O-%s" % (method, self._next_id) + self._next_id = (self._next_id + 1) % (sys.maxint - 1) - txn_id = "%s-O-%s" % (method, self._next_id) - self._next_id = (self._next_id + 1) % (sys.maxint - 1) + outbound_logger.info( + "{%s} [%s] Sending request: %s %s", + txn_id, destination, method, url_bytes + ) - outbound_logger.info( - "{%s} [%s] Sending request: %s %s", - txn_id, destination, method, url_bytes - ) + # XXX: Would be much nicer to retry only at the transaction-layer + # (once we have reliable transactions in place) + if long_retries: + retries_left = MAX_LONG_RETRIES + else: + retries_left = MAX_SHORT_RETRIES - # XXX: Would be much nicer to retry only at the transaction-layer - # (once we have reliable transactions in place) - if long_retries: - retries_left = MAX_LONG_RETRIES - else: - retries_left = MAX_SHORT_RETRIES + http_url_bytes = urlparse.urlunparse( + ("", "", path_bytes, param_bytes, query_bytes, "") + ) - http_url_bytes = urlparse.urlunparse( - ("", "", path_bytes, param_bytes, query_bytes, "") - ) + log_result = None + try: + while True: + producer = None + if body_callback: + producer = body_callback(method, http_url_bytes, headers_dict) + + try: + def send_request(): + request_deferred = preserve_context_over_fn( + self.agent.request, + method, + url_bytes, + Headers(headers_dict), + producer + ) + + return self.clock.time_bound_deferred( + request_deferred, + time_out=timeout / 1000. if timeout else 60, + ) + + response = yield preserve_context_over_fn(send_request) + + log_result = "%d %s" % (response.code, response.phrase,) + break + except Exception as e: + if not retry_on_dns_fail and isinstance(e, DNSLookupError): + logger.warn( + "DNS Lookup failed to %s with %s", + destination, + e + ) + log_result = "DNS Lookup failed to %s with %s" % ( + destination, e + ) + raise - log_result = None - try: - while True: - producer = None - if body_callback: - producer = body_callback(method, http_url_bytes, headers_dict) - - try: - def send_request(): - request_deferred = preserve_context_over_fn( - self.agent.request, + logger.warn( + "{%s} Sending request failed to %s: %s %s: %s - %s", + txn_id, + destination, method, url_bytes, - Headers(headers_dict), - producer + type(e).__name__, + _flatten_response_never_received(e), ) - return self.clock.time_bound_deferred( - request_deferred, - time_out=timeout / 1000. if timeout else 60, + log_result = "%s - %s" % ( + type(e).__name__, _flatten_response_never_received(e), ) - response = yield preserve_context_over_fn(send_request) - - log_result = "%d %s" % (response.code, response.phrase,) - break - except Exception as e: - if not retry_on_dns_fail and isinstance(e, DNSLookupError): - logger.warn( - "DNS Lookup failed to %s with %s", - destination, - e - ) - log_result = "DNS Lookup failed to %s with %s" % ( - destination, e - ) - raise - - logger.warn( - "{%s} Sending request failed to %s: %s %s: %s - %s", - txn_id, - destination, - method, - url_bytes, - type(e).__name__, - _flatten_response_never_received(e), - ) - - log_result = "%s - %s" % ( - type(e).__name__, _flatten_response_never_received(e), - ) - - if retries_left and not timeout: - if long_retries: - delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left) - delay = min(delay, 60) - delay *= random.uniform(0.8, 1.4) + if retries_left and not timeout: + if long_retries: + delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left) + delay = min(delay, 60) + delay *= random.uniform(0.8, 1.4) + else: + delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left) + delay = min(delay, 2) + delay *= random.uniform(0.8, 1.4) + + yield sleep(delay) + retries_left -= 1 else: - delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left) - delay = min(delay, 2) - delay *= random.uniform(0.8, 1.4) - - yield sleep(delay) - retries_left -= 1 - else: - raise - finally: - outbound_logger.info( - "{%s} [%s] Result: %s", - txn_id, - destination, - log_result, - ) + raise + finally: + outbound_logger.info( + "{%s} [%s] Result: %s", + txn_id, + destination, + log_result, + ) - if 200 <= response.code < 300: - pass - else: - # :'( - # Update transactions table? - body = yield preserve_context_over_fn(readBody, response) - raise HttpResponseException( - response.code, response.phrase, body - ) + if 200 <= response.code < 300: + pass + else: + # :'( + # Update transactions table? + body = yield preserve_context_over_fn(readBody, response) + raise HttpResponseException( + response.code, response.phrase, body + ) - defer.returnValue(response) + defer.returnValue(response) def sign_request(self, destination, method, url_bytes, headers_dict, content=None): @@ -261,7 +271,7 @@ class MatrixFederationHttpClient(object): @defer.inlineCallbacks def put_json(self, destination, path, data={}, json_data_callback=None, - long_retries=False, timeout=None): + long_retries=False, timeout=None, backoff_on_404=False): """ Sends the specifed json data using PUT Args: @@ -276,11 +286,17 @@ class MatrixFederationHttpClient(object): retry for a short or long time. timeout(int): How long to try (in ms) the destination for before giving up. None indicates no timeout. + backoff_on_404 (bool): True if we should count a 404 response as + a failure of the server (and should therefore back off future + requests) Returns: Deferred: Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. On a 4xx or 5xx error response a CodeMessageException is raised. + + Fails with ``NotRetryingDestination`` if we are not yet ready + to retry this server. """ if not json_data_callback: @@ -303,6 +319,7 @@ class MatrixFederationHttpClient(object): headers_dict={"Content-Type": ["application/json"]}, long_retries=long_retries, timeout=timeout, + backoff_on_404=backoff_on_404, ) if 200 <= response.code < 300: @@ -332,6 +349,9 @@ class MatrixFederationHttpClient(object): Deferred: Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. On a 4xx or 5xx error response a CodeMessageException is raised. + + Fails with ``NotRetryingDestination`` if we are not yet ready + to retry this server. """ def body_callback(method, url_bytes, headers_dict): @@ -377,6 +397,9 @@ class MatrixFederationHttpClient(object): The result of the deferred is a tuple of `(code, response)`, where `response` is a dict representing the decoded JSON body. + + Fails with ``NotRetryingDestination`` if we are not yet ready + to retry this server. """ logger.debug("get_json args: %s", args) @@ -426,6 +449,9 @@ class MatrixFederationHttpClient(object): Fails with ``HTTPRequestException`` if we get an HTTP response code >= 300 + + Fails with ``NotRetryingDestination`` if we are not yet ready + to retry this server. """ encoded_args = {} diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 153ef001ad..7e5a952584 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -124,7 +124,13 @@ class RetryDestinationLimiter(object): def __exit__(self, exc_type, exc_val, exc_tb): valid_err_code = False - if exc_type is not None and issubclass(exc_type, CodeMessageException): + if exc_type is None: + valid_err_code = True + elif not issubclass(exc_type, Exception): + # avoid treating exceptions which don't derive from Exception as + # failures; this is mostly so as not to catch defer._DefGen. + valid_err_code = True + elif issubclass(exc_type, CodeMessageException): # Some error codes are perfectly fine for some APIs, whereas other # APIs may expect to never received e.g. a 404. It's important to # handle 404 as some remote servers will return a 404 when the HS @@ -142,11 +148,13 @@ class RetryDestinationLimiter(object): else: valid_err_code = False - if exc_type is None or valid_err_code: + if valid_err_code: # We connected successfully. if not self.retry_interval: return + logger.debug("Connection to %s was successful; clearing backoff", + self.destination) retry_last_ts = 0 self.retry_interval = 0 else: @@ -160,6 +168,10 @@ class RetryDestinationLimiter(object): else: self.retry_interval = self.min_retry_interval + logger.debug( + "Connection to %s was unsuccessful (%s(%s)); backoff now %i", + self.destination, exc_type, exc_val, self.retry_interval + ) retry_last_ts = int(self.clock.time_msec()) @defer.inlineCallbacks diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index f88d2be7c5..dbe50383da 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -192,6 +192,7 @@ class TypingNotificationsTestCase(unittest.TestCase): ), json_data_callback=ANY, long_retries=True, + backoff_on_404=True, ), defer.succeed((200, "OK")) ) @@ -263,6 +264,7 @@ class TypingNotificationsTestCase(unittest.TestCase): ), json_data_callback=ANY, long_retries=True, + backoff_on_404=True, ), defer.succeed((200, "OK")) ) -- cgit 1.4.1 From b88a323ffbe165b8639a2ef5f5791ed65cde3e4b Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 23 Mar 2017 12:03:46 +0000 Subject: Fix time_bound_deferred to throw the right exception Due to a failure to instantiate DeferredTimedOutError, time_bound_deferred would throw a CancelledError when the deferred timed out, which was rather confusing. --- synapse/util/__init__.py | 10 ++++++---- tests/util/test_clock.py | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 4 deletions(-) create mode 100644 tests/util/test_clock.py (limited to 'synapse/util') diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 30fc480108..98a5a26ac5 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -26,7 +26,7 @@ logger = logging.getLogger(__name__) class DeferredTimedOutError(SynapseError): def __init__(self): - super(SynapseError).__init__(504, "Timed out") + super(SynapseError, self).__init__(504, "Timed out") def unwrapFirstError(failure): @@ -93,8 +93,10 @@ class Clock(object): ret_deferred = defer.Deferred() def timed_out_fn(): + e = DeferredTimedOutError() + try: - ret_deferred.errback(DeferredTimedOutError()) + ret_deferred.errback(e) except: pass @@ -114,7 +116,7 @@ class Clock(object): ret_deferred.addBoth(cancel) - def sucess(res): + def success(res): try: ret_deferred.callback(res) except: @@ -128,7 +130,7 @@ class Clock(object): except: pass - given_deferred.addCallbacks(callback=sucess, errback=err) + given_deferred.addCallbacks(callback=success, errback=err) timer = self.call_later(time_out, timed_out_fn) diff --git a/tests/util/test_clock.py b/tests/util/test_clock.py new file mode 100644 index 0000000000..9672603579 --- /dev/null +++ b/tests/util/test_clock.py @@ -0,0 +1,33 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from synapse import util +from twisted.internet import defer +from tests import unittest + + +class ClockTestCase(unittest.TestCase): + @defer.inlineCallbacks + def test_time_bound_deferred(self): + # just a deferred which never resolves + slow_deferred = defer.Deferred() + + clock = util.Clock() + time_bound = clock.time_bound_deferred(slow_deferred, 0.001) + + try: + yield time_bound + self.fail("Expected timedout error, but got nothing") + except util.DeferredTimedOutError: + pass -- cgit 1.4.1 From 5a16cb4bf036c6b1914d6c6248ed640c289b59e3 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 23 Mar 2017 11:10:36 +0000 Subject: Ignore backoff history for invites, aliases, and roomdirs Add a param to the federation client which lets us ignore historical backoff data for federation queries, and set it for a handful of operations. --- synapse/federation/federation_client.py | 7 +++++-- synapse/federation/transport/client.py | 6 +++++- synapse/handlers/directory.py | 1 + synapse/handlers/profile.py | 6 ++++-- synapse/http/matrixfederationclient.py | 33 ++++++++++++++++++++++++++------- synapse/util/retryutils.py | 13 +++++++++++-- 6 files changed, 52 insertions(+), 14 deletions(-) (limited to 'synapse/util') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index dc44727b36..deee0f4904 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -88,7 +88,7 @@ class FederationClient(FederationBase): @log_function def make_query(self, destination, query_type, args, - retry_on_dns_fail=False): + retry_on_dns_fail=False, ignore_backoff=False): """Sends a federation Query to a remote homeserver of the given type and arguments. @@ -98,6 +98,8 @@ class FederationClient(FederationBase): handler name used in register_query_handler(). args (dict): Mapping of strings to strings containing the details of the query request. + ignore_backoff (bool): true to ignore the historical backoff data + and try the request anyway. Returns: a Deferred which will eventually yield a JSON object from the @@ -106,7 +108,8 @@ class FederationClient(FederationBase): sent_queries_counter.inc(query_type) return self.transport_layer.make_query( - destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail + destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail, + ignore_backoff=ignore_backoff, ) @log_function diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index cc9bc7f14b..15a03378f5 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -175,7 +175,8 @@ class TransportLayerClient(object): @defer.inlineCallbacks @log_function - def make_query(self, destination, query_type, args, retry_on_dns_fail): + def make_query(self, destination, query_type, args, retry_on_dns_fail, + ignore_backoff=False): path = PREFIX + "/query/%s" % query_type content = yield self.client.get_json( @@ -184,6 +185,7 @@ class TransportLayerClient(object): args=args, retry_on_dns_fail=retry_on_dns_fail, timeout=10000, + ignore_backoff=ignore_backoff, ) defer.returnValue(content) @@ -243,6 +245,7 @@ class TransportLayerClient(object): destination=destination, path=path, data=content, + ignore_backoff=True, ) defer.returnValue(response) @@ -270,6 +273,7 @@ class TransportLayerClient(object): destination=remote_server, path=path, args=args, + ignore_backoff=True, ) defer.returnValue(response) diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 1b5317edf5..943554ce98 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -175,6 +175,7 @@ class DirectoryHandler(BaseHandler): "room_alias": room_alias.to_string(), }, retry_on_dns_fail=False, + ignore_backoff=True, ) except CodeMessageException as e: logging.warn("Error retrieving alias") diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index abd1fb28cb..9bf638f818 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -52,7 +52,8 @@ class ProfileHandler(BaseHandler): args={ "user_id": target_user.to_string(), "field": "displayname", - } + }, + ignore_backoff=True, ) except CodeMessageException as e: if e.code != 404: @@ -99,7 +100,8 @@ class ProfileHandler(BaseHandler): args={ "user_id": target_user.to_string(), "field": "avatar_url", - } + }, + ignore_backoff=True, ) except CodeMessageException as e: if e.code != 404: diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index b0885dc979..f9e32ef03d 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -106,12 +106,16 @@ class MatrixFederationHttpClient(object): def _request(self, destination, method, path, body_callback, headers_dict={}, param_bytes=b"", query_bytes=b"", retry_on_dns_fail=True, - timeout=None, long_retries=False, backoff_on_404=False): + timeout=None, long_retries=False, + ignore_backoff=False, + backoff_on_404=False): """ Creates and sends a request to the given server Args: destination (str): The remote server to send the HTTP request to. method (str): HTTP method path (str): The HTTP path + ignore_backoff (bool): true to ignore the historical backoff data + and try the request anyway. backoff_on_404 (bool): Back off if we get a 404 Returns: @@ -127,6 +131,7 @@ class MatrixFederationHttpClient(object): self.clock, self._store, backoff_on_404=backoff_on_404, + ignore_backoff=ignore_backoff, ) destination = destination.encode("ascii") @@ -271,7 +276,9 @@ class MatrixFederationHttpClient(object): @defer.inlineCallbacks def put_json(self, destination, path, data={}, json_data_callback=None, - long_retries=False, timeout=None, backoff_on_404=False): + long_retries=False, timeout=None, + ignore_backoff=False, + backoff_on_404=False): """ Sends the specifed json data using PUT Args: @@ -286,6 +293,8 @@ class MatrixFederationHttpClient(object): retry for a short or long time. timeout(int): How long to try (in ms) the destination for before giving up. None indicates no timeout. + ignore_backoff (bool): true to ignore the historical backoff data + and try the request anyway. backoff_on_404 (bool): True if we should count a 404 response as a failure of the server (and should therefore back off future requests) @@ -319,6 +328,7 @@ class MatrixFederationHttpClient(object): headers_dict={"Content-Type": ["application/json"]}, long_retries=long_retries, timeout=timeout, + ignore_backoff=ignore_backoff, backoff_on_404=backoff_on_404, ) @@ -331,7 +341,7 @@ class MatrixFederationHttpClient(object): @defer.inlineCallbacks def post_json(self, destination, path, data={}, long_retries=False, - timeout=None): + timeout=None, ignore_backoff=False): """ Sends the specifed json data using POST Args: @@ -344,7 +354,8 @@ class MatrixFederationHttpClient(object): retry for a short or long time. timeout(int): How long to try (in ms) the destination for before giving up. None indicates no timeout. - + ignore_backoff (bool): true to ignore the historical backoff data and + try the request anyway. Returns: Deferred: Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. On a 4xx or 5xx error response a @@ -368,6 +379,7 @@ class MatrixFederationHttpClient(object): headers_dict={"Content-Type": ["application/json"]}, long_retries=long_retries, timeout=timeout, + ignore_backoff=ignore_backoff, ) if 200 <= response.code < 300: @@ -380,7 +392,7 @@ class MatrixFederationHttpClient(object): @defer.inlineCallbacks def get_json(self, destination, path, args={}, retry_on_dns_fail=True, - timeout=None): + timeout=None, ignore_backoff=False): """ GETs some json from the given host homeserver and path Args: @@ -392,6 +404,8 @@ class MatrixFederationHttpClient(object): timeout (int): How long to try (in ms) the destination for before giving up. None indicates no timeout and that the request will be retried. + ignore_backoff (bool): true to ignore the historical backoff data + and try the request anyway. Returns: Deferred: Succeeds when we get *any* HTTP response. @@ -424,6 +438,7 @@ class MatrixFederationHttpClient(object): body_callback=body_callback, retry_on_dns_fail=retry_on_dns_fail, timeout=timeout, + ignore_backoff=ignore_backoff, ) if 200 <= response.code < 300: @@ -436,13 +451,16 @@ class MatrixFederationHttpClient(object): @defer.inlineCallbacks def get_file(self, destination, path, output_stream, args={}, - retry_on_dns_fail=True, max_size=None): + retry_on_dns_fail=True, max_size=None, + ignore_backoff=False): """GETs a file from a given homeserver Args: destination (str): The remote server to send the HTTP request to. path (str): The HTTP path to GET. output_stream (file): File to write the response body to. args (dict): Optional dictionary used to create the query string. + ignore_backoff (bool): true to ignore the historical backoff data + and try the request anyway. Returns: Deferred: resolves with an (int,dict) tuple of the file length and a dict of the response headers. @@ -473,7 +491,8 @@ class MatrixFederationHttpClient(object): path, query_bytes=query_bytes, body_callback=body_callback, - retry_on_dns_fail=retry_on_dns_fail + retry_on_dns_fail=retry_on_dns_fail, + ignore_backoff=ignore_backoff, ) headers = dict(response.headers.getAllRawHeaders()) diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 7e5a952584..7f5299bd32 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -35,7 +35,8 @@ class NotRetryingDestination(Exception): @defer.inlineCallbacks -def get_retry_limiter(destination, clock, store, **kwargs): +def get_retry_limiter(destination, clock, store, ignore_backoff=False, + **kwargs): """For a given destination check if we have previously failed to send a request there and are waiting before retrying the destination. If we are not ready to retry the destination, this will raise a @@ -43,6 +44,14 @@ def get_retry_limiter(destination, clock, store, **kwargs): that will mark the destination as down if an exception is thrown (excluding CodeMessageException with code < 500) + Args: + destination (str): name of homeserver + clock (synapse.util.clock): timing source + store (synapse.storage.transactions.TransactionStore): datastore + ignore_backoff (bool): true to ignore the historical backoff data and + try the request anyway. We will still update the next + retry_interval on success/failure. + Example usage: try: @@ -66,7 +75,7 @@ def get_retry_limiter(destination, clock, store, **kwargs): now = int(clock.time_msec()) - if retry_last_ts + retry_interval > now: + if not ignore_backoff and retry_last_ts + retry_interval > now: raise NotRetryingDestination( retry_last_ts=retry_last_ts, retry_interval=retry_interval, -- cgit 1.4.1 From f9b4bb05e05694f3000df2bc5331b1aaa501575c Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 30 Mar 2017 13:22:24 +0100 Subject: Fix the logcontext handling in the cache wrappers (#2077) The cache wrappers had a habit of leaking the logcontext into the reactor while the lookup function was running, and then not restoring it correctly when the lookup function had completed. It's all the fault of `preserve_context_over_{fn,deferred}` which are basically a bit broken. --- docs/log_contexts.rst | 11 +++-- synapse/util/caches/descriptors.py | 30 ++++++------ synapse/util/logcontext.py | 23 +++++++++ tests/util/caches/test_descriptors.py | 91 +++++++++++++++++++++++++++++++++++ 4 files changed, 136 insertions(+), 19 deletions(-) (limited to 'synapse/util') diff --git a/docs/log_contexts.rst b/docs/log_contexts.rst index 8d04a973de..eb1784e700 100644 --- a/docs/log_contexts.rst +++ b/docs/log_contexts.rst @@ -204,9 +204,14 @@ That doesn't follow the rules, but we can fix it by wrapping it with This technique works equally for external functions which return deferreds, or deferreds we have made ourselves. -XXX: think this is what ``preserve_context_over_deferred`` is supposed to do, -though it is broken, in that it only restores the logcontext for the duration -of the callbacks, which doesn't comply with the logcontext rules. +You can also use ``logcontext.make_deferred_yieldable``, which just does the +boilerplate for you, so the above could be written: + +.. code:: python + + def sleep(seconds): + return logcontext.make_deferred_yieldable(get_sleep_deferred(seconds)) + Fire-and-forget --------------- diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 19595df422..5c30ed235d 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -15,12 +15,9 @@ import logging from synapse.util.async import ObservableDeferred -from synapse.util import unwrapFirstError +from synapse.util import unwrapFirstError, logcontext from synapse.util.caches.lrucache import LruCache from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry -from synapse.util.logcontext import ( - PreserveLoggingContext, preserve_context_over_deferred, preserve_context_over_fn -) from . import DEBUG_CACHES, register_cache @@ -328,11 +325,9 @@ class CacheDescriptor(_CacheDescriptorBase): defer.returnValue(cached_result) observer.addCallback(check_result) - return preserve_context_over_deferred(observer) except KeyError: ret = defer.maybeDeferred( - preserve_context_over_fn, - self.function_to_call, + logcontext.preserve_fn(self.function_to_call), obj, *args, **kwargs ) @@ -342,10 +337,11 @@ class CacheDescriptor(_CacheDescriptorBase): ret.addErrback(onErr) - ret = ObservableDeferred(ret, consumeErrors=True) - cache.set(cache_key, ret, callback=invalidate_callback) + result_d = ObservableDeferred(ret, consumeErrors=True) + cache.set(cache_key, result_d, callback=invalidate_callback) + observer = result_d.observe() - return preserve_context_over_deferred(ret.observe()) + return logcontext.make_deferred_yieldable(observer) wrapped.invalidate = cache.invalidate wrapped.invalidate_all = cache.invalidate_all @@ -362,7 +358,11 @@ class CacheListDescriptor(_CacheDescriptorBase): """Wraps an existing cache to support bulk fetching of keys. Given a list of keys it looks in the cache to find any hits, then passes - the list of missing keys to the wrapped fucntion. + the list of missing keys to the wrapped function. + + Once wrapped, the function returns either a Deferred which resolves to + the list of results, or (if all results were cached), just the list of + results. """ def __init__(self, orig, cached_method_name, list_name, num_args=None, @@ -433,8 +433,7 @@ class CacheListDescriptor(_CacheDescriptorBase): args_to_call[self.list_name] = missing ret_d = defer.maybeDeferred( - preserve_context_over_fn, - self.function_to_call, + logcontext.preserve_fn(self.function_to_call), **args_to_call ) @@ -443,8 +442,7 @@ class CacheListDescriptor(_CacheDescriptorBase): # We need to create deferreds for each arg in the list so that # we can insert the new deferred into the cache. for arg in missing: - with PreserveLoggingContext(): - observer = ret_d.observe() + observer = ret_d.observe() observer.addCallback(lambda r, arg: r.get(arg, None), arg) observer = ObservableDeferred(observer) @@ -471,7 +469,7 @@ class CacheListDescriptor(_CacheDescriptorBase): results.update(res) return results - return preserve_context_over_deferred(defer.gatherResults( + return logcontext.make_deferred_yieldable(defer.gatherResults( cached_defers.values(), consumeErrors=True, ).addCallback(update_results_dict).addErrback( diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index ff67b1d794..857afee7cb 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -310,6 +310,10 @@ def preserve_context_over_fn(fn, *args, **kwargs): def preserve_context_over_deferred(deferred, context=None): """Given a deferred wrap it such that any callbacks added later to it will be invoked with the current context. + + Deprecated: this almost certainly doesn't do want you want, ie make + the deferred follow the synapse logcontext rules: try + ``make_deferred_yieldable`` instead. """ if context is None: context = LoggingContext.current_context() @@ -359,6 +363,25 @@ def preserve_fn(f): return g +@defer.inlineCallbacks +def make_deferred_yieldable(deferred): + """Given a deferred, make it follow the Synapse logcontext rules: + + If the deferred has completed (or is not actually a Deferred), essentially + does nothing (just returns another completed deferred with the + result/failure). + + If the deferred has not yet completed, resets the logcontext before + returning a deferred. Then, when the deferred completes, restores the + current logcontext before running callbacks/errbacks. + + (This is more-or-less the opposite operation to preserve_fn.) + """ + with PreserveLoggingContext(): + r = yield deferred + defer.returnValue(r) + + # modules to ignore in `logcontext_tracer` _to_ignore = [ "synapse.util.logcontext", diff --git a/tests/util/caches/test_descriptors.py b/tests/util/caches/test_descriptors.py index 419281054d..4414e86771 100644 --- a/tests/util/caches/test_descriptors.py +++ b/tests/util/caches/test_descriptors.py @@ -12,11 +12,18 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging + import mock +from synapse.api.errors import SynapseError +from synapse.util import async +from synapse.util import logcontext from twisted.internet import defer from synapse.util.caches import descriptors from tests import unittest +logger = logging.getLogger(__name__) + class DescriptorTestCase(unittest.TestCase): @defer.inlineCallbacks @@ -84,3 +91,87 @@ class DescriptorTestCase(unittest.TestCase): r = yield obj.fn(2, 5) self.assertEqual(r, 'chips') obj.mock.assert_not_called() + + def test_cache_logcontexts(self): + """Check that logcontexts are set and restored correctly when + using the cache.""" + + complete_lookup = defer.Deferred() + + class Cls(object): + @descriptors.cached() + def fn(self, arg1): + @defer.inlineCallbacks + def inner_fn(): + with logcontext.PreserveLoggingContext(): + yield complete_lookup + defer.returnValue(1) + + return inner_fn() + + @defer.inlineCallbacks + def do_lookup(): + with logcontext.LoggingContext() as c1: + c1.name = "c1" + r = yield obj.fn(1) + self.assertEqual(logcontext.LoggingContext.current_context(), + c1) + defer.returnValue(r) + + def check_result(r): + self.assertEqual(r, 1) + + obj = Cls() + + # set off a deferred which will do a cache lookup + d1 = do_lookup() + self.assertEqual(logcontext.LoggingContext.current_context(), + logcontext.LoggingContext.sentinel) + d1.addCallback(check_result) + + # and another + d2 = do_lookup() + self.assertEqual(logcontext.LoggingContext.current_context(), + logcontext.LoggingContext.sentinel) + d2.addCallback(check_result) + + # let the lookup complete + complete_lookup.callback(None) + + return defer.gatherResults([d1, d2]) + + def test_cache_logcontexts_with_exception(self): + """Check that the cache sets and restores logcontexts correctly when + the lookup function throws an exception""" + + class Cls(object): + @descriptors.cached() + def fn(self, arg1): + @defer.inlineCallbacks + def inner_fn(): + yield async.run_on_reactor() + raise SynapseError(400, "blah") + + return inner_fn() + + @defer.inlineCallbacks + def do_lookup(): + with logcontext.LoggingContext() as c1: + c1.name = "c1" + try: + yield obj.fn(1) + self.fail("No exception thrown") + except SynapseError: + pass + + self.assertEqual(logcontext.LoggingContext.current_context(), + c1) + + obj = Cls() + + # set off a deferred which will do a cache lookup + d1 = do_lookup() + self.assertEqual(logcontext.LoggingContext.current_context(), + logcontext.LoggingContext.sentinel) + + return d1 -- cgit 1.4.1