From 7db2622d30466700909e03f6e2d4fd12b6af0611 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Mon, 11 Jan 2021 10:24:22 +0000 Subject: Remove unused SynapseService (#9058) --- synapse/app/homeserver.py | 22 ---------------------- 1 file changed, 22 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index b1d9817a6a..42b5dc53d7 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -15,13 +15,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -import gc import logging import os import sys from typing import Iterable, Iterator -from twisted.application import service from twisted.internet import defer, reactor from twisted.python.failure import Failure from twisted.web.resource import EncodingResourceWrapper, IResource @@ -73,7 +71,6 @@ from synapse.storage.prepare_database import UpgradeDatabaseException from synapse.util.httpresourcetree import create_resource_tree from synapse.util.manhole import manhole from synapse.util.module_loader import load_module -from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string logger = logging.getLogger("synapse.app.homeserver") @@ -487,25 +484,6 @@ def format_config_error(e: ConfigError) -> Iterator[str]: e = e.__cause__ -class SynapseService(service.Service): - """ - A twisted Service class that will start synapse. Used to run synapse - via twistd and a .tac. - """ - - def __init__(self, config): - self.config = config - - def startService(self): - hs = setup(self.config) - change_resource_limit(hs.config.soft_file_limit) - if hs.config.gc_thresholds: - gc.set_threshold(*hs.config.gc_thresholds) - - def stopService(self): - return self._port.stopListening() - - def run(hs): PROFILE_SYNAPSE = False if PROFILE_SYNAPSE: -- cgit 1.5.1 From 671138f6585d77f7577c7809a220555f54b09536 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Mon, 11 Jan 2021 15:55:05 +0000 Subject: Clean up exception handling in the startup code (#9059) Factor out the exception handling in the startup code to a utility function, and fix the some logging and exit code stuff. --- changelog.d/9059.bugfix | 1 + synapse/app/_base.py | 150 +++++++++++++++++++++++++----------------- synapse/app/generic_worker.py | 7 +- synapse/app/homeserver.py | 62 +++++++---------- 4 files changed, 120 insertions(+), 100 deletions(-) create mode 100644 changelog.d/9059.bugfix (limited to 'synapse/app') diff --git a/changelog.d/9059.bugfix b/changelog.d/9059.bugfix new file mode 100644 index 0000000000..2933703ffa --- /dev/null +++ b/changelog.d/9059.bugfix @@ -0,0 +1 @@ +Fix incorrect exit code when there is an error at startup. diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 37ecdbe3d8..395e202b89 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2017 New Vector Ltd +# Copyright 2019-2021 The Matrix.org Foundation C.I.C # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,7 +20,7 @@ import signal import socket import sys import traceback -from typing import Iterable +from typing import Awaitable, Callable, Iterable from typing_extensions import NoReturn @@ -143,6 +144,45 @@ def quit_with_error(error_string: str) -> NoReturn: sys.exit(1) +def register_start(cb: Callable[..., Awaitable], *args, **kwargs) -> None: + """Register a callback with the reactor, to be called once it is running + + This can be used to initialise parts of the system which require an asynchronous + setup. + + Any exception raised by the callback will be printed and logged, and the process + will exit. + """ + + async def wrapper(): + try: + await cb(*args, **kwargs) + except Exception: + # previously, we used Failure().printTraceback() here, in the hope that + # would give better tracebacks than traceback.print_exc(). However, that + # doesn't handle chained exceptions (with a __cause__ or __context__) well, + # and I *think* the need for Failure() is reduced now that we mostly use + # async/await. + + # Write the exception to both the logs *and* the unredirected stderr, + # because people tend to get confused if it only goes to one or the other. + # + # One problem with this is that if people are using a logging config that + # logs to the console (as is common eg under docker), they will get two + # copies of the exception. We could maybe try to detect that, but it's + # probably a cost we can bear. + logger.fatal("Error during startup", exc_info=True) + print("Error during startup:", file=sys.__stderr__) + traceback.print_exc(file=sys.__stderr__) + + # it's no use calling sys.exit here, since that just raises a SystemExit + # exception which is then caught by the reactor, and everything carries + # on as normal. + os._exit(1) + + reactor.callWhenRunning(lambda: defer.ensureDeferred(wrapper())) + + def listen_metrics(bind_addresses, port): """ Start Prometheus metrics server. @@ -227,7 +267,7 @@ def refresh_certificate(hs): logger.info("Context factories updated.") -def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]): +async def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]): """ Start a Synapse server or worker. @@ -241,75 +281,67 @@ def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]): hs: homeserver instance listeners: Listener configuration ('listeners' in homeserver.yaml) """ - try: - # Set up the SIGHUP machinery. - if hasattr(signal, "SIGHUP"): + # Set up the SIGHUP machinery. + if hasattr(signal, "SIGHUP"): + reactor = hs.get_reactor() - reactor = hs.get_reactor() + @wrap_as_background_process("sighup") + def handle_sighup(*args, **kwargs): + # Tell systemd our state, if we're using it. This will silently fail if + # we're not using systemd. + sdnotify(b"RELOADING=1") - @wrap_as_background_process("sighup") - def handle_sighup(*args, **kwargs): - # Tell systemd our state, if we're using it. This will silently fail if - # we're not using systemd. - sdnotify(b"RELOADING=1") + for i, args, kwargs in _sighup_callbacks: + i(*args, **kwargs) - for i, args, kwargs in _sighup_callbacks: - i(*args, **kwargs) + sdnotify(b"READY=1") - sdnotify(b"READY=1") + # We defer running the sighup handlers until next reactor tick. This + # is so that we're in a sane state, e.g. flushing the logs may fail + # if the sighup happens in the middle of writing a log entry. + def run_sighup(*args, **kwargs): + # `callFromThread` should be "signal safe" as well as thread + # safe. + reactor.callFromThread(handle_sighup, *args, **kwargs) - # We defer running the sighup handlers until next reactor tick. This - # is so that we're in a sane state, e.g. flushing the logs may fail - # if the sighup happens in the middle of writing a log entry. - def run_sighup(*args, **kwargs): - # `callFromThread` should be "signal safe" as well as thread - # safe. - reactor.callFromThread(handle_sighup, *args, **kwargs) + signal.signal(signal.SIGHUP, run_sighup) - signal.signal(signal.SIGHUP, run_sighup) + register_sighup(refresh_certificate, hs) - register_sighup(refresh_certificate, hs) + # Load the certificate from disk. + refresh_certificate(hs) - # Load the certificate from disk. - refresh_certificate(hs) + # Start the tracer + synapse.logging.opentracing.init_tracer( # type: ignore[attr-defined] # noqa + hs + ) - # Start the tracer - synapse.logging.opentracing.init_tracer( # type: ignore[attr-defined] # noqa - hs - ) + # It is now safe to start your Synapse. + hs.start_listening(listeners) + hs.get_datastore().db_pool.start_profiling() + hs.get_pusherpool().start() + + # Log when we start the shut down process. + hs.get_reactor().addSystemEventTrigger( + "before", "shutdown", logger.info, "Shutting down..." + ) - # It is now safe to start your Synapse. - hs.start_listening(listeners) - hs.get_datastore().db_pool.start_profiling() - hs.get_pusherpool().start() + setup_sentry(hs) + setup_sdnotify(hs) - # Log when we start the shut down process. - hs.get_reactor().addSystemEventTrigger( - "before", "shutdown", logger.info, "Shutting down..." - ) + # If background tasks are running on the main process, start collecting the + # phone home stats. + if hs.config.run_background_tasks: + start_phone_stats_home(hs) - setup_sentry(hs) - setup_sdnotify(hs) - - # If background tasks are running on the main process, start collecting the - # phone home stats. - if hs.config.run_background_tasks: - start_phone_stats_home(hs) - - # We now freeze all allocated objects in the hopes that (almost) - # everything currently allocated are things that will be used for the - # rest of time. Doing so means less work each GC (hopefully). - # - # This only works on Python 3.7 - if sys.version_info >= (3, 7): - gc.collect() - gc.freeze() - except Exception: - traceback.print_exc(file=sys.stderr) - reactor = hs.get_reactor() - if reactor.running: - reactor.stop() - sys.exit(1) + # We now freeze all allocated objects in the hopes that (almost) + # everything currently allocated are things that will be used for the + # rest of time. Doing so means less work each GC (hopefully). + # + # This only works on Python 3.7 + if sys.version_info >= (3, 7): + gc.collect() + gc.freeze() def setup_sentry(hs): diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 4428472707..a57535989a 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -21,7 +21,7 @@ from typing import Dict, Iterable, Optional, Set from typing_extensions import ContextManager -from twisted.internet import address, reactor +from twisted.internet import address import synapse import synapse.events @@ -34,6 +34,7 @@ from synapse.api.urls import ( SERVER_KEY_V2_PREFIX, ) from synapse.app import _base +from synapse.app._base import register_start from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging @@ -960,9 +961,7 @@ def start(config_options): # streams. Will no-op if no streams can be written to by this worker. hs.get_replication_streamer() - reactor.addSystemEventTrigger( - "before", "startup", _base.start, hs, config.worker_listeners - ) + register_start(_base.start, hs, config.worker_listeners) _base.start_worker_reactor("synapse-generic-worker", config) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 42b5dc53d7..cbecf23be6 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -20,8 +20,7 @@ import os import sys from typing import Iterable, Iterator -from twisted.internet import defer, reactor -from twisted.python.failure import Failure +from twisted.internet import reactor from twisted.web.resource import EncodingResourceWrapper, IResource from twisted.web.server import GzipEncoderFactory from twisted.web.static import File @@ -38,7 +37,7 @@ from synapse.api.urls import ( WEB_CLIENT_PREFIX, ) from synapse.app import _base -from synapse.app._base import listen_ssl, listen_tcp, quit_with_error +from synapse.app._base import listen_ssl, listen_tcp, quit_with_error, register_start from synapse.config._base import ConfigError from synapse.config.emailconfig import ThreepidBehaviour from synapse.config.homeserver import HomeServerConfig @@ -414,40 +413,29 @@ def setup(config_options): _base.refresh_certificate(hs) async def start(): - try: - # Run the ACME provisioning code, if it's enabled. - if hs.config.acme_enabled: - acme = hs.get_acme_handler() - # Start up the webservices which we will respond to ACME - # challenges with, and then provision. - await acme.start_listening() - await do_acme() - - # Check if it needs to be reprovisioned every day. - hs.get_clock().looping_call(reprovision_acme, 24 * 60 * 60 * 1000) - - # Load the OIDC provider metadatas, if OIDC is enabled. - if hs.config.oidc_enabled: - oidc = hs.get_oidc_handler() - # Loading the provider metadata also ensures the provider config is valid. - await oidc.load_metadata() - await oidc.load_jwks() - - _base.start(hs, config.listeners) - - hs.get_datastore().db_pool.updates.start_doing_background_updates() - except Exception: - # Print the exception and bail out. - print("Error during startup:", file=sys.stderr) - - # this gives better tracebacks than traceback.print_exc() - Failure().printTraceback(file=sys.stderr) - - if reactor.running: - reactor.stop() - sys.exit(1) - - reactor.callWhenRunning(lambda: defer.ensureDeferred(start())) + # Run the ACME provisioning code, if it's enabled. + if hs.config.acme_enabled: + acme = hs.get_acme_handler() + # Start up the webservices which we will respond to ACME + # challenges with, and then provision. + await acme.start_listening() + await do_acme() + + # Check if it needs to be reprovisioned every day. + hs.get_clock().looping_call(reprovision_acme, 24 * 60 * 60 * 1000) + + # Load the OIDC provider metadatas, if OIDC is enabled. + if hs.config.oidc_enabled: + oidc = hs.get_oidc_handler() + # Loading the provider metadata also ensures the provider config is valid. + await oidc.load_metadata() + await oidc.load_jwks() + + await _base.start(hs, config.listeners) + + hs.get_datastore().db_pool.updates.start_doing_background_updates() + + register_start(start) return hs -- cgit 1.5.1 From c9195744a4c8196f5900a467d63327ad3a9c9bbc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 11 Jan 2021 18:01:27 +0000 Subject: Move more encryption endpoints off master (#9068) --- changelog.d/9068.feature | 1 + synapse/app/generic_worker.py | 12 +++- synapse/storage/databases/main/end_to_end_keys.py | 88 +++++++++++------------ 3 files changed, 55 insertions(+), 46 deletions(-) create mode 100644 changelog.d/9068.feature (limited to 'synapse/app') diff --git a/changelog.d/9068.feature b/changelog.d/9068.feature new file mode 100644 index 0000000000..cdf1844fa7 --- /dev/null +++ b/changelog.d/9068.feature @@ -0,0 +1 @@ +Add experimental support for handling `/keys/claim` and `/room_keys` APIs on worker processes. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index a57535989a..f24c648ac7 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -100,14 +100,18 @@ from synapse.rest.client.v1.profile import ( ) from synapse.rest.client.v1.push_rule import PushRuleRestServlet from synapse.rest.client.v1.voip import VoipRestServlet -from synapse.rest.client.v2_alpha import groups, sync, user_directory +from synapse.rest.client.v2_alpha import groups, room_keys, sync, user_directory from synapse.rest.client.v2_alpha._base import client_patterns from synapse.rest.client.v2_alpha.account import ThreepidRestServlet from synapse.rest.client.v2_alpha.account_data import ( AccountDataServlet, RoomAccountDataServlet, ) -from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet +from synapse.rest.client.v2_alpha.keys import ( + KeyChangesServlet, + KeyQueryServlet, + OneTimeKeyServlet, +) from synapse.rest.client.v2_alpha.register import RegisterRestServlet from synapse.rest.client.v2_alpha.sendtodevice import SendToDeviceRestServlet from synapse.rest.client.versions import VersionsRestServlet @@ -116,6 +120,7 @@ from synapse.rest.key.v2 import KeyApiV2Resource from synapse.server import HomeServer, cache_in_self from synapse.storage.databases.main.censor_events import CensorEventsStore from synapse.storage.databases.main.client_ips import ClientIpWorkerStore +from synapse.storage.databases.main.e2e_room_keys import EndToEndRoomKeyStore from synapse.storage.databases.main.media_repository import MediaRepositoryStore from synapse.storage.databases.main.metrics import ServerMetricsStore from synapse.storage.databases.main.monthly_active_users import ( @@ -447,6 +452,7 @@ class GenericWorkerSlavedStore( UserDirectoryStore, StatsStore, UIAuthWorkerStore, + EndToEndRoomKeyStore, SlavedDeviceInboxStore, SlavedDeviceStore, SlavedReceiptsStore, @@ -504,6 +510,7 @@ class GenericWorkerServer(HomeServer): LoginRestServlet(self).register(resource) ThreepidRestServlet(self).register(resource) KeyQueryServlet(self).register(resource) + OneTimeKeyServlet(self).register(resource) KeyChangesServlet(self).register(resource) VoipRestServlet(self).register(resource) PushRuleRestServlet(self).register(resource) @@ -521,6 +528,7 @@ class GenericWorkerServer(HomeServer): room.register_servlets(self, resource, True) room.register_deprecated_servlets(self, resource) InitialSyncRestServlet(self).register(resource) + room_keys.register_servlets(self, resource) SendToDeviceRestServlet(self).register(resource) diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py index 4d1b92d1aa..1b6ccd51c8 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py @@ -707,50 +707,6 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore): """Get the current stream id from the _device_list_id_gen""" ... - -class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): - async def set_e2e_device_keys( - self, user_id: str, device_id: str, time_now: int, device_keys: JsonDict - ) -> bool: - """Stores device keys for a device. Returns whether there was a change - or the keys were already in the database. - """ - - def _set_e2e_device_keys_txn(txn): - set_tag("user_id", user_id) - set_tag("device_id", device_id) - set_tag("time_now", time_now) - set_tag("device_keys", device_keys) - - old_key_json = self.db_pool.simple_select_one_onecol_txn( - txn, - table="e2e_device_keys_json", - keyvalues={"user_id": user_id, "device_id": device_id}, - retcol="key_json", - allow_none=True, - ) - - # In py3 we need old_key_json to match new_key_json type. The DB - # returns unicode while encode_canonical_json returns bytes. - new_key_json = encode_canonical_json(device_keys).decode("utf-8") - - if old_key_json == new_key_json: - log_kv({"Message": "Device key already stored."}) - return False - - self.db_pool.simple_upsert_txn( - txn, - table="e2e_device_keys_json", - keyvalues={"user_id": user_id, "device_id": device_id}, - values={"ts_added_ms": time_now, "key_json": new_key_json}, - ) - log_kv({"message": "Device keys stored."}) - return True - - return await self.db_pool.runInteraction( - "set_e2e_device_keys", _set_e2e_device_keys_txn - ) - async def claim_e2e_one_time_keys( self, query_list: Iterable[Tuple[str, str, str]] ) -> Dict[str, Dict[str, Dict[str, bytes]]]: @@ -840,6 +796,50 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): "claim_e2e_one_time_keys", _claim_e2e_one_time_keys ) + +class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): + async def set_e2e_device_keys( + self, user_id: str, device_id: str, time_now: int, device_keys: JsonDict + ) -> bool: + """Stores device keys for a device. Returns whether there was a change + or the keys were already in the database. + """ + + def _set_e2e_device_keys_txn(txn): + set_tag("user_id", user_id) + set_tag("device_id", device_id) + set_tag("time_now", time_now) + set_tag("device_keys", device_keys) + + old_key_json = self.db_pool.simple_select_one_onecol_txn( + txn, + table="e2e_device_keys_json", + keyvalues={"user_id": user_id, "device_id": device_id}, + retcol="key_json", + allow_none=True, + ) + + # In py3 we need old_key_json to match new_key_json type. The DB + # returns unicode while encode_canonical_json returns bytes. + new_key_json = encode_canonical_json(device_keys).decode("utf-8") + + if old_key_json == new_key_json: + log_kv({"Message": "Device key already stored."}) + return False + + self.db_pool.simple_upsert_txn( + txn, + table="e2e_device_keys_json", + keyvalues={"user_id": user_id, "device_id": device_id}, + values={"ts_added_ms": time_now, "key_json": new_key_json}, + ) + log_kv({"message": "Device keys stored."}) + return True + + return await self.db_pool.runInteraction( + "set_e2e_device_keys", _set_e2e_device_keys_txn + ) + async def delete_e2e_keys_by_device(self, user_id: str, device_id: str) -> None: def delete_e2e_keys_by_device_txn(txn): log_kv( -- cgit 1.5.1 From d1eb1b96e8e7968a4e7d3d56a4b2b9ef61a5d7f4 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 13 Jan 2021 12:35:40 -0500 Subject: Register the /devices endpoint on workers. (#9092) --- changelog.d/9092.feature | 1 + docs/workers.md | 1 + synapse/app/generic_worker.py | 2 ++ synapse/storage/databases/main/client_ips.py | 41 ++++++++++++++++++++-------- 4 files changed, 34 insertions(+), 11 deletions(-) create mode 100644 changelog.d/9092.feature (limited to 'synapse/app') diff --git a/changelog.d/9092.feature b/changelog.d/9092.feature new file mode 100644 index 0000000000..64843a6a95 --- /dev/null +++ b/changelog.d/9092.feature @@ -0,0 +1 @@ + Add experimental support for handling `/devices` API on worker processes. diff --git a/docs/workers.md b/docs/workers.md index 298adf8695..7fb651bba4 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -214,6 +214,7 @@ expressions: ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/members$ ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/state$ ^/_matrix/client/(api/v1|r0|unstable)/account/3pid$ + ^/_matrix/client/(api/v1|r0|unstable)/devices$ ^/_matrix/client/(api/v1|r0|unstable)/keys/query$ ^/_matrix/client/(api/v1|r0|unstable)/keys/changes$ ^/_matrix/client/versions$ diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index f24c648ac7..cb202bda44 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -107,6 +107,7 @@ from synapse.rest.client.v2_alpha.account_data import ( AccountDataServlet, RoomAccountDataServlet, ) +from synapse.rest.client.v2_alpha.devices import DevicesRestServlet from synapse.rest.client.v2_alpha.keys import ( KeyChangesServlet, KeyQueryServlet, @@ -509,6 +510,7 @@ class GenericWorkerServer(HomeServer): RegisterRestServlet(self).register(resource) LoginRestServlet(self).register(resource) ThreepidRestServlet(self).register(resource) + DevicesRestServlet(self).register(resource) KeyQueryServlet(self).register(resource) OneTimeKeyServlet(self).register(resource) KeyChangesServlet(self).register(resource) diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py index c53c836337..ea1e8fb580 100644 --- a/synapse/storage/databases/main/client_ips.py +++ b/synapse/storage/databases/main/client_ips.py @@ -407,6 +407,34 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore): "_prune_old_user_ips", _prune_old_user_ips_txn ) + async def get_last_client_ip_by_device( + self, user_id: str, device_id: Optional[str] + ) -> Dict[Tuple[str, str], dict]: + """For each device_id listed, give the user_ip it was last seen on. + + The result might be slightly out of date as client IPs are inserted in batches. + + Args: + user_id: The user to fetch devices for. + device_id: If None fetches all devices for the user + + Returns: + A dictionary mapping a tuple of (user_id, device_id) to dicts, with + keys giving the column names from the devices table. + """ + + keyvalues = {"user_id": user_id} + if device_id is not None: + keyvalues["device_id"] = device_id + + res = await self.db_pool.simple_select_list( + table="devices", + keyvalues=keyvalues, + retcols=("user_id", "ip", "user_agent", "device_id", "last_seen"), + ) + + return {(d["user_id"], d["device_id"]): d for d in res} + class ClientIpStore(ClientIpWorkerStore): def __init__(self, database: DatabasePool, db_conn, hs): @@ -512,18 +540,9 @@ class ClientIpStore(ClientIpWorkerStore): A dictionary mapping a tuple of (user_id, device_id) to dicts, with keys giving the column names from the devices table. """ + ret = await super().get_last_client_ip_by_device(user_id, device_id) - keyvalues = {"user_id": user_id} - if device_id is not None: - keyvalues["device_id"] = device_id - - res = await self.db_pool.simple_select_list( - table="devices", - keyvalues=keyvalues, - retcols=("user_id", "ip", "user_agent", "device_id", "last_seen"), - ) - - ret = {(d["user_id"], d["device_id"]): d for d in res} + # Update what is retrieved from the database with data which is pending insertion. for key in self._batch_row_update: uid, access_token, ip = key if uid == user_id: -- cgit 1.5.1 From 21a296cd5ac9c450a6e8896e25f0a4afad1c3774 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Thu, 14 Jan 2021 13:29:17 +0000 Subject: Split OidcProvider out of OidcHandler (#9107) The idea here is that we will have an instance of OidcProvider for each configured IdP, with OidcHandler just doing the marshalling of them. For now it's still hardcoded with a single provider. --- changelog.d/9107.feature | 1 + synapse/app/homeserver.py | 1 - synapse/handlers/oidc_handler.py | 246 +++++++++++++++++++++++---------------- tests/handlers/test_oidc.py | 93 ++++++++------- 4 files changed, 197 insertions(+), 144 deletions(-) create mode 100644 changelog.d/9107.feature (limited to 'synapse/app') diff --git a/changelog.d/9107.feature b/changelog.d/9107.feature new file mode 100644 index 0000000000..01a24dcf49 --- /dev/null +++ b/changelog.d/9107.feature @@ -0,0 +1 @@ +Add support for multiple SSO Identity Providers. diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index cbecf23be6..57a2f5237c 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -429,7 +429,6 @@ def setup(config_options): oidc = hs.get_oidc_handler() # Loading the provider metadata also ensures the provider config is valid. await oidc.load_metadata() - await oidc.load_jwks() await _base.start(hs, config.listeners) diff --git a/synapse/handlers/oidc_handler.py b/synapse/handlers/oidc_handler.py index 84754e5c9c..d6347bb1b8 100644 --- a/synapse/handlers/oidc_handler.py +++ b/synapse/handlers/oidc_handler.py @@ -35,6 +35,7 @@ from typing_extensions import TypedDict from twisted.web.client import readBody from synapse.config import ConfigError +from synapse.config.oidc_config import OidcProviderConfig from synapse.handlers.sso import MappingException, UserAttributes from synapse.http.site import SynapseRequest from synapse.logging.context import make_deferred_yieldable @@ -70,6 +71,131 @@ JWK = Dict[str, str] JWKS = TypedDict("JWKS", {"keys": List[JWK]}) +class OidcHandler: + """Handles requests related to the OpenID Connect login flow. + """ + + def __init__(self, hs: "HomeServer"): + self._sso_handler = hs.get_sso_handler() + + provider_conf = hs.config.oidc.oidc_provider + # we should not have been instantiated if there is no configured provider. + assert provider_conf is not None + + self._token_generator = OidcSessionTokenGenerator(hs) + + self._provider = OidcProvider(hs, self._token_generator, provider_conf) + + async def load_metadata(self) -> None: + """Validate the config and load the metadata from the remote endpoint. + + Called at startup to ensure we have everything we need. + """ + await self._provider.load_metadata() + await self._provider.load_jwks() + + async def handle_oidc_callback(self, request: SynapseRequest) -> None: + """Handle an incoming request to /_synapse/oidc/callback + + Since we might want to display OIDC-related errors in a user-friendly + way, we don't raise SynapseError from here. Instead, we call + ``self._sso_handler.render_error`` which displays an HTML page for the error. + + Most of the OpenID Connect logic happens here: + + - first, we check if there was any error returned by the provider and + display it + - then we fetch the session cookie, decode and verify it + - the ``state`` query parameter should match with the one stored in the + session cookie + + Once we know the session is legit, we then delegate to the OIDC Provider + implementation, which will exchange the code with the provider and complete the + login/authentication. + + Args: + request: the incoming request from the browser. + """ + + # The provider might redirect with an error. + # In that case, just display it as-is. + if b"error" in request.args: + # error response from the auth server. see: + # https://tools.ietf.org/html/rfc6749#section-4.1.2.1 + # https://openid.net/specs/openid-connect-core-1_0.html#AuthError + error = request.args[b"error"][0].decode() + description = request.args.get(b"error_description", [b""])[0].decode() + + # Most of the errors returned by the provider could be due by + # either the provider misbehaving or Synapse being misconfigured. + # The only exception of that is "access_denied", where the user + # probably cancelled the login flow. In other cases, log those errors. + if error != "access_denied": + logger.error("Error from the OIDC provider: %s %s", error, description) + + self._sso_handler.render_error(request, error, description) + return + + # otherwise, it is presumably a successful response. see: + # https://tools.ietf.org/html/rfc6749#section-4.1.2 + + # Fetch the session cookie + session = request.getCookie(SESSION_COOKIE_NAME) # type: Optional[bytes] + if session is None: + logger.info("No session cookie found") + self._sso_handler.render_error( + request, "missing_session", "No session cookie found" + ) + return + + # Remove the cookie. There is a good chance that if the callback failed + # once, it will fail next time and the code will already be exchanged. + # Removing it early avoids spamming the provider with token requests. + request.addCookie( + SESSION_COOKIE_NAME, + b"", + path="/_synapse/oidc", + expires="Thu, Jan 01 1970 00:00:00 UTC", + httpOnly=True, + sameSite="lax", + ) + + # Check for the state query parameter + if b"state" not in request.args: + logger.info("State parameter is missing") + self._sso_handler.render_error( + request, "invalid_request", "State parameter is missing" + ) + return + + state = request.args[b"state"][0].decode() + + # Deserialize the session token and verify it. + try: + session_data = self._token_generator.verify_oidc_session_token( + session, state + ) + except MacaroonDeserializationException as e: + logger.exception("Invalid session") + self._sso_handler.render_error(request, "invalid_session", str(e)) + return + except MacaroonInvalidSignatureException as e: + logger.exception("Could not verify session") + self._sso_handler.render_error(request, "mismatching_session", str(e)) + return + + if b"code" not in request.args: + logger.info("Code parameter is missing") + self._sso_handler.render_error( + request, "invalid_request", "Code parameter is missing" + ) + return + + code = request.args[b"code"][0].decode() + + await self._provider.handle_oidc_callback(request, session_data, code) + + class OidcError(Exception): """Used to catch errors when calling the token_endpoint """ @@ -84,21 +210,25 @@ class OidcError(Exception): return self.error -class OidcHandler: - """Handles requests related to the OpenID Connect login flow. +class OidcProvider: + """Wraps the config for a single OIDC IdentityProvider + + Provides methods for handling redirect requests and callbacks via that particular + IdP. """ - def __init__(self, hs: "HomeServer"): + def __init__( + self, + hs: "HomeServer", + token_generator: "OidcSessionTokenGenerator", + provider: OidcProviderConfig, + ): self._store = hs.get_datastore() - self._token_generator = OidcSessionTokenGenerator(hs) + self._token_generator = token_generator self._callback_url = hs.config.oidc_callback_url # type: str - provider = hs.config.oidc.oidc_provider - # we should not have been instantiated if there is no configured provider. - assert provider is not None - self._scopes = provider.scopes self._user_profile_method = provider.user_profile_method self._client_auth = ClientAuth( @@ -552,22 +682,16 @@ class OidcHandler: nonce=nonce, ) - async def handle_oidc_callback(self, request: SynapseRequest) -> None: + async def handle_oidc_callback( + self, request: SynapseRequest, session_data: "OidcSessionData", code: str + ) -> None: """Handle an incoming request to /_synapse/oidc/callback - Since we might want to display OIDC-related errors in a user-friendly - way, we don't raise SynapseError from here. Instead, we call - ``self._sso_handler.render_error`` which displays an HTML page for the error. + By this time we have already validated the session on the synapse side, and + now need to do the provider-specific operations. This includes: - Most of the OpenID Connect logic happens here: - - - first, we check if there was any error returned by the provider and - display it - - then we fetch the session cookie, decode and verify it - - the ``state`` query parameter should match with the one stored in the - session cookie - - once we known this session is legit, exchange the code with the - provider using the ``token_endpoint`` (see ``_exchange_code``) + - exchange the code with the provider using the ``token_endpoint`` (see + ``_exchange_code``) - once we have the token, use it to either extract the UserInfo from the ``id_token`` (``_parse_id_token``), or use the ``access_token`` to fetch UserInfo from the ``userinfo_endpoint`` @@ -577,86 +701,12 @@ class OidcHandler: Args: request: the incoming request from the browser. + session_data: the session data, extracted from our cookie + code: The authorization code we got from the callback. """ - - # The provider might redirect with an error. - # In that case, just display it as-is. - if b"error" in request.args: - # error response from the auth server. see: - # https://tools.ietf.org/html/rfc6749#section-4.1.2.1 - # https://openid.net/specs/openid-connect-core-1_0.html#AuthError - error = request.args[b"error"][0].decode() - description = request.args.get(b"error_description", [b""])[0].decode() - - # Most of the errors returned by the provider could be due by - # either the provider misbehaving or Synapse being misconfigured. - # The only exception of that is "access_denied", where the user - # probably cancelled the login flow. In other cases, log those errors. - if error != "access_denied": - logger.error("Error from the OIDC provider: %s %s", error, description) - - self._sso_handler.render_error(request, error, description) - return - - # otherwise, it is presumably a successful response. see: - # https://tools.ietf.org/html/rfc6749#section-4.1.2 - - # Fetch the session cookie - session = request.getCookie(SESSION_COOKIE_NAME) # type: Optional[bytes] - if session is None: - logger.info("No session cookie found") - self._sso_handler.render_error( - request, "missing_session", "No session cookie found" - ) - return - - # Remove the cookie. There is a good chance that if the callback failed - # once, it will fail next time and the code will already be exchanged. - # Removing it early avoids spamming the provider with token requests. - request.addCookie( - SESSION_COOKIE_NAME, - b"", - path="/_synapse/oidc", - expires="Thu, Jan 01 1970 00:00:00 UTC", - httpOnly=True, - sameSite="lax", - ) - - # Check for the state query parameter - if b"state" not in request.args: - logger.info("State parameter is missing") - self._sso_handler.render_error( - request, "invalid_request", "State parameter is missing" - ) - return - - state = request.args[b"state"][0].decode() - - # Deserialize the session token and verify it. - try: - session_data = self._token_generator.verify_oidc_session_token( - session, state - ) - except MacaroonDeserializationException as e: - logger.exception("Invalid session") - self._sso_handler.render_error(request, "invalid_session", str(e)) - return - except MacaroonInvalidSignatureException as e: - logger.exception("Could not verify session") - self._sso_handler.render_error(request, "mismatching_session", str(e)) - return - # Exchange the code with the provider - if b"code" not in request.args: - logger.info("Code parameter is missing") - self._sso_handler.render_error( - request, "invalid_request", "Code parameter is missing" - ) - return - - logger.debug("Exchanging code") - code = request.args[b"code"][0].decode() try: + logger.debug("Exchanging code") token = await self._exchange_code(code) except OidcError as e: logger.exception("Could not exchange code") diff --git a/tests/handlers/test_oidc.py b/tests/handlers/test_oidc.py index 2abd7a83b5..5d338bea87 100644 --- a/tests/handlers/test_oidc.py +++ b/tests/handlers/test_oidc.py @@ -151,6 +151,7 @@ class OidcHandlerTestCase(HomeserverTestCase): hs = self.setup_test_homeserver(proxied_http_client=self.http_client) self.handler = hs.get_oidc_handler() + self.provider = self.handler._provider sso_handler = hs.get_sso_handler() # Mock the render error method. self.render_error = Mock(return_value=None) @@ -162,9 +163,10 @@ class OidcHandlerTestCase(HomeserverTestCase): return hs def metadata_edit(self, values): - return patch.dict(self.handler._provider_metadata, values) + return patch.dict(self.provider._provider_metadata, values) def assertRenderedError(self, error, error_description=None): + self.render_error.assert_called_once() args = self.render_error.call_args[0] self.assertEqual(args[1], error) if error_description is not None: @@ -175,15 +177,15 @@ class OidcHandlerTestCase(HomeserverTestCase): def test_config(self): """Basic config correctly sets up the callback URL and client auth correctly.""" - self.assertEqual(self.handler._callback_url, CALLBACK_URL) - self.assertEqual(self.handler._client_auth.client_id, CLIENT_ID) - self.assertEqual(self.handler._client_auth.client_secret, CLIENT_SECRET) + self.assertEqual(self.provider._callback_url, CALLBACK_URL) + self.assertEqual(self.provider._client_auth.client_id, CLIENT_ID) + self.assertEqual(self.provider._client_auth.client_secret, CLIENT_SECRET) @override_config({"oidc_config": {"discover": True}}) def test_discovery(self): """The handler should discover the endpoints from OIDC discovery document.""" # This would throw if some metadata were invalid - metadata = self.get_success(self.handler.load_metadata()) + metadata = self.get_success(self.provider.load_metadata()) self.http_client.get_json.assert_called_once_with(WELL_KNOWN) self.assertEqual(metadata.issuer, ISSUER) @@ -195,47 +197,47 @@ class OidcHandlerTestCase(HomeserverTestCase): # subsequent calls should be cached self.http_client.reset_mock() - self.get_success(self.handler.load_metadata()) + self.get_success(self.provider.load_metadata()) self.http_client.get_json.assert_not_called() @override_config({"oidc_config": COMMON_CONFIG}) def test_no_discovery(self): """When discovery is disabled, it should not try to load from discovery document.""" - self.get_success(self.handler.load_metadata()) + self.get_success(self.provider.load_metadata()) self.http_client.get_json.assert_not_called() @override_config({"oidc_config": COMMON_CONFIG}) def test_load_jwks(self): """JWKS loading is done once (then cached) if used.""" - jwks = self.get_success(self.handler.load_jwks()) + jwks = self.get_success(self.provider.load_jwks()) self.http_client.get_json.assert_called_once_with(JWKS_URI) self.assertEqual(jwks, {"keys": []}) # subsequent calls should be cached… self.http_client.reset_mock() - self.get_success(self.handler.load_jwks()) + self.get_success(self.provider.load_jwks()) self.http_client.get_json.assert_not_called() # …unless forced self.http_client.reset_mock() - self.get_success(self.handler.load_jwks(force=True)) + self.get_success(self.provider.load_jwks(force=True)) self.http_client.get_json.assert_called_once_with(JWKS_URI) # Throw if the JWKS uri is missing with self.metadata_edit({"jwks_uri": None}): - self.get_failure(self.handler.load_jwks(force=True), RuntimeError) + self.get_failure(self.provider.load_jwks(force=True), RuntimeError) # Return empty key set if JWKS are not used - self.handler._scopes = [] # not asking the openid scope + self.provider._scopes = [] # not asking the openid scope self.http_client.get_json.reset_mock() - jwks = self.get_success(self.handler.load_jwks(force=True)) + jwks = self.get_success(self.provider.load_jwks(force=True)) self.http_client.get_json.assert_not_called() self.assertEqual(jwks, {"keys": []}) @override_config({"oidc_config": COMMON_CONFIG}) def test_validate_config(self): """Provider metadatas are extensively validated.""" - h = self.handler + h = self.provider # Default test config does not throw h._validate_metadata() @@ -314,13 +316,13 @@ class OidcHandlerTestCase(HomeserverTestCase): """Provider metadata validation can be disabled by config.""" with self.metadata_edit({"issuer": "http://insecure"}): # This should not throw - self.handler._validate_metadata() + self.provider._validate_metadata() def test_redirect_request(self): """The redirect request has the right arguments & generates a valid session cookie.""" req = Mock(spec=["addCookie"]) url = self.get_success( - self.handler.handle_redirect_request(req, b"http://client/redirect") + self.provider.handle_redirect_request(req, b"http://client/redirect") ) url = urlparse(url) auth_endpoint = urlparse(AUTHORIZATION_ENDPOINT) @@ -388,7 +390,7 @@ class OidcHandlerTestCase(HomeserverTestCase): # ensure that we are correctly testing the fallback when "get_extra_attributes" # is not implemented. - mapping_provider = self.handler._user_mapping_provider + mapping_provider = self.provider._user_mapping_provider with self.assertRaises(AttributeError): _ = mapping_provider.get_extra_attributes @@ -403,9 +405,9 @@ class OidcHandlerTestCase(HomeserverTestCase): "username": username, } expected_user_id = "@%s:%s" % (username, self.hs.hostname) - self.handler._exchange_code = simple_async_mock(return_value=token) - self.handler._parse_id_token = simple_async_mock(return_value=userinfo) - self.handler._fetch_userinfo = simple_async_mock(return_value=userinfo) + self.provider._exchange_code = simple_async_mock(return_value=token) + self.provider._parse_id_token = simple_async_mock(return_value=userinfo) + self.provider._fetch_userinfo = simple_async_mock(return_value=userinfo) auth_handler = self.hs.get_auth_handler() auth_handler.complete_sso_login = simple_async_mock() @@ -425,14 +427,14 @@ class OidcHandlerTestCase(HomeserverTestCase): auth_handler.complete_sso_login.assert_called_once_with( expected_user_id, request, client_redirect_url, None, ) - self.handler._exchange_code.assert_called_once_with(code) - self.handler._parse_id_token.assert_called_once_with(token, nonce=nonce) - self.handler._fetch_userinfo.assert_not_called() + self.provider._exchange_code.assert_called_once_with(code) + self.provider._parse_id_token.assert_called_once_with(token, nonce=nonce) + self.provider._fetch_userinfo.assert_not_called() self.render_error.assert_not_called() # Handle mapping errors with patch.object( - self.handler, + self.provider, "_remote_id_from_userinfo", new=Mock(side_effect=MappingException()), ): @@ -440,36 +442,36 @@ class OidcHandlerTestCase(HomeserverTestCase): self.assertRenderedError("mapping_error") # Handle ID token errors - self.handler._parse_id_token = simple_async_mock(raises=Exception()) + self.provider._parse_id_token = simple_async_mock(raises=Exception()) self.get_success(self.handler.handle_oidc_callback(request)) self.assertRenderedError("invalid_token") auth_handler.complete_sso_login.reset_mock() - self.handler._exchange_code.reset_mock() - self.handler._parse_id_token.reset_mock() - self.handler._fetch_userinfo.reset_mock() + self.provider._exchange_code.reset_mock() + self.provider._parse_id_token.reset_mock() + self.provider._fetch_userinfo.reset_mock() # With userinfo fetching - self.handler._scopes = [] # do not ask the "openid" scope + self.provider._scopes = [] # do not ask the "openid" scope self.get_success(self.handler.handle_oidc_callback(request)) auth_handler.complete_sso_login.assert_called_once_with( expected_user_id, request, client_redirect_url, None, ) - self.handler._exchange_code.assert_called_once_with(code) - self.handler._parse_id_token.assert_not_called() - self.handler._fetch_userinfo.assert_called_once_with(token) + self.provider._exchange_code.assert_called_once_with(code) + self.provider._parse_id_token.assert_not_called() + self.provider._fetch_userinfo.assert_called_once_with(token) self.render_error.assert_not_called() # Handle userinfo fetching error - self.handler._fetch_userinfo = simple_async_mock(raises=Exception()) + self.provider._fetch_userinfo = simple_async_mock(raises=Exception()) self.get_success(self.handler.handle_oidc_callback(request)) self.assertRenderedError("fetch_error") # Handle code exchange failure from synapse.handlers.oidc_handler import OidcError - self.handler._exchange_code = simple_async_mock( + self.provider._exchange_code = simple_async_mock( raises=OidcError("invalid_request") ) self.get_success(self.handler.handle_oidc_callback(request)) @@ -524,7 +526,7 @@ class OidcHandlerTestCase(HomeserverTestCase): return_value=FakeResponse(code=200, phrase=b"OK", body=token_json) ) code = "code" - ret = self.get_success(self.handler._exchange_code(code)) + ret = self.get_success(self.provider._exchange_code(code)) kwargs = self.http_client.request.call_args[1] self.assertEqual(ret, token) @@ -548,7 +550,7 @@ class OidcHandlerTestCase(HomeserverTestCase): ) from synapse.handlers.oidc_handler import OidcError - exc = self.get_failure(self.handler._exchange_code(code), OidcError) + exc = self.get_failure(self.provider._exchange_code(code), OidcError) self.assertEqual(exc.value.error, "foo") self.assertEqual(exc.value.error_description, "bar") @@ -558,7 +560,7 @@ class OidcHandlerTestCase(HomeserverTestCase): code=500, phrase=b"Internal Server Error", body=b"Not JSON", ) ) - exc = self.get_failure(self.handler._exchange_code(code), OidcError) + exc = self.get_failure(self.provider._exchange_code(code), OidcError) self.assertEqual(exc.value.error, "server_error") # Internal server error with JSON body @@ -570,14 +572,14 @@ class OidcHandlerTestCase(HomeserverTestCase): ) ) - exc = self.get_failure(self.handler._exchange_code(code), OidcError) + exc = self.get_failure(self.provider._exchange_code(code), OidcError) self.assertEqual(exc.value.error, "internal_server_error") # 4xx error without "error" field self.http_client.request = simple_async_mock( return_value=FakeResponse(code=400, phrase=b"Bad request", body=b"{}",) ) - exc = self.get_failure(self.handler._exchange_code(code), OidcError) + exc = self.get_failure(self.provider._exchange_code(code), OidcError) self.assertEqual(exc.value.error, "server_error") # 2xx error with "error" field @@ -586,7 +588,7 @@ class OidcHandlerTestCase(HomeserverTestCase): code=200, phrase=b"OK", body=b'{"error": "some_error"}', ) ) - exc = self.get_failure(self.handler._exchange_code(code), OidcError) + exc = self.get_failure(self.provider._exchange_code(code), OidcError) self.assertEqual(exc.value.error, "some_error") @override_config( @@ -612,8 +614,8 @@ class OidcHandlerTestCase(HomeserverTestCase): "username": "foo", "phone": "1234567", } - self.handler._exchange_code = simple_async_mock(return_value=token) - self.handler._parse_id_token = simple_async_mock(return_value=userinfo) + self.provider._exchange_code = simple_async_mock(return_value=token) + self.provider._parse_id_token = simple_async_mock(return_value=userinfo) auth_handler = self.hs.get_auth_handler() auth_handler.complete_sso_login = simple_async_mock() @@ -979,9 +981,10 @@ async def _make_callback_with_userinfo( from synapse.handlers.oidc_handler import OidcSessionData handler = hs.get_oidc_handler() - handler._exchange_code = simple_async_mock(return_value={}) - handler._parse_id_token = simple_async_mock(return_value=userinfo) - handler._fetch_userinfo = simple_async_mock(return_value=userinfo) + provider = handler._provider + provider._exchange_code = simple_async_mock(return_value={}) + provider._parse_id_token = simple_async_mock(return_value=userinfo) + provider._fetch_userinfo = simple_async_mock(return_value=userinfo) state = "state" session = handler._token_generator.generate_oidc_session_token( -- cgit 1.5.1