summary refs log tree commit diff
path: root/synapse/http/federation/srv_resolver.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/http/federation/srv_resolver.py')
-rw-r--r--synapse/http/federation/srv_resolver.py167
1 files changed, 106 insertions, 61 deletions
diff --git a/synapse/http/federation/srv_resolver.py b/synapse/http/federation/srv_resolver.py
index c49b82c394..71830c549d 100644
--- a/synapse/http/federation/srv_resolver.py
+++ b/synapse/http/federation/srv_resolver.py
@@ -15,6 +15,7 @@
 # limitations under the License.
 
 import logging
+import random
 import time
 
 import attr
@@ -51,74 +52,118 @@ class Server(object):
     expires = attr.ib(default=0)
 
 
-@defer.inlineCallbacks
-def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=time):
-    """Look up a SRV record, with caching
+def pick_server_from_list(server_list):
+    """Randomly choose a server from the server list
+
+    Args:
+        server_list (list[Server]): list of candidate servers
+
+    Returns:
+        Tuple[bytes, int]: (host, port) pair for the chosen server
+    """
+    if not server_list:
+        raise RuntimeError("pick_server_from_list called with empty list")
+
+    # TODO: currently we only use the lowest-priority servers. We should maintain a
+    # cache of servers known to be "down" and filter them out
+
+    min_priority = min(s.priority for s in server_list)
+    eligible_servers = list(s for s in server_list if s.priority == min_priority)
+    total_weight = sum(s.weight for s in eligible_servers)
+    target_weight = random.randint(0, total_weight)
+
+    for s in eligible_servers:
+        target_weight -= s.weight
+
+        if target_weight <= 0:
+            return s.host, s.port
+
+    # this should be impossible.
+    raise RuntimeError(
+        "pick_server_from_list got to end of eligible server list.",
+    )
+
+
+class SrvResolver(object):
+    """Interface to the dns client to do SRV lookups, with result caching.
 
     The default resolver in twisted.names doesn't do any caching (it has a CacheResolver,
     but the cache never gets populated), so we add our own caching layer here.
 
     Args:
-        service_name (unicode|bytes): record to look up
         dns_client (twisted.internet.interfaces.IResolver): twisted resolver impl
         cache (dict): cache object
-        clock (object): clock implementation. must provide a time() method.
-
-    Returns:
-        Deferred[list[Server]]: a list of the SRV records, or an empty list if none found
+        get_time (callable): clock implementation. Should return seconds since the epoch
     """
-    # TODO: the dns client handles both unicode names (encoding via idna) and pre-encoded
-    # byteses; however they will obviously end up as separate entries in the cache. We
-    # should pick one form and stick with it.
-    cache_entry = cache.get(service_name, None)
-    if cache_entry:
-        if all(s.expires > int(clock.time()) for s in cache_entry):
-            servers = list(cache_entry)
-            defer.returnValue(servers)
-
-    try:
-        answers, _, _ = yield make_deferred_yieldable(
-            dns_client.lookupService(service_name),
-        )
-    except DNSNameError:
-        # TODO: cache this. We can get the SOA out of the exception, and use
-        # the negative-TTL value.
-        defer.returnValue([])
-    except DomainError as e:
-        # We failed to resolve the name (other than a NameError)
-        # Try something in the cache, else rereaise
-        cache_entry = cache.get(service_name, None)
+    def __init__(self, dns_client=client, cache=SERVER_CACHE, get_time=time.time):
+        self._dns_client = dns_client
+        self._cache = cache
+        self._get_time = get_time
+
+    @defer.inlineCallbacks
+    def resolve_service(self, service_name):
+        """Look up a SRV record
+
+        Args:
+            service_name (bytes): record to look up
+
+        Returns:
+            Deferred[list[Server]]:
+                a list of the SRV records, or an empty list if none found
+        """
+        now = int(self._get_time())
+
+        if not isinstance(service_name, bytes):
+            raise TypeError("%r is not a byte string" % (service_name,))
+
+        cache_entry = self._cache.get(service_name, None)
         if cache_entry:
-            logger.warn(
-                "Failed to resolve %r, falling back to cache. %r",
-                service_name, e
+            if all(s.expires > now for s in cache_entry):
+                servers = list(cache_entry)
+                defer.returnValue(servers)
+
+        try:
+            answers, _, _ = yield make_deferred_yieldable(
+                self._dns_client.lookupService(service_name),
             )
-            defer.returnValue(list(cache_entry))
-        else:
-            raise e
-
-    if (len(answers) == 1
-            and answers[0].type == dns.SRV
-            and answers[0].payload
-            and answers[0].payload.target == dns.Name(b'.')):
-        raise ConnectError("Service %s unavailable" % service_name)
-
-    servers = []
-
-    for answer in answers:
-        if answer.type != dns.SRV or not answer.payload:
-            continue
-
-        payload = answer.payload
-
-        servers.append(Server(
-            host=payload.target.name,
-            port=payload.port,
-            priority=payload.priority,
-            weight=payload.weight,
-            expires=int(clock.time()) + answer.ttl,
-        ))
-
-    servers.sort()  # FIXME: get rid of this (it's broken by the attrs change)
-    cache[service_name] = list(servers)
-    defer.returnValue(servers)
+        except DNSNameError:
+            # TODO: cache this. We can get the SOA out of the exception, and use
+            # the negative-TTL value.
+            defer.returnValue([])
+        except DomainError as e:
+            # We failed to resolve the name (other than a NameError)
+            # Try something in the cache, else rereaise
+            cache_entry = self._cache.get(service_name, None)
+            if cache_entry:
+                logger.warn(
+                    "Failed to resolve %r, falling back to cache. %r",
+                    service_name, e
+                )
+                defer.returnValue(list(cache_entry))
+            else:
+                raise e
+
+        if (len(answers) == 1
+                and answers[0].type == dns.SRV
+                and answers[0].payload
+                and answers[0].payload.target == dns.Name(b'.')):
+            raise ConnectError("Service %s unavailable" % service_name)
+
+        servers = []
+
+        for answer in answers:
+            if answer.type != dns.SRV or not answer.payload:
+                continue
+
+            payload = answer.payload
+
+            servers.append(Server(
+                host=payload.target.name,
+                port=payload.port,
+                priority=payload.priority,
+                weight=payload.weight,
+                expires=now + answer.ttl,
+            ))
+
+        self._cache[service_name] = list(servers)
+        defer.returnValue(servers)