From 09a135b0391a7a65c24c8c7b046cb6573dee3947 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 29 Oct 2019 15:02:23 +0000 Subject: Make concurrently_execute work with async/await --- synapse/util/async_helpers.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) (limited to 'synapse/util/async_helpers.py') diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 804dbca443..7659eaeb42 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -138,7 +138,7 @@ def concurrently_execute(func, args, limit): the number of concurrent executions. Args: - func (func): Function to execute, should return a deferred. + func (func): Function to execute, should return a deferred or coroutine. args (list): List of arguments to pass to func, each invocation of func gets a signle argument. limit (int): Maximum number of conccurent executions. @@ -148,11 +148,10 @@ def concurrently_execute(func, args, limit): """ it = iter(args) - @defer.inlineCallbacks - def _concurrently_execute_inner(): + async def _concurrently_execute_inner(): try: while True: - yield func(next(it)) + await maybe_awaitable(func(next(it))) except StopIteration: pass -- cgit 1.4.1 From 326b3dace77aeb36e516ea9b04ba1baa171bcb47 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 30 Oct 2019 11:35:46 +0000 Subject: Make ObservableDeferred.observe() always return deferred. This makes it easier to use in an async/await world. Also fixes a bug where cache descriptors would occaisonally return a raw value rather than a deferred. --- synapse/util/async_helpers.py | 7 ++----- tests/storage/test__base.py | 2 +- tests/util/caches/test_descriptors.py | 4 ++-- 3 files changed, 5 insertions(+), 8 deletions(-) (limited to 'synapse/util/async_helpers.py') diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 7659eaeb42..fd75ba27ad 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -86,11 +86,8 @@ class ObservableDeferred(object): deferred.addCallbacks(callback, errback) - def observe(self): + def observe(self) -> defer.Deferred: """Observe the underlying deferred. - - Can return either a deferred if the underlying deferred is still pending - (or has failed), or the actual value. Callers may need to use maybeDeferred. """ if not self._result: d = defer.Deferred() @@ -105,7 +102,7 @@ class ObservableDeferred(object): return d else: success, res = self._result - return res if success else defer.fail(res) + return defer.succeed(res) if success else defer.fail(res) def observers(self): return self._observers diff --git a/tests/storage/test__base.py b/tests/storage/test__base.py index dd49a14524..9b81b536f5 100644 --- a/tests/storage/test__base.py +++ b/tests/storage/test__base.py @@ -197,7 +197,7 @@ class CacheDecoratorTestCase(unittest.TestCase): a.func.prefill(("foo",), ObservableDeferred(d)) - self.assertEquals(a.func("foo"), d.result) + self.assertEquals(a.func("foo").result, d.result) self.assertEquals(callcount[0], 0) @defer.inlineCallbacks diff --git a/tests/util/caches/test_descriptors.py b/tests/util/caches/test_descriptors.py index f907903511..39e360fe24 100644 --- a/tests/util/caches/test_descriptors.py +++ b/tests/util/caches/test_descriptors.py @@ -310,14 +310,14 @@ class DescriptorTestCase(unittest.TestCase): obj.mock.return_value = ["spam", "eggs"] r = obj.fn(1, 2) - self.assertEqual(r, ["spam", "eggs"]) + self.assertEqual(r.result, ["spam", "eggs"]) obj.mock.assert_called_once_with(1, 2) obj.mock.reset_mock() # a call with different params should call the mock again obj.mock.return_value = ["chips"] r = obj.fn(1, 3) - self.assertEqual(r, ["chips"]) + self.assertEqual(r.result, ["chips"]) obj.mock.assert_called_once_with(1, 3) obj.mock.reset_mock() -- cgit 1.4.1 From 6e677403b7c71511374b1532b56e0c411840f87b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 30 Oct 2019 11:52:04 +0000 Subject: Clarify docstring --- synapse/util/async_helpers.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'synapse/util/async_helpers.py') diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index fd75ba27ad..b60a604474 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -88,6 +88,10 @@ class ObservableDeferred(object): def observe(self) -> defer.Deferred: """Observe the underlying deferred. + + This returns a brand new deferred that is resolved when the underlying + deferred is resolved. Interacting with the returned deferred does not + effect the underdlying deferred. """ if not self._result: d = defer.Deferred() -- cgit 1.4.1