summary refs log tree commit diff
path: root/synapse/appservice/scheduler.py
diff options
context:
space:
mode:
authorKegan Dougal <kegan@matrix.org>2015-03-16 14:03:16 +0000
committerKegan Dougal <kegan@matrix.org>2015-03-16 14:03:16 +0000
commitd04fa1f7121d996e05bd4def14951d89eb47d1ab (patch)
tree44f4ad91cd595e0e6ca0a0f0e816d91df96f6c43 /synapse/appservice/scheduler.py
parentReplace EventGrouper for ServiceQueuer to move to push-based txns. Fix tests ... (diff)
downloadsynapse-d04fa1f7121d996e05bd4def14951d89eb47d1ab.tar.xz
Implement ServiceQueuer with tests.
Diffstat (limited to 'synapse/appservice/scheduler.py')
-rw-r--r--synapse/appservice/scheduler.py46
1 files changed, 30 insertions, 16 deletions
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 54c42d1b94..3cedd479a2 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -83,7 +83,6 @@ class AppServiceScheduler(object):
             self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered
         )
         self.txn_ctrl.add_recoverers(recoverers)
-        self.txn_ctrl.start_polling()
 
     def submit_event_for_as(self, service, event):
         self.queuer.enqueue(service, event)
@@ -96,24 +95,37 @@ class _ServiceQueuer(object):
     """
 
     def __init__(self, txn_ctrl):
-        self.groups = {}  # dict of {service: [events]}
+        self.queued_events = {}  # dict of {service_id: [events]}
+        self.pending_requests = {}  # dict of {service_id: Deferred}
         self.txn_ctrl = txn_ctrl
 
     def enqueue(self, service, event):
-        # if nothing in queue for this service, send event immediately and add
-        # callbacks.
-        self.txn_ctrl.send(service, [event])
-
-        # else add to queue for this service
-
-        if service not in self.groups:
-            self.groups[service] = []
-        self.groups[service].append(event)
-
-    def drain_groups(self):
-        groups = self.groups
-        self.groups = {}
-        return groups
+        # if this service isn't being sent something
+        if not self.pending_requests.get(service.id):
+            self._send_request(service, [event])
+        else:
+            # add to queue for this service
+            if service.id not in self.queued_events:
+                self.queued_events[service.id] = []
+            self.queued_events[service.id].append(event)
+
+    def _send_request(self, service, events):
+        # send request and add callbacks
+        d = self.txn_ctrl.send(service, events)
+        d.addCallback(self._on_request_finish)
+        d.addErrback(self._on_request_fail)
+        self.pending_requests[service.id] = d
+
+    def _on_request_finish(self, service):
+        self.pending_requests[service.id] = None
+        # if there are queued events, then send them.
+        if (service.id in self.queued_events
+                and len(self.queued_events[service.id]) > 0):
+            self._send_request(service, self.queued_events[service.id])
+            self.queued_events[service.id] = []
+
+    def _on_request_fail(self, err):
+        logger.error("AS request failed: %s", err)
 
 
 class _TransactionController(object):
@@ -142,6 +154,8 @@ class _TransactionController(object):
                     self._start_recoverer(service)
         except Exception as e:
             logger.exception(e)
+        # request has finished
+        defer.returnValue(service)
 
     @defer.inlineCallbacks
     def on_recovered(self, recoverer):