diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py
index a750261e77..f73e95393c 100644
--- a/synapse/util/distributor.py
+++ b/synapse/util/distributor.py
@@ -16,8 +16,6 @@ import inspect
import logging
from twisted.internet import defer
-from twisted.internet.defer import Deferred, fail, succeed
-from twisted.python import failure
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
@@ -29,11 +27,6 @@ def user_left_room(distributor, user, room_id):
distributor.fire("user_left_room", user=user, room_id=room_id)
-# XXX: this is no longer used. We should probably kill it.
-def user_joined_room(distributor, user, room_id):
- distributor.fire("user_joined_room", user=user, room_id=room_id)
-
-
class Distributor:
"""A central dispatch point for loosely-connected pieces of code to
register, observe, and fire signals.
@@ -81,28 +74,6 @@ class Distributor:
run_as_background_process(name, self.signals[name].fire, *args, **kwargs)
-def maybeAwaitableDeferred(f, *args, **kw):
- """
- Invoke a function that may or may not return a Deferred or an Awaitable.
-
- This is a modified version of twisted.internet.defer.maybeDeferred.
- """
- try:
- result = f(*args, **kw)
- except Exception:
- return fail(failure.Failure(captureVars=Deferred.debug))
-
- if isinstance(result, Deferred):
- return result
- # Handle the additional case of an awaitable being returned.
- elif inspect.isawaitable(result):
- return defer.ensureDeferred(result)
- elif isinstance(result, failure.Failure):
- return fail(result)
- else:
- return succeed(result)
-
-
class Signal:
"""A Signal is a dispatch point that stores a list of callables as
observers of it.
@@ -132,22 +103,17 @@ class Signal:
Returns a Deferred that will complete when all the observers have
completed."""
- def do(observer):
- def eb(failure):
+ async def do(observer):
+ try:
+ result = observer(*args, **kwargs)
+ if inspect.isawaitable(result):
+ result = await result
+ return result
+ except Exception as e:
logger.warning(
- "%s signal observer %s failed: %r",
- self.name,
- observer,
- failure,
- exc_info=(
- failure.type,
- failure.value,
- failure.getTracebackObject(),
- ),
+ "%s signal observer %s failed: %r", self.name, observer, e,
)
- return maybeAwaitableDeferred(observer, *args, **kwargs).addErrback(eb)
-
deferreds = [run_in_background(do, o) for o in self.observers]
return make_deferred_yieldable(
|