From f299c5414c2dd300103b0e11e7114123d8eb58a1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 8 Aug 2019 15:30:04 +0100 Subject: Refactor MatrixFederationAgent to retry SRV. This refactors MatrixFederationAgent to move the SRV lookup into the endpoint code, this has two benefits: 1. Its easier to retry different host/ports in the same way as HostnameEndpoint. 2. We avoid SRV lookups if we have a free connection in the pool --- synapse/http/federation/srv_resolver.py | 35 ++++++++++++++++++++++++++++++--- 1 file changed, 32 insertions(+), 3 deletions(-) (limited to 'synapse/http/federation/srv_resolver.py') diff --git a/synapse/http/federation/srv_resolver.py b/synapse/http/federation/srv_resolver.py index b32188766d..bbda0a23f4 100644 --- a/synapse/http/federation/srv_resolver.py +++ b/synapse/http/federation/srv_resolver.py @@ -32,7 +32,7 @@ logger = logging.getLogger(__name__) SERVER_CACHE = {} -@attr.s +@attr.s(slots=True, frozen=True) class Server(object): """ Our record of an individual server which can be tried to reach a destination. @@ -83,6 +83,35 @@ def pick_server_from_list(server_list): raise RuntimeError("pick_server_from_list got to end of eligible server list.") +def _sort_server_list(server_list): + """Given a list of SRV records sort them into priority order and shuffle + each priority with the given weight. + """ + priority_map = {} + + for server in server_list: + priority_map.setdefault(server.priority, []).append(server) + + results = [] + for priority in sorted(priority_map): + servers = priority_map.pop(priority) + + while servers: + total_weight = sum(s.weight for s in servers) + target_weight = random.randint(0, total_weight) + + for s in servers: + target_weight -= s.weight + + if target_weight <= 0: + break + + results.append(s) + servers.remove(s) + + return results + + class SrvResolver(object): """Interface to the dns client to do SRV lookups, with result caching. @@ -120,7 +149,7 @@ class SrvResolver(object): if cache_entry: if all(s.expires > now for s in cache_entry): servers = list(cache_entry) - return servers + return _sort_server_list(servers) try: answers, _, _ = yield make_deferred_yieldable( @@ -169,4 +198,4 @@ class SrvResolver(object): ) self._cache[service_name] = list(servers) - return servers + return _sort_server_list(servers) -- cgit 1.4.1 From 1f9df1cc7ba7027aef3a38d01909a928ecf2a8c5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 20 Aug 2019 11:49:44 +0100 Subject: Fixup _sort_server_list to be slightly more efficient Also document that we are using the algorithm described in RFC2782 and ensure we handle zero weight correctly. --- synapse/http/federation/srv_resolver.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) (limited to 'synapse/http/federation/srv_resolver.py') diff --git a/synapse/http/federation/srv_resolver.py b/synapse/http/federation/srv_resolver.py index bbda0a23f4..110b112e85 100644 --- a/synapse/http/federation/srv_resolver.py +++ b/synapse/http/federation/srv_resolver.py @@ -94,10 +94,18 @@ def _sort_server_list(server_list): results = [] for priority in sorted(priority_map): - servers = priority_map.pop(priority) + servers = priority_map[priority] + # This algorithms follows the algorithm described in RFC2782. + # + # N.B. Weights can be zero, which means that you should pick that server + # last *or* that its the only server in this priority. + + # We sort to ensure zero weighted items are first. + servers.sort(key=lambda s: s.weight) + + total_weight = sum(s.weight for s in servers) while servers: - total_weight = sum(s.weight for s in servers) target_weight = random.randint(0, total_weight) for s in servers: @@ -108,6 +116,7 @@ def _sort_server_list(server_list): results.append(s) servers.remove(s) + total_weight -= s.weight return results -- cgit 1.4.1 From 74f016d343fe270ab3affe79cc82266d94120e5c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 20 Aug 2019 11:50:12 +0100 Subject: Remove now unused pick_server_from_list --- synapse/http/federation/srv_resolver.py | 30 ------------------------------ 1 file changed, 30 deletions(-) (limited to 'synapse/http/federation/srv_resolver.py') diff --git a/synapse/http/federation/srv_resolver.py b/synapse/http/federation/srv_resolver.py index 110b112e85..c8ca3fd0e9 100644 --- a/synapse/http/federation/srv_resolver.py +++ b/synapse/http/federation/srv_resolver.py @@ -53,36 +53,6 @@ class Server(object): expires = attr.ib(default=0) -def pick_server_from_list(server_list): - """Randomly choose a server from the server list - - Args: - server_list (list[Server]): list of candidate servers - - Returns: - Tuple[bytes, int]: (host, port) pair for the chosen server - """ - if not server_list: - raise RuntimeError("pick_server_from_list called with empty list") - - # TODO: currently we only use the lowest-priority servers. We should maintain a - # cache of servers known to be "down" and filter them out - - min_priority = min(s.priority for s in server_list) - eligible_servers = list(s for s in server_list if s.priority == min_priority) - total_weight = sum(s.weight for s in eligible_servers) - target_weight = random.randint(0, total_weight) - - for s in eligible_servers: - target_weight -= s.weight - - if target_weight <= 0: - return s.host, s.port - - # this should be impossible. - raise RuntimeError("pick_server_from_list got to end of eligible server list.") - - def _sort_server_list(server_list): """Given a list of SRV records sort them into priority order and shuffle each priority with the given weight. -- cgit 1.4.1 From 91caa5b4303bfa0b4604ecf95d56ae72a7074b0b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 27 Aug 2019 13:56:42 +0100 Subject: Fix off by one error in SRV result shuffling --- synapse/http/federation/srv_resolver.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) (limited to 'synapse/http/federation/srv_resolver.py') diff --git a/synapse/http/federation/srv_resolver.py b/synapse/http/federation/srv_resolver.py index c8ca3fd0e9..3fe4ffb9e5 100644 --- a/synapse/http/federation/srv_resolver.py +++ b/synapse/http/federation/srv_resolver.py @@ -66,17 +66,18 @@ def _sort_server_list(server_list): for priority in sorted(priority_map): servers = priority_map[priority] - # This algorithms follows the algorithm described in RFC2782. + # This algorithms roughly follows the algorithm described in RFC2782, + # changed to remove an off-by-one error. # - # N.B. Weights can be zero, which means that you should pick that server - # last *or* that its the only server in this priority. - - # We sort to ensure zero weighted items are first. - servers.sort(key=lambda s: s.weight) + # N.B. Weights can be zero, which means that they should be picked + # rarely. total_weight = sum(s.weight for s in servers) - while servers: - target_weight = random.randint(0, total_weight) + + # Total weight can become zero if there are only zero weight servers + # left, which we handle by just shuffling and appending to the results. + while servers and total_weight: + target_weight = random.randint(1, total_weight) for s in servers: target_weight -= s.weight @@ -88,6 +89,10 @@ def _sort_server_list(server_list): servers.remove(s) total_weight -= s.weight + if servers: + random.shuffle(servers) + results.extend(servers) + return results -- cgit 1.4.1