summary refs log tree commit diff
path: root/synapse/appservice
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/appservice')
-rw-r--r--synapse/appservice/scheduler.py61
1 files changed, 33 insertions, 28 deletions
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 59a870e271..54c42d1b94 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -16,11 +16,11 @@
 This module controls the reliability for application service transactions.
 
 The nominal flow through this module looks like:
-             _________
----ASa[e]-->|  Event  |
-----ASb[e]->| Grouper |<-poll 1/s--+
---ASa[e]--->|_________|            | ASa[e,e]  ASb[e]
-                                   V
+              __________
+1---ASa[e]-->|  Service |--> Queue ASa[f]
+2----ASb[e]->|  Queuer  |
+3--ASa[f]--->|__________|-----------+ ASa[e], ASb[e]
+                                    V
       -````````-            +------------+
       |````````|<--StoreTxn-|Transaction |
       |Database|            | Controller |---> SEND TO AS
@@ -66,14 +66,14 @@ class AppServiceScheduler(object):
         self.clock = clock
         self.store = store
         self.as_api = as_api
-        self.event_grouper = _EventGrouper()
 
         def create_recoverer(service, callback):
             return _Recoverer(clock, store, as_api, service, callback)
 
         self.txn_ctrl = _TransactionController(
-            clock, store, as_api, self.event_grouper, create_recoverer
+            clock, store, as_api, create_recoverer
         )
+        self.queuer = _ServiceQueuer(self.txn_ctrl)
 
     @defer.inlineCallbacks
     def start(self):
@@ -86,17 +86,26 @@ class AppServiceScheduler(object):
         self.txn_ctrl.start_polling()
 
     def submit_event_for_as(self, service, event):
-        self.event_grouper.enqueue(service, event)
+        self.queuer.enqueue(service, event)
 
 
-class _EventGrouper(object):
-    """Groups events for the same application service together.
+class _ServiceQueuer(object):
+    """Queues events for the same application service together, sending
+    transactions as soon as possible. Once a transaction is sent successfully,
+    this schedules any other events in the queue to run.
     """
 
-    def __init__(self):
+    def __init__(self, txn_ctrl):
         self.groups = {}  # dict of {service: [events]}
+        self.txn_ctrl = txn_ctrl
 
     def enqueue(self, service, event):
+        # if nothing in queue for this service, send event immediately and add
+        # callbacks.
+        self.txn_ctrl.send(service, [event])
+
+        # else add to queue for this service
+
         if service not in self.groups:
             self.groups[service] = []
         self.groups[service].append(event)
@@ -109,34 +118,30 @@ class _EventGrouper(object):
 
 class _TransactionController(object):
 
-    def __init__(self, clock, store, as_api, event_grouper, recoverer_fn):
+    def __init__(self, clock, store, as_api, recoverer_fn):
         self.clock = clock
         self.store = store
         self.as_api = as_api
-        self.event_grouper = event_grouper
         self.recoverer_fn = recoverer_fn
         # keep track of how many recoverers there are
         self.recoverers = []
 
     @defer.inlineCallbacks
-    def start_polling(self):
+    def send(self, service, events):
         try:
-            groups = self.event_grouper.drain_groups()
-            for service in groups:
-                txn = yield self.store.create_appservice_txn(
-                    service=service,
-                    events=groups[service]
-                )
-                service_is_up = yield self._is_service_up(service)
-                if service_is_up:
-                    sent = yield txn.send(self.as_api)
-                    if sent:
-                        txn.complete(self.store)
-                    else:
-                        self._start_recoverer(service)
+            txn = yield self.store.create_appservice_txn(
+                service=service,
+                events=events
+            )
+            service_is_up = yield self._is_service_up(service)
+            if service_is_up:
+                sent = yield txn.send(self.as_api)
+                if sent:
+                    txn.complete(self.store)
+                else:
+                    self._start_recoverer(service)
         except Exception as e:
             logger.exception(e)
-        self.clock.call_later(1, self.start_polling)
 
     @defer.inlineCallbacks
     def on_recovered(self, recoverer):