From e6e136decca12648933f974e4151fb936ad9e1fa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Aug 2019 18:04:46 +0100 Subject: Retry well known on fail. If we have recently seen a valid well-known for a domain we want to retry on (non-final) errors a few times, to handle temporary blips in networking/etc. --- .../federation/test_matrix_federation_agent.py | 79 +++++++++++++--------- 1 file changed, 46 insertions(+), 33 deletions(-) (limited to 'tests') diff --git a/tests/http/federation/test_matrix_federation_agent.py b/tests/http/federation/test_matrix_federation_agent.py index 2c568788b3..4d3f31d18c 100644 --- a/tests/http/federation/test_matrix_federation_agent.py +++ b/tests/http/federation/test_matrix_federation_agent.py @@ -73,8 +73,6 @@ class MatrixFederationAgentTests(TestCase): self.mock_resolver = Mock() - self.well_known_cache = TTLCache("test_cache", timer=self.reactor.seconds) - config_dict = default_config("test", parse=False) config_dict["federation_custom_ca_list"] = [get_test_ca_cert_file()] @@ -82,11 +80,21 @@ class MatrixFederationAgentTests(TestCase): config.parse_config_dict(config_dict, "", "") self.tls_factory = ClientTLSOptionsFactory(config) + + self.well_known_cache = TTLCache("test_cache", timer=self.reactor.seconds) + self.had_well_known_cache = TTLCache("test_cache", timer=self.reactor.seconds) + self.well_known_resolver = WellKnownResolver( + self.reactor, + Agent(self.reactor, contextFactory=self.tls_factory), + well_known_cache=self.well_known_cache, + had_well_known_cache=self.had_well_known_cache, + ) + self.agent = MatrixFederationAgent( reactor=self.reactor, tls_client_options_factory=self.tls_factory, _srv_resolver=self.mock_resolver, - _well_known_cache=self.well_known_cache, + _well_known_resolver=self.well_known_resolver, ) def _make_connection(self, client_factory, expected_sni): @@ -701,11 +709,18 @@ class MatrixFederationAgentTests(TestCase): config = default_config("test", parse=True) + # Build a new agent and WellKnownResolver with a different tls factory + tls_factory = ClientTLSOptionsFactory(config) agent = MatrixFederationAgent( reactor=self.reactor, - tls_client_options_factory=ClientTLSOptionsFactory(config), + tls_client_options_factory=tls_factory, _srv_resolver=self.mock_resolver, - _well_known_cache=self.well_known_cache, + _well_known_resolver=WellKnownResolver( + self.reactor, + Agent(self.reactor, contextFactory=tls_factory), + well_known_cache=self.well_known_cache, + had_well_known_cache=self.had_well_known_cache, + ), ) test_d = agent.request(b"GET", b"matrix://testserv/foo/bar") @@ -932,15 +947,9 @@ class MatrixFederationAgentTests(TestCase): self.successResultOf(test_d) def test_well_known_cache(self): - well_known_resolver = WellKnownResolver( - self.reactor, - Agent(self.reactor, contextFactory=self.tls_factory), - well_known_cache=self.well_known_cache, - ) - self.reactor.lookups["testserv"] = "1.2.3.4" - fetch_d = well_known_resolver.get_well_known(b"testserv") + fetch_d = self.well_known_resolver.get_well_known(b"testserv") # there should be an attempt to connect on port 443 for the .well-known clients = self.reactor.tcpClients @@ -963,7 +972,7 @@ class MatrixFederationAgentTests(TestCase): well_known_server.loseConnection() # repeat the request: it should hit the cache - fetch_d = well_known_resolver.get_well_known(b"testserv") + fetch_d = self.well_known_resolver.get_well_known(b"testserv") r = self.successResultOf(fetch_d) self.assertEqual(r.delegated_server, b"target-server") @@ -971,7 +980,7 @@ class MatrixFederationAgentTests(TestCase): self.reactor.pump((1000.0,)) # now it should connect again - fetch_d = well_known_resolver.get_well_known(b"testserv") + fetch_d = self.well_known_resolver.get_well_known(b"testserv") self.assertEqual(len(clients), 1) (host, port, client_factory, _timeout, _bindAddress) = clients.pop(0) @@ -992,15 +1001,9 @@ class MatrixFederationAgentTests(TestCase): it ignores transient errors. """ - well_known_resolver = WellKnownResolver( - self.reactor, - Agent(self.reactor, contextFactory=self.tls_factory), - well_known_cache=self.well_known_cache, - ) - self.reactor.lookups["testserv"] = "1.2.3.4" - fetch_d = well_known_resolver.get_well_known(b"testserv") + fetch_d = self.well_known_resolver.get_well_known(b"testserv") # there should be an attempt to connect on port 443 for the .well-known clients = self.reactor.tcpClients @@ -1026,27 +1029,37 @@ class MatrixFederationAgentTests(TestCase): # another lookup. self.reactor.pump((900.0,)) - fetch_d = well_known_resolver.get_well_known(b"testserv") - clients = self.reactor.tcpClients - (host, port, client_factory, _timeout, _bindAddress) = clients.pop(0) + fetch_d = self.well_known_resolver.get_well_known(b"testserv") - # fonx the connection attempt, this will be treated as a temporary - # failure. - client_factory.clientConnectionFailed(None, Exception("nope")) + # The resolver may retry a few times, so fonx all requests that come along + attempts = 0 + while self.reactor.tcpClients: + clients = self.reactor.tcpClients + (host, port, client_factory, _timeout, _bindAddress) = clients.pop(0) - # attemptdelay on the hostnameendpoint is 0.3, so takes that long before the - # .well-known request fails. - self.reactor.pump((0.4,)) + attempts += 1 + + # fonx the connection attempt, this will be treated as a temporary + # failure. + client_factory.clientConnectionFailed(None, Exception("nope")) + + # There's a few sleeps involved, so we have to pump the reactor a + # bit. + self.reactor.pump((1.0, 1.0)) + + # We expect to see more than one attempt as there was previously a valid + # well known. + self.assertGreater(attempts, 1) # Resolver should return cached value, despite the lookup failing. r = self.successResultOf(fetch_d) self.assertEqual(r.delegated_server, b"target-server") - # Expire the cache and repeat the request - self.reactor.pump((100.0,)) + # Expire both caches and repeat the request + self.reactor.pump((10000.0,)) # Repated the request, this time it should fail if the lookup fails. - fetch_d = well_known_resolver.get_well_known(b"testserv") + fetch_d = self.well_known_resolver.get_well_known(b"testserv") clients = self.reactor.tcpClients (host, port, client_factory, _timeout, _bindAddress) = clients.pop(0) -- cgit 1.5.1 From 1dec31560e5712306e368a0adc6d9f84f924bdc9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 20 Aug 2019 11:46:00 +0100 Subject: Change jitter to be a factor rather than absolute value --- synapse/http/federation/well_known_resolver.py | 23 +++++++++++----------- .../federation/test_matrix_federation_agent.py | 4 ++-- 2 files changed, 14 insertions(+), 13 deletions(-) (limited to 'tests') diff --git a/synapse/http/federation/well_known_resolver.py b/synapse/http/federation/well_known_resolver.py index c846003886..5e9b0befb0 100644 --- a/synapse/http/federation/well_known_resolver.py +++ b/synapse/http/federation/well_known_resolver.py @@ -32,8 +32,8 @@ from synapse.util.metrics import Measure # period to cache .well-known results for by default WELL_KNOWN_DEFAULT_CACHE_PERIOD = 24 * 3600 -# jitter to add to the .well-known default cache ttl -WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER = 10 * 60 +# jitter factor to add to the .well-known default cache ttls +WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER = 0.1 # period to cache failure to fetch .well-known for WELL_KNOWN_INVALID_CACHE_PERIOD = 1 * 3600 @@ -133,16 +133,14 @@ class WellKnownResolver(object): # We have recently seen a valid well-known record for this # server, so we cache the lack of well-known for a shorter time. cache_period = WELL_KNOWN_DOWN_CACHE_PERIOD - cache_period += random.uniform( - 0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER - ) else: - # add some randomness to the TTL to avoid a stampeding herd every hour - # after startup cache_period = WELL_KNOWN_INVALID_CACHE_PERIOD - cache_period += random.uniform( - 0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER - ) + + # add some randomness to the TTL to avoid a stampeding herd + cache_period *= random.uniform( + 1 - WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER, + 1 + WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER, + ) if cache_period > 0: self._well_known_cache.set(server_name, result, cache_period) @@ -194,7 +192,10 @@ class WellKnownResolver(object): cache_period = WELL_KNOWN_DEFAULT_CACHE_PERIOD # add some randomness to the TTL to avoid a stampeding herd every 24 hours # after startup - cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER) + cache_period *= random.uniform( + 1 - WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER, + 1 + WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER, + ) else: cache_period = min(cache_period, WELL_KNOWN_MAX_CACHE_PERIOD) cache_period = max(cache_period, WELL_KNOWN_MIN_CACHE_PERIOD) diff --git a/tests/http/federation/test_matrix_federation_agent.py b/tests/http/federation/test_matrix_federation_agent.py index 4d3f31d18c..c55aad8e11 100644 --- a/tests/http/federation/test_matrix_federation_agent.py +++ b/tests/http/federation/test_matrix_federation_agent.py @@ -551,7 +551,7 @@ class MatrixFederationAgentTests(TestCase): self.assertEqual(self.well_known_cache[b"testserv"], b"target-server") # check the cache expires - self.reactor.pump((25 * 3600,)) + self.reactor.pump((48 * 3600,)) self.well_known_cache.expire() self.assertNotIn(b"testserv", self.well_known_cache) @@ -639,7 +639,7 @@ class MatrixFederationAgentTests(TestCase): self.assertEqual(self.well_known_cache[b"testserv"], b"target-server") # check the cache expires - self.reactor.pump((25 * 3600,)) + self.reactor.pump((48 * 3600,)) self.well_known_cache.expire() self.assertNotIn(b"testserv", self.well_known_cache) -- cgit 1.5.1 From 501994582899ad9d790029b3d7c48ba32f5720a9 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 20 Aug 2019 11:20:10 +0100 Subject: Refactor the Appservice scheduler code Get rid of the labyrinthine `recoverer_fn` code, and clean up the startup code (it seemed to be previously inexplicably split between `ApplicationServiceScheduler.start` and `_Recoverer.start`). Add some docstrings too. --- changelog.d/5886.misc | 1 + synapse/appservice/scheduler.py | 110 ++++++++++++++++++++++--------------- tests/appservice/test_scheduler.py | 6 +- 3 files changed, 68 insertions(+), 49 deletions(-) create mode 100644 changelog.d/5886.misc (limited to 'tests') diff --git a/changelog.d/5886.misc b/changelog.d/5886.misc new file mode 100644 index 0000000000..22adba3d85 --- /dev/null +++ b/changelog.d/5886.misc @@ -0,0 +1 @@ +Refactor the Appservice scheduler code. diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 42a350bff8..03a14402c5 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -70,35 +70,37 @@ class ApplicationServiceScheduler(object): self.store = hs.get_datastore() self.as_api = hs.get_application_service_api() - def create_recoverer(service, callback): - return _Recoverer(self.clock, self.store, self.as_api, service, callback) - - self.txn_ctrl = _TransactionController( - self.clock, self.store, self.as_api, create_recoverer - ) + self.txn_ctrl = _TransactionController(self.clock, self.store, self.as_api) self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock) @defer.inlineCallbacks def start(self): logger.info("Starting appservice scheduler") + # check for any DOWN ASes and start recoverers for them. - recoverers = yield _Recoverer.start( - self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered + services = yield self.store.get_appservices_by_state( + ApplicationServiceState.DOWN ) - self.txn_ctrl.add_recoverers(recoverers) + + for service in services: + self.txn_ctrl.start_recoverer(service) def submit_event_for_as(self, service, event): self.queuer.enqueue(service, event) class _ServiceQueuer(object): - """Queues events for the same application service together, sending - transactions as soon as possible. Once a transaction is sent successfully, - this schedules any other events in the queue to run. + """Queue of events waiting to be sent to appservices. + + Groups events into transactions per-appservice, and sends them on to the + TransactionController. Makes sure that we only have one transaction in flight per + appservice at a given time. """ def __init__(self, txn_ctrl, clock): self.queued_events = {} # dict of {service_id: [events]} + + # the appservices which currently have a transaction in flight self.requests_in_flight = set() self.txn_ctrl = txn_ctrl self.clock = clock @@ -136,13 +138,29 @@ class _ServiceQueuer(object): class _TransactionController(object): - def __init__(self, clock, store, as_api, recoverer_fn): + """Transaction manager. + + Builds AppServiceTransactions and runs their lifecycle. Also starts a Recoverer + if a transaction fails. + + (Note we have only have one of these in the homeserver.) + + Args: + clock (synapse.util.Clock): + store (synapse.storage.DataStore): + as_api (synapse.appservice.api.ApplicationServiceApi): + """ + + def __init__(self, clock, store, as_api): self.clock = clock self.store = store self.as_api = as_api - self.recoverer_fn = recoverer_fn - # keep track of how many recoverers there are - self.recoverers = [] + + # map from service id to recoverer instance + self.recoverers = {} + + # for UTs + self.RECOVERER_CLASS = _Recoverer @defer.inlineCallbacks def send(self, service, events): @@ -154,42 +172,45 @@ class _TransactionController(object): if sent: yield txn.complete(self.store) else: - run_in_background(self._start_recoverer, service) + run_in_background(self._on_txn_fail, service) except Exception: logger.exception("Error creating appservice transaction") - run_in_background(self._start_recoverer, service) + run_in_background(self._on_txn_fail, service) @defer.inlineCallbacks def on_recovered(self, recoverer): - self.recoverers.remove(recoverer) logger.info( "Successfully recovered application service AS ID %s", recoverer.service.id ) + self.recoverers.pop(recoverer.service.id) logger.info("Remaining active recoverers: %s", len(self.recoverers)) yield self.store.set_appservice_state( recoverer.service, ApplicationServiceState.UP ) - def add_recoverers(self, recoverers): - for r in recoverers: - self.recoverers.append(r) - if len(recoverers) > 0: - logger.info("New active recoverers: %s", len(self.recoverers)) - @defer.inlineCallbacks - def _start_recoverer(self, service): + def _on_txn_fail(self, service): try: yield self.store.set_appservice_state(service, ApplicationServiceState.DOWN) - logger.info( - "Application service falling behind. Starting recoverer. AS ID %s", - service.id, - ) - recoverer = self.recoverer_fn(service, self.on_recovered) - self.add_recoverers([recoverer]) - recoverer.recover() + self.start_recoverer(service) except Exception: logger.exception("Error starting AS recoverer") + def start_recoverer(self, service): + """Start a Recoverer for the given service + + Args: + service (synapse.appservice.ApplicationService): + """ + logger.info("Starting recoverer for AS ID %s", service.id) + assert service.id not in self.recoverers + recoverer = self.RECOVERER_CLASS( + self.clock, self.store, self.as_api, service, self.on_recovered + ) + self.recoverers[service.id] = recoverer + recoverer.recover() + logger.info("Now %i active recoverers", len(self.recoverers)) + @defer.inlineCallbacks def _is_service_up(self, service): state = yield self.store.get_appservice_state(service) @@ -197,18 +218,17 @@ class _TransactionController(object): class _Recoverer(object): - @staticmethod - @defer.inlineCallbacks - def start(clock, store, as_api, callback): - services = yield store.get_appservices_by_state(ApplicationServiceState.DOWN) - recoverers = [_Recoverer(clock, store, as_api, s, callback) for s in services] - for r in recoverers: - logger.info( - "Starting recoverer for AS ID %s which was marked as " "DOWN", - r.service.id, - ) - r.recover() - return recoverers + """Manages retries and backoff for a DOWN appservice. + + We have one of these for each appservice which is currently considered DOWN. + + Args: + clock (synapse.util.Clock): + store (synapse.storage.DataStore): + as_api (synapse.appservice.api.ApplicationServiceApi): + service (synapse.appservice.ApplicationService): the service we are managing + callback (callable[_Recoverer]): called once the service recovers. + """ def __init__(self, clock, store, as_api, service, callback): self.clock = clock diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index 04b8c2c07c..52f89d3f83 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -37,11 +37,9 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): self.recoverer = Mock() self.recoverer_fn = Mock(return_value=self.recoverer) self.txnctrl = _TransactionController( - clock=self.clock, - store=self.store, - as_api=self.as_api, - recoverer_fn=self.recoverer_fn, + clock=self.clock, store=self.store, as_api=self.as_api ) + self.txnctrl.RECOVERER_CLASS = self.recoverer_fn def test_single_service_up_txn_sent(self): # Test: The AS is up and the txn is successfully sent. -- cgit 1.5.1