diff --git a/.gitignore b/.gitignore
index 1033124f1d..37f0028d66 100644
--- a/.gitignore
+++ b/.gitignore
@@ -12,6 +12,7 @@ dbs/
dist/
docs/build/
*.egg-info
+pip-wheel-metadata/
cmdclient_config.json
homeserver*.db
diff --git a/changelog.d/4495.feature b/changelog.d/4495.feature
new file mode 100644
index 0000000000..fc2b5daf63
--- /dev/null
+++ b/changelog.d/4495.feature
@@ -0,0 +1 @@
+Synapse will now reload TLS certificates from disk upon SIGHUP.
diff --git a/changelog.d/4516.feature b/changelog.d/4516.feature
new file mode 100644
index 0000000000..bda713adf9
--- /dev/null
+++ b/changelog.d/4516.feature
@@ -0,0 +1 @@
+Implement MSC1708 (.well-known routing for server-server federation)
\ No newline at end of file
diff --git a/changelog.d/4519.misc b/changelog.d/4519.misc
new file mode 100644
index 0000000000..897e783d28
--- /dev/null
+++ b/changelog.d/4519.misc
@@ -0,0 +1 @@
+Fix code to comply with linting in PyFlakes 3.7.1.
diff --git a/changelog.d/4520.feature b/changelog.d/4520.feature
new file mode 100644
index 0000000000..bda713adf9
--- /dev/null
+++ b/changelog.d/4520.feature
@@ -0,0 +1 @@
+Implement MSC1708 (.well-known routing for server-server federation)
\ No newline at end of file
diff --git a/changelog.d/4521.feature b/changelog.d/4521.feature
new file mode 100644
index 0000000000..bda713adf9
--- /dev/null
+++ b/changelog.d/4521.feature
@@ -0,0 +1 @@
+Implement MSC1708 (.well-known routing for server-server federation)
\ No newline at end of file
diff --git a/changelog.d/4523.feature b/changelog.d/4523.feature
new file mode 100644
index 0000000000..9538c64f08
--- /dev/null
+++ b/changelog.d/4523.feature
@@ -0,0 +1 @@
+Add support for room version 3
diff --git a/changelog.d/4524.feature b/changelog.d/4524.feature
new file mode 100644
index 0000000000..fc2b5daf63
--- /dev/null
+++ b/changelog.d/4524.feature
@@ -0,0 +1 @@
+Synapse will now reload TLS certificates from disk upon SIGHUP.
diff --git a/changelog.d/4525.feature b/changelog.d/4525.feature
new file mode 100644
index 0000000000..c7f595cec2
--- /dev/null
+++ b/changelog.d/4525.feature
@@ -0,0 +1 @@
+ Synapse can now automatically provision TLS certificates via ACME (the protocol used by CAs like Let's Encrypt).
diff --git a/synapse/_scripts/register_new_matrix_user.py b/synapse/_scripts/register_new_matrix_user.py
index 4c3abf06fe..6e93f5a0c6 100644
--- a/synapse/_scripts/register_new_matrix_user.py
+++ b/synapse/_scripts/register_new_matrix_user.py
@@ -46,7 +46,7 @@ def request_registration(
# Get the nonce
r = requests.get(url, verify=False)
- if r.status_code is not 200:
+ if r.status_code != 200:
_print("ERROR! Received %d %s" % (r.status_code, r.reason))
if 400 <= r.status_code < 500:
try:
@@ -84,7 +84,7 @@ def request_registration(
_print("Sending registration request...")
r = requests.post(url, json=data, verify=False)
- if r.status_code is not 200:
+ if r.status_code != 200:
_print("ERROR! Received %d %s" % (r.status_code, r.reason))
if 400 <= r.status_code < 500:
try:
diff --git a/synapse/app/__init__.py b/synapse/app/__init__.py
index b45adafdd3..f56f5fcc13 100644
--- a/synapse/app/__init__.py
+++ b/synapse/app/__init__.py
@@ -12,15 +12,38 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
+import logging
import sys
from synapse import python_dependencies # noqa: E402
sys.dont_write_bytecode = True
+logger = logging.getLogger(__name__)
+
try:
python_dependencies.check_requirements()
except python_dependencies.DependencyException as e:
sys.stderr.writelines(e.message)
sys.exit(1)
+
+
+def check_bind_error(e, address, bind_addresses):
+ """
+ This method checks an exception occurred while binding on 0.0.0.0.
+ If :: is specified in the bind addresses a warning is shown.
+ The exception is still raised otherwise.
+
+ Binding on both 0.0.0.0 and :: causes an exception on Linux and macOS
+ because :: binds on both IPv4 and IPv6 (as per RFC 3493).
+ When binding on 0.0.0.0 after :: this can safely be ignored.
+
+ Args:
+ e (Exception): Exception that was caught.
+ address (str): Address on which binding was attempted.
+ bind_addresses (list): Addresses on which the service listens.
+ """
+ if address == '0.0.0.0' and '::' in bind_addresses:
+ logger.warn('Failed to listen on 0.0.0.0, continuing because listening on [::]')
+ else:
+ raise e
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 18584226e9..5b97a54d45 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -22,6 +22,7 @@ from daemonize import Daemonize
from twisted.internet import error, reactor
+from synapse.app import check_bind_error
from synapse.util import PreserveLoggingContext
from synapse.util.rlimit import change_resource_limit
@@ -143,6 +144,9 @@ def listen_metrics(bind_addresses, port):
def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50):
"""
Create a TCP socket for a port and several addresses
+
+ Returns:
+ list (empty)
"""
for address in bind_addresses:
try:
@@ -155,42 +159,33 @@ def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50):
except error.CannotListenError as e:
check_bind_error(e, address, bind_addresses)
+ logger.info("Synapse now listening on TCP port %d", port)
+ return []
+
def listen_ssl(
bind_addresses, port, factory, context_factory, reactor=reactor, backlog=50
):
"""
- Create an SSL socket for a port and several addresses
+ Create an TLS-over-TCP socket for a port and several addresses
+
+ Returns:
+ list of twisted.internet.tcp.Port listening for TLS connections
"""
+ r = []
for address in bind_addresses:
try:
- reactor.listenSSL(
- port,
- factory,
- context_factory,
- backlog,
- address
+ r.append(
+ reactor.listenSSL(
+ port,
+ factory,
+ context_factory,
+ backlog,
+ address
+ )
)
except error.CannotListenError as e:
check_bind_error(e, address, bind_addresses)
-
-def check_bind_error(e, address, bind_addresses):
- """
- This method checks an exception occurred while binding on 0.0.0.0.
- If :: is specified in the bind addresses a warning is shown.
- The exception is still raised otherwise.
-
- Binding on both 0.0.0.0 and :: causes an exception on Linux and macOS
- because :: binds on both IPv4 and IPv6 (as per RFC 3493).
- When binding on 0.0.0.0 after :: this can safely be ignored.
-
- Args:
- e (Exception): Exception that was caught.
- address (str): Address on which binding was attempted.
- bind_addresses (list): Addresses on which the service listens.
- """
- if address == '0.0.0.0' and '::' in bind_addresses:
- logger.warn('Failed to listen on 0.0.0.0, continuing because listening on [::]')
- else:
- raise e
+ logger.info("Synapse now listening on port %d (TLS)", port)
+ return r
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index ffc49d77cc..250a17cef8 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -17,6 +17,7 @@
import gc
import logging
import os
+import signal
import sys
import traceback
@@ -27,6 +28,7 @@ from prometheus_client import Gauge
from twisted.application import service
from twisted.internet import defer, reactor
+from twisted.protocols.tls import TLSMemoryBIOFactory
from twisted.web.resource import EncodingResourceWrapper, NoResource
from twisted.web.server import GzipEncoderFactory
from twisted.web.static import File
@@ -84,6 +86,7 @@ def gz_wrap(r):
class SynapseHomeServer(HomeServer):
DATASTORE_CLASS = DataStore
+ _listening_services = []
def _listener_http(self, config, listener_config):
port = listener_config["port"]
@@ -92,7 +95,9 @@ class SynapseHomeServer(HomeServer):
site_tag = listener_config.get("tag", port)
if tls and config.no_tls:
- return
+ raise ConfigError(
+ "Listener on port %i has TLS enabled, but no_tls is set" % (port,),
+ )
resources = {}
for res in listener_config["resources"]:
@@ -121,7 +126,7 @@ class SynapseHomeServer(HomeServer):
root_resource = create_resource_tree(resources, root_resource)
if tls:
- listen_ssl(
+ return listen_ssl(
bind_addresses,
port,
SynapseSite(
@@ -135,7 +140,7 @@ class SynapseHomeServer(HomeServer):
)
else:
- listen_tcp(
+ return listen_tcp(
bind_addresses,
port,
SynapseSite(
@@ -146,7 +151,6 @@ class SynapseHomeServer(HomeServer):
self.version_string,
)
)
- logger.info("Synapse now listening on port %d", port)
def _configure_named_resource(self, name, compress=False):
"""Build a resource map for a named resource
@@ -242,7 +246,9 @@ class SynapseHomeServer(HomeServer):
for listener in config.listeners:
if listener["type"] == "http":
- self._listener_http(config, listener)
+ self._listening_services.extend(
+ self._listener_http(config, listener)
+ )
elif listener["type"] == "manhole":
listen_tcp(
listener["bind_addresses"],
@@ -322,7 +328,19 @@ def setup(config_options):
# generating config files and shouldn't try to continue.
sys.exit(0)
- synapse.config.logger.setup_logging(config, use_worker_options=False)
+ sighup_callbacks = []
+ synapse.config.logger.setup_logging(
+ config,
+ use_worker_options=False,
+ register_sighup=sighup_callbacks.append
+ )
+
+ def handle_sighup(*args, **kwargs):
+ for i in sighup_callbacks:
+ i(*args, **kwargs)
+
+ if hasattr(signal, "SIGHUP"):
+ signal.signal(signal.SIGHUP, handle_sighup)
events.USE_FROZEN_DICTS = config.use_frozen_dicts
@@ -359,6 +377,31 @@ def setup(config_options):
hs.setup()
+ def refresh_certificate(*args):
+ """
+ Refresh the TLS certificates that Synapse is using by re-reading them
+ from disk and updating the TLS context factories to use them.
+ """
+ logging.info("Reloading certificate from disk...")
+ hs.config.read_certificate_from_disk()
+ hs.tls_server_context_factory = context_factory.ServerContextFactory(config)
+ hs.tls_client_options_factory = context_factory.ClientTLSOptionsFactory(
+ config
+ )
+ logging.info("Certificate reloaded.")
+
+ logging.info("Updating context factories...")
+ for i in hs._listening_services:
+ if isinstance(i.factory, TLSMemoryBIOFactory):
+ i.factory = TLSMemoryBIOFactory(
+ hs.tls_server_context_factory,
+ False,
+ i.factory.wrappedFactory
+ )
+ logging.info("Context factories updated.")
+
+ sighup_callbacks.append(refresh_certificate)
+
@defer.inlineCallbacks
def start():
try:
diff --git a/synapse/config/logger.py b/synapse/config/logger.py
index f87efecbf8..a795e39b1a 100644
--- a/synapse/config/logger.py
+++ b/synapse/config/logger.py
@@ -127,7 +127,7 @@ class LoggingConfig(Config):
)
-def setup_logging(config, use_worker_options=False):
+def setup_logging(config, use_worker_options=False, register_sighup=None):
""" Set up python logging
Args:
@@ -136,7 +136,16 @@ def setup_logging(config, use_worker_options=False):
use_worker_options (bool): True to use 'worker_log_config' and
'worker_log_file' options instead of 'log_config' and 'log_file'.
+
+ register_sighup (func | None): Function to call to register a
+ sighup handler.
"""
+ if not register_sighup:
+ if getattr(signal, "SIGHUP"):
+ register_sighup = lambda x: signal.signal(signal.SIGHUP, x)
+ else:
+ register_sighup = lambda x: None
+
log_config = (config.worker_log_config if use_worker_options
else config.log_config)
log_file = (config.worker_log_file if use_worker_options
@@ -198,13 +207,7 @@ def setup_logging(config, use_worker_options=False):
load_log_config()
- # TODO(paul): obviously this is a terrible mechanism for
- # stealing SIGHUP, because it means no other part of synapse
- # can use it instead. If we want to catch SIGHUP anywhere
- # else as well, I'd suggest we find a nicer way to broadcast
- # it around.
- if getattr(signal, "SIGHUP"):
- signal.signal(signal.SIGHUP, sighup)
+ register_sighup(sighup)
# make sure that the first thing we log is a thing we can grep backwards
# for
diff --git a/synapse/config/tls.py b/synapse/config/tls.py
index 734f612db7..5f63676d9c 100644
--- a/synapse/config/tls.py
+++ b/synapse/config/tls.py
@@ -31,13 +31,16 @@ logger = logging.getLogger()
class TlsConfig(Config):
def read_config(self, config):
- acme_config = config.get("acme", {})
+ acme_config = config.get("acme", None)
+ if acme_config is None:
+ acme_config = {}
+
self.acme_enabled = acme_config.get("enabled", False)
self.acme_url = acme_config.get(
"url", "https://acme-v01.api.letsencrypt.org/directory"
)
- self.acme_port = acme_config.get("port", 8449)
- self.acme_bind_addresses = acme_config.get("bind_addresses", ["127.0.0.1"])
+ self.acme_port = acme_config.get("port", 80)
+ self.acme_bind_addresses = acme_config.get("bind_addresses", ['::', '0.0.0.0'])
self.acme_reprovision_threshold = acme_config.get("reprovision_threshold", 30)
self.tls_certificate_file = self.abspath(config.get("tls_certificate_path"))
@@ -126,21 +129,80 @@ class TlsConfig(Config):
tls_certificate_path = base_key_name + ".tls.crt"
tls_private_key_path = base_key_name + ".tls.key"
+ # this is to avoid the max line length. Sorrynotsorry
+ proxypassline = (
+ 'ProxyPass /.well-known/acme-challenge '
+ 'http://localhost:8009/.well-known/acme-challenge'
+ )
+
return (
"""\
- # PEM encoded X509 certificate for TLS.
- # This certificate, as of Synapse 1.0, will need to be a valid
- # and verifiable certificate, with a root that is available in
- # the root store of other servers you wish to federate to. Any
- # required intermediary certificates can be appended after the
- # primary certificate in hierarchical order.
+ # PEM-encoded X509 certificate for TLS.
+ # This certificate, as of Synapse 1.0, will need to be a valid and verifiable
+ # certificate, signed by a recognised Certificate Authority.
+ #
+ # See 'ACME support' below to enable auto-provisioning this certificate via
+ # Let's Encrypt.
+ #
tls_certificate_path: "%(tls_certificate_path)s"
- # PEM encoded private key for TLS
+ # PEM-encoded private key for TLS
tls_private_key_path: "%(tls_private_key_path)s"
- # Don't bind to the https port
- no_tls: False
+ # ACME support: This will configure Synapse to request a valid TLS certificate
+ # for your configured `server_name` via Let's Encrypt.
+ #
+ # Note that provisioning a certificate in this way requires port 80 to be
+ # routed to Synapse so that it can complete the http-01 ACME challenge.
+ # By default, if you enable ACME support, Synapse will attempt to listen on
+ # port 80 for incoming http-01 challenges - however, this will likely fail
+ # with 'Permission denied' or a similar error.
+ #
+ # There are a couple of potential solutions to this:
+ #
+ # * If you already have an Apache, Nginx, or similar listening on port 80,
+ # you can configure Synapse to use an alternate port, and have your web
+ # server forward the requests. For example, assuming you set 'port: 8009'
+ # below, on Apache, you would write:
+ #
+ # %(proxypassline)s
+ #
+ # * Alternatively, you can use something like `authbind` to give Synapse
+ # permission to listen on port 80.
+ #
+ acme:
+ # ACME support is disabled by default. Uncomment the following line
+ # to enable it.
+ #
+ # enabled: true
+
+ # Endpoint to use to request certificates. If you only want to test,
+ # use Let's Encrypt's staging url:
+ # https://acme-staging.api.letsencrypt.org/directory
+ #
+ # url: https://acme-v01.api.letsencrypt.org/directory
+
+ # Port number to listen on for the HTTP-01 challenge. Change this if
+ # you are forwarding connections through Apache/Nginx/etc.
+ #
+ # port: 80
+
+ # Local addresses to listen on for incoming connections.
+ # Again, you may want to change this if you are forwarding connections
+ # through Apache/Nginx/etc.
+ #
+ # bind_addresses: ['::', '0.0.0.0']
+
+ # How many days remaining on a certificate before it is renewed.
+ #
+ # reprovision_threshold: 30
+
+ # If your server runs behind a reverse-proxy which terminates TLS connections
+ # (for both client and federation connections), it may be useful to disable
+ # All TLS support for incoming connections. Setting no_tls to False will
+ # do so (and avoid the need to give synapse a TLS private key).
+ #
+ # no_tls: False
# List of allowed TLS fingerprints for this server to publish along
# with the signing keys for this server. Other matrix servers that
@@ -170,20 +232,6 @@ class TlsConfig(Config):
tls_fingerprints: []
# tls_fingerprints: [{"sha256": "<base64_encoded_sha256_fingerprint>"}]
- ## Support for ACME certificate auto-provisioning.
- # acme:
- # enabled: false
- ## ACME path.
- ## If you only want to test, use the staging url:
- ## https://acme-staging.api.letsencrypt.org/directory
- # url: 'https://acme-v01.api.letsencrypt.org/directory'
- ## Port number (to listen for the HTTP-01 challenge).
- ## Using port 80 requires utilising something like authbind, or proxying to it.
- # port: 8449
- ## Hosts to bind to.
- # bind_addresses: ['127.0.0.1']
- ## How many days remaining on a certificate before it is renewed.
- # reprovision_threshold: 30
"""
% locals()
)
diff --git a/synapse/handlers/acme.py b/synapse/handlers/acme.py
index 73ea7ed018..dd0b217965 100644
--- a/synapse/handlers/acme.py
+++ b/synapse/handlers/acme.py
@@ -18,13 +18,16 @@ import logging
import attr
from zope.interface import implementer
+import twisted
+import twisted.internet.error
from twisted.internet import defer
-from twisted.internet.endpoints import serverFromString
from twisted.python.filepath import FilePath
from twisted.python.url import URL
from twisted.web import server, static
from twisted.web.resource import Resource
+from synapse.app import check_bind_error
+
logger = logging.getLogger(__name__)
try:
@@ -96,16 +99,19 @@ class AcmeHandler(object):
srv = server.Site(responder_resource)
- listeners = []
-
- for host in self.hs.config.acme_bind_addresses:
+ bind_addresses = self.hs.config.acme_bind_addresses
+ for host in bind_addresses:
logger.info(
- "Listening for ACME requests on %s:%s", host, self.hs.config.acme_port
- )
- endpoint = serverFromString(
- self.reactor, "tcp:%s:interface=%s" % (self.hs.config.acme_port, host)
+ "Listening for ACME requests on %s:%i", host, self.hs.config.acme_port,
)
- listeners.append(endpoint.listen(srv))
+ try:
+ self.reactor.listenTCP(
+ self.hs.config.acme_port,
+ srv,
+ interface=host,
+ )
+ except twisted.internet.error.CannotListenError as e:
+ check_bind_error(e, host, bind_addresses)
# Make sure we are registered to the ACME server. There's no public API
# for this, it is usually triggered by startService, but since we don't
@@ -114,9 +120,6 @@ class AcmeHandler(object):
self._issuer._registered = False
yield self._issuer._ensure_registered()
- # Return a Deferred that will fire when all the servers have started up.
- yield defer.DeferredList(listeners, fireOnOneErrback=True, consumeErrors=True)
-
@defer.inlineCallbacks
def provision_certificate(self):
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 0699731c13..6bb254f899 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -57,8 +57,8 @@ class DirectoryHandler(BaseHandler):
# general association creation for both human users and app services
for wchar in string.whitespace:
- if wchar in room_alias.localpart:
- raise SynapseError(400, "Invalid characters in room alias")
+ if wchar in room_alias.localpart:
+ raise SynapseError(400, "Invalid characters in room alias")
if not self.hs.is_mine(room_alias):
raise SynapseError(400, "Room alias must be local")
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index f89dabb9eb..083f2e0ac3 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -102,7 +102,7 @@ class FederationHandler(BaseHandler):
self.hs = hs
- self.store = hs.get_datastore() # type: synapse.storage.DataStore
+ self.store = hs.get_datastore()
self.federation_client = hs.get_federation_client()
self.state_handler = hs.get_state_handler()
self.server_name = hs.hostname
diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py
index f81fcd4301..26649e70be 100644
--- a/synapse/http/federation/matrix_federation_agent.py
+++ b/synapse/http/federation/matrix_federation_agent.py
@@ -14,6 +14,8 @@
# limitations under the License.
import json
import logging
+import random
+import time
import attr
from netaddr import IPAddress
@@ -21,14 +23,30 @@ from zope.interface import implementer
from twisted.internet import defer
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
-from twisted.web.client import URI, Agent, HTTPConnectionPool, readBody
+from twisted.web.client import URI, Agent, HTTPConnectionPool, RedirectAgent, readBody
+from twisted.web.http import stringToDatetime
from twisted.web.http_headers import Headers
from twisted.web.iweb import IAgent
from synapse.http.federation.srv_resolver import SrvResolver, pick_server_from_list
+from synapse.util.caches.ttlcache import TTLCache
from synapse.util.logcontext import make_deferred_yieldable
+# period to cache .well-known results for by default
+WELL_KNOWN_DEFAULT_CACHE_PERIOD = 24 * 3600
+
+# jitter to add to the .well-known default cache ttl
+WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER = 10 * 60
+
+# period to cache failure to fetch .well-known for
+WELL_KNOWN_INVALID_CACHE_PERIOD = 1 * 3600
+
+# cap for .well-known cache period
+WELL_KNOWN_MAX_CACHE_PERIOD = 48 * 3600
+
+
logger = logging.getLogger(__name__)
+well_known_cache = TTLCache('well-known')
@implementer(IAgent)
@@ -57,6 +75,7 @@ class MatrixFederationAgent(object):
self, reactor, tls_client_options_factory,
_well_known_tls_policy=None,
_srv_resolver=None,
+ _well_known_cache=well_known_cache,
):
self._reactor = reactor
self._tls_client_options_factory = tls_client_options_factory
@@ -74,9 +93,13 @@ class MatrixFederationAgent(object):
# the param is called 'contextFactory', but actually passing a
# contextfactory is deprecated, and it expects an IPolicyForHTTPS.
agent_args['contextFactory'] = _well_known_tls_policy
- _well_known_agent = Agent(self._reactor, pool=self._pool, **agent_args)
+ _well_known_agent = RedirectAgent(
+ Agent(self._reactor, pool=self._pool, **agent_args),
+ )
self._well_known_agent = _well_known_agent
+ self._well_known_cache = _well_known_cache
+
@defer.inlineCallbacks
def request(self, method, uri, headers=None, bodyProducer=None):
"""
@@ -259,7 +282,14 @@ class MatrixFederationAgent(object):
Deferred[bytes|None]: either the new server name, from the .well-known, or
None if there was no .well-known file.
"""
- # FIXME: add a cache
+ try:
+ cached = self._well_known_cache[server_name]
+ defer.returnValue(cached)
+ except KeyError:
+ pass
+
+ # TODO: should we linearise so that we don't end up doing two .well-known requests
+ # for the same server in parallel?
uri = b"https://%s/.well-known/matrix/server" % (server_name, )
uri_str = uri.decode("ascii")
@@ -268,14 +298,18 @@ class MatrixFederationAgent(object):
response = yield make_deferred_yieldable(
self._well_known_agent.request(b"GET", uri),
)
+ body = yield make_deferred_yieldable(readBody(response))
+ if response.code != 200:
+ raise Exception("Non-200 response %s" % (response.code, ))
except Exception as e:
- logger.info("Connection error fetching %s: %s", uri_str, e)
- defer.returnValue(None)
+ logger.info("Error fetching %s: %s", uri_str, e)
- body = yield make_deferred_yieldable(readBody(response))
+ # add some randomness to the TTL to avoid a stampeding herd every hour
+ # after startup
+ cache_period = WELL_KNOWN_INVALID_CACHE_PERIOD
+ cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
- if response.code != 200:
- logger.info("Error response %i from %s", response.code, uri_str)
+ self._well_known_cache.set(server_name, None, cache_period)
defer.returnValue(None)
try:
@@ -287,7 +321,63 @@ class MatrixFederationAgent(object):
raise Exception("Missing key 'm.server'")
except Exception as e:
raise Exception("invalid .well-known response from %s: %s" % (uri_str, e,))
- defer.returnValue(parsed_body["m.server"].encode("ascii"))
+
+ result = parsed_body["m.server"].encode("ascii")
+
+ cache_period = _cache_period_from_headers(
+ response.headers,
+ time_now=self._reactor.seconds,
+ )
+ if cache_period is None:
+ cache_period = WELL_KNOWN_DEFAULT_CACHE_PERIOD
+ # add some randomness to the TTL to avoid a stampeding herd every 24 hours
+ # after startup
+ cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
+ else:
+ cache_period = min(cache_period, WELL_KNOWN_MAX_CACHE_PERIOD)
+
+ if cache_period > 0:
+ self._well_known_cache.set(server_name, result, cache_period)
+
+ defer.returnValue(result)
+
+
+def _cache_period_from_headers(headers, time_now=time.time):
+ cache_controls = _parse_cache_control(headers)
+
+ if b'no-store' in cache_controls:
+ return 0
+
+ if b'max-age' in cache_controls:
+ try:
+ max_age = int(cache_controls[b'max-age'])
+ return max_age
+ except ValueError:
+ pass
+
+ expires = headers.getRawHeaders(b'expires')
+ if expires is not None:
+ try:
+ expires_date = stringToDatetime(expires[-1])
+ return expires_date - time_now()
+ except ValueError:
+ # RFC7234 says 'A cache recipient MUST interpret invalid date formats,
+ # especially the value "0", as representing a time in the past (i.e.,
+ # "already expired").
+ return 0
+
+ return None
+
+
+def _parse_cache_control(headers):
+ cache_controls = {}
+ for hdr in headers.getRawHeaders(b'cache-control', []):
+ for directive in hdr.split(b','):
+ splits = [x.strip() for x in directive.split(b'=', 1)]
+ k = splits[0].lower()
+ v = splits[1] if len(splits) > 1 else None
+ cache_controls[k] = v
+ return cache_controls
@attr.s
diff --git a/synapse/push/clientformat.py b/synapse/push/clientformat.py
index ecbf364a5e..8bd96b1178 100644
--- a/synapse/push/clientformat.py
+++ b/synapse/push/clientformat.py
@@ -84,7 +84,7 @@ def _rule_to_template(rule):
templaterule["pattern"] = thecond["pattern"]
if unscoped_rule_id:
- templaterule['rule_id'] = unscoped_rule_id
+ templaterule['rule_id'] = unscoped_rule_id
if 'default' in rule:
templaterule['default'] = rule['default']
return templaterule
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index 5e5376cf58..e81456ab2b 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -127,7 +127,10 @@ class ReplicationEndpoint(object):
def send_request(**kwargs):
data = yield cls._serialize_payload(**kwargs)
- url_args = [urllib.parse.quote(kwargs[name]) for name in cls.PATH_ARGS]
+ url_args = [
+ urllib.parse.quote(kwargs[name], safe='')
+ for name in cls.PATH_ARGS
+ ]
if cls.CACHE:
txn_id = random_string(10)
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 24329879e5..42cd3c83ad 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -317,7 +317,7 @@ class DataStore(RoomMemberStore, RoomStore,
thirty_days_ago_in_secs))
for row in txn:
- if row[0] is 'unknown':
+ if row[0] == 'unknown':
pass
results[row[0]] = row[1]
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 3e1915fb87..81b250480d 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -904,106 +904,106 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order):
for room_id, current_state_tuple in iteritems(state_delta_by_room):
- to_delete, to_insert = current_state_tuple
-
- # First we add entries to the current_state_delta_stream. We
- # do this before updating the current_state_events table so
- # that we can use it to calculate the `prev_event_id`. (This
- # allows us to not have to pull out the existing state
- # unnecessarily).
- sql = """
- INSERT INTO current_state_delta_stream
- (stream_id, room_id, type, state_key, event_id, prev_event_id)
- SELECT ?, ?, ?, ?, ?, (
- SELECT event_id FROM current_state_events
- WHERE room_id = ? AND type = ? AND state_key = ?
- )
- """
- txn.executemany(sql, (
- (
- max_stream_order, room_id, etype, state_key, None,
- room_id, etype, state_key,
- )
- for etype, state_key in to_delete
- # We sanity check that we're deleting rather than updating
- if (etype, state_key) not in to_insert
- ))
- txn.executemany(sql, (
- (
- max_stream_order, room_id, etype, state_key, ev_id,
- room_id, etype, state_key,
- )
- for (etype, state_key), ev_id in iteritems(to_insert)
- ))
+ to_delete, to_insert = current_state_tuple
- # Now we actually update the current_state_events table
-
- txn.executemany(
- "DELETE FROM current_state_events"
- " WHERE room_id = ? AND type = ? AND state_key = ?",
- (
- (room_id, etype, state_key)
- for etype, state_key in itertools.chain(to_delete, to_insert)
- ),
+ # First we add entries to the current_state_delta_stream. We
+ # do this before updating the current_state_events table so
+ # that we can use it to calculate the `prev_event_id`. (This
+ # allows us to not have to pull out the existing state
+ # unnecessarily).
+ sql = """
+ INSERT INTO current_state_delta_stream
+ (stream_id, room_id, type, state_key, event_id, prev_event_id)
+ SELECT ?, ?, ?, ?, ?, (
+ SELECT event_id FROM current_state_events
+ WHERE room_id = ? AND type = ? AND state_key = ?
)
-
- self._simple_insert_many_txn(
- txn,
- table="current_state_events",
- values=[
- {
- "event_id": ev_id,
- "room_id": room_id,
- "type": key[0],
- "state_key": key[1],
- }
- for key, ev_id in iteritems(to_insert)
- ],
+ """
+ txn.executemany(sql, (
+ (
+ max_stream_order, room_id, etype, state_key, None,
+ room_id, etype, state_key,
)
-
- txn.call_after(
- self._curr_state_delta_stream_cache.entity_has_changed,
- room_id, max_stream_order,
+ for etype, state_key in to_delete
+ # We sanity check that we're deleting rather than updating
+ if (etype, state_key) not in to_insert
+ ))
+ txn.executemany(sql, (
+ (
+ max_stream_order, room_id, etype, state_key, ev_id,
+ room_id, etype, state_key,
)
+ for (etype, state_key), ev_id in iteritems(to_insert)
+ ))
- # Invalidate the various caches
-
- # Figure out the changes of membership to invalidate the
- # `get_rooms_for_user` cache.
- # We find out which membership events we may have deleted
- # and which we have added, then we invlidate the caches for all
- # those users.
- members_changed = set(
- state_key
- for ev_type, state_key in itertools.chain(to_delete, to_insert)
- if ev_type == EventTypes.Member
- )
+ # Now we actually update the current_state_events table
- for member in members_changed:
- self._invalidate_cache_and_stream(
- txn, self.get_rooms_for_user_with_stream_ordering, (member,)
- )
+ txn.executemany(
+ "DELETE FROM current_state_events"
+ " WHERE room_id = ? AND type = ? AND state_key = ?",
+ (
+ (room_id, etype, state_key)
+ for etype, state_key in itertools.chain(to_delete, to_insert)
+ ),
+ )
- for host in set(get_domain_from_id(u) for u in members_changed):
- self._invalidate_cache_and_stream(
- txn, self.is_host_joined, (room_id, host)
- )
- self._invalidate_cache_and_stream(
- txn, self.was_host_joined, (room_id, host)
- )
+ self._simple_insert_many_txn(
+ txn,
+ table="current_state_events",
+ values=[
+ {
+ "event_id": ev_id,
+ "room_id": room_id,
+ "type": key[0],
+ "state_key": key[1],
+ }
+ for key, ev_id in iteritems(to_insert)
+ ],
+ )
+
+ txn.call_after(
+ self._curr_state_delta_stream_cache.entity_has_changed,
+ room_id, max_stream_order,
+ )
+
+ # Invalidate the various caches
+
+ # Figure out the changes of membership to invalidate the
+ # `get_rooms_for_user` cache.
+ # We find out which membership events we may have deleted
+ # and which we have added, then we invlidate the caches for all
+ # those users.
+ members_changed = set(
+ state_key
+ for ev_type, state_key in itertools.chain(to_delete, to_insert)
+ if ev_type == EventTypes.Member
+ )
+ for member in members_changed:
self._invalidate_cache_and_stream(
- txn, self.get_users_in_room, (room_id,)
+ txn, self.get_rooms_for_user_with_stream_ordering, (member,)
)
+ for host in set(get_domain_from_id(u) for u in members_changed):
self._invalidate_cache_and_stream(
- txn, self.get_room_summary, (room_id,)
+ txn, self.is_host_joined, (room_id, host)
)
-
self._invalidate_cache_and_stream(
- txn, self.get_current_state_ids, (room_id,)
+ txn, self.was_host_joined, (room_id, host)
)
+ self._invalidate_cache_and_stream(
+ txn, self.get_users_in_room, (room_id,)
+ )
+
+ self._invalidate_cache_and_stream(
+ txn, self.get_room_summary, (room_id,)
+ )
+
+ self._invalidate_cache_and_stream(
+ txn, self.get_current_state_ids, (room_id,)
+ )
+
def _update_forward_extremities_txn(self, txn, new_forward_extremities,
max_stream_order):
for room_id, new_extrem in iteritems(new_forward_extremities):
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index ebe1429acb..57dae324c7 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -220,7 +220,7 @@ class EventsWorkerStore(SQLBaseStore):
defer.returnValue(events)
def _invalidate_get_event_cache(self, event_id):
- self._get_event_cache.invalidate((event_id,))
+ self._get_event_cache.invalidate((event_id,))
def _get_events_from_cache(self, events, allow_rejected, update_metrics=True):
"""Fetch events from the caches
diff --git a/synapse/util/caches/ttlcache.py b/synapse/util/caches/ttlcache.py
new file mode 100644
index 0000000000..5ba1862506
--- /dev/null
+++ b/synapse/util/caches/ttlcache.py
@@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import time
+
+import attr
+from sortedcontainers import SortedList
+
+from synapse.util.caches import register_cache
+
+logger = logging.getLogger(__name__)
+
+SENTINEL = object()
+
+
+class TTLCache(object):
+ """A key/value cache implementation where each entry has its own TTL"""
+
+ def __init__(self, cache_name, timer=time.time):
+ # map from key to _CacheEntry
+ self._data = {}
+
+ # the _CacheEntries, sorted by expiry time
+ self._expiry_list = SortedList()
+
+ self._timer = timer
+
+ self._metrics = register_cache("ttl", cache_name, self)
+
+ def set(self, key, value, ttl):
+ """Add/update an entry in the cache
+
+ Args:
+ key: key for this entry
+ value: value for this entry
+ ttl (float): TTL for this entry, in seconds
+ """
+ expiry = self._timer() + ttl
+
+ self.expire()
+ e = self._data.pop(key, SENTINEL)
+ if e != SENTINEL:
+ self._expiry_list.remove(e)
+
+ entry = _CacheEntry(expiry_time=expiry, key=key, value=value)
+ self._data[key] = entry
+ self._expiry_list.add(entry)
+
+ def get(self, key, default=SENTINEL):
+ """Get a value from the cache
+
+ Args:
+ key: key to look up
+ default: default value to return, if key is not found. If not set, and the
+ key is not found, a KeyError will be raised
+
+ Returns:
+ value from the cache, or the default
+ """
+ self.expire()
+ e = self._data.get(key, SENTINEL)
+ if e == SENTINEL:
+ self._metrics.inc_misses()
+ if default == SENTINEL:
+ raise KeyError(key)
+ return default
+ self._metrics.inc_hits()
+ return e.value
+
+ def get_with_expiry(self, key):
+ """Get a value, and its expiry time, from the cache
+
+ Args:
+ key: key to look up
+
+ Returns:
+ Tuple[Any, float]: the value from the cache, and the expiry time
+
+ Raises:
+ KeyError if the entry is not found
+ """
+ self.expire()
+ try:
+ e = self._data[key]
+ except KeyError:
+ self._metrics.inc_misses()
+ raise
+ self._metrics.inc_hits()
+ return e.value, e.expiry_time
+
+ def pop(self, key, default=SENTINEL):
+ """Remove a value from the cache
+
+ If key is in the cache, remove it and return its value, else return default.
+ If default is not given and key is not in the cache, a KeyError is raised.
+
+ Args:
+ key: key to look up
+ default: default value to return, if key is not found. If not set, and the
+ key is not found, a KeyError will be raised
+
+ Returns:
+ value from the cache, or the default
+ """
+ self.expire()
+ e = self._data.pop(key, SENTINEL)
+ if e == SENTINEL:
+ self._metrics.inc_misses()
+ if default == SENTINEL:
+ raise KeyError(key)
+ return default
+ self._expiry_list.remove(e)
+ self._metrics.inc_hits()
+ return e.value
+
+ def __getitem__(self, key):
+ return self.get(key)
+
+ def __delitem__(self, key):
+ self.pop(key)
+
+ def __contains__(self, key):
+ return key in self._data
+
+ def __len__(self):
+ self.expire()
+ return len(self._data)
+
+ def expire(self):
+ """Run the expiry on the cache. Any entries whose expiry times are due will
+ be removed
+ """
+ now = self._timer()
+ while self._expiry_list:
+ first_entry = self._expiry_list[0]
+ if first_entry.expiry_time - now > 0.0:
+ break
+ del self._data[first_entry.key]
+ del self._expiry_list[0]
+
+
+@attr.s(frozen=True, slots=True)
+class _CacheEntry(object):
+ """TTLCache entry"""
+ # expiry_time is the first attribute, so that entries are sorted by expiry.
+ expiry_time = attr.ib()
+ key = attr.ib()
+ value = attr.ib()
diff --git a/tests/http/federation/test_matrix_federation_agent.py b/tests/http/federation/test_matrix_federation_agent.py
index 11ea8ef10c..7b2800021f 100644
--- a/tests/http/federation/test_matrix_federation_agent.py
+++ b/tests/http/federation/test_matrix_federation_agent.py
@@ -24,11 +24,16 @@ from twisted.internet._sslverify import ClientTLSOptions, OpenSSLCertificateOpti
from twisted.internet.protocol import Factory
from twisted.protocols.tls import TLSMemoryBIOFactory
from twisted.web.http import HTTPChannel
+from twisted.web.http_headers import Headers
from twisted.web.iweb import IPolicyForHTTPS
from synapse.crypto.context_factory import ClientTLSOptionsFactory
-from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
+from synapse.http.federation.matrix_federation_agent import (
+ MatrixFederationAgent,
+ _cache_period_from_headers,
+)
from synapse.http.federation.srv_resolver import Server
+from synapse.util.caches.ttlcache import TTLCache
from synapse.util.logcontext import LoggingContext
from tests.http import ServerTLSContext
@@ -44,11 +49,14 @@ class MatrixFederationAgentTests(TestCase):
self.mock_resolver = Mock()
+ self.well_known_cache = TTLCache("test_cache", timer=self.reactor.seconds)
+
self.agent = MatrixFederationAgent(
reactor=self.reactor,
tls_client_options_factory=ClientTLSOptionsFactory(None),
_well_known_tls_policy=TrustingTLSPolicyForHTTPS(),
_srv_resolver=self.mock_resolver,
+ _well_known_cache=self.well_known_cache,
)
def _make_connection(self, client_factory, expected_sni):
@@ -115,7 +123,9 @@ class MatrixFederationAgentTests(TestCase):
finally:
_check_logcontext(context)
- def _handle_well_known_connection(self, client_factory, expected_sni, target_server):
+ def _handle_well_known_connection(
+ self, client_factory, expected_sni, target_server, response_headers={},
+ ):
"""Handle an outgoing HTTPs connection: wire it up to a server, check that the
request is for a .well-known, and send the response.
@@ -124,6 +134,8 @@ class MatrixFederationAgentTests(TestCase):
expected_sni (bytes): SNI that we expect the outgoing connection to send
target_server (bytes): target server that we should redirect to in the
.well-known response.
+ Returns:
+ HTTPChannel: server impl
"""
# make the connection for .well-known
well_known_server = self._make_connection(
@@ -133,9 +145,10 @@ class MatrixFederationAgentTests(TestCase):
# check the .well-known request and send a response
self.assertEqual(len(well_known_server.requests), 1)
request = well_known_server.requests[0]
- self._send_well_known_response(request, target_server)
+ self._send_well_known_response(request, target_server, headers=response_headers)
+ return well_known_server
- def _send_well_known_response(self, request, target_server):
+ def _send_well_known_response(self, request, target_server, headers={}):
"""Check that an incoming request looks like a valid .well-known request, and
send back the response.
"""
@@ -146,6 +159,8 @@ class MatrixFederationAgentTests(TestCase):
[b'testserv'],
)
# send back a response
+ for k, v in headers.items():
+ request.setHeader(k, v)
request.write(b'{ "m.server": "%s" }' % (target_server,))
request.finish()
@@ -448,6 +463,110 @@ class MatrixFederationAgentTests(TestCase):
self.reactor.pump((0.1,))
self.successResultOf(test_d)
+ self.assertEqual(self.well_known_cache[b"testserv"], b"target-server")
+
+ # check the cache expires
+ self.reactor.pump((25 * 3600,))
+ self.well_known_cache.expire()
+ self.assertNotIn(b"testserv", self.well_known_cache)
+
+ def test_get_well_known_redirect(self):
+ """Test the behaviour when the server name has no port and no SRV record, but
+ the .well-known has a 300 redirect
+ """
+ self.mock_resolver.resolve_service.side_effect = lambda _: []
+ self.reactor.lookups["testserv"] = "1.2.3.4"
+ self.reactor.lookups["target-server"] = "1::f"
+
+ test_d = self._make_get_request(b"matrix://testserv/foo/bar")
+
+ # Nothing happened yet
+ self.assertNoResult(test_d)
+
+ self.mock_resolver.resolve_service.assert_called_once_with(
+ b"_matrix._tcp.testserv",
+ )
+ self.mock_resolver.resolve_service.reset_mock()
+
+ # there should be an attempt to connect on port 443 for the .well-known
+ clients = self.reactor.tcpClients
+ self.assertEqual(len(clients), 1)
+ (host, port, client_factory, _timeout, _bindAddress) = clients.pop()
+ self.assertEqual(host, '1.2.3.4')
+ self.assertEqual(port, 443)
+
+ redirect_server = self._make_connection(
+ client_factory,
+ expected_sni=b"testserv",
+ )
+
+ # send a 302 redirect
+ self.assertEqual(len(redirect_server.requests), 1)
+ request = redirect_server.requests[0]
+ request.redirect(b'https://testserv/even_better_known')
+ request.finish()
+
+ self.reactor.pump((0.1, ))
+
+ # now there should be another connection
+ clients = self.reactor.tcpClients
+ self.assertEqual(len(clients), 1)
+ (host, port, client_factory, _timeout, _bindAddress) = clients.pop()
+ self.assertEqual(host, '1.2.3.4')
+ self.assertEqual(port, 443)
+
+ well_known_server = self._make_connection(
+ client_factory,
+ expected_sni=b"testserv",
+ )
+
+ self.assertEqual(len(well_known_server.requests), 1, "No request after 302")
+ request = well_known_server.requests[0]
+ self.assertEqual(request.method, b'GET')
+ self.assertEqual(request.path, b'/even_better_known')
+ request.write(b'{ "m.server": "target-server" }')
+ request.finish()
+
+ self.reactor.pump((0.1, ))
+
+ # there should be another SRV lookup
+ self.mock_resolver.resolve_service.assert_called_once_with(
+ b"_matrix._tcp.target-server",
+ )
+
+ # now we should get a connection to the target server
+ self.assertEqual(len(clients), 1)
+ (host, port, client_factory, _timeout, _bindAddress) = clients[0]
+ self.assertEqual(host, '1::f')
+ self.assertEqual(port, 8448)
+
+ # make a test server, and wire up the client
+ http_server = self._make_connection(
+ client_factory,
+ expected_sni=b'target-server',
+ )
+
+ self.assertEqual(len(http_server.requests), 1)
+ request = http_server.requests[0]
+ self.assertEqual(request.method, b'GET')
+ self.assertEqual(request.path, b'/foo/bar')
+ self.assertEqual(
+ request.requestHeaders.getRawHeaders(b'host'),
+ [b'target-server'],
+ )
+
+ # finish the request
+ request.finish()
+ self.reactor.pump((0.1,))
+ self.successResultOf(test_d)
+
+ self.assertEqual(self.well_known_cache[b"testserv"], b"target-server")
+
+ # check the cache expires
+ self.reactor.pump((25 * 3600,))
+ self.well_known_cache.expire()
+ self.assertNotIn(b"testserv", self.well_known_cache)
+
def test_get_hostname_srv(self):
"""
Test the behaviour when there is a single SRV record
@@ -661,6 +780,126 @@ class MatrixFederationAgentTests(TestCase):
self.reactor.pump((0.1,))
self.successResultOf(test_d)
+ @defer.inlineCallbacks
+ def do_get_well_known(self, serv):
+ try:
+ result = yield self.agent._get_well_known(serv)
+ logger.info("Result from well-known fetch: %s", result)
+ except Exception as e:
+ logger.warning("Error fetching well-known: %s", e)
+ raise
+ defer.returnValue(result)
+
+ def test_well_known_cache(self):
+ self.reactor.lookups["testserv"] = "1.2.3.4"
+
+ fetch_d = self.do_get_well_known(b'testserv')
+
+ # there should be an attempt to connect on port 443 for the .well-known
+ clients = self.reactor.tcpClients
+ self.assertEqual(len(clients), 1)
+ (host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
+ self.assertEqual(host, '1.2.3.4')
+ self.assertEqual(port, 443)
+
+ well_known_server = self._handle_well_known_connection(
+ client_factory,
+ expected_sni=b"testserv",
+ response_headers={b'Cache-Control': b'max-age=10'},
+ target_server=b"target-server",
+ )
+
+ r = self.successResultOf(fetch_d)
+ self.assertEqual(r, b'target-server')
+
+ # close the tcp connection
+ well_known_server.loseConnection()
+
+ # repeat the request: it should hit the cache
+ fetch_d = self.do_get_well_known(b'testserv')
+ r = self.successResultOf(fetch_d)
+ self.assertEqual(r, b'target-server')
+
+ # expire the cache
+ self.reactor.pump((10.0,))
+
+ # now it should connect again
+ fetch_d = self.do_get_well_known(b'testserv')
+
+ self.assertEqual(len(clients), 1)
+ (host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
+ self.assertEqual(host, '1.2.3.4')
+ self.assertEqual(port, 443)
+
+ self._handle_well_known_connection(
+ client_factory,
+ expected_sni=b"testserv",
+ target_server=b"other-server",
+ )
+
+ r = self.successResultOf(fetch_d)
+ self.assertEqual(r, b'other-server')
+
+
+class TestCachePeriodFromHeaders(TestCase):
+ def test_cache_control(self):
+ # uppercase
+ self.assertEqual(
+ _cache_period_from_headers(
+ Headers({b'Cache-Control': [b'foo, Max-Age = 100, bar']}),
+ ), 100,
+ )
+
+ # missing value
+ self.assertIsNone(_cache_period_from_headers(
+ Headers({b'Cache-Control': [b'max-age=, bar']}),
+ ))
+
+ # hackernews: bogus due to semicolon
+ self.assertIsNone(_cache_period_from_headers(
+ Headers({b'Cache-Control': [b'private; max-age=0']}),
+ ))
+
+ # github
+ self.assertEqual(
+ _cache_period_from_headers(
+ Headers({b'Cache-Control': [b'max-age=0, private, must-revalidate']}),
+ ), 0,
+ )
+
+ # google
+ self.assertEqual(
+ _cache_period_from_headers(
+ Headers({b'cache-control': [b'private, max-age=0']}),
+ ), 0,
+ )
+
+ def test_expires(self):
+ self.assertEqual(
+ _cache_period_from_headers(
+ Headers({b'Expires': [b'Wed, 30 Jan 2019 07:35:33 GMT']}),
+ time_now=lambda: 1548833700
+ ), 33,
+ )
+
+ # cache-control overrides expires
+ self.assertEqual(
+ _cache_period_from_headers(
+ Headers({
+ b'cache-control': [b'max-age=10'],
+ b'Expires': [b'Wed, 30 Jan 2019 07:35:33 GMT']
+ }),
+ time_now=lambda: 1548833700
+ ), 10,
+ )
+
+ # invalid expires means immediate expiry
+ self.assertEqual(
+ _cache_period_from_headers(
+ Headers({b'Expires': [b'0']}),
+ ), 0,
+ )
+
def _check_logcontext(context):
current = LoggingContext.current_context()
diff --git a/tests/server.py b/tests/server.py
index 3d7ae9875c..fc1e76d146 100644
--- a/tests/server.py
+++ b/tests/server.py
@@ -360,6 +360,7 @@ class FakeTransport(object):
"""
disconnecting = False
+ disconnected = False
buffer = attr.ib(default=b'')
producer = attr.ib(default=None)
@@ -370,14 +371,16 @@ class FakeTransport(object):
return None
def loseConnection(self, reason=None):
- logger.info("FakeTransport: loseConnection(%s)", reason)
if not self.disconnecting:
+ logger.info("FakeTransport: loseConnection(%s)", reason)
self.disconnecting = True
if self._protocol:
self._protocol.connectionLost(reason)
+ self.disconnected = True
def abortConnection(self):
- self.disconnecting = True
+ logger.info("FakeTransport: abortConnection()")
+ self.loseConnection()
def pauseProducing(self):
if not self.producer:
@@ -416,9 +419,16 @@ class FakeTransport(object):
# TLSMemoryBIOProtocol
return
+ if self.disconnected:
+ return
+ logger.info("%s->%s: %s", self._protocol, self.other, self.buffer)
+
if getattr(self.other, "transport") is not None:
- self.other.dataReceived(self.buffer)
- self.buffer = b""
+ try:
+ self.other.dataReceived(self.buffer)
+ self.buffer = b""
+ except Exception as e:
+ logger.warning("Exception writing to protocol: %s", e)
return
self._reactor.callLater(0.0, _write)
diff --git a/tests/storage/test_background_update.py b/tests/storage/test_background_update.py
index 81403727c5..5568a607c7 100644
--- a/tests/storage/test_background_update.py
+++ b/tests/storage/test_background_update.py
@@ -11,7 +11,7 @@ class BackgroundUpdateTestCase(unittest.TestCase):
def setUp(self):
hs = yield setup_test_homeserver(
self.addCleanup
- ) # type: synapse.server.HomeServer
+ )
self.store = hs.get_datastore()
self.clock = hs.get_clock()
diff --git a/tests/storage/test_end_to_end_keys.py b/tests/storage/test_end_to_end_keys.py
index b83f7336d3..11fb8c0c19 100644
--- a/tests/storage/test_end_to_end_keys.py
+++ b/tests/storage/test_end_to_end_keys.py
@@ -20,9 +20,6 @@ import tests.utils
class EndToEndKeyStoreTestCase(tests.unittest.TestCase):
- def __init__(self, *args, **kwargs):
- super(EndToEndKeyStoreTestCase, self).__init__(*args, **kwargs)
- self.store = None # type: synapse.storage.DataStore
@defer.inlineCallbacks
def setUp(self):
diff --git a/tests/storage/test_keys.py b/tests/storage/test_keys.py
index 47f4a8ceac..0d2dc9f325 100644
--- a/tests/storage/test_keys.py
+++ b/tests/storage/test_keys.py
@@ -22,9 +22,6 @@ import tests.utils
class KeyStoreTestCase(tests.unittest.TestCase):
- def __init__(self, *args, **kwargs):
- super(KeyStoreTestCase, self).__init__(*args, **kwargs)
- self.store = None # type: synapse.storage.keys.KeyStore
@defer.inlineCallbacks
def setUp(self):
diff --git a/tests/storage/test_state.py b/tests/storage/test_state.py
index a1f99134dc..99cd3e09eb 100644
--- a/tests/storage/test_state.py
+++ b/tests/storage/test_state.py
@@ -28,9 +28,6 @@ logger = logging.getLogger(__name__)
class StateStoreTestCase(tests.unittest.TestCase):
- def __init__(self, *args, **kwargs):
- super(StateStoreTestCase, self).__init__(*args, **kwargs)
- self.store = None # type: synapse.storage.DataStore
@defer.inlineCallbacks
def setUp(self):
diff --git a/tests/util/caches/test_ttlcache.py b/tests/util/caches/test_ttlcache.py
new file mode 100644
index 0000000000..03b3c15db6
--- /dev/null
+++ b/tests/util/caches/test_ttlcache.py
@@ -0,0 +1,83 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from mock import Mock
+
+from synapse.util.caches.ttlcache import TTLCache
+
+from tests import unittest
+
+
+class CacheTestCase(unittest.TestCase):
+ def setUp(self):
+ self.mock_timer = Mock(side_effect=lambda: 100.0)
+ self.cache = TTLCache("test_cache", self.mock_timer)
+
+ def test_get(self):
+ """simple set/get tests"""
+ self.cache.set('one', '1', 10)
+ self.cache.set('two', '2', 20)
+ self.cache.set('three', '3', 30)
+
+ self.assertEqual(len(self.cache), 3)
+
+ self.assertTrue('one' in self.cache)
+ self.assertEqual(self.cache.get('one'), '1')
+ self.assertEqual(self.cache['one'], '1')
+ self.assertEqual(self.cache.get_with_expiry('one'), ('1', 110))
+ self.assertEqual(self.cache._metrics.hits, 3)
+ self.assertEqual(self.cache._metrics.misses, 0)
+
+ self.cache.set('two', '2.5', 20)
+ self.assertEqual(self.cache['two'], '2.5')
+ self.assertEqual(self.cache._metrics.hits, 4)
+
+ # non-existent-item tests
+ self.assertEqual(self.cache.get('four', '4'), '4')
+ self.assertIs(self.cache.get('four', None), None)
+
+ with self.assertRaises(KeyError):
+ self.cache['four']
+
+ with self.assertRaises(KeyError):
+ self.cache.get('four')
+
+ with self.assertRaises(KeyError):
+ self.cache.get_with_expiry('four')
+
+ self.assertEqual(self.cache._metrics.hits, 4)
+ self.assertEqual(self.cache._metrics.misses, 5)
+
+ def test_expiry(self):
+ self.cache.set('one', '1', 10)
+ self.cache.set('two', '2', 20)
+ self.cache.set('three', '3', 30)
+
+ self.assertEqual(len(self.cache), 3)
+ self.assertEqual(self.cache['one'], '1')
+ self.assertEqual(self.cache['two'], '2')
+
+ # enough for the first entry to expire, but not the rest
+ self.mock_timer.side_effect = lambda: 110.0
+
+ self.assertEqual(len(self.cache), 2)
+ self.assertFalse('one' in self.cache)
+ self.assertEqual(self.cache['two'], '2')
+ self.assertEqual(self.cache['three'], '3')
+
+ self.assertEqual(self.cache.get_with_expiry('two'), ('2', 120))
+
+ self.assertEqual(self.cache._metrics.hits, 5)
+ self.assertEqual(self.cache._metrics.misses, 0)
|