summary refs log tree commit diff
path: root/synapse/util/retryutils.py
blob: ab58eb528bf90716d441f7d80a1d04d40a719cd6 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from twisted.internet import defer

from synapse.api.errors import CodeMessageException

from synapse.util.async import run_on_reactor

import logging


logger = logging.getLogger(__name__)


class NotRetryingDestination(Exception):
    def __init__(self, retry_last_ts, retry_interval, destination):
        msg = "Not retrying server %s." % (destination,)
        super(NotRetryingDestination, self).__init__(msg)

        self.retry_last_ts = retry_last_ts
        self.retry_interval = retry_interval
        self.destination = destination


@defer.inlineCallbacks
def get_retry_limiter(destination, clock, store, **kwargs):
    """For a given destination check if we have previously failed to
    send a request there and are waiting before retrying the destination.
    If we are not ready to retry the destination, this will raise a
    NotRetryingDestination exception. Otherwise, will return a Context Manager
    that will mark the destination as down if an exception is thrown (excluding
    CodeMessageException with code < 500)

    Example usage:

        try:
            limiter = yield get_retry_limiter(destination, clock, store)
            with limiter:
                response = yield do_request()
        except NotRetryingDestination:
            # We aren't ready to retry that destination.
            raise
    """
    retry_last_ts, retry_interval = (0, 0)

    retry_timings = yield store.get_destination_retry_timings(
        destination
    )

    if retry_timings:
        retry_last_ts, retry_interval = (
            retry_timings.retry_last_ts, retry_timings.retry_interval
        )

        now = int(clock.time_msec())

        if retry_last_ts + retry_interval > now:
            raise NotRetryingDestination(
                retry_last_ts=retry_last_ts,
                retry_interval=retry_interval,
                destination=destination,
            )

    defer.returnValue(
        RetryDestinationLimiter(
            destination,
            clock,
            store,
            retry_interval,
            **kwargs
        )
    )


class RetryDestinationLimiter(object):
    def __init__(self, destination, clock, store, retry_interval,
                 min_retry_interval=5000, max_retry_interval=60 * 60 * 1000,
                 multiplier_retry_interval=2,):
        """Marks the destination as "down" if an exception is thrown in the
        context, except for CodeMessageException with code < 500.

        If no exception is raised, marks the destination as "up".

        Args:
            destination (str)
            clock (Clock)
            store (DataStore)
            retry_interval (int): The next retry interval taken from the
                database in milliseconds, or zero if the last request was
                successful.
            min_retry_interval (int): The minimum retry interval to use after
                a failed request, in milliseconds.
            max_retry_interval (int): The maximum retry interval to use after
                a failed request, in milliseconds.
            multiplier_retry_interval (int): The multiplier to use to increase
                the retry interval after a failed request.
        """
        self.clock = clock
        self.store = store
        self.destination = destination

        self.retry_interval = retry_interval
        self.min_retry_interval = min_retry_interval
        self.max_retry_interval = max_retry_interval
        self.multiplier_retry_interval = multiplier_retry_interval

    def __enter__(self):
        pass

    def __exit__(self, exc_type, exc_val, exc_tb):
        def err(failure):
            logger.exception(
                "Failed to store set_destination_retry_timings",
                failure.value
            )

        valid_err_code = False
        if exc_type is CodeMessageException:
            valid_err_code = 0 <= exc_val.code < 500

        if exc_type is None or valid_err_code:
            # We connected successfully.
            if not self.retry_interval:
                return

            retry_last_ts = 0
            self.retry_interval = 0
        else:
            # We couldn't connect.
            if self.retry_interval:
                self.retry_interval *= self.multiplier_retry_interval

                if self.retry_interval >= self.max_retry_interval:
                    self.retry_interval = self.max_retry_interval
            else:
                self.retry_interval = self.min_retry_interval

            retry_last_ts = int(self.clock.time_msec())

        self.store.set_destination_retry_timings(
            self.destination, retry_last_ts, self.retry_interval
        ).addErrback(err)