summary refs log tree commit diff
path: root/synapse/util/retryutils.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2015-02-19 10:38:48 +0000
committerErik Johnston <erik@matrix.org>2015-02-19 10:38:48 +0000
commit8321e8a2e0381f52f3f434223db58f6ea280d89e (patch)
tree13c5600cbeb56c7c2837dd2df329f10a239f91ac /synapse/util/retryutils.py
parentMerge pull request #73 from matrix-org/hotfixes-v0.7.0f (diff)
parentUpdate release date (diff)
downloadsynapse-8321e8a2e0381f52f3f434223db58f6ea280d89e.tar.xz
Merge branch 'release-v0.7.1' of github.com:matrix-org/synapse
Diffstat (limited to 'synapse/util/retryutils.py')
-rw-r--r--synapse/util/retryutils.py153
1 files changed, 153 insertions, 0 deletions
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
new file mode 100644
index 0000000000..4e82232796
--- /dev/null
+++ b/synapse/util/retryutils.py
@@ -0,0 +1,153 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from synapse.api.errors import CodeMessageException
+
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+
+class NotRetryingDestination(Exception):
+    def __init__(self, retry_last_ts, retry_interval, destination):
+        msg = "Not retrying server %s." % (destination,)
+        super(NotRetryingDestination, self).__init__(msg)
+
+        self.retry_last_ts = retry_last_ts
+        self.retry_interval = retry_interval
+        self.destination = destination
+
+
+@defer.inlineCallbacks
+def get_retry_limiter(destination, clock, store, **kwargs):
+    """For a given destination check if we have previously failed to
+    send a request there and are waiting before retrying the destination.
+    If we are not ready to retry the destination, this will raise a
+    NotRetryingDestination exception. Otherwise, will return a Context Manager
+    that will mark the destination as down if an exception is thrown (excluding
+    CodeMessageException with code < 500)
+
+    Example usage:
+
+        try:
+            limiter = yield get_retry_limiter(destination, clock, store)
+            with limiter:
+                response = yield do_request()
+        except NotRetryingDestination:
+            # We aren't ready to retry that destination.
+            raise
+    """
+    retry_last_ts, retry_interval = (0, 0)
+
+    retry_timings = yield store.get_destination_retry_timings(
+        destination
+    )
+
+    if retry_timings:
+        retry_last_ts, retry_interval = (
+            retry_timings.retry_last_ts, retry_timings.retry_interval
+        )
+
+        now = int(clock.time_msec())
+
+        if retry_last_ts + retry_interval > now:
+            raise NotRetryingDestination(
+                retry_last_ts=retry_last_ts,
+                retry_interval=retry_interval,
+                destination=destination,
+            )
+
+    defer.returnValue(
+        RetryDestinationLimiter(
+            destination,
+            clock,
+            store,
+            retry_interval,
+            **kwargs
+        )
+    )
+
+
+class RetryDestinationLimiter(object):
+    def __init__(self, destination, clock, store, retry_interval,
+                 min_retry_interval=5000, max_retry_interval=60 * 60 * 1000,
+                 multiplier_retry_interval=2,):
+        """Marks the destination as "down" if an exception is thrown in the
+        context, except for CodeMessageException with code < 500.
+
+        If no exception is raised, marks the destination as "up".
+
+        Args:
+            destination (str)
+            clock (Clock)
+            store (DataStore)
+            retry_interval (int): The next retry interval taken from the
+                database in milliseconds, or zero if the last request was
+                successful.
+            min_retry_interval (int): The minimum retry interval to use after
+                a failed request, in milliseconds.
+            max_retry_interval (int): The maximum retry interval to use after
+                a failed request, in milliseconds.
+            multiplier_retry_interval (int): The multiplier to use to increase
+                the retry interval after a failed request.
+        """
+        self.clock = clock
+        self.store = store
+        self.destination = destination
+
+        self.retry_interval = retry_interval
+        self.min_retry_interval = min_retry_interval
+        self.max_retry_interval = max_retry_interval
+        self.multiplier_retry_interval = multiplier_retry_interval
+
+    def __enter__(self):
+        pass
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        def err(failure):
+            logger.exception(
+                "Failed to store set_destination_retry_timings",
+                failure.value
+            )
+
+        valid_err_code = False
+        if exc_type is CodeMessageException:
+            valid_err_code = 0 <= exc_val.code < 500
+
+        if exc_type is None or valid_err_code:
+            # We connected successfully.
+            if not self.retry_interval:
+                return
+
+            retry_last_ts = 0
+            self.retry_interval = 0
+        else:
+            # We couldn't connect.
+            if self.retry_interval:
+                self.retry_interval *= self.multiplier_retry_interval
+
+                if self.retry_interval >= self.max_retry_interval:
+                    self.retry_interval = self.max_retry_interval
+            else:
+                self.retry_interval = self.min_retry_interval
+
+            retry_last_ts = int(self.clock.time_msec())
+
+        self.store.set_destination_retry_timings(
+            self.destination, retry_last_ts, self.retry_interval
+        ).addErrback(err)