From 95ccb6e2ec57f2150a697ea9cde030e8f78d6db9 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Thu, 19 Jul 2018 20:58:18 +1000 Subject: Don't spew errors because we can't save metrics (#3563) --- synapse/util/logcontext.py | 11 +++++++++++ synapse/util/metrics.py | 19 +++++++++++++------ 2 files changed, 24 insertions(+), 6 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index f6c7175f74..8dcae50b39 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -99,6 +99,17 @@ class ContextResourceUsage(object): self.db_sched_duration_sec = 0 self.evt_db_fetch_count = 0 + def __repr__(self): + return ("") % ( + self.ru_stime, + self.ru_utime, + self.db_txn_count, + self.db_txn_duration_sec, + self.db_sched_duration_sec, + self.evt_db_fetch_count,) + def __iadd__(self, other): """Add another ContextResourceUsage's stats to this one's. diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index 6ba7107896..97f1267380 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -104,12 +104,19 @@ class Measure(object): logger.warn("Expected context. (%r)", self.name) return - usage = context.get_resource_usage() - self.start_usage - block_ru_utime.labels(self.name).inc(usage.ru_utime) - block_ru_stime.labels(self.name).inc(usage.ru_stime) - block_db_txn_count.labels(self.name).inc(usage.db_txn_count) - block_db_txn_duration.labels(self.name).inc(usage.db_txn_duration_sec) - block_db_sched_duration.labels(self.name).inc(usage.db_sched_duration_sec) + current = context.get_resource_usage() + usage = current - self.start_usage + try: + block_ru_utime.labels(self.name).inc(usage.ru_utime) + block_ru_stime.labels(self.name).inc(usage.ru_stime) + block_db_txn_count.labels(self.name).inc(usage.db_txn_count) + block_db_txn_duration.labels(self.name).inc(usage.db_txn_duration_sec) + block_db_sched_duration.labels(self.name).inc(usage.db_sched_duration_sec) + except ValueError: + logger.warn( + "Failed to save metrics! OLD: %r, NEW: %r", + self.start_usage, current + ) if self.created_context: self.start_context.__exit__(exc_type, exc_val, exc_tb) -- cgit 1.4.1 From d7275eecf3abcd5f7c645e5da47c7e7310350333 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 20 Jul 2018 12:37:12 +0100 Subject: Add a sleep to the Limiter to fix stack overflows. Fixes #3570 --- synapse/util/async.py | 23 ++++++++++++++++++++--- tests/util/test_limiter.py | 8 ++++---- 2 files changed, 24 insertions(+), 7 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/async.py b/synapse/util/async.py index 5d0fb39130..7d5acecb1c 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -248,11 +248,15 @@ class Limiter(object): # do some work. """ - def __init__(self, max_count): + def __init__(self, max_count, clock=None): """ Args: max_count(int): The maximum number of concurrent access """ + if not clock: + from twisted.internet import reactor + clock = Clock(reactor) + self._clock = clock self.max_count = max_count # key_to_defer is a map from the key to a 2 element list where @@ -277,10 +281,23 @@ class Limiter(object): with PreserveLoggingContext(): yield new_defer logger.info("Acquired limiter lock for key %r", key) + entry[0] += 1 + + # if the code holding the lock completes synchronously, then it + # will recursively run the next claimant on the list. That can + # relatively rapidly lead to stack exhaustion. This is essentially + # the same problem as http://twistedmatrix.com/trac/ticket/9304. + # + # In order to break the cycle, we add a cheeky sleep(0) here to + # ensure that we fall back to the reactor between each iteration. + # + # (This needs to happen while we hold the lock, and the context manager's exit + # code must be synchronous, so this is the only sensible place.) + yield self._clock.sleep(0) + else: logger.info("Acquired uncontended limiter lock for key %r", key) - - entry[0] += 1 + entry[0] += 1 @contextmanager def _ctx_manager(): diff --git a/tests/util/test_limiter.py b/tests/util/test_limiter.py index a5a767b1ff..f7b942f5c1 100644 --- a/tests/util/test_limiter.py +++ b/tests/util/test_limiter.py @@ -48,21 +48,21 @@ class LimiterTestCase(unittest.TestCase): self.assertFalse(d4.called) self.assertFalse(d5.called) - self.assertTrue(d4.called) + cm4 = yield d4 self.assertFalse(d5.called) with cm3: self.assertFalse(d5.called) - self.assertTrue(d5.called) + cm5 = yield d5 with cm2: pass - with (yield d4): + with cm4: pass - with (yield d5): + with cm5: pass d6 = limiter.queue(key) -- cgit 1.4.1 From 8462c26485fb4f19fc52accc05870c0ea4c8eb6a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 20 Jul 2018 12:43:23 +0100 Subject: Improvements to the Limiter * give them names, to improve logging * use a deque rather than a list for efficiency --- synapse/handlers/message.py | 2 +- synapse/util/async.py | 33 ++++++++++++++++++++------------- 2 files changed, 21 insertions(+), 14 deletions(-) (limited to 'synapse/util') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index a39b852ceb..8c12c6990f 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -427,7 +427,7 @@ class EventCreationHandler(object): # We arbitrarily limit concurrent event creation for a room to 5. # This is to stop us from diverging history *too* much. - self.limiter = Limiter(max_count=5) + self.limiter = Limiter(max_count=5, name="room_event_creation_limit") self.action_generator = hs.get_action_generator() diff --git a/synapse/util/async.py b/synapse/util/async.py index 7d5acecb1c..22071ddef7 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,7 +13,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import collections import logging from contextlib import contextmanager @@ -248,11 +249,16 @@ class Limiter(object): # do some work. """ - def __init__(self, max_count, clock=None): + def __init__(self, max_count=1, name=None, clock=None): """ Args: - max_count(int): The maximum number of concurrent access + max_count(int): The maximum number of concurrent accesses """ + if name is None: + self.name = id(self) + else: + self.name = name + if not clock: from twisted.internet import reactor clock = Clock(reactor) @@ -260,14 +266,14 @@ class Limiter(object): self.max_count = max_count # key_to_defer is a map from the key to a 2 element list where - # the first element is the number of things executing - # the second element is a list of deferreds for the things blocked from + # the first element is the number of things executing, and + # the second element is a deque of deferreds for the things blocked from # executing. self.key_to_defer = {} @defer.inlineCallbacks def queue(self, key): - entry = self.key_to_defer.setdefault(key, [0, []]) + entry = self.key_to_defer.setdefault(key, [0, collections.deque()]) # If the number of things executing is greater than the maximum # then add a deferred to the list of blocked items @@ -277,10 +283,10 @@ class Limiter(object): new_defer = defer.Deferred() entry[1].append(new_defer) - logger.info("Waiting to acquire limiter lock for key %r", key) - with PreserveLoggingContext(): - yield new_defer - logger.info("Acquired limiter lock for key %r", key) + logger.info("Waiting to acquire limiter lock %r for key %r", self.name, key) + yield make_deferred_yieldable(new_defer) + + logger.info("Acquired limiter lock %r for key %r", self.name, key) entry[0] += 1 # if the code holding the lock completes synchronously, then it @@ -296,7 +302,7 @@ class Limiter(object): yield self._clock.sleep(0) else: - logger.info("Acquired uncontended limiter lock for key %r", key) + logger.info("Acquired uncontended limiter lock %r for key %r", self.name, key) entry[0] += 1 @contextmanager @@ -304,15 +310,16 @@ class Limiter(object): try: yield finally: - logger.info("Releasing limiter lock for key %r", key) + logger.info("Releasing limiter lock %r for key %r", self.name, key) # We've finished executing so check if there are any things # blocked waiting to execute and start one of them entry[0] -= 1 if entry[1]: - next_def = entry[1].pop(0) + next_def = entry[1].popleft() + # we need to run the next thing in the sentinel context. with PreserveLoggingContext(): next_def.callback(None) elif entry[0] == 0: -- cgit 1.4.1 From 7c712f95bbc7f405355d5714c92d65551f64fec2 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 20 Jul 2018 13:11:43 +0100 Subject: Combine Limiter and Linearizer Linearizer was effectively a Limiter with max_count=1, so rather than maintaining two sets of code, let's combine them. --- synapse/handlers/message.py | 4 +- synapse/util/async.py | 99 +++++-------------------------------------- tests/util/test_limiter.py | 70 ------------------------------ tests/util/test_linearizer.py | 47 ++++++++++++++++++++ 4 files changed, 59 insertions(+), 161 deletions(-) delete mode 100644 tests/util/test_limiter.py (limited to 'synapse/util') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 8c12c6990f..abc07ea87c 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -33,7 +33,7 @@ from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator from synapse.replication.http.send_event import send_event_to_master from synapse.types import RoomAlias, RoomStreamToken, UserID -from synapse.util.async import Limiter, ReadWriteLock +from synapse.util.async import Linearizer, ReadWriteLock from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.logcontext import run_in_background from synapse.util.metrics import measure_func @@ -427,7 +427,7 @@ class EventCreationHandler(object): # We arbitrarily limit concurrent event creation for a room to 5. # This is to stop us from diverging history *too* much. - self.limiter = Limiter(max_count=5, name="room_event_creation_limit") + self.limiter = Linearizer(max_count=5, name="room_event_creation_limit") self.action_generator = hs.get_action_generator() diff --git a/synapse/util/async.py b/synapse/util/async.py index 22071ddef7..5a50d9700f 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -157,91 +157,8 @@ def concurrently_execute(func, args, limit): class Linearizer(object): - """Linearizes access to resources based on a key. Useful to ensure only one - thing is happening at a time on a given resource. - - Example: - - with (yield linearizer.queue("test_key")): - # do some work. - - """ - def __init__(self, name=None, clock=None): - if name is None: - self.name = id(self) - else: - self.name = name - self.key_to_defer = {} - - if not clock: - from twisted.internet import reactor - clock = Clock(reactor) - self._clock = clock - - @defer.inlineCallbacks - def queue(self, key): - # If there is already a deferred in the queue, we pull it out so that - # we can wait on it later. - # Then we replace it with a deferred that we resolve *after* the - # context manager has exited. - # We only return the context manager after the previous deferred has - # resolved. - # This all has the net effect of creating a chain of deferreds that - # wait for the previous deferred before starting their work. - current_defer = self.key_to_defer.get(key) - - new_defer = defer.Deferred() - self.key_to_defer[key] = new_defer - - if current_defer: - logger.info( - "Waiting to acquire linearizer lock %r for key %r", self.name, key - ) - try: - with PreserveLoggingContext(): - yield current_defer - except Exception: - logger.exception("Unexpected exception in Linearizer") - - logger.info("Acquired linearizer lock %r for key %r", self.name, - key) - - # if the code holding the lock completes synchronously, then it - # will recursively run the next claimant on the list. That can - # relatively rapidly lead to stack exhaustion. This is essentially - # the same problem as http://twistedmatrix.com/trac/ticket/9304. - # - # In order to break the cycle, we add a cheeky sleep(0) here to - # ensure that we fall back to the reactor between each iteration. - # - # (There's no particular need for it to happen before we return - # the context manager, but it needs to happen while we hold the - # lock, and the context manager's exit code must be synchronous, - # so actually this is the only sensible place. - yield self._clock.sleep(0) - - else: - logger.info("Acquired uncontended linearizer lock %r for key %r", - self.name, key) - - @contextmanager - def _ctx_manager(): - try: - yield - finally: - logger.info("Releasing linearizer lock %r for key %r", self.name, key) - with PreserveLoggingContext(): - new_defer.callback(None) - current_d = self.key_to_defer.get(key) - if current_d is new_defer: - self.key_to_defer.pop(key, None) - - defer.returnValue(_ctx_manager()) - - -class Limiter(object): """Limits concurrent access to resources based on a key. Useful to ensure - only a few thing happen at a time on a given resource. + only a few things happen at a time on a given resource. Example: @@ -249,7 +166,7 @@ class Limiter(object): # do some work. """ - def __init__(self, max_count=1, name=None, clock=None): + def __init__(self, name=None, max_count=1, clock=None): """ Args: max_count(int): The maximum number of concurrent accesses @@ -283,10 +200,12 @@ class Limiter(object): new_defer = defer.Deferred() entry[1].append(new_defer) - logger.info("Waiting to acquire limiter lock %r for key %r", self.name, key) + logger.info( + "Waiting to acquire linearizer lock %r for key %r", self.name, key, + ) yield make_deferred_yieldable(new_defer) - logger.info("Acquired limiter lock %r for key %r", self.name, key) + logger.info("Acquired linearizer lock %r for key %r", self.name, key) entry[0] += 1 # if the code holding the lock completes synchronously, then it @@ -302,7 +221,9 @@ class Limiter(object): yield self._clock.sleep(0) else: - logger.info("Acquired uncontended limiter lock %r for key %r", self.name, key) + logger.info( + "Acquired uncontended linearizer lock %r for key %r", self.name, key, + ) entry[0] += 1 @contextmanager @@ -310,7 +231,7 @@ class Limiter(object): try: yield finally: - logger.info("Releasing limiter lock %r for key %r", self.name, key) + logger.info("Releasing linearizer lock %r for key %r", self.name, key) # We've finished executing so check if there are any things # blocked waiting to execute and start one of them diff --git a/tests/util/test_limiter.py b/tests/util/test_limiter.py deleted file mode 100644 index f7b942f5c1..0000000000 --- a/tests/util/test_limiter.py +++ /dev/null @@ -1,70 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from twisted.internet import defer - -from synapse.util.async import Limiter - -from tests import unittest - - -class LimiterTestCase(unittest.TestCase): - - @defer.inlineCallbacks - def test_limiter(self): - limiter = Limiter(3) - - key = object() - - d1 = limiter.queue(key) - cm1 = yield d1 - - d2 = limiter.queue(key) - cm2 = yield d2 - - d3 = limiter.queue(key) - cm3 = yield d3 - - d4 = limiter.queue(key) - self.assertFalse(d4.called) - - d5 = limiter.queue(key) - self.assertFalse(d5.called) - - with cm1: - self.assertFalse(d4.called) - self.assertFalse(d5.called) - - cm4 = yield d4 - self.assertFalse(d5.called) - - with cm3: - self.assertFalse(d5.called) - - cm5 = yield d5 - - with cm2: - pass - - with cm4: - pass - - with cm5: - pass - - d6 = limiter.queue(key) - with (yield d6): - pass diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py index c95907b32c..c9563124f9 100644 --- a/tests/util/test_linearizer.py +++ b/tests/util/test_linearizer.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -65,3 +66,49 @@ class LinearizerTestCase(unittest.TestCase): func(i) return func(1000) + + @defer.inlineCallbacks + def test_multiple_entries(self): + limiter = Linearizer(max_count=3) + + key = object() + + d1 = limiter.queue(key) + cm1 = yield d1 + + d2 = limiter.queue(key) + cm2 = yield d2 + + d3 = limiter.queue(key) + cm3 = yield d3 + + d4 = limiter.queue(key) + self.assertFalse(d4.called) + + d5 = limiter.queue(key) + self.assertFalse(d5.called) + + with cm1: + self.assertFalse(d4.called) + self.assertFalse(d5.called) + + cm4 = yield d4 + self.assertFalse(d5.called) + + with cm3: + self.assertFalse(d5.called) + + cm5 = yield d5 + + with cm2: + pass + + with cm4: + pass + + with cm5: + pass + + d6 = limiter.queue(key) + with (yield d6): + pass -- cgit 1.4.1 From 3d6df846580ce6ef8769945e6990af2f44251e40 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 20 Jul 2018 13:59:55 +0100 Subject: Test and fix support for cancellation in Linearizer --- synapse/util/async.py | 28 ++++++++++++++++++++++------ tests/util/test_linearizer.py | 31 +++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 6 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/async.py b/synapse/util/async.py index 5a50d9700f..a7094e2fb4 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -184,13 +184,13 @@ class Linearizer(object): # key_to_defer is a map from the key to a 2 element list where # the first element is the number of things executing, and - # the second element is a deque of deferreds for the things blocked from - # executing. + # the second element is an OrderedDict, where the keys are deferreds for the + # things blocked from executing. self.key_to_defer = {} @defer.inlineCallbacks def queue(self, key): - entry = self.key_to_defer.setdefault(key, [0, collections.deque()]) + entry = self.key_to_defer.setdefault(key, [0, collections.OrderedDict()]) # If the number of things executing is greater than the maximum # then add a deferred to the list of blocked items @@ -198,12 +198,28 @@ class Linearizer(object): # this item so that it can continue executing. if entry[0] >= self.max_count: new_defer = defer.Deferred() - entry[1].append(new_defer) + entry[1][new_defer] = 1 logger.info( "Waiting to acquire linearizer lock %r for key %r", self.name, key, ) - yield make_deferred_yieldable(new_defer) + try: + yield make_deferred_yieldable(new_defer) + except Exception as e: + if isinstance(e, CancelledError): + logger.info( + "Cancelling wait for linearizer lock %r for key %r", + self.name, key, + ) + else: + logger.warn( + "Unexpected exception waiting for linearizer lock %r for key %r", + self.name, key, + ) + + # we just have to take ourselves back out of the queue. + del entry[1][new_defer] + raise logger.info("Acquired linearizer lock %r for key %r", self.name, key) entry[0] += 1 @@ -238,7 +254,7 @@ class Linearizer(object): entry[0] -= 1 if entry[1]: - next_def = entry[1].popleft() + (next_def, _) = entry[1].popitem(last=False) # we need to run the next thing in the sentinel context. with PreserveLoggingContext(): diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py index c9563124f9..4729bd5a0a 100644 --- a/tests/util/test_linearizer.py +++ b/tests/util/test_linearizer.py @@ -17,6 +17,7 @@ from six.moves import range from twisted.internet import defer, reactor +from twisted.internet.defer import CancelledError from synapse.util import Clock, logcontext from synapse.util.async import Linearizer @@ -112,3 +113,33 @@ class LinearizerTestCase(unittest.TestCase): d6 = limiter.queue(key) with (yield d6): pass + + @defer.inlineCallbacks + def test_cancellation(self): + linearizer = Linearizer() + + key = object() + + d1 = linearizer.queue(key) + cm1 = yield d1 + + d2 = linearizer.queue(key) + self.assertFalse(d2.called) + + d3 = linearizer.queue(key) + self.assertFalse(d3.called) + + d2.cancel() + + with cm1: + pass + + self.assertTrue(d2.called) + try: + yield d2 + self.fail("Expected d2 to raise CancelledError") + except CancelledError: + pass + + with (yield d3): + pass -- cgit 1.4.1