summary refs log tree commit diff
path: root/synapse/appservice/scheduler.py
diff options
context:
space:
mode:
authorAndrew Morgan <andrew@amorgan.xyz>2019-02-26 14:23:40 +0000
committerAndrew Morgan <andrew@amorgan.xyz>2019-02-26 14:23:40 +0000
commit802884d4ee06ca8e42f46f64e6da7c188d43dc69 (patch)
tree6767e6e142d75e5500092a829d488583fcedef51 /synapse/appservice/scheduler.py
parentAdd changelog (diff)
parentMerge pull request #4745 from matrix-org/revert-4736-anoa/public_rooms_federate (diff)
downloadsynapse-802884d4ee06ca8e42f46f64e6da7c188d43dc69.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into anoa/public_rooms_federate_develop
Diffstat (limited to 'synapse/appservice/scheduler.py')
-rw-r--r--synapse/appservice/scheduler.py36
1 files changed, 24 insertions, 12 deletions
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 2430814796..685f15c061 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -53,8 +53,8 @@ import logging
 from twisted.internet import defer
 
 from synapse.appservice import ApplicationServiceState
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.util.logcontext import run_in_background
-from synapse.util.metrics import Measure
 
 logger = logging.getLogger(__name__)
 
@@ -104,27 +104,34 @@ class _ServiceQueuer(object):
         self.clock = clock
 
     def enqueue(self, service, event):
-        # if this service isn't being sent something
         self.queued_events.setdefault(service.id, []).append(event)
-        run_in_background(self._send_request, service)
 
-    @defer.inlineCallbacks
-    def _send_request(self, service):
+        # start a sender for this appservice if we don't already have one
+
         if service.id in self.requests_in_flight:
             return
 
+        run_as_background_process(
+            "as-sender-%s" % (service.id, ),
+            self._send_request, service,
+        )
+
+    @defer.inlineCallbacks
+    def _send_request(self, service):
+        # sanity-check: we shouldn't get here if this service already has a sender
+        # running.
+        assert(service.id not in self.requests_in_flight)
+
         self.requests_in_flight.add(service.id)
         try:
             while True:
                 events = self.queued_events.pop(service.id, [])
                 if not events:
                     return
-
-                with Measure(self.clock, "servicequeuer.send"):
-                    try:
-                        yield self.txn_ctrl.send(service, events)
-                    except Exception:
-                        logger.exception("AS request failed")
+                try:
+                    yield self.txn_ctrl.send(service, events)
+                except Exception:
+                    logger.exception("AS request failed")
         finally:
             self.requests_in_flight.discard(service.id)
 
@@ -223,7 +230,12 @@ class _Recoverer(object):
         self.backoff_counter = 1
 
     def recover(self):
-        self.clock.call_later((2 ** self.backoff_counter), self.retry)
+        def _retry():
+            run_as_background_process(
+                "as-recoverer-%s" % (self.service.id,),
+                self.retry,
+            )
+        self.clock.call_later((2 ** self.backoff_counter), _retry)
 
     def _backoff(self):
         # cap the backoff to be around 8.5min => (2^9) = 512 secs