diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py
index 45af8d3eeb..a750261e77 100644
--- a/synapse/util/distributor.py
+++ b/synapse/util/distributor.py
@@ -12,10 +12,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
+import inspect
import logging
from twisted.internet import defer
+from twisted.internet.defer import Deferred, fail, succeed
+from twisted.python import failure
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
@@ -32,14 +34,14 @@ def user_joined_room(distributor, user, room_id):
distributor.fire("user_joined_room", user=user, room_id=room_id)
-class Distributor(object):
+class Distributor:
"""A central dispatch point for loosely-connected pieces of code to
register, observe, and fire signals.
Signals are named simply by strings.
TODO(paul): It would be nice to give signals stronger object identities,
- so we can attach metadata, docstrings, detect typoes, etc... But this
+ so we can attach metadata, docstrings, detect typos, etc... But this
model will do for today.
"""
@@ -79,7 +81,29 @@ class Distributor(object):
run_as_background_process(name, self.signals[name].fire, *args, **kwargs)
-class Signal(object):
+def maybeAwaitableDeferred(f, *args, **kw):
+ """
+ Invoke a function that may or may not return a Deferred or an Awaitable.
+
+ This is a modified version of twisted.internet.defer.maybeDeferred.
+ """
+ try:
+ result = f(*args, **kw)
+ except Exception:
+ return fail(failure.Failure(captureVars=Deferred.debug))
+
+ if isinstance(result, Deferred):
+ return result
+ # Handle the additional case of an awaitable being returned.
+ elif inspect.isawaitable(result):
+ return defer.ensureDeferred(result)
+ elif isinstance(result, failure.Failure):
+ return fail(result)
+ else:
+ return succeed(result)
+
+
+class Signal:
"""A Signal is a dispatch point that stores a list of callables as
observers of it.
@@ -122,7 +146,7 @@ class Signal(object):
),
)
- return defer.maybeDeferred(observer, *args, **kwargs).addErrback(eb)
+ return maybeAwaitableDeferred(observer, *args, **kwargs).addErrback(eb)
deferreds = [run_in_background(do, o) for o in self.observers]
|