summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2019-01-30 10:55:25 +0000
committerGitHub <noreply@github.com>2019-01-30 10:55:25 +0000
commitbc5f6e1797057e3acea692b926305982390424a3 (patch)
treefadca57abf7ce57ffc08d55af79fbc602f87a1f6 /synapse
parentFix flake8 (#4519) (diff)
downloadsynapse-bc5f6e1797057e3acea692b926305982390424a3.tar.xz
Add a caching layer to .well-known responses (#4516)
Diffstat (limited to 'synapse')
-rw-r--r--synapse/http/federation/matrix_federation_agent.py90
-rw-r--r--synapse/util/caches/ttlcache.py161
2 files changed, 249 insertions, 2 deletions
diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py
index f81fcd4301..29804efe10 100644
--- a/synapse/http/federation/matrix_federation_agent.py
+++ b/synapse/http/federation/matrix_federation_agent.py
@@ -14,6 +14,8 @@
 # limitations under the License.
 import json
 import logging
+import random
+import time
 
 import attr
 from netaddr import IPAddress
@@ -22,13 +24,29 @@ from zope.interface import implementer
 from twisted.internet import defer
 from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
 from twisted.web.client import URI, Agent, HTTPConnectionPool, readBody
+from twisted.web.http import stringToDatetime
 from twisted.web.http_headers import Headers
 from twisted.web.iweb import IAgent
 
 from synapse.http.federation.srv_resolver import SrvResolver, pick_server_from_list
+from synapse.util.caches.ttlcache import TTLCache
 from synapse.util.logcontext import make_deferred_yieldable
 
+# period to cache .well-known results for by default
+WELL_KNOWN_DEFAULT_CACHE_PERIOD = 24 * 3600
+
+# jitter to add to the .well-known default cache ttl
+WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER = 10 * 60
+
+# period to cache failure to fetch .well-known for
+WELL_KNOWN_INVALID_CACHE_PERIOD = 1 * 3600
+
+# cap for .well-known cache period
+WELL_KNOWN_MAX_CACHE_PERIOD = 48 * 3600
+
+
 logger = logging.getLogger(__name__)
+well_known_cache = TTLCache('well-known')
 
 
 @implementer(IAgent)
@@ -57,6 +75,7 @@ class MatrixFederationAgent(object):
         self, reactor, tls_client_options_factory,
         _well_known_tls_policy=None,
         _srv_resolver=None,
+        _well_known_cache=well_known_cache,
     ):
         self._reactor = reactor
         self._tls_client_options_factory = tls_client_options_factory
@@ -77,6 +96,8 @@ class MatrixFederationAgent(object):
         _well_known_agent = Agent(self._reactor, pool=self._pool, **agent_args)
         self._well_known_agent = _well_known_agent
 
+        self._well_known_cache = _well_known_cache
+
     @defer.inlineCallbacks
     def request(self, method, uri, headers=None, bodyProducer=None):
         """
@@ -259,7 +280,14 @@ class MatrixFederationAgent(object):
             Deferred[bytes|None]: either the new server name, from the .well-known, or
                 None if there was no .well-known file.
         """
-        # FIXME: add a cache
+        try:
+            cached = self._well_known_cache[server_name]
+            defer.returnValue(cached)
+        except KeyError:
+            pass
+
+        # TODO: should we linearise so that we don't end up doing two .well-known requests
+        # for the same server in parallel?
 
         uri = b"https://%s/.well-known/matrix/server" % (server_name, )
         uri_str = uri.decode("ascii")
@@ -270,12 +298,14 @@ class MatrixFederationAgent(object):
             )
         except Exception as e:
             logger.info("Connection error fetching %s: %s", uri_str, e)
+            self._well_known_cache.set(server_name, None, WELL_KNOWN_INVALID_CACHE_PERIOD)
             defer.returnValue(None)
 
         body = yield make_deferred_yieldable(readBody(response))
 
         if response.code != 200:
             logger.info("Error response %i from %s", response.code, uri_str)
+            self._well_known_cache.set(server_name, None, WELL_KNOWN_INVALID_CACHE_PERIOD)
             defer.returnValue(None)
 
         try:
@@ -287,7 +317,63 @@ class MatrixFederationAgent(object):
                 raise Exception("Missing key 'm.server'")
         except Exception as e:
             raise Exception("invalid .well-known response from %s: %s" % (uri_str, e,))
-        defer.returnValue(parsed_body["m.server"].encode("ascii"))
+
+        result = parsed_body["m.server"].encode("ascii")
+
+        cache_period = _cache_period_from_headers(
+            response.headers,
+            time_now=self._reactor.seconds,
+        )
+        if cache_period is None:
+            cache_period = WELL_KNOWN_DEFAULT_CACHE_PERIOD
+            # add some randomness to the TTL to avoid a stampeding herd every hour after
+            # startup
+            cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
+        else:
+            cache_period = min(cache_period, WELL_KNOWN_MAX_CACHE_PERIOD)
+
+        if cache_period > 0:
+            self._well_known_cache.set(server_name, result, cache_period)
+
+        defer.returnValue(result)
+
+
+def _cache_period_from_headers(headers, time_now=time.time):
+    cache_controls = _parse_cache_control(headers)
+
+    if b'no-store' in cache_controls:
+        return 0
+
+    if b'max-age' in cache_controls:
+        try:
+            max_age = int(cache_controls[b'max-age'])
+            return max_age
+        except ValueError:
+            pass
+
+    expires = headers.getRawHeaders(b'expires')
+    if expires is not None:
+        try:
+            expires_date = stringToDatetime(expires[-1])
+            return expires_date - time_now()
+        except ValueError:
+            # RFC7234 says 'A cache recipient MUST interpret invalid date formats,
+            # especially the value "0", as representing a time in the past (i.e.,
+            # "already expired").
+            return 0
+
+    return None
+
+
+def _parse_cache_control(headers):
+    cache_controls = {}
+    for hdr in headers.getRawHeaders(b'cache-control', []):
+        for directive in hdr.split(b','):
+            splits = [x.strip() for x in directive.split(b'=', 1)]
+            k = splits[0].lower()
+            v = splits[1] if len(splits) > 1 else None
+            cache_controls[k] = v
+    return cache_controls
 
 
 @attr.s
diff --git a/synapse/util/caches/ttlcache.py b/synapse/util/caches/ttlcache.py
new file mode 100644
index 0000000000..5ba1862506
--- /dev/null
+++ b/synapse/util/caches/ttlcache.py
@@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import time
+
+import attr
+from sortedcontainers import SortedList
+
+from synapse.util.caches import register_cache
+
+logger = logging.getLogger(__name__)
+
+SENTINEL = object()
+
+
+class TTLCache(object):
+    """A key/value cache implementation where each entry has its own TTL"""
+
+    def __init__(self, cache_name, timer=time.time):
+        # map from key to _CacheEntry
+        self._data = {}
+
+        # the _CacheEntries, sorted by expiry time
+        self._expiry_list = SortedList()
+
+        self._timer = timer
+
+        self._metrics = register_cache("ttl", cache_name, self)
+
+    def set(self, key, value, ttl):
+        """Add/update an entry in the cache
+
+        Args:
+            key: key for this entry
+            value: value for this entry
+            ttl (float): TTL for this entry, in seconds
+        """
+        expiry = self._timer() + ttl
+
+        self.expire()
+        e = self._data.pop(key, SENTINEL)
+        if e != SENTINEL:
+            self._expiry_list.remove(e)
+
+        entry = _CacheEntry(expiry_time=expiry, key=key, value=value)
+        self._data[key] = entry
+        self._expiry_list.add(entry)
+
+    def get(self, key, default=SENTINEL):
+        """Get a value from the cache
+
+        Args:
+            key: key to look up
+            default: default value to return, if key is not found. If not set, and the
+                key is not found, a KeyError will be raised
+
+        Returns:
+            value from the cache, or the default
+        """
+        self.expire()
+        e = self._data.get(key, SENTINEL)
+        if e == SENTINEL:
+            self._metrics.inc_misses()
+            if default == SENTINEL:
+                raise KeyError(key)
+            return default
+        self._metrics.inc_hits()
+        return e.value
+
+    def get_with_expiry(self, key):
+        """Get a value, and its expiry time, from the cache
+
+        Args:
+            key: key to look up
+
+        Returns:
+            Tuple[Any, float]: the value from the cache, and the expiry time
+
+        Raises:
+            KeyError if the entry is not found
+        """
+        self.expire()
+        try:
+            e = self._data[key]
+        except KeyError:
+            self._metrics.inc_misses()
+            raise
+        self._metrics.inc_hits()
+        return e.value, e.expiry_time
+
+    def pop(self, key, default=SENTINEL):
+        """Remove a value from the cache
+
+        If key is in the cache, remove it and return its value, else return default.
+        If default is not given and key is not in the cache, a KeyError is raised.
+
+        Args:
+            key: key to look up
+            default: default value to return, if key is not found. If not set, and the
+                key is not found, a KeyError will be raised
+
+        Returns:
+            value from the cache, or the default
+        """
+        self.expire()
+        e = self._data.pop(key, SENTINEL)
+        if e == SENTINEL:
+            self._metrics.inc_misses()
+            if default == SENTINEL:
+                raise KeyError(key)
+            return default
+        self._expiry_list.remove(e)
+        self._metrics.inc_hits()
+        return e.value
+
+    def __getitem__(self, key):
+        return self.get(key)
+
+    def __delitem__(self, key):
+        self.pop(key)
+
+    def __contains__(self, key):
+        return key in self._data
+
+    def __len__(self):
+        self.expire()
+        return len(self._data)
+
+    def expire(self):
+        """Run the expiry on the cache. Any entries whose expiry times are due will
+        be removed
+        """
+        now = self._timer()
+        while self._expiry_list:
+            first_entry = self._expiry_list[0]
+            if first_entry.expiry_time - now > 0.0:
+                break
+            del self._data[first_entry.key]
+            del self._expiry_list[0]
+
+
+@attr.s(frozen=True, slots=True)
+class _CacheEntry(object):
+    """TTLCache entry"""
+    # expiry_time is the first attribute, so that entries are sorted by expiry.
+    expiry_time = attr.ib()
+    key = attr.ib()
+    value = attr.ib()