diff options
Diffstat (limited to 'synapse')
37 files changed, 862 insertions, 400 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py index 048d6e572f..f7bac0ea4e 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd -# Copyright 2018 New Vector Ltd +# Copyright 2018-9 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -27,4 +27,4 @@ try: except ImportError: pass -__version__ = "0.99.0" +__version__ = "0.99.1" diff --git a/synapse/api/constants.py b/synapse/api/constants.py index fedfb92b3e..f47c33a074 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -73,6 +73,7 @@ class EventTypes(object): RoomHistoryVisibility = "m.room.history_visibility" CanonicalAlias = "m.room.canonical_alias" RoomAvatar = "m.room.avatar" + RoomEncryption = "m.room.encryption" GuestAccess = "m.room.guest_access" # These are used for validation diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 5b97a54d45..73ca52bd8c 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -15,19 +15,36 @@ import gc import logging +import signal import sys +import traceback import psutil from daemonize import Daemonize from twisted.internet import error, reactor +from twisted.protocols.tls import TLSMemoryBIOFactory from synapse.app import check_bind_error +from synapse.crypto import context_factory from synapse.util import PreserveLoggingContext from synapse.util.rlimit import change_resource_limit logger = logging.getLogger(__name__) +_sighup_callbacks = [] + + +def register_sighup(func): + """ + Register a function to be called when a SIGHUP occurs. + + Args: + func (function): Function to be called when sent a SIGHUP signal. + Will be called with a single argument, the homeserver. + """ + _sighup_callbacks.append(func) + def start_worker_reactor(appname, config): """ Run the reactor in the main process @@ -136,9 +153,8 @@ def listen_metrics(bind_addresses, port): from prometheus_client import start_http_server for host in bind_addresses: - reactor.callInThread(start_http_server, int(port), - addr=host, registry=RegistryProxy) - logger.info("Metrics now reporting on %s:%d", host, port) + logger.info("Starting metrics listener on %s:%d", host, port) + start_http_server(port, addr=host, registry=RegistryProxy) def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50): @@ -146,21 +162,23 @@ def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50): Create a TCP socket for a port and several addresses Returns: - list (empty) + list[twisted.internet.tcp.Port]: listening for TCP connections """ + r = [] for address in bind_addresses: try: - reactor.listenTCP( - port, - factory, - backlog, - address + r.append( + reactor.listenTCP( + port, + factory, + backlog, + address + ) ) except error.CannotListenError as e: check_bind_error(e, address, bind_addresses) - logger.info("Synapse now listening on TCP port %d", port) - return [] + return r def listen_ssl( @@ -187,5 +205,74 @@ def listen_ssl( except error.CannotListenError as e: check_bind_error(e, address, bind_addresses) - logger.info("Synapse now listening on port %d (TLS)", port) return r + + +def refresh_certificate(hs): + """ + Refresh the TLS certificates that Synapse is using by re-reading them from + disk and updating the TLS context factories to use them. + """ + + if not hs.config.has_tls_listener(): + # attempt to reload the certs for the good of the tls_fingerprints + hs.config.read_certificate_from_disk(require_cert_and_key=False) + return + + hs.config.read_certificate_from_disk(require_cert_and_key=True) + hs.tls_server_context_factory = context_factory.ServerContextFactory(hs.config) + + if hs._listening_services: + logger.info("Updating context factories...") + for i in hs._listening_services: + # When you listenSSL, it doesn't make an SSL port but a TCP one with + # a TLS wrapping factory around the factory you actually want to get + # requests. This factory attribute is public but missing from + # Twisted's documentation. + if isinstance(i.factory, TLSMemoryBIOFactory): + addr = i.getHost() + logger.info( + "Replacing TLS context factory on [%s]:%i", addr.host, addr.port, + ) + # We want to replace TLS factories with a new one, with the new + # TLS configuration. We do this by reaching in and pulling out + # the wrappedFactory, and then re-wrapping it. + i.factory = TLSMemoryBIOFactory( + hs.tls_server_context_factory, + False, + i.factory.wrappedFactory + ) + logger.info("Context factories updated.") + + +def start(hs, listeners=None): + """ + Start a Synapse server or worker. + + Args: + hs (synapse.server.HomeServer) + listeners (list[dict]): Listener configuration ('listeners' in homeserver.yaml) + """ + try: + # Set up the SIGHUP machinery. + if hasattr(signal, "SIGHUP"): + def handle_sighup(*args, **kwargs): + for i in _sighup_callbacks: + i(hs) + + signal.signal(signal.SIGHUP, handle_sighup) + + register_sighup(refresh_certificate) + + # Load the certificate from disk. + refresh_certificate(hs) + + # It is now safe to start your Synapse. + hs.start_listening(listeners) + hs.get_datastore().start_profiling() + except Exception: + traceback.print_exc(file=sys.stderr) + reactor = hs.get_reactor() + if reactor.running: + reactor.stop() + sys.exit(1) diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index 8559e141af..33107f56d1 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -168,12 +168,7 @@ def start(config_options): ) ps.setup() - ps.start_listening(config.worker_listeners) - - def start(): - ps.get_datastore().start_profiling() - - reactor.callWhenRunning(start) + reactor.callWhenRunning(_base.start, ps, config.worker_listeners) _base.start_worker_reactor("synapse-appservice", config) diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index f8a417cb60..a9d2147022 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -25,7 +25,6 @@ from synapse.app import _base from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging -from synapse.crypto import context_factory from synapse.http.server import JsonResource from synapse.http.site import SynapseSite from synapse.metrics import RegistryProxy @@ -173,17 +172,7 @@ def start(config_options): ) ss.setup() - - def start(): - ss.config.read_certificate_from_disk() - ss.tls_server_context_factory = context_factory.ServerContextFactory(config) - ss.tls_client_options_factory = context_factory.ClientTLSOptionsFactory( - config - ) - ss.start_listening(config.worker_listeners) - ss.get_datastore().start_profiling() - - reactor.callWhenRunning(start) + reactor.callWhenRunning(_base.start, ss, config.worker_listeners) _base.start_worker_reactor("synapse-client-reader", config) diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py index 656e0edc0f..b8e5196152 100644 --- a/synapse/app/event_creator.py +++ b/synapse/app/event_creator.py @@ -25,7 +25,6 @@ from synapse.app import _base from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging -from synapse.crypto import context_factory from synapse.http.server import JsonResource from synapse.http.site import SynapseSite from synapse.metrics import RegistryProxy @@ -194,17 +193,7 @@ def start(config_options): ) ss.setup() - - def start(): - ss.config.read_certificate_from_disk() - ss.tls_server_context_factory = context_factory.ServerContextFactory(config) - ss.tls_client_options_factory = context_factory.ClientTLSOptionsFactory( - config - ) - ss.start_listening(config.worker_listeners) - ss.get_datastore().start_profiling() - - reactor.callWhenRunning(start) + reactor.callWhenRunning(_base.start, ss, config.worker_listeners) _base.start_worker_reactor("synapse-event-creator", config) diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 3de2715132..6ee2b76dcd 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -26,7 +26,6 @@ from synapse.app import _base from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging -from synapse.crypto import context_factory from synapse.federation.transport.server import TransportLayerServer from synapse.http.site import SynapseSite from synapse.metrics import RegistryProxy @@ -87,6 +86,16 @@ class FederationReaderServer(HomeServer): resources.update({ FEDERATION_PREFIX: TransportLayerServer(self), }) + if name == "openid" and "federation" not in res["names"]: + # Only load the openid resource separately if federation resource + # is not specified since federation resource includes openid + # resource. + resources.update({ + FEDERATION_PREFIX: TransportLayerServer( + self, + servlet_groups=["openid"], + ), + }) root_resource = create_resource_tree(resources, NoResource()) @@ -99,7 +108,8 @@ class FederationReaderServer(HomeServer): listener_config, root_resource, self.version_string, - ) + ), + reactor=self.get_reactor() ) logger.info("Synapse federation reader now listening on port %d", port) @@ -160,17 +170,7 @@ def start(config_options): ) ss.setup() - - def start(): - ss.config.read_certificate_from_disk() - ss.tls_server_context_factory = context_factory.ServerContextFactory(config) - ss.tls_client_options_factory = context_factory.ClientTLSOptionsFactory( - config - ) - ss.start_listening(config.worker_listeners) - ss.get_datastore().start_profiling() - - reactor.callWhenRunning(start) + reactor.callWhenRunning(_base.start, ss, config.worker_listeners) _base.start_worker_reactor("synapse-federation-reader", config) diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index d944e0517f..a461442fdc 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -25,7 +25,6 @@ from synapse.app import _base from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging -from synapse.crypto import context_factory from synapse.federation import send_queue from synapse.http.site import SynapseSite from synapse.metrics import RegistryProxy @@ -192,17 +191,8 @@ def start(config_options): ) ss.setup() + reactor.callWhenRunning(_base.start, ss, config.worker_listeners) - def start(): - ss.config.read_certificate_from_disk() - ss.tls_server_context_factory = context_factory.ServerContextFactory(config) - ss.tls_client_options_factory = context_factory.ClientTLSOptionsFactory( - config - ) - ss.start_listening(config.worker_listeners) - ss.get_datastore().start_profiling() - - reactor.callWhenRunning(start) _base.start_worker_reactor("synapse-federation-sender", config) diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py index d9ef6edc3c..d5b954361d 100644 --- a/synapse/app/frontend_proxy.py +++ b/synapse/app/frontend_proxy.py @@ -26,7 +26,6 @@ from synapse.app import _base from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging -from synapse.crypto import context_factory from synapse.http.server import JsonResource from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.http.site import SynapseSite @@ -250,17 +249,7 @@ def start(config_options): ) ss.setup() - - def start(): - ss.config.read_certificate_from_disk() - ss.tls_server_context_factory = context_factory.ServerContextFactory(config) - ss.tls_client_options_factory = context_factory.ClientTLSOptionsFactory( - config - ) - ss.start_listening(config.worker_listeners) - ss.get_datastore().start_profiling() - - reactor.callWhenRunning(start) + reactor.callWhenRunning(_base.start, ss, config.worker_listeners) _base.start_worker_reactor("synapse-frontend-proxy", config) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 250a17cef8..dbd9a83877 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -17,7 +17,6 @@ import gc import logging import os -import signal import sys import traceback @@ -28,7 +27,6 @@ from prometheus_client import Gauge from twisted.application import service from twisted.internet import defer, reactor -from twisted.protocols.tls import TLSMemoryBIOFactory from twisted.web.resource import EncodingResourceWrapper, NoResource from twisted.web.server import GzipEncoderFactory from twisted.web.static import File @@ -49,7 +47,6 @@ from synapse.app import _base from synapse.app._base import listen_ssl, listen_tcp, quit_with_error from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig -from synapse.crypto import context_factory from synapse.federation.transport.server import TransportLayerServer from synapse.http.additional_resource import AdditionalResource from synapse.http.server import RootRedirect @@ -86,7 +83,6 @@ def gz_wrap(r): class SynapseHomeServer(HomeServer): DATASTORE_CLASS = DataStore - _listening_services = [] def _listener_http(self, config, listener_config): port = listener_config["port"] @@ -94,14 +90,13 @@ class SynapseHomeServer(HomeServer): tls = listener_config.get("tls", False) site_tag = listener_config.get("tag", port) - if tls and config.no_tls: - raise ConfigError( - "Listener on port %i has TLS enabled, but no_tls is set" % (port,), - ) - resources = {} for res in listener_config["resources"]: for name in res["names"]: + if name == "openid" and "federation" in res["names"]: + # Skip loading openid resource if federation is defined + # since federation resource will include openid + continue resources.update(self._configure_named_resource( name, res.get("compress", False), )) @@ -126,7 +121,7 @@ class SynapseHomeServer(HomeServer): root_resource = create_resource_tree(resources, root_resource) if tls: - return listen_ssl( + ports = listen_ssl( bind_addresses, port, SynapseSite( @@ -137,10 +132,12 @@ class SynapseHomeServer(HomeServer): self.version_string, ), self.tls_server_context_factory, + reactor=self.get_reactor(), ) + logger.info("Synapse now listening on TCP port %d (TLS)", port) else: - return listen_tcp( + ports = listen_tcp( bind_addresses, port, SynapseSite( @@ -149,8 +146,12 @@ class SynapseHomeServer(HomeServer): listener_config, root_resource, self.version_string, - ) + ), + reactor=self.get_reactor(), ) + logger.info("Synapse now listening on TCP port %d", port) + + return ports def _configure_named_resource(self, name, compress=False): """Build a resource map for a named resource @@ -196,6 +197,11 @@ class SynapseHomeServer(HomeServer): FEDERATION_PREFIX: TransportLayerServer(self), }) + if name == "openid": + resources.update({ + FEDERATION_PREFIX: TransportLayerServer(self, servlet_groups=["openid"]), + }) + if name in ["static", "client"]: resources.update({ STATIC_PREFIX: File( @@ -241,10 +247,10 @@ class SynapseHomeServer(HomeServer): return resources - def start_listening(self): + def start_listening(self, listeners): config = self.get_config() - for listener in config.listeners: + for listener in listeners: if listener["type"] == "http": self._listening_services.extend( self._listener_http(config, listener) @@ -260,14 +266,14 @@ class SynapseHomeServer(HomeServer): ) ) elif listener["type"] == "replication": - bind_addresses = listener["bind_addresses"] - for address in bind_addresses: - factory = ReplicationStreamProtocolFactory(self) - server_listener = reactor.listenTCP( - listener["port"], factory, interface=address - ) + services = listen_tcp( + listener["bind_addresses"], + listener["port"], + ReplicationStreamProtocolFactory(self), + ) + for s in services: reactor.addSystemEventTrigger( - "before", "shutdown", server_listener.stopListening, + "before", "shutdown", s.stopListening, ) elif listener["type"] == "metrics": if not self.get_config().enable_metrics: @@ -328,20 +334,11 @@ def setup(config_options): # generating config files and shouldn't try to continue. sys.exit(0) - sighup_callbacks = [] synapse.config.logger.setup_logging( config, - use_worker_options=False, - register_sighup=sighup_callbacks.append + use_worker_options=False ) - def handle_sighup(*args, **kwargs): - for i in sighup_callbacks: - i(*args, **kwargs) - - if hasattr(signal, "SIGHUP"): - signal.signal(signal.SIGHUP, handle_sighup) - events.USE_FROZEN_DICTS = config.use_frozen_dicts database_engine = create_engine(config.database_config) @@ -377,76 +374,73 @@ def setup(config_options): hs.setup() - def refresh_certificate(*args): + @defer.inlineCallbacks + def do_acme(): """ - Refresh the TLS certificates that Synapse is using by re-reading them - from disk and updating the TLS context factories to use them. + Reprovision an ACME certificate, if it's required. + + Returns: + Deferred[bool]: Whether the cert has been updated. """ - logging.info("Reloading certificate from disk...") - hs.config.read_certificate_from_disk() - hs.tls_server_context_factory = context_factory.ServerContextFactory(config) - hs.tls_client_options_factory = context_factory.ClientTLSOptionsFactory( - config + acme = hs.get_acme_handler() + + # Check how long the certificate is active for. + cert_days_remaining = hs.config.is_disk_cert_valid( + allow_self_signed=False ) - logging.info("Certificate reloaded.") - - logging.info("Updating context factories...") - for i in hs._listening_services: - if isinstance(i.factory, TLSMemoryBIOFactory): - i.factory = TLSMemoryBIOFactory( - hs.tls_server_context_factory, - False, - i.factory.wrappedFactory - ) - logging.info("Context factories updated.") - sighup_callbacks.append(refresh_certificate) + # We want to reprovision if cert_days_remaining is None (meaning no + # certificate exists), or the days remaining number it returns + # is less than our re-registration threshold. + provision = False + + if (cert_days_remaining is None): + provision = True + + if cert_days_remaining > hs.config.acme_reprovision_threshold: + provision = True + + if provision: + yield acme.provision_certificate() + + defer.returnValue(provision) + + @defer.inlineCallbacks + def reprovision_acme(): + """ + Provision a certificate from ACME, if required, and reload the TLS + certificate if it's renewed. + """ + reprovisioned = yield do_acme() + if reprovisioned: + _base.refresh_certificate(hs) @defer.inlineCallbacks def start(): try: - # Check if the certificate is still valid. - cert_days_remaining = hs.config.is_disk_cert_valid() - + # Run the ACME provisioning code, if it's enabled. if hs.config.acme_enabled: - # If ACME is enabled, we might need to provision a certificate - # before starting. acme = hs.get_acme_handler() - # Start up the webservices which we will respond to ACME - # challenges with. + # challenges with, and then provision. yield acme.start_listening() + yield do_acme() - # We want to reprovision if cert_days_remaining is None (meaning no - # certificate exists), or the days remaining number it returns - # is less than our re-registration threshold. - if (cert_days_remaining is None) or ( - not cert_days_remaining > hs.config.acme_reprovision_threshold - ): - yield acme.provision_certificate() - - # Read the certificate from disk and build the context factories for - # TLS. - hs.config.read_certificate_from_disk() - hs.tls_server_context_factory = context_factory.ServerContextFactory(config) - hs.tls_client_options_factory = context_factory.ClientTLSOptionsFactory( - config - ) + # Check if it needs to be reprovisioned every day. + hs.get_clock().looping_call( + reprovision_acme, + 24 * 60 * 60 * 1000 + ) + + _base.start(hs, config.listeners) - # It is now safe to start your Synapse. - hs.start_listening() hs.get_pusherpool().start() - hs.get_datastore().start_profiling() hs.get_datastore().start_doing_background_updates() - except Exception as e: - # If a DeferredList failed (like in listening on the ACME listener), - # we need to print the subfailure explicitly. - if isinstance(e, defer.FirstError): - e.subFailure.printTraceback(sys.stderr) - sys.exit(1) - - # Something else went wrong when starting. Print it and bail out. + except Exception: + # Print the exception and bail out. traceback.print_exc(file=sys.stderr) + if reactor.running: + reactor.stop() sys.exit(1) reactor.callWhenRunning(start) @@ -455,7 +449,8 @@ def setup(config_options): class SynapseService(service.Service): - """A twisted Service class that will start synapse. Used to run synapse + """ + A twisted Service class that will start synapse. Used to run synapse via twistd and a .tac. """ def __init__(self, config): diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index 4ecf64031b..d4cc4e9443 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -26,7 +26,6 @@ from synapse.app import _base from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging -from synapse.crypto import context_factory from synapse.http.site import SynapseSite from synapse.metrics import RegistryProxy from synapse.metrics.resource import METRICS_PREFIX, MetricsResource @@ -160,17 +159,7 @@ def start(config_options): ) ss.setup() - - def start(): - ss.config.read_certificate_from_disk() - ss.tls_server_context_factory = context_factory.ServerContextFactory(config) - ss.tls_client_options_factory = context_factory.ClientTLSOptionsFactory( - config - ) - ss.start_listening(config.worker_listeners) - ss.get_datastore().start_profiling() - - reactor.callWhenRunning(start) + reactor.callWhenRunning(_base.start, ss, config.worker_listeners) _base.start_worker_reactor("synapse-media-repository", config) diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 83b0863f00..cbf0d67f51 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -224,11 +224,10 @@ def start(config_options): ) ps.setup() - ps.start_listening(config.worker_listeners) def start(): + _base.start(ps, config.worker_listeners) ps.get_pusherpool().start() - ps.get_datastore().start_profiling() reactor.callWhenRunning(start) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 0354e82bf8..9163b56d86 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -445,12 +445,7 @@ def start(config_options): ) ss.setup() - ss.start_listening(config.worker_listeners) - - def start(): - ss.get_datastore().start_profiling() - - reactor.callWhenRunning(start) + reactor.callWhenRunning(_base.start, ss, config.worker_listeners) _base.start_worker_reactor("synapse-synchrotron", config) diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py index 176d55a783..d1ab9512cd 100644 --- a/synapse/app/user_dir.py +++ b/synapse/app/user_dir.py @@ -26,7 +26,6 @@ from synapse.app import _base from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging -from synapse.crypto import context_factory from synapse.http.server import JsonResource from synapse.http.site import SynapseSite from synapse.metrics import RegistryProxy @@ -220,17 +219,7 @@ def start(config_options): ) ss.setup() - - def start(): - ss.config.read_certificate_from_disk() - ss.tls_server_context_factory = context_factory.ServerContextFactory(config) - ss.tls_client_options_factory = context_factory.ClientTLSOptionsFactory( - config - ) - ss.start_listening(config.worker_listeners) - ss.get_datastore().start_profiling() - - reactor.callWhenRunning(start) + reactor.callWhenRunning(_base.start, ss, config.worker_listeners) _base.start_worker_reactor("synapse-user-dir", config) diff --git a/synapse/config/_base.py b/synapse/config/_base.py index 5858fb92b4..5aec43b702 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -257,7 +257,7 @@ class Config(object): "--keys-directory", metavar="DIRECTORY", help="Used with 'generate-*' options to specify where files such as" - " certs and signing keys should be stored in, unless explicitly" + " signing keys should be stored, unless explicitly" " specified in the config.", ) config_parser.add_argument( @@ -313,16 +313,11 @@ class Config(object): print( ( "A config file has been generated in %r for server name" - " %r with corresponding SSL keys and self-signed" - " certificates. Please review this file and customise it" + " %r. Please review this file and customise it" " to your needs." ) % (config_path, server_name) ) - print( - "If this server name is incorrect, you will need to" - " regenerate the SSL certificates" - ) return else: print( diff --git a/synapse/config/api.py b/synapse/config/api.py index 403d96ba76..9f25bbc5cb 100644 --- a/synapse/config/api.py +++ b/synapse/config/api.py @@ -24,6 +24,7 @@ class ApiConfig(Config): EventTypes.JoinRules, EventTypes.CanonicalAlias, EventTypes.RoomAvatar, + EventTypes.RoomEncryption, EventTypes.Name, ]) @@ -36,5 +37,6 @@ class ApiConfig(Config): - "{JoinRules}" - "{CanonicalAlias}" - "{RoomAvatar}" + - "{RoomEncryption}" - "{Name}" """.format(**vars(EventTypes)) diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py index 5aad062c36..727fdc54d8 100644 --- a/synapse/config/homeserver.py +++ b/synapse/config/homeserver.py @@ -42,7 +42,7 @@ from .voip import VoipConfig from .workers import WorkerConfig -class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig, +class HomeServerConfig(ServerConfig, TlsConfig, DatabaseConfig, LoggingConfig, RatelimitConfig, ContentRepositoryConfig, CaptchaConfig, VoipConfig, RegistrationConfig, MetricsConfig, ApiConfig, AppServiceConfig, KeyConfig, SAML2Config, CasConfig, diff --git a/synapse/config/logger.py b/synapse/config/logger.py index a795e39b1a..4b938053fb 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -15,7 +15,6 @@ import logging import logging.config import os -import signal import sys from string import Template @@ -24,6 +23,7 @@ import yaml from twisted.logger import STDLibLogObserver, globalLogBeginner import synapse +from synapse.app import _base as appbase from synapse.util.logcontext import LoggingContextFilter from synapse.util.versionstring import get_version_string @@ -127,7 +127,7 @@ class LoggingConfig(Config): ) -def setup_logging(config, use_worker_options=False, register_sighup=None): +def setup_logging(config, use_worker_options=False): """ Set up python logging Args: @@ -140,12 +140,6 @@ def setup_logging(config, use_worker_options=False, register_sighup=None): register_sighup (func | None): Function to call to register a sighup handler. """ - if not register_sighup: - if getattr(signal, "SIGHUP"): - register_sighup = lambda x: signal.signal(signal.SIGHUP, x) - else: - register_sighup = lambda x: None - log_config = (config.worker_log_config if use_worker_options else config.log_config) log_file = (config.worker_log_file if use_worker_options @@ -187,7 +181,7 @@ def setup_logging(config, use_worker_options=False, register_sighup=None): else: handler = logging.StreamHandler() - def sighup(signum, stack): + def sighup(*args): pass handler.setFormatter(formatter) @@ -200,14 +194,14 @@ def setup_logging(config, use_worker_options=False, register_sighup=None): with open(log_config, 'r') as f: logging.config.dictConfig(yaml.load(f)) - def sighup(signum, stack): + def sighup(*args): # it might be better to use a file watcher or something for this. load_log_config() logging.info("Reloaded log config from %s due to SIGHUP", log_config) load_log_config() - register_sighup(sighup) + appbase.register_sighup(sighup) # make sure that the first thing we log is a thing we can grep backwards # for diff --git a/synapse/config/server.py b/synapse/config/server.py index 268a43ff00..93a30e4cfa 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -24,6 +24,14 @@ from ._base import Config, ConfigError logger = logging.Logger(__name__) +# by default, we attempt to listen on both '::' *and* '0.0.0.0' because some OSes +# (Windows, macOS, other BSD/Linux where net.ipv6.bindv6only is set) will only listen +# on IPv6 when '::' is set. +# +# We later check for errors when binding to 0.0.0.0 and ignore them if :: is also in +# in the list. +DEFAULT_BIND_ADDRESSES = ['::', '0.0.0.0'] + class ServerConfig(Config): @@ -118,16 +126,38 @@ class ServerConfig(Config): self.public_baseurl += '/' self.start_pushers = config.get("start_pushers", True) - self.listeners = config.get("listeners", []) + self.listeners = [] + for listener in config.get("listeners", []): + if not isinstance(listener.get("port", None), int): + raise ConfigError( + "Listener configuration is lacking a valid 'port' option" + ) + + if listener.setdefault("tls", False): + # no_tls is not really supported any more, but let's grandfather it in + # here. + if config.get("no_tls", False): + logger.info( + "Ignoring TLS-enabled listener on port %i due to no_tls" + ) + continue - for listener in self.listeners: bind_address = listener.pop("bind_address", None) bind_addresses = listener.setdefault("bind_addresses", []) + # if bind_address was specified, add it to the list of addresses if bind_address: bind_addresses.append(bind_address) - elif not bind_addresses: - bind_addresses.append('') + + # if we still have an empty list of addresses, use the default list + if not bind_addresses: + if listener['type'] == 'metrics': + # the metrics listener doesn't support IPv6 + bind_addresses.append('0.0.0.0') + else: + bind_addresses.extend(DEFAULT_BIND_ADDRESSES) + + self.listeners.append(listener) if not self.web_client_location: _warn_if_webclient_configured(self.listeners) @@ -136,6 +166,9 @@ class ServerConfig(Config): bind_port = config.get("bind_port") if bind_port: + if config.get("no_tls", False): + raise ConfigError("no_tls is incompatible with bind_port") + self.listeners = [] bind_host = config.get("bind_host", "") gzip_responses = config.get("gzip_responses", True) @@ -182,6 +215,7 @@ class ServerConfig(Config): "port": manhole, "bind_addresses": ["127.0.0.1"], "type": "manhole", + "tls": False, }) metrics_port = config.get("metrics_port") @@ -207,6 +241,9 @@ class ServerConfig(Config): _check_resource_config(self.listeners) + def has_tls_listener(self): + return any(l["tls"] for l in self.listeners) + def default_config(self, server_name, data_dir_path, **kwargs): _, bind_port = parse_and_validate_server_name(server_name) if bind_port is not None: @@ -295,75 +332,106 @@ class ServerConfig(Config): # List of ports that Synapse should listen on, their purpose and their # configuration. + # + # Options for each listener include: + # + # port: the TCP port to bind to + # + # bind_addresses: a list of local addresses to listen on. The default is + # 'all local interfaces'. + # + # type: the type of listener. Normally 'http', but other valid options are: + # 'manhole' (see docs/manhole.md), + # 'metrics' (see docs/metrics-howto.rst), + # 'replication' (see docs/workers.rst). + # + # tls: set to true to enable TLS for this listener. Will use the TLS + # key/cert specified in tls_private_key_path / tls_certificate_path. + # + # x_forwarded: Only valid for an 'http' listener. Set to true to use the + # X-Forwarded-For header as the client IP. Useful when Synapse is + # behind a reverse-proxy. + # + # resources: Only valid for an 'http' listener. A list of resources to host + # on this port. Options for each resource are: + # + # names: a list of names of HTTP resources. See below for a list of + # valid resource names. + # + # compress: set to true to enable HTTP comression for this resource. + # + # additional_resources: Only valid for an 'http' listener. A map of + # additional endpoints which should be loaded via dynamic modules. + # + # Valid resource names are: + # + # client: the client-server API (/_matrix/client). Also implies 'media' and + # 'static'. + # + # consent: user consent forms (/_matrix/consent). See + # docs/consent_tracking.md. + # + # federation: the server-server API (/_matrix/federation). Also implies + # 'media', 'keys', 'openid' + # + # keys: the key discovery API (/_matrix/keys). + # + # media: the media API (/_matrix/media). + # + # metrics: the metrics interface. See docs/metrics-howto.rst. + # + # openid: OpenID authentication. + # + # replication: the HTTP replication API (/_synapse/replication). See + # docs/workers.rst. + # + # static: static resources under synapse/static (/_matrix/static). (Mostly + # useful for 'fallback authentication'.) + # + # webclient: A web client. Requires web_client_location to be set. + # listeners: - # Main HTTPS listener - # For when matrix traffic is sent directly to synapse. - - - # The port to listen for HTTPS requests on. - port: %(bind_port)s - - # Local addresses to listen on. - # On Linux and Mac OS, `::` will listen on all IPv4 and IPv6 - # addresses by default. For most other OSes, this will only listen - # on IPv6. - bind_addresses: - - '::' - - '0.0.0.0' - - # This is a 'http' listener, allows us to specify 'resources'. + # TLS-enabled listener: for when matrix traffic is sent directly to synapse. + # + # Disabled by default. To enable it, uncomment the following. (Note that you + # will also need to give Synapse a TLS key and certificate: see the TLS section + # below.) + # + # - port: %(bind_port)s + # type: http + # tls: true + # resources: + # - names: [client, federation] + + # Unsecure HTTP listener: for when matrix traffic passes through a reverse proxy + # that unwraps TLS. + # + # If you plan to use a reverse proxy, please see + # https://github.com/matrix-org/synapse/blob/master/docs/reverse_proxy.rst. + # + - port: %(unsecure_port)s + tls: false + bind_addresses: ['::1', '127.0.0.1'] type: http + x_forwarded: true - tls: true - - # Use the X-Forwarded-For (XFF) header as the client IP and not the - # actual client IP. - x_forwarded: false - - # List of HTTP resources to serve on this listener. resources: - - - # List of resources to host on this listener. - names: - - client # The client-server APIs, both v1 and v2 - # - webclient # A web client. Requires web_client_location to be set. - - # Should synapse compress HTTP responses to clients that support it? - # This should be disabled if running synapse behind a load balancer - # that can do automatic compression. - compress: true - - - names: [federation] # Federation APIs + - names: [client, federation] compress: false - # optional list of additional endpoints which can be loaded via - # dynamic modules + # example additonal_resources: + # # additional_resources: # "/_matrix/my/custom/endpoint": # module: my_module.CustomRequestHandler # config: {} - # Unsecure HTTP listener, - # For when matrix traffic passes through loadbalancer that unwraps TLS. - - port: %(unsecure_port)s - tls: false - bind_addresses: ['::', '0.0.0.0'] - type: http - - x_forwarded: false - - resources: - - names: [client] - compress: true - - names: [federation] - compress: false - # Turn on the twisted ssh manhole service on localhost on the given # port. # - port: 9000 # bind_addresses: ['::1', '127.0.0.1'] # type: manhole - # Homeserver blocking # # How to reach the server admin, used in ResourceLimitError @@ -480,6 +548,7 @@ KNOWN_RESOURCES = ( 'keys', 'media', 'metrics', + 'openid', 'replication', 'static', 'webclient', diff --git a/synapse/config/tls.py b/synapse/config/tls.py index b5f2cfd9b7..5fb3486db1 100644 --- a/synapse/config/tls.py +++ b/synapse/config/tls.py @@ -23,9 +23,9 @@ from unpaddedbase64 import encode_base64 from OpenSSL import crypto -from synapse.config._base import Config +from synapse.config._base import Config, ConfigError -logger = logging.getLogger() +logger = logging.getLogger(__name__) class TlsConfig(Config): @@ -45,9 +45,25 @@ class TlsConfig(Config): self.tls_certificate_file = self.abspath(config.get("tls_certificate_path")) self.tls_private_key_file = self.abspath(config.get("tls_private_key_path")) - self._original_tls_fingerprints = config["tls_fingerprints"] + + if self.has_tls_listener(): + if not self.tls_certificate_file: + raise ConfigError( + "tls_certificate_path must be specified if TLS-enabled listeners are " + "configured." + ) + if not self.tls_private_key_file: + raise ConfigError( + "tls_certificate_path must be specified if TLS-enabled listeners are " + "configured." + ) + + self._original_tls_fingerprints = config.get("tls_fingerprints", []) + + if self._original_tls_fingerprints is None: + self._original_tls_fingerprints = [] + self.tls_fingerprints = list(self._original_tls_fingerprints) - self.no_tls = config.get("no_tls", False) # This config option applies to non-federation HTTP clients # (e.g. for talking to recaptcha, identity servers, and such) @@ -60,10 +76,14 @@ class TlsConfig(Config): self.tls_certificate = None self.tls_private_key = None - def is_disk_cert_valid(self): + def is_disk_cert_valid(self, allow_self_signed=True): """ Is the certificate we have on disk valid, and if so, for how long? + Args: + allow_self_signed (bool): Should we allow the certificate we + read to be self signed? + Returns: int: Days remaining of certificate validity. None: No certificate exists. @@ -84,6 +104,12 @@ class TlsConfig(Config): logger.exception("Failed to parse existing certificate off disk!") raise + if not allow_self_signed: + if tls_certificate.get_subject() == tls_certificate.get_issuer(): + raise ValueError( + "TLS Certificate is self signed, and this is not permitted" + ) + # YYYYMMDDhhmmssZ -- in UTC expires_on = datetime.strptime( tls_certificate.get_notAfter().decode('ascii'), "%Y%m%d%H%M%SZ" @@ -92,36 +118,40 @@ class TlsConfig(Config): days_remaining = (expires_on - now).days return days_remaining - def read_certificate_from_disk(self): + def read_certificate_from_disk(self, require_cert_and_key): """ - Read the certificates from disk. - """ - self.tls_certificate = self.read_tls_certificate(self.tls_certificate_file) + Read the certificates and private key from disk. - # Check if it is self-signed, and issue a warning if so. - if self.tls_certificate.get_issuer() == self.tls_certificate.get_subject(): - warnings.warn( - ( - "Self-signed TLS certificates will not be accepted by Synapse 1.0. " - "Please either provide a valid certificate, or use Synapse's ACME " - "support to provision one." + Args: + require_cert_and_key (bool): set to True to throw an error if the certificate + and key file are not given + """ + if require_cert_and_key: + self.tls_private_key = self.read_tls_private_key() + self.tls_certificate = self.read_tls_certificate() + elif self.tls_certificate_file: + # we only need the certificate for the tls_fingerprints. Reload it if we + # can, but it's not a fatal error if we can't. + try: + self.tls_certificate = self.read_tls_certificate() + except Exception as e: + logger.info( + "Unable to read TLS certificate (%s). Ignoring as no " + "tls listeners enabled.", e, ) - ) - - if not self.no_tls: - self.tls_private_key = self.read_tls_private_key(self.tls_private_key_file) self.tls_fingerprints = list(self._original_tls_fingerprints) - # Check that our own certificate is included in the list of fingerprints - # and include it if it is not. - x509_certificate_bytes = crypto.dump_certificate( - crypto.FILETYPE_ASN1, self.tls_certificate - ) - sha256_fingerprint = encode_base64(sha256(x509_certificate_bytes).digest()) - sha256_fingerprints = set(f["sha256"] for f in self.tls_fingerprints) - if sha256_fingerprint not in sha256_fingerprints: - self.tls_fingerprints.append({u"sha256": sha256_fingerprint}) + if self.tls_certificate: + # Check that our own certificate is included in the list of fingerprints + # and include it if it is not. + x509_certificate_bytes = crypto.dump_certificate( + crypto.FILETYPE_ASN1, self.tls_certificate + ) + sha256_fingerprint = encode_base64(sha256(x509_certificate_bytes).digest()) + sha256_fingerprints = set(f["sha256"] for f in self.tls_fingerprints) + if sha256_fingerprint not in sha256_fingerprints: + self.tls_fingerprints.append({u"sha256": sha256_fingerprint}) def default_config(self, config_dir_path, server_name, **kwargs): base_key_name = os.path.join(config_dir_path, server_name) @@ -137,6 +167,8 @@ class TlsConfig(Config): return ( """\ + ## TLS ## + # PEM-encoded X509 certificate for TLS. # This certificate, as of Synapse 1.0, will need to be a valid and verifiable # certificate, signed by a recognised Certificate Authority. @@ -144,10 +176,10 @@ class TlsConfig(Config): # See 'ACME support' below to enable auto-provisioning this certificate via # Let's Encrypt. # - tls_certificate_path: "%(tls_certificate_path)s" + # tls_certificate_path: "%(tls_certificate_path)s" # PEM-encoded private key for TLS - tls_private_key_path: "%(tls_private_key_path)s" + # tls_private_key_path: "%(tls_private_key_path)s" # ACME support: This will configure Synapse to request a valid TLS certificate # for your configured `server_name` via Let's Encrypt. @@ -172,7 +204,7 @@ class TlsConfig(Config): # acme: # ACME support is disabled by default. Uncomment the following line - # to enable it. + # (and tls_certificate_path and tls_private_key_path above) to enable it. # # enabled: true @@ -197,13 +229,6 @@ class TlsConfig(Config): # # reprovision_threshold: 30 - # If your server runs behind a reverse-proxy which terminates TLS connections - # (for both client and federation connections), it may be useful to disable - # All TLS support for incoming connections. Setting no_tls to True will - # do so (and avoid the need to give synapse a TLS private key). - # - # no_tls: True - # List of allowed TLS fingerprints for this server to publish along # with the signing keys for this server. Other matrix servers that # make HTTPS requests to this server will check that the TLS @@ -236,10 +261,38 @@ class TlsConfig(Config): % locals() ) - def read_tls_certificate(self, cert_path): - cert_pem = self.read_file(cert_path, "tls_certificate") - return crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem) + def read_tls_certificate(self): + """Reads the TLS certificate from the configured file, and returns it + + Also checks if it is self-signed, and warns if so + + Returns: + OpenSSL.crypto.X509: the certificate + """ + cert_path = self.tls_certificate_file + logger.info("Loading TLS certificate from %s", cert_path) + cert_pem = self.read_file(cert_path, "tls_certificate_path") + cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem) + + # Check if it is self-signed, and issue a warning if so. + if cert.get_issuer() == cert.get_subject(): + warnings.warn( + ( + "Self-signed TLS certificates will not be accepted by Synapse 1.0. " + "Please either provide a valid certificate, or use Synapse's ACME " + "support to provision one." + ) + ) + + return cert + + def read_tls_private_key(self): + """Reads the TLS private key from the configured file, and returns it - def read_tls_private_key(self, private_key_path): - private_key_pem = self.read_file(private_key_path, "tls_private_key") + Returns: + OpenSSL.crypto.PKey: the private key + """ + private_key_path = self.tls_private_key_file + logger.info("Loading TLS key from %s", private_key_path) + private_key_pem = self.read_file(private_key_path, "tls_private_key_path") return crypto.load_privatekey(crypto.FILETYPE_PEM, private_key_pem) diff --git a/synapse/crypto/context_factory.py b/synapse/crypto/context_factory.py index 286ad80100..85f2848fb1 100644 --- a/synapse/crypto/context_factory.py +++ b/synapse/crypto/context_factory.py @@ -43,9 +43,7 @@ class ServerContextFactory(ContextFactory): logger.exception("Failed to enable elliptic curve for TLS") context.set_options(SSL.OP_NO_SSLv2 | SSL.OP_NO_SSLv3) context.use_certificate_chain_file(config.tls_certificate_file) - - if not config.no_tls: - context.use_privatekey(config.tls_private_key) + context.use_privatekey(config.tls_private_key) # https://hynek.me/articles/hardening-your-web-servers-ssl-ciphers/ context.set_cipher_list( diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index aeadc9c564..3da86d4ba6 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -148,6 +148,22 @@ class FederationServer(FederationBase): logger.debug("[%s] Transaction is new", transaction.transaction_id) + # Reject if PDU count > 50 and EDU count > 100 + if (len(transaction.pdus) > 50 + or (hasattr(transaction, "edus") and len(transaction.edus) > 100)): + + logger.info( + "Transaction PDU or EDU count too large. Returning 400", + ) + + response = {} + yield self.transaction_actions.set_response( + origin, + transaction, + 400, response + ) + defer.returnValue((400, response)) + received_pdus_counter.inc(len(transaction.pdus)) origin_host, _ = parse_server_name(origin) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 67ae0212c3..a2396ab466 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -43,9 +43,20 @@ logger = logging.getLogger(__name__) class TransportLayerServer(JsonResource): """Handles incoming federation HTTP requests""" - def __init__(self, hs): + def __init__(self, hs, servlet_groups=None): + """Initialize the TransportLayerServer + + Will by default register all servlets. For custom behaviour, pass in + a list of servlet_groups to register. + + Args: + hs (synapse.server.HomeServer): homeserver + servlet_groups (list[str], optional): List of servlet groups to register. + Defaults to ``DEFAULT_SERVLET_GROUPS``. + """ self.hs = hs self.clock = hs.get_clock() + self.servlet_groups = servlet_groups super(TransportLayerServer, self).__init__(hs, canonical_json=False) @@ -67,6 +78,7 @@ class TransportLayerServer(JsonResource): resource=self, ratelimiter=self.ratelimiter, authenticator=self.authenticator, + servlet_groups=self.servlet_groups, ) @@ -1308,10 +1320,12 @@ FEDERATION_SERVLET_CLASSES = ( FederationClientKeysClaimServlet, FederationThirdPartyInviteExchangeServlet, On3pidBindServlet, - OpenIdUserInfo, FederationVersionServlet, ) +OPENID_SERVLET_CLASSES = ( + OpenIdUserInfo, +) ROOM_LIST_CLASSES = ( PublicRoomList, @@ -1350,44 +1364,83 @@ GROUP_ATTESTATION_SERVLET_CLASSES = ( FederationGroupsRenewAttestaionServlet, ) +DEFAULT_SERVLET_GROUPS = ( + "federation", + "room_list", + "group_server", + "group_local", + "group_attestation", + "openid", +) + + +def register_servlets(hs, resource, authenticator, ratelimiter, servlet_groups=None): + """Initialize and register servlet classes. -def register_servlets(hs, resource, authenticator, ratelimiter): - for servletclass in FEDERATION_SERVLET_CLASSES: - servletclass( - handler=hs.get_federation_server(), - authenticator=authenticator, - ratelimiter=ratelimiter, - server_name=hs.hostname, - ).register(resource) - - for servletclass in ROOM_LIST_CLASSES: - servletclass( - handler=hs.get_room_list_handler(), - authenticator=authenticator, - ratelimiter=ratelimiter, - server_name=hs.hostname, - ).register(resource) - - for servletclass in GROUP_SERVER_SERVLET_CLASSES: - servletclass( - handler=hs.get_groups_server_handler(), - authenticator=authenticator, - ratelimiter=ratelimiter, - server_name=hs.hostname, - ).register(resource) - - for servletclass in GROUP_LOCAL_SERVLET_CLASSES: - servletclass( - handler=hs.get_groups_local_handler(), - authenticator=authenticator, - ratelimiter=ratelimiter, - server_name=hs.hostname, - ).register(resource) - - for servletclass in GROUP_ATTESTATION_SERVLET_CLASSES: - servletclass( - handler=hs.get_groups_attestation_renewer(), - authenticator=authenticator, - ratelimiter=ratelimiter, - server_name=hs.hostname, - ).register(resource) + Will by default register all servlets. For custom behaviour, pass in + a list of servlet_groups to register. + + Args: + hs (synapse.server.HomeServer): homeserver + resource (TransportLayerServer): resource class to register to + authenticator (Authenticator): authenticator to use + ratelimiter (util.ratelimitutils.FederationRateLimiter): ratelimiter to use + servlet_groups (list[str], optional): List of servlet groups to register. + Defaults to ``DEFAULT_SERVLET_GROUPS``. + """ + if not servlet_groups: + servlet_groups = DEFAULT_SERVLET_GROUPS + + if "federation" in servlet_groups: + for servletclass in FEDERATION_SERVLET_CLASSES: + servletclass( + handler=hs.get_federation_server(), + authenticator=authenticator, + ratelimiter=ratelimiter, + server_name=hs.hostname, + ).register(resource) + + if "openid" in servlet_groups: + for servletclass in OPENID_SERVLET_CLASSES: + servletclass( + handler=hs.get_federation_server(), + authenticator=authenticator, + ratelimiter=ratelimiter, + server_name=hs.hostname, + ).register(resource) + + if "room_list" in servlet_groups: + for servletclass in ROOM_LIST_CLASSES: + servletclass( + handler=hs.get_room_list_handler(), + authenticator=authenticator, + ratelimiter=ratelimiter, + server_name=hs.hostname, + ).register(resource) + + if "group_server" in servlet_groups: + for servletclass in GROUP_SERVER_SERVLET_CLASSES: + servletclass( + handler=hs.get_groups_server_handler(), + authenticator=authenticator, + ratelimiter=ratelimiter, + server_name=hs.hostname, + ).register(resource) + + if "group_local" in servlet_groups: + for servletclass in GROUP_LOCAL_SERVLET_CLASSES: + servletclass( + handler=hs.get_groups_local_handler(), + authenticator=authenticator, + ratelimiter=ratelimiter, + server_name=hs.hostname, + ).register(resource) + + if "group_attestation" in servlet_groups: + for servletclass in GROUP_ATTESTATION_SERVLET_CLASSES: + servletclass( + handler=hs.get_groups_attestation_renewer(), + authenticator=authenticator, + ratelimiter=ratelimiter, + server_name=hs.hostname, + ).register(resource) diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py index 42b040375f..7bc174070e 100644 --- a/synapse/handlers/e2e_room_keys.py +++ b/synapse/handlers/e2e_room_keys.py @@ -19,7 +19,13 @@ from six import iteritems from twisted.internet import defer -from synapse.api.errors import NotFoundError, RoomKeysVersionError, StoreError +from synapse.api.errors import ( + Codes, + NotFoundError, + RoomKeysVersionError, + StoreError, + SynapseError, +) from synapse.util.async_helpers import Linearizer logger = logging.getLogger(__name__) @@ -267,7 +273,7 @@ class E2eRoomKeysHandler(object): version(str): Optional; if None gives the most recent version otherwise a historical one. Raises: - StoreError: code 404 if the requested backup version doesn't exist + NotFoundError: if the requested backup version doesn't exist Returns: A deferred of a info dict that gives the info about the new version. @@ -279,7 +285,13 @@ class E2eRoomKeysHandler(object): """ with (yield self._upload_linearizer.queue(user_id)): - res = yield self.store.get_e2e_room_keys_version_info(user_id, version) + try: + res = yield self.store.get_e2e_room_keys_version_info(user_id, version) + except StoreError as e: + if e.code == 404: + raise NotFoundError("Unknown backup version") + else: + raise defer.returnValue(res) @defer.inlineCallbacks @@ -290,8 +302,60 @@ class E2eRoomKeysHandler(object): user_id(str): the user whose current backup version we're deleting version(str): the version id of the backup being deleted Raises: - StoreError: code 404 if this backup version doesn't exist + NotFoundError: if this backup version doesn't exist """ with (yield self._upload_linearizer.queue(user_id)): - yield self.store.delete_e2e_room_keys_version(user_id, version) + try: + yield self.store.delete_e2e_room_keys_version(user_id, version) + except StoreError as e: + if e.code == 404: + raise NotFoundError("Unknown backup version") + else: + raise + + @defer.inlineCallbacks + def update_version(self, user_id, version, version_info): + """Update the info about a given version of the user's backup + + Args: + user_id(str): the user whose current backup version we're updating + version(str): the backup version we're updating + version_info(dict): the new information about the backup + Raises: + NotFoundError: if the requested backup version doesn't exist + Returns: + A deferred of an empty dict. + """ + if "version" not in version_info: + raise SynapseError( + 400, + "Missing version in body", + Codes.MISSING_PARAM + ) + if version_info["version"] != version: + raise SynapseError( + 400, + "Version in body does not match", + Codes.INVALID_PARAM + ) + with (yield self._upload_linearizer.queue(user_id)): + try: + old_info = yield self.store.get_e2e_room_keys_version_info( + user_id, version + ) + except StoreError as e: + if e.code == 404: + raise NotFoundError("Unknown backup version") + else: + raise + if old_info["algorithm"] != version_info["algorithm"]: + raise SynapseError( + 400, + "Algorithm does not match", + Codes.INVALID_PARAM + ) + + yield self.store.update_e2e_room_keys_version(user_id, version, version_info) + + defer.returnValue({}) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 13ba9291b0..f9af1f0046 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -263,6 +263,16 @@ class RoomCreationHandler(BaseHandler): } } + # Check if old room was non-federatable + + # Get old room's create event + old_room_create_event = yield self.store.get_create_event_for_room(old_room_id) + + # Check if the create event specified a non-federatable room + if not old_room_create_event.content.get("m.federate", True): + # If so, mark the new room as non-federatable as well + creation_content["m.federate"] = False + initial_state = dict() # Replicate relevant room events @@ -274,6 +284,7 @@ class RoomCreationHandler(BaseHandler): (EventTypes.GuestAccess, ""), (EventTypes.RoomAvatar, ""), (EventTypes.Encryption, ""), + (EventTypes.ServerACL, ""), ) old_room_state_ids = yield self.store.get_filtered_current_state_ids( diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index bb2e64ed80..3c24bf3805 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -28,7 +28,7 @@ from canonicaljson import encode_canonical_json from prometheus_client import Counter from signedjson.sign import sign_json -from twisted.internet import defer, protocol +from twisted.internet import defer, protocol, task from twisted.internet.error import DNSLookupError from twisted.internet.task import _EPSILON, Cooperator from twisted.web._newclient import ResponseDone @@ -168,7 +168,7 @@ class MatrixFederationHttpClient(object): requests. """ - def __init__(self, hs): + def __init__(self, hs, tls_client_options_factory): self.hs = hs self.signing_key = hs.config.signing_key[0] self.server_name = hs.hostname @@ -176,7 +176,7 @@ class MatrixFederationHttpClient(object): self.agent = MatrixFederationAgent( hs.get_reactor(), - hs.tls_client_options_factory, + tls_client_options_factory, ) self.clock = hs.get_clock() self._store = hs.get_datastore() @@ -286,7 +286,7 @@ class MatrixFederationHttpClient(object): json, ) data = encode_canonical_json(json) - producer = FileBodyProducer( + producer = QuieterFileBodyProducer( BytesIO(data), cooperator=self._cooperator, ) @@ -839,3 +839,16 @@ def encode_query_args(args): query_bytes = urllib.parse.urlencode(encoded_args, True) return query_bytes.encode('utf8') + + +class QuieterFileBodyProducer(FileBodyProducer): + """Wrapper for FileBodyProducer that avoids CRITICAL errors when the connection drops. + + Workaround for https://github.com/matrix-org/synapse/issues/4003 / + https://twistedmatrix.com/trac/ticket/6528 + """ + def stopProducing(self): + try: + FileBodyProducer.stopProducing(self) + except task.TaskStopped: + pass diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 756721e304..5d087ee26b 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -85,7 +85,7 @@ CONDITIONAL_REQUIREMENTS = { "saml2": ["pysaml2>=4.5.0"], "url_preview": ["lxml>=3.5.0"], - "test": ["mock>=2.0"], + "test": ["mock>=2.0", "parameterized"], } diff --git a/synapse/rest/client/v2_alpha/room_keys.py b/synapse/rest/client/v2_alpha/room_keys.py index ab3f1bd21a..220a0de30b 100644 --- a/synapse/rest/client/v2_alpha/room_keys.py +++ b/synapse/rest/client/v2_alpha/room_keys.py @@ -380,6 +380,40 @@ class RoomKeysVersionServlet(RestServlet): ) defer.returnValue((200, {})) + @defer.inlineCallbacks + def on_PUT(self, request, version): + """ + Update the information about a given version of the user's room_keys backup. + + POST /room_keys/version/12345 HTTP/1.1 + Content-Type: application/json + { + "algorithm": "m.megolm_backup.v1", + "auth_data": { + "public_key": "abcdefg", + "signatures": { + "ed25519:something": "hijklmnop" + } + }, + "version": "42" + } + + HTTP/1.1 200 OK + Content-Type: application/json + {} + """ + requester = yield self.auth.get_user_by_req(request, allow_guest=False) + user_id = requester.user.to_string() + info = parse_json_object_from_request(request) + + if version is None: + raise SynapseError(400, "No version specified to update", Codes.MISSING_PARAM) + + yield self.e2e_room_keys_handler.update_version( + user_id, version, info + ) + defer.returnValue((200, {})) + def register_servlets(hs, http_server): RoomKeysServlet(hs).register(http_server) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 0251146722..39d157a44b 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -75,7 +75,7 @@ class SyncRestServlet(RestServlet): """ PATTERNS = client_v2_patterns("/sync$") - ALLOWED_PRESENCE = set(["online", "offline"]) + ALLOWED_PRESENCE = set(["online", "offline", "unavailable"]) def __init__(self, hs): super(SyncRestServlet, self).__init__() diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py index 29e62bfcdd..27e7cbf3cc 100644 --- a/synapse/rest/client/versions.py +++ b/synapse/rest/client/versions.py @@ -38,6 +38,7 @@ class VersionsRestServlet(RestServlet): "r0.1.0", "r0.2.0", "r0.3.0", + "r0.4.0", ], # as per MSC1497: "unstable_features": { diff --git a/synapse/server.py b/synapse/server.py index 6c52101616..8615b67ad4 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -31,6 +31,7 @@ from synapse.api.filtering import Filtering from synapse.api.ratelimiting import Ratelimiter from synapse.appservice.api import ApplicationServiceApi from synapse.appservice.scheduler import ApplicationServiceScheduler +from synapse.crypto import context_factory from synapse.crypto.keyring import Keyring from synapse.events.builder import EventBuilderFactory from synapse.events.spamcheck import SpamChecker @@ -112,6 +113,8 @@ class HomeServer(object): Attributes: config (synapse.config.homeserver.HomeserverConfig): + _listening_services (list[twisted.internet.tcp.Port]): TCP ports that + we are listening on to provide HTTP services. """ __metaclass__ = abc.ABCMeta @@ -196,6 +199,7 @@ class HomeServer(object): self._reactor = reactor self.hostname = hostname self._building = {} + self._listening_services = [] self.clock = Clock(reactor) self.distributor = Distributor() @@ -364,7 +368,10 @@ class HomeServer(object): return PusherPool(self) def build_http_client(self): - return MatrixFederationHttpClient(self) + tls_client_options_factory = context_factory.ClientTLSOptionsFactory( + self.config + ) + return MatrixFederationHttpClient(self, tls_client_options_factory) def build_db_pool(self): name = self.db_config["name"] diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 4872ff55b6..e124161845 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -50,6 +50,21 @@ sql_query_timer = Histogram("synapse_storage_query_time", "sec", ["verb"]) sql_txn_timer = Histogram("synapse_storage_transaction_time", "sec", ["desc"]) +# Unique indexes which have been added in background updates. Maps from table name +# to the name of the background update which added the unique index to that table. +# +# This is used by the upsert logic to figure out which tables are safe to do a proper +# UPSERT on: until the relevant background update has completed, we +# have to emulate an upsert by locking the table. +# +UNIQUE_INDEX_BACKGROUND_UPDATES = { + "user_ips": "user_ips_device_unique_index", + "device_lists_remote_extremeties": "device_lists_remote_extremeties_unique_idx", + "device_lists_remote_cache": "device_lists_remote_cache_unique_idx", + "event_search": "event_search_event_id_idx", +} + + class LoggingTransaction(object): """An object that almost-transparently proxies for the 'txn' object passed to the constructor. Adds logging and metrics to the .execute() @@ -194,7 +209,7 @@ class SQLBaseStore(object): self.database_engine = hs.database_engine # A set of tables that are not safe to use native upserts in. - self._unsafe_to_upsert_tables = {"user_ips"} + self._unsafe_to_upsert_tables = set(UNIQUE_INDEX_BACKGROUND_UPDATES.keys()) # We add the user_directory_search table to the blacklist on SQLite # because the existing search table does not have an index, making it @@ -230,12 +245,12 @@ class SQLBaseStore(object): ) updates = [x["update_name"] for x in updates] - # The User IPs table in schema #53 was missing a unique index, which we - # run as a background update. - if "user_ips_device_unique_index" not in updates: - self._unsafe_to_upsert_tables.discard("user_ips") + for table, update_name in UNIQUE_INDEX_BACKGROUND_UPDATES.items(): + if update_name not in updates: + logger.debug("Now safe to upsert in %s", table) + self._unsafe_to_upsert_tables.discard(table) - # If there's any tables left to check, reschedule to run. + # If there's any updates still running, reschedule to run. if updates: self._clock.call_later( 15.0, diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index 091d7116c5..9c21362226 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -66,6 +66,11 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): ) self.register_background_update_handler( + "user_ips_analyze", + self._analyze_user_ip, + ) + + self.register_background_update_handler( "user_ips_remove_dupes", self._remove_user_ip_dupes, ) @@ -109,6 +114,25 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): defer.returnValue(1) @defer.inlineCallbacks + def _analyze_user_ip(self, progress, batch_size): + # Background update to analyze user_ips table before we run the + # deduplication background update. The table may not have been analyzed + # for ages due to the table locks. + # + # This will lock out the naive upserts to user_ips while it happens, but + # the analyze should be quick (28GB table takes ~10s) + def user_ips_analyze(txn): + txn.execute("ANALYZE user_ips") + + yield self.runInteraction( + "user_ips_analyze", user_ips_analyze + ) + + yield self._end_background_update("user_ips_analyze") + + defer.returnValue(1) + + @defer.inlineCallbacks def _remove_user_ip_dupes(self, progress, batch_size): # This works function works by scanning the user_ips table in batches # based on `last_seen`. For each row in a batch it searches the rest of @@ -167,12 +191,16 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): clause = "? <= last_seen AND last_seen < ?" args = (begin_last_seen, end_last_seen) + # (Note: The DISTINCT in the inner query is important to ensure that + # the COUNT(*) is accurate, otherwise double counting may happen due + # to the join effectively being a cross product) txn.execute( """ SELECT user_id, access_token, ip, - MAX(device_id), MAX(user_agent), MAX(last_seen) + MAX(device_id), MAX(user_agent), MAX(last_seen), + COUNT(*) FROM ( - SELECT user_id, access_token, ip + SELECT DISTINCT user_id, access_token, ip FROM user_ips WHERE {} ) c @@ -186,7 +214,60 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): # We've got some duplicates for i in res: - user_id, access_token, ip, device_id, user_agent, last_seen = i + user_id, access_token, ip, device_id, user_agent, last_seen, count = i + + # We want to delete the duplicates so we end up with only a + # single row. + # + # The naive way of doing this would be just to delete all rows + # and reinsert a constructed row. However, if there are a lot of + # duplicate rows this can cause the table to grow a lot, which + # can be problematic in two ways: + # 1. If user_ips is already large then this can cause the + # table to rapidly grow, potentially filling the disk. + # 2. Reinserting a lot of rows can confuse the table + # statistics for postgres, causing it to not use the + # correct indices for the query above, resulting in a full + # table scan. This is incredibly slow for large tables and + # can kill database performance. (This seems to mainly + # happen for the last query where the clause is simply `? < + # last_seen`) + # + # So instead we want to delete all but *one* of the duplicate + # rows. That is hard to do reliably, so we cheat and do a two + # step process: + # 1. Delete all rows with a last_seen strictly less than the + # max last_seen. This hopefully results in deleting all but + # one row the majority of the time, but there may be + # duplicate last_seen + # 2. If multiple rows remain, we fall back to the naive method + # and simply delete all rows and reinsert. + # + # Note that this relies on no new duplicate rows being inserted, + # but if that is happening then this entire process is futile + # anyway. + + # Do step 1: + + txn.execute( + """ + DELETE FROM user_ips + WHERE user_id = ? AND access_token = ? AND ip = ? AND last_seen < ? + """, + (user_id, access_token, ip, last_seen) + ) + if txn.rowcount == count - 1: + # We deleted all but one of the duplicate rows, i.e. there + # is exactly one remaining and so there is nothing left to + # do. + continue + elif txn.rowcount >= count: + raise Exception( + "We deleted more duplicate rows from 'user_ips' than expected", + ) + + # The previous step didn't delete enough rows, so we fallback to + # step 2: # Drop all the duplicates txn.execute( diff --git a/synapse/storage/e2e_room_keys.py b/synapse/storage/e2e_room_keys.py index 45cebe61d1..9a3aec759e 100644 --- a/synapse/storage/e2e_room_keys.py +++ b/synapse/storage/e2e_room_keys.py @@ -298,6 +298,27 @@ class EndToEndRoomKeyStore(SQLBaseStore): "create_e2e_room_keys_version_txn", _create_e2e_room_keys_version_txn ) + def update_e2e_room_keys_version(self, user_id, version, info): + """Update a given backup version + + Args: + user_id(str): the user whose backup version we're updating + version(str): the version ID of the backup version we're updating + info(dict): the new backup version info to store + """ + + return self._simple_update( + table="e2e_room_keys_versions", + keyvalues={ + "user_id": user_id, + "version": version, + }, + updatevalues={ + "auth_data": json.dumps(info["auth_data"]), + }, + desc="update_e2e_room_keys_version" + ) + def delete_e2e_room_keys_version(self, user_id, version=None): """Delete a given backup version of the user's room keys. Doesn't delete their actual key data. diff --git a/synapse/storage/schema/delta/53/user_ips_index.sql b/synapse/storage/schema/delta/53/user_ips_index.sql index 4ca346c111..b812c5794f 100644 --- a/synapse/storage/schema/delta/53/user_ips_index.sql +++ b/synapse/storage/schema/delta/53/user_ips_index.sql @@ -13,9 +13,13 @@ * limitations under the License. */ --- delete duplicates + -- analyze user_ips, to help ensure the correct indices are used INSERT INTO background_updates (update_name, progress_json) VALUES - ('user_ips_remove_dupes', '{}'); + ('user_ips_analyze', '{}'); + +-- delete duplicates +INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES + ('user_ips_remove_dupes', '{}', 'user_ips_analyze'); -- add a new unique index to user_ips table INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES @@ -23,4 +27,4 @@ INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES -- drop the old original index INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES - ('user_ips_drop_nonunique_index', '{}', 'user_ips_device_unique_index'); \ No newline at end of file + ('user_ips_drop_nonunique_index', '{}', 'user_ips_device_unique_index'); diff --git a/synapse/storage/state.py b/synapse/storage/state.py index c3ab7db7ae..d14a7b2538 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -428,13 +428,9 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): """ # for now we do this by looking at the create event. We may want to cache this # more intelligently in future. - state_ids = yield self.get_current_state_ids(room_id) - create_id = state_ids.get((EventTypes.Create, "")) - - if not create_id: - raise NotFoundError("Unknown room %s" % (room_id)) - create_event = yield self.get_event(create_id) + # Retrieve the room's create event + create_event = yield self.get_create_event_for_room(room_id) defer.returnValue(create_event.content.get("room_version", "1")) @defer.inlineCallbacks @@ -447,19 +443,39 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): Returns: Deferred[unicode|None]: predecessor room id + + Raises: + NotFoundError if the room is unknown + """ + # Retrieve the room's create event + create_event = yield self.get_create_event_for_room(room_id) + + # Return predecessor if present + defer.returnValue(create_event.content.get("predecessor", None)) + + @defer.inlineCallbacks + def get_create_event_for_room(self, room_id): + """Get the create state event for a room. + + Args: + room_id (str) + + Returns: + Deferred[EventBase]: The room creation event. + + Raises: + NotFoundError if the room is unknown """ state_ids = yield self.get_current_state_ids(room_id) create_id = state_ids.get((EventTypes.Create, "")) # If we can't find the create event, assume we've hit a dead end if not create_id: - defer.returnValue(None) + raise NotFoundError("Unknown room %s" % (room_id)) - # Retrieve the room's create event + # Retrieve the room's create event and return create_event = yield self.get_event(create_id) - - # Return predecessor if present - defer.returnValue(create_event.content.get("predecessor", None)) + defer.returnValue(create_event) @cached(max_entries=100000, iterable=True) def get_current_state_ids(self, room_id): diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index ce48212265..e8b574ee5e 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -22,6 +22,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, JoinRules from synapse.storage.engines import PostgresEngine, Sqlite3Engine +from synapse.storage.state import StateFilter from synapse.types import get_domain_from_id, get_localpart_from_id from synapse.util.caches.descriptors import cached, cachedInlineCallbacks @@ -31,12 +32,19 @@ logger = logging.getLogger(__name__) class UserDirectoryStore(SQLBaseStore): - @cachedInlineCallbacks(cache_context=True) - def is_room_world_readable_or_publicly_joinable(self, room_id, cache_context): + @defer.inlineCallbacks + def is_room_world_readable_or_publicly_joinable(self, room_id): """Check if the room is either world_readable or publically joinable """ - current_state_ids = yield self.get_current_state_ids( - room_id, on_invalidate=cache_context.invalidate + + # Create a state filter that only queries join and history state event + types_to_filter = ( + (EventTypes.JoinRules, ""), + (EventTypes.RoomHistoryVisibility, ""), + ) + + current_state_ids = yield self.get_filtered_current_state_ids( + room_id, StateFilter.from_types(types_to_filter), ) join_rules_id = current_state_ids.get((EventTypes.JoinRules, "")) |