diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py
index 26649e70be..50baa8bf0a 100644
--- a/synapse/http/federation/matrix_federation_agent.py
+++ b/synapse/http/federation/matrix_federation_agent.py
@@ -23,14 +23,17 @@ from zope.interface import implementer
from twisted.internet import defer
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
+from twisted.internet.interfaces import IStreamClientEndpoint
from twisted.web.client import URI, Agent, HTTPConnectionPool, RedirectAgent, readBody
from twisted.web.http import stringToDatetime
from twisted.web.http_headers import Headers
from twisted.web.iweb import IAgent
from synapse.http.federation.srv_resolver import SrvResolver, pick_server_from_list
+from synapse.util import Clock
from synapse.util.caches.ttlcache import TTLCache
from synapse.util.logcontext import make_deferred_yieldable
+from synapse.util.metrics import Measure
# period to cache .well-known results for by default
WELL_KNOWN_DEFAULT_CACHE_PERIOD = 24 * 3600
@@ -44,6 +47,8 @@ WELL_KNOWN_INVALID_CACHE_PERIOD = 1 * 3600
# cap for .well-known cache period
WELL_KNOWN_MAX_CACHE_PERIOD = 48 * 3600
+# magic value to mark an invalid well-known
+INVALID_WELL_KNOWN = object()
logger = logging.getLogger(__name__)
well_known_cache = TTLCache('well-known')
@@ -78,6 +83,8 @@ class MatrixFederationAgent(object):
_well_known_cache=well_known_cache,
):
self._reactor = reactor
+ self._clock = Clock(reactor)
+
self._tls_client_options_factory = tls_client_options_factory
if _srv_resolver is None:
_srv_resolver = SrvResolver()
@@ -98,6 +105,11 @@ class MatrixFederationAgent(object):
)
self._well_known_agent = _well_known_agent
+ # our cache of .well-known lookup results, mapping from server name
+ # to delegated name. The values can be:
+ # `bytes`: a valid server-name
+ # `None`: there is no .well-known here
+ # INVALID_WELL_KNWOWN: the .well-known here is invalid
self._well_known_cache = _well_known_cache
@defer.inlineCallbacks
@@ -152,12 +164,9 @@ class MatrixFederationAgent(object):
class EndpointFactory(object):
@staticmethod
def endpointForURI(_uri):
- logger.info(
- "Connecting to %s:%i",
- res.target_host.decode("ascii"),
- res.target_port,
+ ep = LoggingHostnameEndpoint(
+ self._reactor, res.target_host, res.target_port,
)
- ep = HostnameEndpoint(self._reactor, res.target_host, res.target_port)
if tls_options is not None:
ep = wrapClientTLS(tls_options, ep)
return ep
@@ -210,11 +219,7 @@ class MatrixFederationAgent(object):
target_port=parsed_uri.port,
))
- # try a SRV lookup
- service_name = b"_matrix._tcp.%s" % (parsed_uri.host,)
- server_list = yield self._srv_resolver.resolve_service(service_name)
-
- if not server_list and lookup_well_known:
+ if lookup_well_known:
# try a .well-known lookup
well_known_server = yield self._get_well_known(parsed_uri.host)
@@ -250,6 +255,10 @@ class MatrixFederationAgent(object):
res = yield self._route_matrix_uri(new_uri, lookup_well_known=False)
defer.returnValue(res)
+ # try a SRV lookup
+ service_name = b"_matrix._tcp.%s" % (parsed_uri.host,)
+ server_list = yield self._srv_resolver.resolve_service(service_name)
+
if not server_list:
target_host = parsed_uri.host
port = 8448
@@ -283,14 +292,35 @@ class MatrixFederationAgent(object):
None if there was no .well-known file.
"""
try:
- cached = self._well_known_cache[server_name]
- defer.returnValue(cached)
+ result = self._well_known_cache[server_name]
except KeyError:
- pass
+ # TODO: should we linearise so that we don't end up doing two .well-known
+ # requests for the same server in parallel?
+ with Measure(self._clock, "get_well_known"):
+ result, cache_period = yield self._do_get_well_known(server_name)
+
+ if cache_period > 0:
+ self._well_known_cache.set(server_name, result, cache_period)
+
+ if result == INVALID_WELL_KNOWN:
+ raise Exception("invalid .well-known on this server")
+
+ defer.returnValue(result)
- # TODO: should we linearise so that we don't end up doing two .well-known requests
- # for the same server in parallel?
+ @defer.inlineCallbacks
+ def _do_get_well_known(self, server_name):
+ """Actually fetch and parse a .well-known, without checking the cache
+ Args:
+ server_name (bytes): name of the server, from the requested url
+
+ Returns:
+ Deferred[Tuple[bytes|None|object],int]:
+ result, cache period, where result is one of:
+ - the new server name from the .well-known (as a `bytes`)
+ - None if there was no .well-known file.
+ - INVALID_WELL_KNOWN if the .well-known was invalid
+ """
uri = b"https://%s/.well-known/matrix/server" % (server_name, )
uri_str = uri.decode("ascii")
logger.info("Fetching %s", uri_str)
@@ -308,9 +338,7 @@ class MatrixFederationAgent(object):
# after startup
cache_period = WELL_KNOWN_INVALID_CACHE_PERIOD
cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
-
- self._well_known_cache.set(server_name, None, cache_period)
- defer.returnValue(None)
+ defer.returnValue((None, cache_period))
try:
parsed_body = json.loads(body.decode('utf-8'))
@@ -320,7 +348,10 @@ class MatrixFederationAgent(object):
if "m.server" not in parsed_body:
raise Exception("Missing key 'm.server'")
except Exception as e:
- raise Exception("invalid .well-known response from %s: %s" % (uri_str, e,))
+ logger.info("invalid .well-known response from %s: %s", uri_str, e)
+ cache_period = WELL_KNOWN_INVALID_CACHE_PERIOD
+ cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
+ defer.returnValue((INVALID_WELL_KNOWN, cache_period))
result = parsed_body["m.server"].encode("ascii")
@@ -336,10 +367,20 @@ class MatrixFederationAgent(object):
else:
cache_period = min(cache_period, WELL_KNOWN_MAX_CACHE_PERIOD)
- if cache_period > 0:
- self._well_known_cache.set(server_name, result, cache_period)
+ defer.returnValue((result, cache_period))
- defer.returnValue(result)
+
+@implementer(IStreamClientEndpoint)
+class LoggingHostnameEndpoint(object):
+ """A wrapper for HostnameEndpint which logs when it connects"""
+ def __init__(self, reactor, host, port, *args, **kwargs):
+ self.host = host
+ self.port = port
+ self.ep = HostnameEndpoint(reactor, host, port, *args, **kwargs)
+
+ def connect(self, protocol_factory):
+ logger.info("Connecting to %s:%i", self.host, self.port)
+ return self.ep.connect(protocol_factory)
def _cache_period_from_headers(headers, time_now=time.time):
|