diff options
author | Erik Johnston <erikj@jki.re> | 2016-08-17 14:37:21 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-08-17 14:37:21 +0100 |
commit | 973d67a033bad7a9ab2cabfb4910eb0a65e49378 (patch) | |
tree | 4ef4b2c8e1249a92ec3af04de548ca9c054334fc /synapse/appservice/scheduler.py | |
parent | Merge pull request #1018 from matrix-org/erikj/dead_appservice (diff) | |
parent | Clean up _ServiceQueuer (diff) | |
download | synapse-973d67a033bad7a9ab2cabfb4910eb0a65e49378.tar.xz |
Merge pull request #1019 from matrix-org/erikj/appservice_clean
Clean up _ServiceQueuer
Diffstat (limited to 'synapse/appservice/scheduler.py')
-rw-r--r-- | synapse/appservice/scheduler.py | 61 |
1 files changed, 30 insertions, 31 deletions
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 9afc8fd754..f130d4367d 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -48,9 +48,12 @@ UP & quit +---------- YES SUCCESS This is all tied together by the AppServiceScheduler which DIs the required components. """ +from twisted.internet import defer from synapse.appservice import ApplicationServiceState -from twisted.internet import defer +from synapse.util.logcontext import preserve_fn +from synapse.util.metrics import Measure + import logging logger = logging.getLogger(__name__) @@ -73,7 +76,7 @@ class ApplicationServiceScheduler(object): self.txn_ctrl = _TransactionController( self.clock, self.store, self.as_api, create_recoverer ) - self.queuer = _ServiceQueuer(self.txn_ctrl) + self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock) @defer.inlineCallbacks def start(self): @@ -94,38 +97,36 @@ class _ServiceQueuer(object): this schedules any other events in the queue to run. """ - def __init__(self, txn_ctrl): + def __init__(self, txn_ctrl, clock): self.queued_events = {} # dict of {service_id: [events]} - self.pending_requests = {} # dict of {service_id: Deferred} + self.requests_in_flight = set() self.txn_ctrl = txn_ctrl + self.clock = clock def enqueue(self, service, event): # if this service isn't being sent something - if not self.pending_requests.get(service.id): - self._send_request(service, [event]) - else: - # add to queue for this service - if service.id not in self.queued_events: - self.queued_events[service.id] = [] - self.queued_events[service.id].append(event) - - def _send_request(self, service, events): - # send request and add callbacks - d = self.txn_ctrl.send(service, events) - d.addBoth(self._on_request_finish) - d.addErrback(self._on_request_fail) - self.pending_requests[service.id] = d - - def _on_request_finish(self, service): - self.pending_requests[service.id] = None - # if there are queued events, then send them. - if (service.id in self.queued_events - and len(self.queued_events[service.id]) > 0): - self._send_request(service, self.queued_events[service.id]) - self.queued_events[service.id] = [] - - def _on_request_fail(self, err): - logger.error("AS request failed: %s", err) + self.queued_events.setdefault(service.id, []).append(event) + preserve_fn(self._send_request)(service) + + @defer.inlineCallbacks + def _send_request(self, service): + if service.id in self.requests_in_flight: + return + + with Measure(self.clock, "_ServiceQueuer._send_request"): + self.requests_in_flight.add(service.id) + try: + while True: + events = self.queued_events.pop(service.id, []) + if not events: + return + + try: + yield self.txn_ctrl.send(service, events) + except: + logger.exception("AS request failed") + finally: + self.requests_in_flight.discard(service.id) class _TransactionController(object): @@ -155,8 +156,6 @@ class _TransactionController(object): except Exception as e: logger.exception(e) self._start_recoverer(service) - # request has finished - defer.returnValue(service) @defer.inlineCallbacks def on_recovered(self, recoverer): |