diff options
author | Andrew Morgan <andrew@amorgan.xyz> | 2019-02-26 14:23:40 +0000 |
---|---|---|
committer | Andrew Morgan <andrew@amorgan.xyz> | 2019-02-26 14:23:40 +0000 |
commit | 802884d4ee06ca8e42f46f64e6da7c188d43dc69 (patch) | |
tree | 6767e6e142d75e5500092a829d488583fcedef51 /synapse/appservice/scheduler.py | |
parent | Add changelog (diff) | |
parent | Merge pull request #4745 from matrix-org/revert-4736-anoa/public_rooms_federate (diff) | |
download | synapse-802884d4ee06ca8e42f46f64e6da7c188d43dc69.tar.xz |
Merge branch 'develop' of github.com:matrix-org/synapse into anoa/public_rooms_federate_develop
Diffstat (limited to 'synapse/appservice/scheduler.py')
-rw-r--r-- | synapse/appservice/scheduler.py | 36 |
1 files changed, 24 insertions, 12 deletions
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 2430814796..685f15c061 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -53,8 +53,8 @@ import logging from twisted.internet import defer from synapse.appservice import ApplicationServiceState +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.logcontext import run_in_background -from synapse.util.metrics import Measure logger = logging.getLogger(__name__) @@ -104,27 +104,34 @@ class _ServiceQueuer(object): self.clock = clock def enqueue(self, service, event): - # if this service isn't being sent something self.queued_events.setdefault(service.id, []).append(event) - run_in_background(self._send_request, service) - @defer.inlineCallbacks - def _send_request(self, service): + # start a sender for this appservice if we don't already have one + if service.id in self.requests_in_flight: return + run_as_background_process( + "as-sender-%s" % (service.id, ), + self._send_request, service, + ) + + @defer.inlineCallbacks + def _send_request(self, service): + # sanity-check: we shouldn't get here if this service already has a sender + # running. + assert(service.id not in self.requests_in_flight) + self.requests_in_flight.add(service.id) try: while True: events = self.queued_events.pop(service.id, []) if not events: return - - with Measure(self.clock, "servicequeuer.send"): - try: - yield self.txn_ctrl.send(service, events) - except Exception: - logger.exception("AS request failed") + try: + yield self.txn_ctrl.send(service, events) + except Exception: + logger.exception("AS request failed") finally: self.requests_in_flight.discard(service.id) @@ -223,7 +230,12 @@ class _Recoverer(object): self.backoff_counter = 1 def recover(self): - self.clock.call_later((2 ** self.backoff_counter), self.retry) + def _retry(): + run_as_background_process( + "as-recoverer-%s" % (self.service.id,), + self.retry, + ) + self.clock.call_later((2 ** self.backoff_counter), _retry) def _backoff(self): # cap the backoff to be around 8.5min => (2^9) = 512 secs |