diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 2430814796..685f15c061 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -53,8 +53,8 @@ import logging
from twisted.internet import defer
from synapse.appservice import ApplicationServiceState
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.logcontext import run_in_background
-from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
@@ -104,27 +104,34 @@ class _ServiceQueuer(object):
self.clock = clock
def enqueue(self, service, event):
- # if this service isn't being sent something
self.queued_events.setdefault(service.id, []).append(event)
- run_in_background(self._send_request, service)
- @defer.inlineCallbacks
- def _send_request(self, service):
+ # start a sender for this appservice if we don't already have one
+
if service.id in self.requests_in_flight:
return
+ run_as_background_process(
+ "as-sender-%s" % (service.id, ),
+ self._send_request, service,
+ )
+
+ @defer.inlineCallbacks
+ def _send_request(self, service):
+ # sanity-check: we shouldn't get here if this service already has a sender
+ # running.
+ assert(service.id not in self.requests_in_flight)
+
self.requests_in_flight.add(service.id)
try:
while True:
events = self.queued_events.pop(service.id, [])
if not events:
return
-
- with Measure(self.clock, "servicequeuer.send"):
- try:
- yield self.txn_ctrl.send(service, events)
- except Exception:
- logger.exception("AS request failed")
+ try:
+ yield self.txn_ctrl.send(service, events)
+ except Exception:
+ logger.exception("AS request failed")
finally:
self.requests_in_flight.discard(service.id)
@@ -223,7 +230,12 @@ class _Recoverer(object):
self.backoff_counter = 1
def recover(self):
- self.clock.call_later((2 ** self.backoff_counter), self.retry)
+ def _retry():
+ run_as_background_process(
+ "as-recoverer-%s" % (self.service.id,),
+ self.retry,
+ )
+ self.clock.call_later((2 ** self.backoff_counter), _retry)
def _backoff(self):
# cap the backoff to be around 8.5min => (2^9) = 512 secs
|