From 0b03a9770829247055fe8eaf66c24bb1892a3c50 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 26 Sep 2017 17:56:41 +0100 Subject: Add module_loader.py --- synapse/util/module_loader.py | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 synapse/util/module_loader.py (limited to 'synapse/util') diff --git a/synapse/util/module_loader.py b/synapse/util/module_loader.py new file mode 100644 index 0000000000..b4464790ee --- /dev/null +++ b/synapse/util/module_loader.py @@ -0,0 +1,41 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import importlib + +from synapse.config._base import ConfigError + +def load_module(provider): + """ Loads a module with its config + Take a dict with keys 'module' (the module name) and 'config' + (the config dict). + + Returns + Tuple of (provider class, parsed config object) + """ + # We need to import the module, and then pick the class out of + # that, so we split based on the last dot. + module, clz = provider['module'].rsplit(".", 1) + module = importlib.import_module(module) + provider_class = getattr(module, clz) + + try: + provider_config = provider_class.parse_config(provider["config"]) + except Exception as e: + raise ConfigError( + "Failed to parse config for %r: %r" % (provider['module'], e) + ) + + return (provider_class, provider_config) -- cgit 1.5.1 From 9fd086e506ae3cb3db7f1b1c7317c7602a4d71e3 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 26 Sep 2017 17:59:46 +0100 Subject: unnecessary parens --- synapse/util/module_loader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/util') diff --git a/synapse/util/module_loader.py b/synapse/util/module_loader.py index b4464790ee..4b51d7a77b 100644 --- a/synapse/util/module_loader.py +++ b/synapse/util/module_loader.py @@ -38,4 +38,4 @@ def load_module(provider): "Failed to parse config for %r: %r" % (provider['module'], e) ) - return (provider_class, provider_config) + return provider_class, provider_config -- cgit 1.5.1 From 8ad5f34908df99804b27bd045fde5b9d5625d784 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 26 Sep 2017 19:21:41 +0100 Subject: pep8 --- synapse/util/module_loader.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/util') diff --git a/synapse/util/module_loader.py b/synapse/util/module_loader.py index 4b51d7a77b..4288312b8a 100644 --- a/synapse/util/module_loader.py +++ b/synapse/util/module_loader.py @@ -17,6 +17,7 @@ import importlib from synapse.config._base import ConfigError + def load_module(provider): """ Loads a module with its config Take a dict with keys 'module' (the module name) and 'config' -- cgit 1.5.1 From 148428ce763978583da2b1d3c435ec321df45855 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 6 Oct 2017 22:24:28 +0100 Subject: Fix logcontext handling for concurrently_execute Avoid preserve_context_over_deferred, which is broken. --- synapse/util/async.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/async.py b/synapse/util/async.py index 1453faf0ef..bb252f75d7 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -19,7 +19,7 @@ from twisted.internet import defer, reactor from .logcontext import ( PreserveLoggingContext, preserve_fn, preserve_context_over_deferred, ) -from synapse.util import unwrapFirstError +from synapse.util import logcontext, unwrapFirstError from contextlib import contextmanager @@ -155,7 +155,7 @@ def concurrently_execute(func, args, limit): except StopIteration: pass - return preserve_context_over_deferred(defer.gatherResults([ + return logcontext.make_deferred_yieldable(defer.gatherResults([ preserve_fn(_concurrently_execute_inner)() for _ in xrange(limit) ], consumeErrors=True)).addErrback(unwrapFirstError) -- cgit 1.5.1 From 3cc852d339ad1cdcb0a435c76b44182bdb81dfe9 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 9 Oct 2017 17:44:42 +0100 Subject: Fancy logformatter to format exceptions better This is a bit of an experimental change at this point; the idea is to see if it helps us track down where our stack overflows are coming from by logging the stack when the exception was caught and turned into a Failure. (We'll also need https://github.com/richvdh/twisted/commit/edf27044200e74680ea67c525768e36dc9d9af2b). If we deploy this, we'll be able to enable it via the log config yaml. --- synapse/util/logformatter.py | 43 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 synapse/util/logformatter.py (limited to 'synapse/util') diff --git a/synapse/util/logformatter.py b/synapse/util/logformatter.py new file mode 100644 index 0000000000..60504162e9 --- /dev/null +++ b/synapse/util/logformatter.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import StringIO +import logging +import traceback + + +class LogFormatter(logging.Formatter): + """Log formatter which gives more detail for exceptions + + This is the same as the standard log formatter, except that when logging + exceptions [typically via log.foo("msg", exc_info=1)], it prints the + sequence that led up to the point at which the exception was caught. + (Normally only stack frames between the point the exception was raised and + where it was caught are logged). + """ + def __init__(self, *args, **kwargs): + super(LogFormatter, self).__init__(*args, **kwargs) + + def formatException(self, ei): + sio = StringIO.StringIO() + sio.write("Capture point (most recent call last):\n") + traceback.print_stack(ei[2].tb_frame.f_back, None, sio) + traceback.print_exception(ei[0], ei[1], ei[2], None, sio) + s = sio.getvalue() + sio.close() + if s[-1:] == "\n": + s = s[:-1] + return s -- cgit 1.5.1 From 4fad8efbfb1726c72bdd7cbbacc894b8701efec3 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 11 Oct 2017 15:05:05 +0100 Subject: Fix stackoverflow and logcontexts from linearizer 1. make it not blow out the stack when there are more than 50 things waiting for a lock. Fixes https://github.com/matrix-org/synapse/issues/2505. 2. Make it not mess up the log contexts. --- synapse/util/async.py | 24 ++++++++++++++++++++++-- tests/util/test_linearizer.py | 28 ++++++++++++++++++++++++---- 2 files changed, 46 insertions(+), 6 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/async.py b/synapse/util/async.py index bb252f75d7..0fd5b42523 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -203,7 +203,26 @@ class Linearizer(object): except: logger.exception("Unexpected exception in Linearizer") - logger.info("Acquired linearizer lock %r for key %r", self.name, key) + logger.info("Acquired linearizer lock %r for key %r", self.name, + key) + + # if the code holding the lock completes synchronously, then it + # will recursively run the next claimant on the list. That can + # relatively rapidly lead to stack exhaustion. This is essentially + # the same problem as http://twistedmatrix.com/trac/ticket/9304. + # + # In order to break the cycle, we add a cheeky sleep(0) here to + # ensure that we fall back to the reactor between each iteration. + # + # (There's no particular need for it to happen before we return + # the context manager, but it needs to happen while we hold the + # lock, and the context manager's exit code must be synchronous, + # so actually this is the only sensible place. + yield run_on_reactor() + + else: + logger.info("Acquired uncontended linearizer lock %r for key %r", + self.name, key) @contextmanager def _ctx_manager(): @@ -211,7 +230,8 @@ class Linearizer(object): yield finally: logger.info("Releasing linearizer lock %r for key %r", self.name, key) - new_defer.callback(None) + with PreserveLoggingContext(): + new_defer.callback(None) current_d = self.key_to_defer.get(key) if current_d is new_defer: self.key_to_defer.pop(key, None) diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py index afcba482f9..793a88e462 100644 --- a/tests/util/test_linearizer.py +++ b/tests/util/test_linearizer.py @@ -12,8 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - - +from synapse.util import async, logcontext from tests import unittest from twisted.internet import defer @@ -38,7 +37,28 @@ class LinearizerTestCase(unittest.TestCase): with cm1: self.assertFalse(d2.called) - self.assertTrue(d2.called) - with (yield d2): pass + + def test_lots_of_queued_things(self): + # we have one slow thing, and lots of fast things queued up behind it. + # it should *not* explode the stack. + linearizer = Linearizer() + + @defer.inlineCallbacks + def func(i, sleep=False): + with logcontext.LoggingContext("func(%s)" % i) as lc: + with (yield linearizer.queue("")): + self.assertEqual( + logcontext.LoggingContext.current_context(), lc) + if sleep: + yield async.sleep(0) + + self.assertEqual( + logcontext.LoggingContext.current_context(), lc) + + func(0, sleep=True) + for i in xrange(1, 100): + func(i) + + return func(1000) -- cgit 1.5.1 From f30c4ed2bc2255dc7182bd026fb6437afec735a5 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 11 Oct 2017 17:26:17 +0100 Subject: logformatter: fix AttributeError make sure we have the relevant fields before we try to log them. --- synapse/util/logformatter.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/logformatter.py b/synapse/util/logformatter.py index 60504162e9..cdbc4bffd7 100644 --- a/synapse/util/logformatter.py +++ b/synapse/util/logformatter.py @@ -33,9 +33,17 @@ class LogFormatter(logging.Formatter): def formatException(self, ei): sio = StringIO.StringIO() - sio.write("Capture point (most recent call last):\n") - traceback.print_stack(ei[2].tb_frame.f_back, None, sio) - traceback.print_exception(ei[0], ei[1], ei[2], None, sio) + (typ, val, tb) = ei + + # log the stack above the exception capture point if possible, but + # check that we actually have an f_back attribute to work around + # https://twistedmatrix.com/trac/ticket/9305 + + if tb and hasattr(tb.tb_frame, 'f_back'): + sio.write("Capture point (most recent call last):\n") + traceback.print_stack(tb.tb_frame.f_back, None, sio) + + traceback.print_exception(typ, val, tb, None, sio) s = sio.getvalue() sio.close() if s[-1:] == "\n": -- cgit 1.5.1 From 2e9f5ea31a9c66eceb6276c5241cc6537cb0ae4c Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 17 Oct 2017 10:59:30 +0100 Subject: Fix logcontext handling for persist_events * don't use preserve_context_over_deferred, which is known broken. * remove a redundant preserve_fn. * add/improve some comments --- synapse/storage/events.py | 24 +++++++++++++++++------- synapse/util/async.py | 5 +++++ 2 files changed, 22 insertions(+), 7 deletions(-) (limited to 'synapse/util') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 4f0b43c36d..637640ec2a 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -21,7 +21,7 @@ from synapse.events.utils import prune_event from synapse.util.async import ObservableDeferred from synapse.util.logcontext import ( - preserve_fn, PreserveLoggingContext, preserve_context_over_deferred + preserve_fn, PreserveLoggingContext, make_deferred_yieldable ) from synapse.util.logutils import log_function from synapse.util.metrics import Measure @@ -88,13 +88,23 @@ class _EventPeristenceQueue(object): def add_to_queue(self, room_id, events_and_contexts, backfilled): """Add events to the queue, with the given persist_event options. + NB: due to the normal usage pattern of this method, it does *not* + follow the synapse logcontext rules, and leaves the logcontext in + place whether or not the returned deferred is ready. + Args: room_id (str): events_and_contexts (list[(EventBase, EventContext)]): backfilled (bool): + + Returns: + defer.Deferred: a deferred which will resolve once the events are + persisted. Runs its callbacks *without* a logcontext. """ queue = self._event_persist_queues.setdefault(room_id, deque()) if queue: + # if the last item in the queue has the same `backfilled` setting, + # we can just add these new events to that item. end_item = queue[-1] if end_item.backfilled == backfilled: end_item.events_and_contexts.extend(events_and_contexts) @@ -113,11 +123,11 @@ class _EventPeristenceQueue(object): def handle_queue(self, room_id, per_item_callback): """Attempts to handle the queue for a room if not already being handled. - The given callback will be invoked with for each item in the queue,1 + The given callback will be invoked with for each item in the queue, of type _EventPersistQueueItem. The per_item_callback will continuously be called with new items, unless the queue becomnes empty. The return value of the function will be given to the deferreds waiting on the item, - exceptions will be passed to the deferres as well. + exceptions will be passed to the deferreds as well. This function should therefore be called whenever anything is added to the queue. @@ -233,7 +243,7 @@ class EventsStore(SQLBaseStore): deferreds = [] for room_id, evs_ctxs in partitioned.iteritems(): - d = preserve_fn(self._event_persist_queue.add_to_queue)( + d = self._event_persist_queue.add_to_queue( room_id, evs_ctxs, backfilled=backfilled, ) @@ -242,7 +252,7 @@ class EventsStore(SQLBaseStore): for room_id in partitioned: self._maybe_start_persisting(room_id) - return preserve_context_over_deferred( + return make_deferred_yieldable( defer.gatherResults(deferreds, consumeErrors=True) ) @@ -267,7 +277,7 @@ class EventsStore(SQLBaseStore): self._maybe_start_persisting(event.room_id) - yield preserve_context_over_deferred(deferred) + yield make_deferred_yieldable(deferred) max_persisted_id = yield self._stream_id_gen.get_current_token() defer.returnValue((event.internal_metadata.stream_ordering, max_persisted_id)) @@ -1526,7 +1536,7 @@ class EventsStore(SQLBaseStore): if not allow_rejected: rows[:] = [r for r in rows if not r["rejects"]] - res = yield preserve_context_over_deferred(defer.gatherResults( + res = yield make_deferred_yieldable(defer.gatherResults( [ preserve_fn(self._get_event_from_row)( row["internal_metadata"], row["json"], row["redacts"], diff --git a/synapse/util/async.py b/synapse/util/async.py index 0fd5b42523..a0a9039475 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -53,6 +53,11 @@ class ObservableDeferred(object): Cancelling or otherwise resolving an observer will not affect the original ObservableDeferred. + + NB that it does not attempt to do anything with logcontexts; in general + you should probably make_deferred_yieldable the deferreds + returned by `observe`, and ensure that the original deferred runs its + callbacks in the sentinel logcontext. """ __slots__ = ["_deferred", "_observers", "_result"] -- cgit 1.5.1