diff options
Diffstat (limited to 'synapse/appservice/scheduler.py')
-rw-r--r-- | synapse/appservice/scheduler.py | 63 |
1 files changed, 12 insertions, 51 deletions
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 2b3aa3b0ea..50ad3b8e83 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -88,45 +88,6 @@ class AppServiceScheduler(object): self.event_grouper.on_receive(service, event) -class AppServiceTransaction(object): - """Represents an application service transaction.""" - - def __init__(self, service, id, events): - self.service = service - self.id = id - self.events = events - - def send(self, as_api): - """Sends this transaction using the provided AS API interface. - - Args: - as_api(ApplicationServiceApi): The API to use to send. - Returns: - A Deferred which resolves to True if the transaction was sent. - """ - return as_api.push_bulk( - service=self.service, - events=self.events, - txn_id=self.id - ) - - def complete(self, store): - """Completes this transaction as successful. - - Marks this transaction ID on the application service and removes the - transaction contents from the database. - - Args: - store: The database store to operate on. - Returns: - A Deferred which resolves to True if the transaction was completed. - """ - return store.complete_appservice_txn( - service=self.service, - txn_id=self.id - ) - - class _EventGrouper(object): """Groups events for the same application service together. """ @@ -156,14 +117,18 @@ class _TransactionController(object): # keep track of how many recoverers there are self.recoverers = [] + @defer.inlineCallbacks def start_polling(self): groups = self.event_grouper.drain_groups() for service in groups: - txn_id = self._get_next_txn_id(service) - txn = AppServiceTransaction(service, txn_id, groups[service]) - self._store_txn(txn) - if self._is_service_up(service): - if txn.send(self.as_api): + txn = yield self.store.create_appservice_txn( + service=service, + events=groups[service] + ) + service_is_up = yield self._is_service_up(service) + if service_is_up: + sent = yield txn.send(self.as_api) + if sent: txn.complete(self.store) else: self._start_recoverer(service) @@ -207,14 +172,10 @@ class _TransactionController(object): logger.error("Failed to apply appservice state DOWN to service %s", service) + @defer.inlineCallbacks def _is_service_up(self, service): - pass - - def _get_next_txn_id(self, service): - pass # TODO work out the next txn_id for this service - - def _store_txn(self, txn): - pass + state = yield self.store.get_appservice_state(service) + defer.returnValue(state == ApplicationServiceState.UP) class _Recoverer(object): |