From 2b8f1a956c6a1d767a3b60e84e7d0afe5857fb0d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Feb 2015 17:20:56 +0000 Subject: Add per server retry limiting. Factor out the pre destination retry logic from TransactionQueue so it can be reused in both get_pdu and crypto.keyring --- synapse/util/retryutils.py | 108 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 108 insertions(+) create mode 100644 synapse/util/retryutils.py (limited to 'synapse/util') diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py new file mode 100644 index 0000000000..bba524ebd8 --- /dev/null +++ b/synapse/util/retryutils.py @@ -0,0 +1,108 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +import logging + + +logger = logging.getLogger(__name__) + + +class NotRetryingDestination(Exception): + def __init__(self, retry_last_ts, retry_interval, destination): + msg = "Not retrying server %s." % (destination,) + super(NotRetryingDestination, self).__init__(msg) + + self.retry_last_ts = retry_last_ts + self.retry_interval = retry_interval + self.destination = destination + + +@defer.inlineCallbacks +def get_retry_limiter(destination, clock, store, **kwargs): + retry_last_ts, retry_interval = (0, 0) + + retry_timings = yield store.get_destination_retry_timings( + destination + ) + + if retry_timings: + retry_last_ts, retry_interval = ( + retry_timings.retry_last_ts, retry_timings.retry_interval + ) + + now = int(clock.time_msec()) + + if retry_last_ts + retry_interval > now: + raise NotRetryingDestination( + retry_last_ts=retry_last_ts, + retry_interval=retry_interval, + destination=destination, + ) + + defer.returnValue( + RetryDestinationLimiter( + destination, + clock, + store, + retry_interval, + **kwargs + ) + ) + + +class RetryDestinationLimiter(object): + def __init__(self, destination, clock, store, retry_interval, + min_retry_interval=20000, max_retry_interval=60 * 60 * 1000, + multiplier_retry_interval=2): + self.clock = clock + self.store = store + self.destination = destination + + self.retry_interval = retry_interval + self.min_retry_interval = min_retry_interval + self.max_retry_interval = max_retry_interval + self.multiplier_retry_interval = multiplier_retry_interval + + def __enter__(self): + pass + + def __exit__(self, exc_type, exc_val, exc_tb): + def err(self, failure): + logger.exception( + "Failed to store set_destination_retry_timings", + failure.value + ) + + if exc_type is None and exc_val is None and exc_tb is None: + # We connected successfully. + retry_last_ts = 0 + self.retry_interval = 0 + else: + # We couldn't connect. + if self.retry_interval: + self.retry_interval *= self.multiplier_retry_interval + + if self.retry_interval >= self.max_retry_interval: + self.retry_interval = self.max_retry_interval + else: + self.retry_interval = self.min_retry_interval + + retry_last_ts = int(self._clock.time_msec()), + + self.store.set_destination_retry_timings( + self.destination, retry_last_ts, self.retry_interval + ).addErrback(err) -- cgit 1.4.1 From f91263b1e03a6e30480af1ece357c27439a8c6b3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Feb 2015 17:37:51 +0000 Subject: Remove spurious self --- synapse/util/retryutils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/util') diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index bba524ebd8..9ab119f29e 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -81,7 +81,7 @@ class RetryDestinationLimiter(object): pass def __exit__(self, exc_type, exc_val, exc_tb): - def err(self, failure): + def err(failure): logger.exception( "Failed to store set_destination_retry_timings", failure.value -- cgit 1.4.1 From c8436b38a08c29fd7027e040564678ec8a635a14 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Feb 2015 17:38:38 +0000 Subject: Only update destination_retry_timings if we have succeeded when retrying --- synapse/util/retryutils.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'synapse/util') diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 9ab119f29e..888b7ef2e9 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -89,6 +89,9 @@ class RetryDestinationLimiter(object): if exc_type is None and exc_val is None and exc_tb is None: # We connected successfully. + if not self.retry_interval: + return + retry_last_ts = 0 self.retry_interval = 0 else: -- cgit 1.4.1 From 9371019133bf16cec163d58fe69aa701c8ca5305 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Feb 2015 18:13:34 +0000 Subject: Try to only back off if we think we failed to connect to the remote --- synapse/crypto/keyring.py | 108 ++++++++++++++++---------------- synapse/federation/transaction_queue.py | 66 +++++++++---------- synapse/util/retryutils.py | 10 ++- 3 files changed, 95 insertions(+), 89 deletions(-) (limited to 'synapse/util') diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index ea00c830c0..828aced44a 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -101,63 +101,63 @@ class Keyring(object): server_name, self.hs.tls_context_factory ) - # Check the response. + # Check the response. - x509_certificate_bytes = crypto.dump_certificate( - crypto.FILETYPE_ASN1, tls_certificate - ) + x509_certificate_bytes = crypto.dump_certificate( + crypto.FILETYPE_ASN1, tls_certificate + ) - if ("signatures" not in response - or server_name not in response["signatures"]): - raise ValueError("Key response not signed by remote server") - - if "tls_certificate" not in response: - raise ValueError("Key response missing TLS certificate") - - tls_certificate_b64 = response["tls_certificate"] - - if encode_base64(x509_certificate_bytes) != tls_certificate_b64: - raise ValueError("TLS certificate doesn't match") - - verify_keys = {} - for key_id, key_base64 in response["verify_keys"].items(): - if is_signing_algorithm_supported(key_id): - key_bytes = decode_base64(key_base64) - verify_key = decode_verify_key_bytes(key_id, key_bytes) - verify_keys[key_id] = verify_key - - for key_id in response["signatures"][server_name]: - if key_id not in response["verify_keys"]: - raise ValueError( - "Key response must include verification keys for all" - " signatures" - ) - if key_id in verify_keys: - verify_signed_json( - response, - server_name, - verify_keys[key_id] - ) - - # Cache the result in the datastore. - - time_now_ms = self.clock.time_msec() - - yield self.store.store_server_certificate( - server_name, - server_name, - time_now_ms, - tls_certificate, - ) + if ("signatures" not in response + or server_name not in response["signatures"]): + raise ValueError("Key response not signed by remote server") + + if "tls_certificate" not in response: + raise ValueError("Key response missing TLS certificate") - for key_id, key in verify_keys.items(): - yield self.store.store_server_verify_key( - server_name, server_name, time_now_ms, key + tls_certificate_b64 = response["tls_certificate"] + + if encode_base64(x509_certificate_bytes) != tls_certificate_b64: + raise ValueError("TLS certificate doesn't match") + + verify_keys = {} + for key_id, key_base64 in response["verify_keys"].items(): + if is_signing_algorithm_supported(key_id): + key_bytes = decode_base64(key_base64) + verify_key = decode_verify_key_bytes(key_id, key_bytes) + verify_keys[key_id] = verify_key + + for key_id in response["signatures"][server_name]: + if key_id not in response["verify_keys"]: + raise ValueError( + "Key response must include verification keys for all" + " signatures" + ) + if key_id in verify_keys: + verify_signed_json( + response, + server_name, + verify_keys[key_id] ) - for key_id in key_ids: - if key_id in verify_keys: - defer.returnValue(verify_keys[key_id]) - return + # Cache the result in the datastore. + + time_now_ms = self.clock.time_msec() + + yield self.store.store_server_certificate( + server_name, + server_name, + time_now_ms, + tls_certificate, + ) + + for key_id, key in verify_keys.items(): + yield self.store.store_server_verify_key( + server_name, server_name, time_now_ms, key + ) + + for key_id in key_ids: + if key_id in verify_keys: + defer.returnValue(verify_keys[key_id]) + return - raise ValueError("No verification key found for given key ids") + raise ValueError("No verification key found for given key ids") diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 7d02afe163..dc0f30cb64 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -167,15 +167,6 @@ class TransactionQueue(object): logger.info("TX [%s] Nothing to send", destination) return - logger.debug( - "TX [%s] Attempting new transaction" - " (pdus: %d, edus: %d, failures: %d)", - destination, - len(pending_pdus), - len(pending_edus), - len(pending_failures) - ) - # Sort based on the order field pending_pdus.sort(key=lambda t: t[2]) @@ -194,32 +185,41 @@ class TransactionQueue(object): self.store, ) - with limiter: - self.pending_transactions[destination] = 1 + logger.debug( + "TX [%s] Attempting new transaction" + " (pdus: %d, edus: %d, failures: %d)", + destination, + len(pending_pdus), + len(pending_edus), + len(pending_failures) + ) + + self.pending_transactions[destination] = 1 - logger.debug("TX [%s] Persisting transaction...", destination) + logger.debug("TX [%s] Persisting transaction...", destination) - transaction = Transaction.create_new( - origin_server_ts=int(self._clock.time_msec()), - transaction_id=str(self._next_txn_id), - origin=self.server_name, - destination=destination, - pdus=pdus, - edus=edus, - pdu_failures=failures, - ) + transaction = Transaction.create_new( + origin_server_ts=int(self._clock.time_msec()), + transaction_id=str(self._next_txn_id), + origin=self.server_name, + destination=destination, + pdus=pdus, + edus=edus, + pdu_failures=failures, + ) - self._next_txn_id += 1 + self._next_txn_id += 1 - yield self.transaction_actions.prepare_to_send(transaction) + yield self.transaction_actions.prepare_to_send(transaction) - logger.debug("TX [%s] Persisted transaction", destination) - logger.info( - "TX [%s] Sending transaction [%s]", - destination, - transaction.transaction_id, - ) + logger.debug("TX [%s] Persisted transaction", destination) + logger.info( + "TX [%s] Sending transaction [%s]", + destination, + transaction.transaction_id, + ) + with limiter: # Actually send the transaction # FIXME (erikj): This is a bit of a hack to make the Pdu age @@ -249,11 +249,11 @@ class TransactionQueue(object): logger.debug("TX [%s] Sent transaction", destination) logger.debug("TX [%s] Marking as delivered...", destination) - yield self.transaction_actions.delivered( - transaction, code, response - ) + yield self.transaction_actions.delivered( + transaction, code, response + ) - logger.debug("TX [%s] Marked as delivered", destination) + logger.debug("TX [%s] Marked as delivered", destination) logger.debug("TX [%s] Yielding to callbacks...", destination) diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 888b7ef2e9..d190100c8c 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -15,6 +15,8 @@ from twisted.internet import defer +from synapse.api.errors import CodeMessageException + import logging @@ -67,7 +69,7 @@ def get_retry_limiter(destination, clock, store, **kwargs): class RetryDestinationLimiter(object): def __init__(self, destination, clock, store, retry_interval, min_retry_interval=20000, max_retry_interval=60 * 60 * 1000, - multiplier_retry_interval=2): + multiplier_retry_interval=2,): self.clock = clock self.store = store self.destination = destination @@ -87,7 +89,11 @@ class RetryDestinationLimiter(object): failure.value ) - if exc_type is None and exc_val is None and exc_tb is None: + valid_err_code = False + if exc_type is CodeMessageException: + valid_err_code = 0 <= exc_val.code < 500 + + if exc_type is None or valid_err_code: # We connected successfully. if not self.retry_interval: return -- cgit 1.4.1 From d77912ff4483fa63308eeefb70da93b8c327b740 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 18 Feb 2015 10:09:54 +0000 Subject: Docs. --- synapse/util/retryutils.py | 34 +++++++++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) (limited to 'synapse/util') diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index d190100c8c..08285e811d 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -35,6 +35,23 @@ class NotRetryingDestination(Exception): @defer.inlineCallbacks def get_retry_limiter(destination, clock, store, **kwargs): + """For a given destination check if we have previously failed to + send a request there and are waiting before retrying the destination. + If we are not ready to retry the destination, this will raise a + NotRetryingDestination exception. Otherwise, will return a Context Manager + that will mark the destination as down if an exception is thrown (excluding + CodeMessageException with code < 500) + + Example usage: + + try: + limiter = yield get_retry_limiter(destination, clock, store) + with limiter: + response = yield do_request() + except NotRetryingDestination: + # We aren't ready to retry that destination. + raise + """ retry_last_ts, retry_interval = (0, 0) retry_timings = yield store.get_destination_retry_timings( @@ -68,8 +85,23 @@ def get_retry_limiter(destination, clock, store, **kwargs): class RetryDestinationLimiter(object): def __init__(self, destination, clock, store, retry_interval, - min_retry_interval=20000, max_retry_interval=60 * 60 * 1000, + min_retry_interval=5000, max_retry_interval=60 * 60 * 1000, multiplier_retry_interval=2,): + """ + Args: + destination (str) + clock (Clock) + store (DataStore) + retry_interval (int): The next retry interval taken from the + database in milliseconds, or zero if the last request was + successful. + min_retry_interval (int): The minimum retry interval to use after + a failed request, in milliseconds. + max_retry_interval (int): The maximum retry interval to use after + a failed request, in milliseconds. + multiplier_retry_interval (int): The multiplier to use to increase + the retry interval after a failed request. + """ self.clock = clock self.store = store self.destination = destination -- cgit 1.4.1 From 4fd176a41db9f3b9073db172e20c8ede40b5f5f4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 18 Feb 2015 10:11:24 +0000 Subject: More docs --- synapse/util/retryutils.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'synapse/util') diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 08285e811d..366f7c48a7 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -87,7 +87,11 @@ class RetryDestinationLimiter(object): def __init__(self, destination, clock, store, retry_interval, min_retry_interval=5000, max_retry_interval=60 * 60 * 1000, multiplier_retry_interval=2,): - """ + """Marks the destination as "down" if an exception is thrown in the + context, except for CodeMessageException with code < 500. + + If no exception is raised, marks the destination as "up". + Args: destination (str) clock (Clock) -- cgit 1.4.1