diff options
author | Kegan Dougal <kegan@matrix.org> | 2015-03-16 14:03:16 +0000 |
---|---|---|
committer | Kegan Dougal <kegan@matrix.org> | 2015-03-16 14:03:16 +0000 |
commit | d04fa1f7121d996e05bd4def14951d89eb47d1ab (patch) | |
tree | 44f4ad91cd595e0e6ca0a0f0e816d91df96f6c43 /synapse/appservice/scheduler.py | |
parent | Replace EventGrouper for ServiceQueuer to move to push-based txns. Fix tests ... (diff) | |
download | synapse-d04fa1f7121d996e05bd4def14951d89eb47d1ab.tar.xz |
Implement ServiceQueuer with tests.
Diffstat (limited to 'synapse/appservice/scheduler.py')
-rw-r--r-- | synapse/appservice/scheduler.py | 46 |
1 files changed, 30 insertions, 16 deletions
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 54c42d1b94..3cedd479a2 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -83,7 +83,6 @@ class AppServiceScheduler(object): self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered ) self.txn_ctrl.add_recoverers(recoverers) - self.txn_ctrl.start_polling() def submit_event_for_as(self, service, event): self.queuer.enqueue(service, event) @@ -96,24 +95,37 @@ class _ServiceQueuer(object): """ def __init__(self, txn_ctrl): - self.groups = {} # dict of {service: [events]} + self.queued_events = {} # dict of {service_id: [events]} + self.pending_requests = {} # dict of {service_id: Deferred} self.txn_ctrl = txn_ctrl def enqueue(self, service, event): - # if nothing in queue for this service, send event immediately and add - # callbacks. - self.txn_ctrl.send(service, [event]) - - # else add to queue for this service - - if service not in self.groups: - self.groups[service] = [] - self.groups[service].append(event) - - def drain_groups(self): - groups = self.groups - self.groups = {} - return groups + # if this service isn't being sent something + if not self.pending_requests.get(service.id): + self._send_request(service, [event]) + else: + # add to queue for this service + if service.id not in self.queued_events: + self.queued_events[service.id] = [] + self.queued_events[service.id].append(event) + + def _send_request(self, service, events): + # send request and add callbacks + d = self.txn_ctrl.send(service, events) + d.addCallback(self._on_request_finish) + d.addErrback(self._on_request_fail) + self.pending_requests[service.id] = d + + def _on_request_finish(self, service): + self.pending_requests[service.id] = None + # if there are queued events, then send them. + if (service.id in self.queued_events + and len(self.queued_events[service.id]) > 0): + self._send_request(service, self.queued_events[service.id]) + self.queued_events[service.id] = [] + + def _on_request_fail(self, err): + logger.error("AS request failed: %s", err) class _TransactionController(object): @@ -142,6 +154,8 @@ class _TransactionController(object): self._start_recoverer(service) except Exception as e: logger.exception(e) + # request has finished + defer.returnValue(service) @defer.inlineCallbacks def on_recovered(self, recoverer): |