diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
new file mode 100644
index 0000000000..4e82232796
--- /dev/null
+++ b/synapse/util/retryutils.py
@@ -0,0 +1,153 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from synapse.api.errors import CodeMessageException
+
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+
+class NotRetryingDestination(Exception):
+ def __init__(self, retry_last_ts, retry_interval, destination):
+ msg = "Not retrying server %s." % (destination,)
+ super(NotRetryingDestination, self).__init__(msg)
+
+ self.retry_last_ts = retry_last_ts
+ self.retry_interval = retry_interval
+ self.destination = destination
+
+
+@defer.inlineCallbacks
+def get_retry_limiter(destination, clock, store, **kwargs):
+ """For a given destination check if we have previously failed to
+ send a request there and are waiting before retrying the destination.
+ If we are not ready to retry the destination, this will raise a
+ NotRetryingDestination exception. Otherwise, will return a Context Manager
+ that will mark the destination as down if an exception is thrown (excluding
+ CodeMessageException with code < 500)
+
+ Example usage:
+
+ try:
+ limiter = yield get_retry_limiter(destination, clock, store)
+ with limiter:
+ response = yield do_request()
+ except NotRetryingDestination:
+ # We aren't ready to retry that destination.
+ raise
+ """
+ retry_last_ts, retry_interval = (0, 0)
+
+ retry_timings = yield store.get_destination_retry_timings(
+ destination
+ )
+
+ if retry_timings:
+ retry_last_ts, retry_interval = (
+ retry_timings.retry_last_ts, retry_timings.retry_interval
+ )
+
+ now = int(clock.time_msec())
+
+ if retry_last_ts + retry_interval > now:
+ raise NotRetryingDestination(
+ retry_last_ts=retry_last_ts,
+ retry_interval=retry_interval,
+ destination=destination,
+ )
+
+ defer.returnValue(
+ RetryDestinationLimiter(
+ destination,
+ clock,
+ store,
+ retry_interval,
+ **kwargs
+ )
+ )
+
+
+class RetryDestinationLimiter(object):
+ def __init__(self, destination, clock, store, retry_interval,
+ min_retry_interval=5000, max_retry_interval=60 * 60 * 1000,
+ multiplier_retry_interval=2,):
+ """Marks the destination as "down" if an exception is thrown in the
+ context, except for CodeMessageException with code < 500.
+
+ If no exception is raised, marks the destination as "up".
+
+ Args:
+ destination (str)
+ clock (Clock)
+ store (DataStore)
+ retry_interval (int): The next retry interval taken from the
+ database in milliseconds, or zero if the last request was
+ successful.
+ min_retry_interval (int): The minimum retry interval to use after
+ a failed request, in milliseconds.
+ max_retry_interval (int): The maximum retry interval to use after
+ a failed request, in milliseconds.
+ multiplier_retry_interval (int): The multiplier to use to increase
+ the retry interval after a failed request.
+ """
+ self.clock = clock
+ self.store = store
+ self.destination = destination
+
+ self.retry_interval = retry_interval
+ self.min_retry_interval = min_retry_interval
+ self.max_retry_interval = max_retry_interval
+ self.multiplier_retry_interval = multiplier_retry_interval
+
+ def __enter__(self):
+ pass
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ def err(failure):
+ logger.exception(
+ "Failed to store set_destination_retry_timings",
+ failure.value
+ )
+
+ valid_err_code = False
+ if exc_type is CodeMessageException:
+ valid_err_code = 0 <= exc_val.code < 500
+
+ if exc_type is None or valid_err_code:
+ # We connected successfully.
+ if not self.retry_interval:
+ return
+
+ retry_last_ts = 0
+ self.retry_interval = 0
+ else:
+ # We couldn't connect.
+ if self.retry_interval:
+ self.retry_interval *= self.multiplier_retry_interval
+
+ if self.retry_interval >= self.max_retry_interval:
+ self.retry_interval = self.max_retry_interval
+ else:
+ self.retry_interval = self.min_retry_interval
+
+ retry_last_ts = int(self.clock.time_msec())
+
+ self.store.set_destination_retry_timings(
+ self.destination, retry_last_ts, self.retry_interval
+ ).addErrback(err)
|