summary refs log tree commit diff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/appservice/scheduler.py61
-rw-r--r--tests/appservice/test_scheduler.py2
2 files changed, 31 insertions, 32 deletions
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 9afc8fd754..f130d4367d 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -48,9 +48,12 @@ UP & quit           +---------- YES                       SUCCESS
 This is all tied together by the AppServiceScheduler which DIs the required
 components.
 """
+from twisted.internet import defer
 
 from synapse.appservice import ApplicationServiceState
-from twisted.internet import defer
+from synapse.util.logcontext import preserve_fn
+from synapse.util.metrics import Measure
+
 import logging
 
 logger = logging.getLogger(__name__)
@@ -73,7 +76,7 @@ class ApplicationServiceScheduler(object):
         self.txn_ctrl = _TransactionController(
             self.clock, self.store, self.as_api, create_recoverer
         )
-        self.queuer = _ServiceQueuer(self.txn_ctrl)
+        self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock)
 
     @defer.inlineCallbacks
     def start(self):
@@ -94,38 +97,36 @@ class _ServiceQueuer(object):
     this schedules any other events in the queue to run.
     """
 
-    def __init__(self, txn_ctrl):
+    def __init__(self, txn_ctrl, clock):
         self.queued_events = {}  # dict of {service_id: [events]}
-        self.pending_requests = {}  # dict of {service_id: Deferred}
+        self.requests_in_flight = set()
         self.txn_ctrl = txn_ctrl
+        self.clock = clock
 
     def enqueue(self, service, event):
         # if this service isn't being sent something
-        if not self.pending_requests.get(service.id):
-            self._send_request(service, [event])
-        else:
-            # add to queue for this service
-            if service.id not in self.queued_events:
-                self.queued_events[service.id] = []
-            self.queued_events[service.id].append(event)
-
-    def _send_request(self, service, events):
-        # send request and add callbacks
-        d = self.txn_ctrl.send(service, events)
-        d.addBoth(self._on_request_finish)
-        d.addErrback(self._on_request_fail)
-        self.pending_requests[service.id] = d
-
-    def _on_request_finish(self, service):
-        self.pending_requests[service.id] = None
-        # if there are queued events, then send them.
-        if (service.id in self.queued_events
-                and len(self.queued_events[service.id]) > 0):
-            self._send_request(service, self.queued_events[service.id])
-            self.queued_events[service.id] = []
-
-    def _on_request_fail(self, err):
-        logger.error("AS request failed: %s", err)
+        self.queued_events.setdefault(service.id, []).append(event)
+        preserve_fn(self._send_request)(service)
+
+    @defer.inlineCallbacks
+    def _send_request(self, service):
+        if service.id in self.requests_in_flight:
+            return
+
+        with Measure(self.clock, "_ServiceQueuer._send_request"):
+            self.requests_in_flight.add(service.id)
+            try:
+                while True:
+                    events = self.queued_events.pop(service.id, [])
+                    if not events:
+                        return
+
+                    try:
+                        yield self.txn_ctrl.send(service, events)
+                    except:
+                        logger.exception("AS request failed")
+            finally:
+                self.requests_in_flight.discard(service.id)
 
 
 class _TransactionController(object):
@@ -155,8 +156,6 @@ class _TransactionController(object):
         except Exception as e:
             logger.exception(e)
             self._start_recoverer(service)
-        # request has finished
-        defer.returnValue(service)
 
     @defer.inlineCallbacks
     def on_recovered(self, recoverer):
diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py
index 631a229332..e5a902f734 100644
--- a/tests/appservice/test_scheduler.py
+++ b/tests/appservice/test_scheduler.py
@@ -193,7 +193,7 @@ class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase):
 
     def setUp(self):
         self.txn_ctrl = Mock()
-        self.queuer = _ServiceQueuer(self.txn_ctrl)
+        self.queuer = _ServiceQueuer(self.txn_ctrl, MockClock())
 
     def test_send_single_event_no_queue(self):
         # Expect the event to be sent immediately.