summary refs log blame commit diff
path: root/synapse/util/retryutils.py
blob: d81f2527d7be29e1996e2f28a6f843a0e9ed691b (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
13
                                     










                                                                          
              
             
                                      
 
                              
                                                   
                                     

                                    
                                                        



                                                                
                                          
                          
 
                                        
                                                                                  


                                                                                
                                                                          
                                                                             
                                                                           
                        
                                               
           
                                                        
                             




                                            





                                 





                                                                               
         


                                                                      
                                                                                      
 

                  
                                                                        
                         
                                             


                                                        
                     
                                          
                                                                          
                     

                                                     

                                    
                                                                       




                                              



                                                                                   


                                   
                   
                                              
                 

     
                              
                 





                                        
      



                                                                           
             


                                                                          
                                                                      
                                                                  
                                                                         
                                                    
 
                                                                          
                                            
           


                                      
                                    
                                            
                                            
                                                    
 
                                
            




                                                
                              




                                                                            
                                                       


                                                                              
                                                                          
                                                                           
                        
                                                           
                                      
                                            



                                      
 
                          
                                        

                                       

                                                                                     
                                  
                                   
                                         

                                   

                                                                                     
 
                                                             
                 
                                                        
 
                        
                                                                             


                                    
             
                                                       
 

                                               
                                                
                
                                                               


                                        
                 
                             
                                                                             
 
                                                    
                                                                      
# Copyright 2015, 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import random
from types import TracebackType
from typing import Any, Optional, Type

import synapse.logging.context
from synapse.api.errors import CodeMessageException
from synapse.storage import DataStore
from synapse.util import Clock

logger = logging.getLogger(__name__)

# the initial backoff, after the first transaction fails
MIN_RETRY_INTERVAL = 10 * 60 * 1000

# how much we multiply the backoff by after each subsequent fail
RETRY_MULTIPLIER = 5

# a cap on the backoff. (Essentially none)
MAX_RETRY_INTERVAL = 2**62


class NotRetryingDestination(Exception):
    def __init__(self, retry_last_ts: int, retry_interval: int, destination: str):
        """Raised by the limiter (and federation client) to indicate that we are
        are deliberately not attempting to contact a given server.

        Args:
            retry_last_ts: the unix ts in milliseconds of our last attempt
                to contact the server.  0 indicates that the last attempt was
                successful or that we've never actually attempted to connect.
            retry_interval: the time in milliseconds to wait until the next
                attempt.
            destination: the domain in question
        """

        msg = "Not retrying server %s." % (destination,)
        super().__init__(msg)

        self.retry_last_ts = retry_last_ts
        self.retry_interval = retry_interval
        self.destination = destination


async def get_retry_limiter(
    destination: str,
    clock: Clock,
    store: DataStore,
    ignore_backoff: bool = False,
    **kwargs: Any,
) -> "RetryDestinationLimiter":
    """For a given destination check if we have previously failed to
    send a request there and are waiting before retrying the destination.
    If we are not ready to retry the destination, this will raise a
    NotRetryingDestination exception. Otherwise, will return a Context Manager
    that will mark the destination as down if an exception is thrown (excluding
    CodeMessageException with code < 500)

    Args:
        destination: name of homeserver
        clock: timing source
        store: datastore
        ignore_backoff: true to ignore the historical backoff data and
            try the request anyway. We will still reset the retry_interval on success.

    Example usage:

        try:
            limiter = await get_retry_limiter(destination, clock, store)
            with limiter:
                response = await do_request()
        except NotRetryingDestination:
            # We aren't ready to retry that destination.
            raise
    """
    failure_ts = None
    retry_last_ts, retry_interval = (0, 0)

    retry_timings = await store.get_destination_retry_timings(destination)

    if retry_timings:
        failure_ts = retry_timings.failure_ts
        retry_last_ts = retry_timings.retry_last_ts
        retry_interval = retry_timings.retry_interval

        now = int(clock.time_msec())

        if not ignore_backoff and retry_last_ts + retry_interval > now:
            raise NotRetryingDestination(
                retry_last_ts=retry_last_ts,
                retry_interval=retry_interval,
                destination=destination,
            )

    # if we are ignoring the backoff data, we should also not increment the backoff
    # when we get another failure - otherwise a server can very quickly reach the
    # maximum backoff even though it might only have been down briefly
    backoff_on_failure = not ignore_backoff

    return RetryDestinationLimiter(
        destination,
        clock,
        store,
        failure_ts,
        retry_interval,
        backoff_on_failure=backoff_on_failure,
        **kwargs,
    )


class RetryDestinationLimiter:
    def __init__(
        self,
        destination: str,
        clock: Clock,
        store: DataStore,
        failure_ts: Optional[int],
        retry_interval: int,
        backoff_on_404: bool = False,
        backoff_on_failure: bool = True,
    ):
        """Marks the destination as "down" if an exception is thrown in the
        context, except for CodeMessageException with code < 500.

        If no exception is raised, marks the destination as "up".

        Args:
            destination
            clock
            store
            failure_ts: when this destination started failing (in ms since
                the epoch), or zero if the last request was successful
            retry_interval: The next retry interval taken from the
                database in milliseconds, or zero if the last request was
                successful.
            backoff_on_404: Back off if we get a 404

            backoff_on_failure: set to False if we should not increase the
                retry interval on a failure.
        """
        self.clock = clock
        self.store = store
        self.destination = destination

        self.failure_ts = failure_ts
        self.retry_interval = retry_interval
        self.backoff_on_404 = backoff_on_404
        self.backoff_on_failure = backoff_on_failure

    def __enter__(self) -> None:
        pass

    def __exit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_val: Optional[BaseException],
        exc_tb: Optional[TracebackType],
    ) -> None:
        valid_err_code = False
        if exc_type is None:
            valid_err_code = True
        elif not issubclass(exc_type, Exception):
            # avoid treating exceptions which don't derive from Exception as
            # failures; this is mostly so as not to catch defer._DefGen.
            valid_err_code = True
        elif isinstance(exc_val, CodeMessageException):
            # Some error codes are perfectly fine for some APIs, whereas other
            # APIs may expect to never received e.g. a 404. It's important to
            # handle 404 as some remote servers will return a 404 when the HS
            # has been decommissioned.
            # If we get a 401, then we should probably back off since they
            # won't accept our requests for at least a while.
            # 429 is us being aggressively rate limited, so lets rate limit
            # ourselves.
            if exc_val.code == 404 and self.backoff_on_404:
                valid_err_code = False
            elif exc_val.code in (401, 429):
                valid_err_code = False
            elif exc_val.code < 500:
                valid_err_code = True
            else:
                valid_err_code = False

        if valid_err_code:
            # We connected successfully.
            if not self.retry_interval:
                return

            logger.debug(
                "Connection to %s was successful; clearing backoff", self.destination
            )
            self.failure_ts = None
            retry_last_ts = 0
            self.retry_interval = 0
        elif not self.backoff_on_failure:
            return
        else:
            # We couldn't connect.
            if self.retry_interval:
                self.retry_interval = int(
                    self.retry_interval * RETRY_MULTIPLIER * random.uniform(0.8, 1.4)
                )

                if self.retry_interval >= MAX_RETRY_INTERVAL:
                    self.retry_interval = MAX_RETRY_INTERVAL
            else:
                self.retry_interval = MIN_RETRY_INTERVAL

            logger.info(
                "Connection to %s was unsuccessful (%s(%s)); backoff now %i",
                self.destination,
                exc_type,
                exc_val,
                self.retry_interval,
            )
            retry_last_ts = int(self.clock.time_msec())

            if self.failure_ts is None:
                self.failure_ts = retry_last_ts

        async def store_retry_timings() -> None:
            try:
                await self.store.set_destination_retry_timings(
                    self.destination,
                    self.failure_ts,
                    retry_last_ts,
                    self.retry_interval,
                )
            except Exception:
                logger.exception("Failed to store destination_retry_timings")

        # we deliberately do this in the background.
        synapse.logging.context.run_in_background(store_retry_timings)