diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 59a870e271..54c42d1b94 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -16,11 +16,11 @@
This module controls the reliability for application service transactions.
The nominal flow through this module looks like:
- _________
----ASa[e]-->| Event |
-----ASb[e]->| Grouper |<-poll 1/s--+
---ASa[e]--->|_________| | ASa[e,e] ASb[e]
- V
+ __________
+1---ASa[e]-->| Service |--> Queue ASa[f]
+2----ASb[e]->| Queuer |
+3--ASa[f]--->|__________|-----------+ ASa[e], ASb[e]
+ V
-````````- +------------+
|````````|<--StoreTxn-|Transaction |
|Database| | Controller |---> SEND TO AS
@@ -66,14 +66,14 @@ class AppServiceScheduler(object):
self.clock = clock
self.store = store
self.as_api = as_api
- self.event_grouper = _EventGrouper()
def create_recoverer(service, callback):
return _Recoverer(clock, store, as_api, service, callback)
self.txn_ctrl = _TransactionController(
- clock, store, as_api, self.event_grouper, create_recoverer
+ clock, store, as_api, create_recoverer
)
+ self.queuer = _ServiceQueuer(self.txn_ctrl)
@defer.inlineCallbacks
def start(self):
@@ -86,17 +86,26 @@ class AppServiceScheduler(object):
self.txn_ctrl.start_polling()
def submit_event_for_as(self, service, event):
- self.event_grouper.enqueue(service, event)
+ self.queuer.enqueue(service, event)
-class _EventGrouper(object):
- """Groups events for the same application service together.
+class _ServiceQueuer(object):
+ """Queues events for the same application service together, sending
+ transactions as soon as possible. Once a transaction is sent successfully,
+ this schedules any other events in the queue to run.
"""
- def __init__(self):
+ def __init__(self, txn_ctrl):
self.groups = {} # dict of {service: [events]}
+ self.txn_ctrl = txn_ctrl
def enqueue(self, service, event):
+ # if nothing in queue for this service, send event immediately and add
+ # callbacks.
+ self.txn_ctrl.send(service, [event])
+
+ # else add to queue for this service
+
if service not in self.groups:
self.groups[service] = []
self.groups[service].append(event)
@@ -109,34 +118,30 @@ class _EventGrouper(object):
class _TransactionController(object):
- def __init__(self, clock, store, as_api, event_grouper, recoverer_fn):
+ def __init__(self, clock, store, as_api, recoverer_fn):
self.clock = clock
self.store = store
self.as_api = as_api
- self.event_grouper = event_grouper
self.recoverer_fn = recoverer_fn
# keep track of how many recoverers there are
self.recoverers = []
@defer.inlineCallbacks
- def start_polling(self):
+ def send(self, service, events):
try:
- groups = self.event_grouper.drain_groups()
- for service in groups:
- txn = yield self.store.create_appservice_txn(
- service=service,
- events=groups[service]
- )
- service_is_up = yield self._is_service_up(service)
- if service_is_up:
- sent = yield txn.send(self.as_api)
- if sent:
- txn.complete(self.store)
- else:
- self._start_recoverer(service)
+ txn = yield self.store.create_appservice_txn(
+ service=service,
+ events=events
+ )
+ service_is_up = yield self._is_service_up(service)
+ if service_is_up:
+ sent = yield txn.send(self.as_api)
+ if sent:
+ txn.complete(self.store)
+ else:
+ self._start_recoverer(service)
except Exception as e:
logger.exception(e)
- self.clock.call_later(1, self.start_polling)
@defer.inlineCallbacks
def on_recovered(self, recoverer):
|