diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 9afc8fd754..f130d4367d 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -48,9 +48,12 @@ UP & quit +---------- YES SUCCESS
This is all tied together by the AppServiceScheduler which DIs the required
components.
"""
+from twisted.internet import defer
from synapse.appservice import ApplicationServiceState
-from twisted.internet import defer
+from synapse.util.logcontext import preserve_fn
+from synapse.util.metrics import Measure
+
import logging
logger = logging.getLogger(__name__)
@@ -73,7 +76,7 @@ class ApplicationServiceScheduler(object):
self.txn_ctrl = _TransactionController(
self.clock, self.store, self.as_api, create_recoverer
)
- self.queuer = _ServiceQueuer(self.txn_ctrl)
+ self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock)
@defer.inlineCallbacks
def start(self):
@@ -94,38 +97,36 @@ class _ServiceQueuer(object):
this schedules any other events in the queue to run.
"""
- def __init__(self, txn_ctrl):
+ def __init__(self, txn_ctrl, clock):
self.queued_events = {} # dict of {service_id: [events]}
- self.pending_requests = {} # dict of {service_id: Deferred}
+ self.requests_in_flight = set()
self.txn_ctrl = txn_ctrl
+ self.clock = clock
def enqueue(self, service, event):
# if this service isn't being sent something
- if not self.pending_requests.get(service.id):
- self._send_request(service, [event])
- else:
- # add to queue for this service
- if service.id not in self.queued_events:
- self.queued_events[service.id] = []
- self.queued_events[service.id].append(event)
-
- def _send_request(self, service, events):
- # send request and add callbacks
- d = self.txn_ctrl.send(service, events)
- d.addBoth(self._on_request_finish)
- d.addErrback(self._on_request_fail)
- self.pending_requests[service.id] = d
-
- def _on_request_finish(self, service):
- self.pending_requests[service.id] = None
- # if there are queued events, then send them.
- if (service.id in self.queued_events
- and len(self.queued_events[service.id]) > 0):
- self._send_request(service, self.queued_events[service.id])
- self.queued_events[service.id] = []
-
- def _on_request_fail(self, err):
- logger.error("AS request failed: %s", err)
+ self.queued_events.setdefault(service.id, []).append(event)
+ preserve_fn(self._send_request)(service)
+
+ @defer.inlineCallbacks
+ def _send_request(self, service):
+ if service.id in self.requests_in_flight:
+ return
+
+ with Measure(self.clock, "_ServiceQueuer._send_request"):
+ self.requests_in_flight.add(service.id)
+ try:
+ while True:
+ events = self.queued_events.pop(service.id, [])
+ if not events:
+ return
+
+ try:
+ yield self.txn_ctrl.send(service, events)
+ except:
+ logger.exception("AS request failed")
+ finally:
+ self.requests_in_flight.discard(service.id)
class _TransactionController(object):
@@ -155,8 +156,6 @@ class _TransactionController(object):
except Exception as e:
logger.exception(e)
self._start_recoverer(service)
- # request has finished
- defer.returnValue(service)
@defer.inlineCallbacks
def on_recovered(self, recoverer):
diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py
index 631a229332..e5a902f734 100644
--- a/tests/appservice/test_scheduler.py
+++ b/tests/appservice/test_scheduler.py
@@ -193,7 +193,7 @@ class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase):
def setUp(self):
self.txn_ctrl = Mock()
- self.queuer = _ServiceQueuer(self.txn_ctrl)
+ self.queuer = _ServiceQueuer(self.txn_ctrl, MockClock())
def test_send_single_event_no_queue(self):
# Expect the event to be sent immediately.
|