| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
 | # -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import synapse.util.logcontext
from twisted.internet import defer
from synapse.api.errors import CodeMessageException
import logging
import random
logger = logging.getLogger(__name__)
class NotRetryingDestination(Exception):
    def __init__(self, retry_last_ts, retry_interval, destination):
        """Raised by the limiter (and federation client) to indicate that we are
        are deliberately not attempting to contact a given server.
        Args:
            retry_last_ts (int): the unix ts in milliseconds of our last attempt
                to contact the server.  0 indicates that the last attempt was
                successful or that we've never actually attempted to connect.
            retry_interval (int): the time in milliseconds to wait until the next
                attempt.
            destination (str): the domain in question
        """
        msg = "Not retrying server %s." % (destination,)
        super(NotRetryingDestination, self).__init__(msg)
        self.retry_last_ts = retry_last_ts
        self.retry_interval = retry_interval
        self.destination = destination
@defer.inlineCallbacks
def get_retry_limiter(destination, clock, store, ignore_backoff=False,
                      **kwargs):
    """For a given destination check if we have previously failed to
    send a request there and are waiting before retrying the destination.
    If we are not ready to retry the destination, this will raise a
    NotRetryingDestination exception. Otherwise, will return a Context Manager
    that will mark the destination as down if an exception is thrown (excluding
    CodeMessageException with code < 500)
    Args:
        destination (str): name of homeserver
        clock (synapse.util.clock): timing source
        store (synapse.storage.transactions.TransactionStore): datastore
        ignore_backoff (bool): true to ignore the historical backoff data and
            try the request anyway. We will still update the next
            retry_interval on success/failure.
    Example usage:
        try:
            limiter = yield get_retry_limiter(destination, clock, store)
            with limiter:
                response = yield do_request()
        except NotRetryingDestination:
            # We aren't ready to retry that destination.
            raise
    """
    retry_last_ts, retry_interval = (0, 0)
    retry_timings = yield store.get_destination_retry_timings(
        destination
    )
    if retry_timings:
        retry_last_ts, retry_interval = (
            retry_timings["retry_last_ts"], retry_timings["retry_interval"]
        )
        now = int(clock.time_msec())
        if not ignore_backoff and retry_last_ts + retry_interval > now:
            raise NotRetryingDestination(
                retry_last_ts=retry_last_ts,
                retry_interval=retry_interval,
                destination=destination,
            )
    defer.returnValue(
        RetryDestinationLimiter(
            destination,
            clock,
            store,
            retry_interval,
            **kwargs
        )
    )
class RetryDestinationLimiter(object):
    def __init__(self, destination, clock, store, retry_interval,
                 min_retry_interval=10 * 60 * 1000,
                 max_retry_interval=24 * 60 * 60 * 1000,
                 multiplier_retry_interval=5, backoff_on_404=False):
        """Marks the destination as "down" if an exception is thrown in the
        context, except for CodeMessageException with code < 500.
        If no exception is raised, marks the destination as "up".
        Args:
            destination (str)
            clock (Clock)
            store (DataStore)
            retry_interval (int): The next retry interval taken from the
                database in milliseconds, or zero if the last request was
                successful.
            min_retry_interval (int): The minimum retry interval to use after
                a failed request, in milliseconds.
            max_retry_interval (int): The maximum retry interval to use after
                a failed request, in milliseconds.
            multiplier_retry_interval (int): The multiplier to use to increase
                the retry interval after a failed request.
            backoff_on_404 (bool): Back off if we get a 404
        """
        self.clock = clock
        self.store = store
        self.destination = destination
        self.retry_interval = retry_interval
        self.min_retry_interval = min_retry_interval
        self.max_retry_interval = max_retry_interval
        self.multiplier_retry_interval = multiplier_retry_interval
        self.backoff_on_404 = backoff_on_404
    def __enter__(self):
        pass
    def __exit__(self, exc_type, exc_val, exc_tb):
        valid_err_code = False
        if exc_type is None:
            valid_err_code = True
        elif not issubclass(exc_type, Exception):
            # avoid treating exceptions which don't derive from Exception as
            # failures; this is mostly so as not to catch defer._DefGen.
            valid_err_code = True
        elif issubclass(exc_type, CodeMessageException):
            # Some error codes are perfectly fine for some APIs, whereas other
            # APIs may expect to never received e.g. a 404. It's important to
            # handle 404 as some remote servers will return a 404 when the HS
            # has been decommissioned.
            # If we get a 401, then we should probably back off since they
            # won't accept our requests for at least a while.
            # 429 is us being aggresively rate limited, so lets rate limit
            # ourselves.
            if exc_val.code == 404 and self.backoff_on_404:
                valid_err_code = False
            elif exc_val.code in (401, 429):
                valid_err_code = False
            elif exc_val.code < 500:
                valid_err_code = True
            else:
                valid_err_code = False
        if valid_err_code:
            # We connected successfully.
            if not self.retry_interval:
                return
            logger.debug("Connection to %s was successful; clearing backoff",
                         self.destination)
            retry_last_ts = 0
            self.retry_interval = 0
        else:
            # We couldn't connect.
            if self.retry_interval:
                self.retry_interval *= self.multiplier_retry_interval
                self.retry_interval *= int(random.uniform(0.8, 1.4))
                if self.retry_interval >= self.max_retry_interval:
                    self.retry_interval = self.max_retry_interval
            else:
                self.retry_interval = self.min_retry_interval
            logger.debug(
                "Connection to %s was unsuccessful (%s(%s)); backoff now %i",
                self.destination, exc_type, exc_val, self.retry_interval
            )
            retry_last_ts = int(self.clock.time_msec())
        @defer.inlineCallbacks
        def store_retry_timings():
            try:
                yield self.store.set_destination_retry_timings(
                    self.destination, retry_last_ts, self.retry_interval
                )
            except Exception:
                logger.exception(
                    "Failed to store set_destination_retry_timings",
                )
        # we deliberately do this in the background.
        synapse.util.logcontext.preserve_fn(store_retry_timings)()
 |