From 9267741a5f7732d7d16f8445edc68bc68b730601 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 30 Sep 2019 11:58:36 +0100 Subject: Fix `devices_last_seen` background update. Fixes #6134. --- synapse/storage/engines/postgres.py | 7 +++++++ synapse/storage/engines/sqlite.py | 8 ++++++++ 2 files changed, 15 insertions(+) (limited to 'synapse/storage/engines') diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 289b6bc281..601617b21e 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -72,6 +72,13 @@ class PostgresEngine(object): """ return True + @property + def supports_tuple_comparison(self): + """ + Do we support comparing tuples, i.e. `(a, b) > (c, d)`? + """ + return True + def is_deadlock(self, error): if isinstance(error, self.module.DatabaseError): # https://www.postgresql.org/docs/current/static/errcodes-appendix.html diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index e9b9caa49a..ac92109366 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -38,6 +38,14 @@ class Sqlite3Engine(object): """ return self.module.sqlite_version_info >= (3, 24, 0) + @property + def supports_tuple_comparison(self): + """ + Do we support comparing tuples, i.e. `(a, b) > (c, d)`? This requires + SQLite 3.15+. + """ + return self.module.sqlite_version_info >= (3, 15, 0) + def check_database(self, txn): pass -- cgit 1.5.1 From 1d3858371e9577faf3382d1feee97154e5085cd4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 8 Oct 2019 16:21:17 +0100 Subject: Disable bytes usage with postgres More often than not passing bytes to `txn.execute` is a bug (where we meant to pass a string) that just happens to work if `BYTEA_OUTPUT` is set to `ESCAPE`. However, this is a bit of a footgun so we want to instead error when this happens, and force using `bytearray` if we actually want to use bytes. --- synapse/storage/engines/postgres.py | 7 +++++++ synapse/storage/filtering.py | 4 ++-- synapse/storage/pusher.py | 2 +- 3 files changed, 10 insertions(+), 3 deletions(-) (limited to 'synapse/storage/engines') diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 601617b21e..d670286fa5 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -22,6 +22,13 @@ class PostgresEngine(object): def __init__(self, database_module, database_config): self.module = database_module self.module.extensions.register_type(self.module.extensions.UNICODE) + + # Disables passing `bytes` to txn.execute, c.f. #6186. If you do + # actually want to use bytes than wrap it in `bytearray`. + def _disable_bytes_adapter(_): + raise Exception("Passing bytes to DB is disabled.") + + self.module.extensions.register_adapter(bytes, _disable_bytes_adapter) self.synchronous_commit = database_config.get("synchronous_commit", True) self._version = None # unknown as yet diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py index 23b48f6cea..7c2a7da836 100644 --- a/synapse/storage/filtering.py +++ b/synapse/storage/filtering.py @@ -51,7 +51,7 @@ class FilteringStore(SQLBaseStore): "SELECT filter_id FROM user_filters " "WHERE user_id = ? AND filter_json = ?" ) - txn.execute(sql, (user_localpart, def_json)) + txn.execute(sql, (user_localpart, bytearray(def_json))) filter_id_response = txn.fetchone() if filter_id_response is not None: return filter_id_response[0] @@ -68,7 +68,7 @@ class FilteringStore(SQLBaseStore): "INSERT INTO user_filters (user_id, filter_id, filter_json)" "VALUES(?, ?, ?)" ) - txn.execute(sql, (user_localpart, filter_id, def_json)) + txn.execute(sql, (user_localpart, filter_id, bytearray(def_json))) return filter_id diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 3e0e834a62..b12e80440a 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -241,7 +241,7 @@ class PusherStore(PusherWorkerStore): "device_display_name": device_display_name, "ts": pushkey_ts, "lang": lang, - "data": encode_canonical_json(data), + "data": bytearray(encode_canonical_json(data)), "last_stream_ordering": last_stream_ordering, "profile_tag": profile_tag, "id": stream_id, -- cgit 1.5.1 From 3bc687508fa6c4cf82b5ddb22ce6f3674433d0ff Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 10 Oct 2019 15:35:46 +0100 Subject: Remove add_in_list_sql_clause --- synapse/storage/_base.py | 35 ++++++----------------------------- synapse/storage/engines/postgres.py | 6 ++++++ synapse/storage/engines/sqlite.py | 6 ++++++ synapse/storage/search.py | 12 +++++++----- 4 files changed, 25 insertions(+), 34 deletions(-) (limited to 'synapse/storage/engines') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 085b8ae871..6176838aa6 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -20,7 +20,7 @@ import random import sys import threading import time -from typing import Iterable, List, Tuple +from typing import Iterable, Tuple from six import PY2, iteritems, iterkeys, itervalues from six.moves import builtins, intern, range @@ -1164,10 +1164,8 @@ class SQLBaseStore(object): if not iterable: return [] - clauses = [] - values = [] - - add_in_list_sql_clause(txn.database_engine, column, iterable, clauses, values) + clause, values = make_in_list_sql_clause(txn.database_engine, column, iterable) + clauses = [clause] for key, value in iteritems(keyvalues): clauses.append("%s = ?" % (key,)) @@ -1326,10 +1324,8 @@ class SQLBaseStore(object): sql = "DELETE FROM %s" % table - clauses = [] - values = [] - - add_in_list_sql_clause(txn.database_engine, column, iterable, clauses, values) + clause, values = make_in_list_sql_clause(txn.database_engine, column, iterable) + clauses = [clause] for key, value in iteritems(keyvalues): clauses.append("%s = ?" % (key,)) @@ -1698,25 +1694,6 @@ def db_to_json(db_content): raise -def add_in_list_sql_clause( - database_engine, column: str, iterable: Iterable, clauses: List[str], args: List -): - """Adds an SQL clause to the given list of clauses/args that checks the - given column is in the iterable. c.f. `make_in_list_sql_clause` - - Args: - database_engine - column: Name of the column - iterable: The values to check the column against. - clauses: A list to add the expanded clause to - args: A list of arguments that we append the args to. - """ - - clause, new_args = make_in_list_sql_clause(database_engine, column, iterable) - clauses.append(clause) - args.extend(new_args) - - def make_in_list_sql_clause( database_engine, column: str, iterable: Iterable ) -> Tuple[str, Iterable]: @@ -1736,7 +1713,7 @@ def make_in_list_sql_clause( A tuple of SQL query and the args """ - if isinstance(database_engine, PostgresEngine): + if database_engine.supports_using_any_list: # This should hopefully be faster, but also makes postgres query # stats easier to understand. return "%s = ANY(?)" % (column,), [list(iterable)] diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 601617b21e..f36600b4bb 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -79,6 +79,12 @@ class PostgresEngine(object): """ return True + @property + def supports_using_any_list(self): + """Do we support using `a = ANY(?)` and passing a list + """ + return True + def is_deadlock(self, error): if isinstance(error, self.module.DatabaseError): # https://www.postgresql.org/docs/current/static/errcodes-appendix.html diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index ac92109366..2526258060 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -46,6 +46,12 @@ class Sqlite3Engine(object): """ return self.module.sqlite_version_info >= (3, 15, 0) + @property + def supports_any_list(self): + """Do we support using `a = ANY(?)` and passing a list + """ + return False + def check_database(self, txn): pass diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 4be6e56dfa..7695bf09fc 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -24,7 +24,7 @@ from canonicaljson import json from twisted.internet import defer from synapse.api.errors import SynapseError -from synapse.storage._base import add_in_list_sql_clause +from synapse.storage._base import make_in_list_sql_clause from synapse.storage.engines import PostgresEngine, Sqlite3Engine from .background_updates import BackgroundUpdateStore @@ -386,9 +386,10 @@ class SearchStore(SearchBackgroundUpdateStore): # Make sure we don't explode because the person is in too many rooms. # We filter the results below regardless. if len(room_ids) < 500: - add_in_list_sql_clause( - self.database_engine, "room_id", room_ids, clauses, args + clause, args = make_in_list_sql_clause( + self.database_engine, "room_id", room_ids ) + clauses = [clause] local_clauses = [] for key in keys: @@ -494,9 +495,10 @@ class SearchStore(SearchBackgroundUpdateStore): # Make sure we don't explode because the person is in too many rooms. # We filter the results below regardless. if len(room_ids) < 500: - add_in_list_sql_clause( - self.database_engine, "room_id", room_ids, clauses, args + clause, args = make_in_list_sql_clause( + self.database_engine, "room_id", room_ids ) + clauses = [clause] local_clauses = [] for key in keys: -- cgit 1.5.1 From afb6d9d53b417ff3b651767ab88bf63606e7225e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 10 Oct 2019 15:55:41 +0100 Subject: Fix SQLite --- synapse/storage/engines/sqlite.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage/engines') diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index 2526258060..ddad17dc5a 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -47,7 +47,7 @@ class Sqlite3Engine(object): return self.module.sqlite_version_info >= (3, 15, 0) @property - def supports_any_list(self): + def supports_using_any_list(self): """Do we support using `a = ANY(?)` and passing a list """ return False -- cgit 1.5.1 From 2284eb3a533a2df04784df08da28e67d6588a5ea Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 18 Dec 2019 10:45:12 +0000 Subject: Add database config class (#6513) This encapsulates config for a given database and is the way to get new connections. --- changelog.d/6513.misc | 1 + scripts-dev/update_database | 9 +-- scripts/synapse_port_db | 58 ++++++++----------- synapse/config/database.py | 78 ++++++++++++++++++++------ synapse/handlers/presence.py | 2 +- synapse/server.py | 41 ++------------ synapse/storage/_base.py | 2 +- synapse/storage/data_stores/__init__.py | 40 ++++++++++--- synapse/storage/data_stores/main/client_ips.py | 2 +- synapse/storage/database.py | 45 ++++++++++++++- synapse/storage/engines/sqlite.py | 16 +++++- synapse/storage/prepare_database.py | 7 +-- tests/handlers/test_typing.py | 39 ++++++------- tests/replication/slave/storage/_base.py | 6 +- tests/server.py | 55 +++++++++--------- tests/storage/test_appservice.py | 37 ++++++++---- tests/storage/test_base.py | 14 +++-- tests/storage/test_registration.py | 1 - tests/utils.py | 43 +++++--------- 19 files changed, 287 insertions(+), 209 deletions(-) create mode 100644 changelog.d/6513.misc (limited to 'synapse/storage/engines') diff --git a/changelog.d/6513.misc b/changelog.d/6513.misc new file mode 100644 index 0000000000..36700f5657 --- /dev/null +++ b/changelog.d/6513.misc @@ -0,0 +1 @@ +Remove all assumptions of there being a single phyiscal DB apart from the `synapse.config`. diff --git a/scripts-dev/update_database b/scripts-dev/update_database index 23017c21f8..1d62f0403a 100755 --- a/scripts-dev/update_database +++ b/scripts-dev/update_database @@ -26,7 +26,6 @@ from synapse.config.homeserver import HomeServerConfig from synapse.metrics.background_process_metrics import run_as_background_process from synapse.server import HomeServer from synapse.storage import DataStore -from synapse.storage.prepare_database import prepare_database logger = logging.getLogger("update_database") @@ -77,12 +76,8 @@ if __name__ == "__main__": # Instantiate and initialise the homeserver object. hs = MockHomeserver(config) - db_conn = hs.get_db_conn() - # Update the database to the latest schema. - prepare_database(db_conn, hs.database_engine, config=config) - db_conn.commit() - - # setup instantiates the store within the homeserver object. + # Setup instantiates the store within the homeserver object and updates the + # DB. hs.setup() store = hs.get_datastore() diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index e393a9b2f7..5b5368988c 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -30,6 +30,7 @@ import yaml from twisted.enterprise import adbapi from twisted.internet import defer, reactor +from synapse.config.database import DatabaseConnectionConfig from synapse.config.homeserver import HomeServerConfig from synapse.logging.context import PreserveLoggingContext from synapse.storage._base import LoggingTransaction @@ -55,7 +56,7 @@ from synapse.storage.data_stores.main.stats import StatsStore from synapse.storage.data_stores.main.user_directory import ( UserDirectoryBackgroundUpdateStore, ) -from synapse.storage.database import Database +from synapse.storage.database import Database, make_conn from synapse.storage.engines import create_engine from synapse.storage.prepare_database import prepare_database from synapse.util import Clock @@ -165,23 +166,17 @@ class Store( class MockHomeserver: - def __init__(self, config, database_engine, db_conn, db_pool): - self.database_engine = database_engine - self.db_conn = db_conn - self.db_pool = db_pool + def __init__(self, config): self.clock = Clock(reactor) self.config = config self.hostname = config.server_name - def get_db_conn(self): - return self.db_conn - - def get_db_pool(self): - return self.db_pool - def get_clock(self): return self.clock + def get_reactor(self): + return reactor + class Porter(object): def __init__(self, **kwargs): @@ -445,45 +440,36 @@ class Porter(object): else: return - def setup_db(self, db_config, database_engine): - db_conn = database_engine.module.connect( - **{ - k: v - for k, v in db_config.get("args", {}).items() - if not k.startswith("cp_") - } - ) - - prepare_database(db_conn, database_engine, config=None) + def setup_db(self, db_config: DatabaseConnectionConfig, engine): + db_conn = make_conn(db_config, engine) + prepare_database(db_conn, engine, config=None) db_conn.commit() return db_conn @defer.inlineCallbacks - def build_db_store(self, config): + def build_db_store(self, db_config: DatabaseConnectionConfig): """Builds and returns a database store using the provided configuration. Args: - config: The database configuration, i.e. a dict following the structure of - the "database" section of Synapse's configuration file. + config: The database configuration Returns: The built Store object. """ - engine = create_engine(config) - - self.progress.set_state("Preparing %s" % config["name"]) - conn = self.setup_db(config, engine) + self.progress.set_state("Preparing %s" % db_config.config["name"]) - db_pool = adbapi.ConnectionPool(config["name"], **config["args"]) + engine = create_engine(db_config.config) + conn = self.setup_db(db_config, engine) - hs = MockHomeserver(self.hs_config, engine, conn, db_pool) + hs = MockHomeserver(self.hs_config) - store = Store(Database(hs), conn, hs) + store = Store(Database(hs, db_config, engine), conn, hs) yield store.db.runInteraction( - "%s_engine.check_database" % config["name"], engine.check_database, + "%s_engine.check_database" % db_config.config["name"], + engine.check_database, ) return store @@ -509,7 +495,11 @@ class Porter(object): @defer.inlineCallbacks def run(self): try: - self.sqlite_store = yield self.build_db_store(self.sqlite_config) + self.sqlite_store = yield self.build_db_store( + DatabaseConnectionConfig( + "master", self.sqlite_config, data_stores=["main"] + ) + ) # Check if all background updates are done, abort if not. updates_complete = ( @@ -524,7 +514,7 @@ class Porter(object): defer.returnValue(None) self.postgres_store = yield self.build_db_store( - self.hs_config.database_config + self.hs_config.get_single_database() ) yield self.run_background_updates_on_postgres() diff --git a/synapse/config/database.py b/synapse/config/database.py index 0e2509f0b1..5f2f3c7cfd 100644 --- a/synapse/config/database.py +++ b/synapse/config/database.py @@ -12,12 +12,43 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging import os from textwrap import indent +from typing import List import yaml -from ._base import Config +from synapse.config._base import Config, ConfigError + +logger = logging.getLogger(__name__) + + +class DatabaseConnectionConfig: + """Contains the connection config for a particular database. + + Args: + name: A label for the database, used for logging. + db_config: The config for a particular database, as per `database` + section of main config. Has two fields: `name` for database + module name, and `args` for the args to give to the database + connector. + data_stores: The list of data stores that should be provisioned on the + database. + """ + + def __init__(self, name: str, db_config: dict, data_stores: List[str]): + if db_config["name"] not in ("sqlite3", "psycopg2"): + raise ConfigError("Unsupported database type %r" % (db_config["name"],)) + + if db_config["name"] == "sqlite3": + db_config.setdefault("args", {}).update( + {"cp_min": 1, "cp_max": 1, "check_same_thread": False} + ) + + self.name = name + self.config = db_config + self.data_stores = data_stores class DatabaseConfig(Config): @@ -26,20 +57,14 @@ class DatabaseConfig(Config): def read_config(self, config, **kwargs): self.event_cache_size = self.parse_size(config.get("event_cache_size", "10K")) - self.database_config = config.get("database") + database_config = config.get("database") - if self.database_config is None: - self.database_config = {"name": "sqlite3", "args": {}} + if database_config is None: + database_config = {"name": "sqlite3", "args": {}} - name = self.database_config.get("name", None) - if name == "psycopg2": - pass - elif name == "sqlite3": - self.database_config.setdefault("args", {}).update( - {"cp_min": 1, "cp_max": 1, "check_same_thread": False} - ) - else: - raise RuntimeError("Unsupported database type '%s'" % (name,)) + self.databases = [ + DatabaseConnectionConfig("master", database_config, data_stores=["main"]) + ] self.set_databasepath(config.get("database_path")) @@ -76,11 +101,24 @@ class DatabaseConfig(Config): self.set_databasepath(args.database_path) def set_databasepath(self, database_path): + if database_path is None: + return + if database_path != ":memory:": database_path = self.abspath(database_path) - if self.database_config.get("name", None) == "sqlite3": - if database_path is not None: - self.database_config["args"]["database"] = database_path + + # We only support setting a database path if we have a single sqlite3 + # database. + if len(self.databases) != 1: + raise ConfigError("Cannot specify 'database_path' with multiple databases") + + database = self.get_single_database() + if database.config["name"] != "sqlite3": + # We don't raise here as we haven't done so before for this case. + logger.warn("Ignoring 'database_path' for non-sqlite3 database") + return + + database.config["args"]["database"] = database_path @staticmethod def add_arguments(parser): @@ -91,3 +129,11 @@ class DatabaseConfig(Config): metavar="SQLITE_DATABASE_PATH", help="The path to a sqlite database to use.", ) + + def get_single_database(self) -> DatabaseConnectionConfig: + """Returns the database if there is only one, useful for e.g. tests + """ + if len(self.databases) != 1: + raise Exception("More than one database exists") + + return self.databases[0] diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index eda15bc623..240c4add12 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -230,7 +230,7 @@ class PresenceHandler(object): is some spurious presence changes that will self-correct. """ # If the DB pool has already terminated, don't try updating - if not self.hs.get_db_pool().running: + if not self.store.database.is_running(): return logger.info( diff --git a/synapse/server.py b/synapse/server.py index 5021068ce0..7926867b77 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -25,7 +25,6 @@ import abc import logging import os -from twisted.enterprise import adbapi from twisted.mail.smtp import sendmail from twisted.web.client import BrowserLikePolicyForHTTPS @@ -98,7 +97,6 @@ from synapse.server_notices.worker_server_notices_sender import ( ) from synapse.state import StateHandler, StateResolutionHandler from synapse.storage import DataStores, Storage -from synapse.storage.engines import create_engine from synapse.streams.events import EventSources from synapse.util import Clock from synapse.util.distributor import Distributor @@ -134,7 +132,6 @@ class HomeServer(object): DEPENDENCIES = [ "http_client", - "db_pool", "federation_client", "federation_server", "handlers", @@ -233,12 +230,6 @@ class HomeServer(object): self.admin_redaction_ratelimiter = Ratelimiter() self.registration_ratelimiter = Ratelimiter() - self.database_engine = create_engine(config.database_config) - config.database_config.setdefault("args", {})[ - "cp_openfun" - ] = self.database_engine.on_new_connection - self.db_config = config.database_config - self.datastores = None # Other kwargs are explicit dependencies @@ -247,10 +238,8 @@ class HomeServer(object): def setup(self): logger.info("Setting up.") - with self.get_db_conn() as conn: - self.datastores = DataStores(self.DATASTORE_CLASS, conn, self) - conn.commit() self.start_time = int(self.get_clock().time()) + self.datastores = DataStores(self.DATASTORE_CLASS, self) logger.info("Finished setting up.") def setup_master(self): @@ -284,6 +273,9 @@ class HomeServer(object): def get_datastore(self): return self.datastores.main + def get_datastores(self): + return self.datastores + def get_config(self): return self.config @@ -433,31 +425,6 @@ class HomeServer(object): ) return MatrixFederationHttpClient(self, tls_client_options_factory) - def build_db_pool(self): - name = self.db_config["name"] - - return adbapi.ConnectionPool( - name, cp_reactor=self.get_reactor(), **self.db_config.get("args", {}) - ) - - def get_db_conn(self, run_new_connection=True): - """Makes a new connection to the database, skipping the db pool - - Returns: - Connection: a connection object implementing the PEP-249 spec - """ - # Any param beginning with cp_ is a parameter for adbapi, and should - # not be passed to the database engine. - db_params = { - k: v - for k, v in self.db_config.get("args", {}).items() - if not k.startswith("cp_") - } - db_conn = self.database_engine.module.connect(**db_params) - if run_new_connection: - self.database_engine.on_new_connection(db_conn) - return db_conn - def build_media_repository_resource(self): # build the media repo resource. This indirects through the HomeServer # to ensure that we only have a single instance of diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index b7637b5dc0..88546ad614 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -40,7 +40,7 @@ class SQLBaseStore(object): def __init__(self, database: Database, db_conn, hs): self.hs = hs self._clock = hs.get_clock() - self.database_engine = hs.database_engine + self.database_engine = database.engine self.db = database self.rand = random.SystemRandom() diff --git a/synapse/storage/data_stores/__init__.py b/synapse/storage/data_stores/__init__.py index cafedd5c0d..0983e059c0 100644 --- a/synapse/storage/data_stores/__init__.py +++ b/synapse/storage/data_stores/__init__.py @@ -13,24 +13,50 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.storage.database import Database +import logging + +from synapse.storage.database import Database, make_conn +from synapse.storage.engines import create_engine from synapse.storage.prepare_database import prepare_database +logger = logging.getLogger(__name__) + class DataStores(object): """The various data stores. These are low level interfaces to physical databases. + + Attributes: + main (DataStore) """ - def __init__(self, main_store_class, db_conn, hs): + def __init__(self, main_store_class, hs): # Note we pass in the main store class here as workers use a different main # store. - database = Database(hs) - # Check that db is correctly configured. - database.engine.check_database(db_conn.cursor()) + self.databases = [] + + for database_config in hs.config.database.databases: + db_name = database_config.name + engine = create_engine(database_config.config) + + with make_conn(database_config, engine) as db_conn: + logger.info("Preparing database %r...", db_name) + + engine.check_database(db_conn.cursor()) + prepare_database( + db_conn, engine, hs.config, data_stores=database_config.data_stores, + ) + + database = Database(hs, database_config, engine) + + if "main" in database_config.data_stores: + logger.info("Starting 'main' data store") + self.main = main_store_class(database, db_conn, hs) + + db_conn.commit() - prepare_database(db_conn, database.engine, config=hs.config) + self.databases.append(database) - self.main = main_store_class(database, db_conn, hs) + logger.info("Database %r prepared", db_name) diff --git a/synapse/storage/data_stores/main/client_ips.py b/synapse/storage/data_stores/main/client_ips.py index add3037b69..13f4c9c72e 100644 --- a/synapse/storage/data_stores/main/client_ips.py +++ b/synapse/storage/data_stores/main/client_ips.py @@ -412,7 +412,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore): def _update_client_ips_batch(self): # If the DB pool has already terminated, don't try updating - if not self.hs.get_db_pool().running: + if not self.db.is_running(): return to_update = self._batch_row_update diff --git a/synapse/storage/database.py b/synapse/storage/database.py index ec19ae1d9d..1003dd84a5 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -24,9 +24,11 @@ from six.moves import intern, range from prometheus_client import Histogram +from twisted.enterprise import adbapi from twisted.internet import defer from synapse.api.errors import StoreError +from synapse.config.database import DatabaseConnectionConfig from synapse.logging.context import LoggingContext, make_deferred_yieldable from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.background_updates import BackgroundUpdater @@ -74,6 +76,37 @@ UNIQUE_INDEX_BACKGROUND_UPDATES = { } +def make_pool( + reactor, db_config: DatabaseConnectionConfig, engine +) -> adbapi.ConnectionPool: + """Get the connection pool for the database. + """ + + return adbapi.ConnectionPool( + db_config.config["name"], + cp_reactor=reactor, + cp_openfun=engine.on_new_connection, + **db_config.config.get("args", {}) + ) + + +def make_conn(db_config: DatabaseConnectionConfig, engine): + """Make a new connection to the database and return it. + + Returns: + Connection + """ + + db_params = { + k: v + for k, v in db_config.config.get("args", {}).items() + if not k.startswith("cp_") + } + db_conn = engine.module.connect(**db_params) + engine.on_new_connection(db_conn) + return db_conn + + class LoggingTransaction(object): """An object that almost-transparently proxies for the 'txn' object passed to the constructor. Adds logging and metrics to the .execute() @@ -218,10 +251,11 @@ class Database(object): _TXN_ID = 0 - def __init__(self, hs): + def __init__(self, hs, database_config: DatabaseConnectionConfig, engine): self.hs = hs self._clock = hs.get_clock() - self._db_pool = hs.get_db_pool() + self._database_config = database_config + self._db_pool = make_pool(hs.get_reactor(), database_config, engine) self.updates = BackgroundUpdater(hs, self) @@ -234,7 +268,7 @@ class Database(object): # to watch it self._txn_perf_counters = PerformanceCounters() - self.engine = hs.database_engine + self.engine = engine # A set of tables that are not safe to use native upserts in. self._unsafe_to_upsert_tables = set(UNIQUE_INDEX_BACKGROUND_UPDATES.keys()) @@ -255,6 +289,11 @@ class Database(object): self._check_safe_to_upsert, ) + def is_running(self): + """Is the database pool currently running + """ + return self._db_pool.running + @defer.inlineCallbacks def _check_safe_to_upsert(self): """ diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index ddad17dc5a..df039a072d 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -16,8 +16,6 @@ import struct import threading -from synapse.storage.prepare_database import prepare_database - class Sqlite3Engine(object): single_threaded = True @@ -25,6 +23,9 @@ class Sqlite3Engine(object): def __init__(self, database_module, database_config): self.module = database_module + database = database_config.get("args", {}).get("database") + self._is_in_memory = database in (None, ":memory:",) + # The current max state_group, or None if we haven't looked # in the DB yet. self._current_state_group_id = None @@ -59,7 +60,16 @@ class Sqlite3Engine(object): return sql def on_new_connection(self, db_conn): - prepare_database(db_conn, self, config=None) + + # We need to import here to avoid an import loop. + from synapse.storage.prepare_database import prepare_database + + if self._is_in_memory: + # In memory databases need to be rebuilt each time. Ideally we'd + # reuse the same connection as we do when starting up, but that + # would involve using adbapi before we have started the reactor. + prepare_database(db_conn, self, config=None) + db_conn.create_function("rank", 1, _rank) def is_deadlock(self, error): diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 731e1c9d9c..b4194b44ee 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -41,7 +41,7 @@ class UpgradeDatabaseException(PrepareDatabaseException): pass -def prepare_database(db_conn, database_engine, config): +def prepare_database(db_conn, database_engine, config, data_stores=["main"]): """Prepares a database for usage. Will either create all necessary tables or upgrade from an older schema version. @@ -54,11 +54,10 @@ def prepare_database(db_conn, database_engine, config): config (synapse.config.homeserver.HomeServerConfig|None): application config, or None if we are connecting to an existing database which we expect to be configured already + data_stores (list[str]): The name of the data stores that will be used + with this database. Defaults to all data stores. """ - # For now we only have the one datastore. - data_stores = ["main"] - try: cur = db_conn.cursor() version_info = _get_or_create_schema_state(cur, database_engine) diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 92b8726093..596ddc6970 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -64,28 +64,29 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): mock_federation_client = Mock(spec=["put_json"]) mock_federation_client.put_json.return_value = defer.succeed((200, "OK")) + datastores = Mock() + datastores.main = Mock( + spec=[ + # Bits that Federation needs + "prep_send_transaction", + "delivered_txn", + "get_received_txn_response", + "set_received_txn_response", + "get_destination_retry_timings", + "get_devices_by_remote", + # Bits that user_directory needs + "get_user_directory_stream_pos", + "get_current_state_deltas", + "get_device_updates_by_remote", + ] + ) + hs = self.setup_test_homeserver( - datastore=( - Mock( - spec=[ - # Bits that Federation needs - "prep_send_transaction", - "delivered_txn", - "get_received_txn_response", - "set_received_txn_response", - "get_destination_retry_timings", - "get_device_updates_by_remote", - # Bits that user_directory needs - "get_user_directory_stream_pos", - "get_current_state_deltas", - ] - ) - ), - notifier=Mock(), - http_client=mock_federation_client, - keyring=mock_keyring, + notifier=Mock(), http_client=mock_federation_client, keyring=mock_keyring ) + hs.datastores = datastores + return hs def prepare(self, reactor, clock, hs): diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/slave/storage/_base.py index 3dae83c543..2a1e7c7166 100644 --- a/tests/replication/slave/storage/_base.py +++ b/tests/replication/slave/storage/_base.py @@ -20,7 +20,7 @@ from synapse.replication.tcp.client import ( ReplicationClientHandler, ) from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory -from synapse.storage.database import Database +from synapse.storage.database import make_conn from tests import unittest from tests.server import FakeTransport @@ -41,10 +41,12 @@ class BaseSlavedStoreTestCase(unittest.HomeserverTestCase): def prepare(self, reactor, clock, hs): + db_config = hs.config.database.get_single_database() self.master_store = self.hs.get_datastore() self.storage = hs.get_storage() + database = hs.get_datastores().databases[0] self.slaved_store = self.STORE_TYPE( - Database(hs), self.hs.get_db_conn(), self.hs + database, make_conn(db_config, database.engine), self.hs ) self.event_id = 0 diff --git a/tests/server.py b/tests/server.py index 2b7cf4242e..a554dfdd57 100644 --- a/tests/server.py +++ b/tests/server.py @@ -302,41 +302,42 @@ def setup_test_homeserver(cleanup_func, *args, **kwargs): Set up a synchronous test server, driven by the reactor used by the homeserver. """ - d = _sth(cleanup_func, *args, **kwargs).result + server = _sth(cleanup_func, *args, **kwargs) - if isinstance(d, Failure): - d.raiseException() + database = server.config.database.get_single_database() # Make the thread pool synchronous. - clock = d.get_clock() - pool = d.get_db_pool() - - def runWithConnection(func, *args, **kwargs): - return threads.deferToThreadPool( - pool._reactor, - pool.threadpool, - pool._runWithConnection, - func, - *args, - **kwargs - ) - - def runInteraction(interaction, *args, **kwargs): - return threads.deferToThreadPool( - pool._reactor, - pool.threadpool, - pool._runInteraction, - interaction, - *args, - **kwargs - ) + clock = server.get_clock() + + for database in server.get_datastores().databases: + pool = database._db_pool + + def runWithConnection(func, *args, **kwargs): + return threads.deferToThreadPool( + pool._reactor, + pool.threadpool, + pool._runWithConnection, + func, + *args, + **kwargs + ) + + def runInteraction(interaction, *args, **kwargs): + return threads.deferToThreadPool( + pool._reactor, + pool.threadpool, + pool._runInteraction, + interaction, + *args, + **kwargs + ) - if pool: pool.runWithConnection = runWithConnection pool.runInteraction = runInteraction pool.threadpool = ThreadPool(clock._reactor) pool.running = True - return d + + return server def get_clock(): diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index 2e521e9ab7..fd52512696 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -28,7 +28,7 @@ from synapse.storage.data_stores.main.appservice import ( ApplicationServiceStore, ApplicationServiceTransactionStore, ) -from synapse.storage.database import Database +from synapse.storage.database import Database, make_conn from tests import unittest from tests.utils import setup_test_homeserver @@ -55,8 +55,10 @@ class ApplicationServiceStoreTestCase(unittest.TestCase): self._add_appservice("token2", "as2", "some_url", "some_hs_token", "bob") self._add_appservice("token3", "as3", "some_url", "some_hs_token", "bob") # must be done after inserts - database = Database(hs) - self.store = ApplicationServiceStore(database, hs.get_db_conn(), hs) + database = hs.get_datastores().databases[0] + self.store = ApplicationServiceStore( + database, make_conn(database._database_config, database.engine), hs + ) def tearDown(self): # TODO: suboptimal that we need to create files for tests! @@ -111,9 +113,6 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): hs.config.event_cache_size = 1 hs.config.password_providers = [] - self.db_pool = hs.get_db_pool() - self.engine = hs.database_engine - self.as_list = [ {"token": "token1", "url": "https://matrix-as.org", "id": "id_1"}, {"token": "alpha_tok", "url": "https://alpha.com", "id": "id_alpha"}, @@ -125,8 +124,15 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): self.as_yaml_files = [] - database = Database(hs) - self.store = TestTransactionStore(database, hs.get_db_conn(), hs) + # We assume there is only one database in these tests + database = hs.get_datastores().databases[0] + self.db_pool = database._db_pool + self.engine = database.engine + + db_config = hs.config.get_single_database() + self.store = TestTransactionStore( + database, make_conn(db_config, self.engine), hs + ) def _add_service(self, url, as_token, id): as_yaml = dict( @@ -419,7 +425,10 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase): hs.config.event_cache_size = 1 hs.config.password_providers = [] - ApplicationServiceStore(Database(hs), hs.get_db_conn(), hs) + database = hs.get_datastores().databases[0] + ApplicationServiceStore( + database, make_conn(database._database_config, database.engine), hs + ) @defer.inlineCallbacks def test_duplicate_ids(self): @@ -435,7 +444,10 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase): hs.config.password_providers = [] with self.assertRaises(ConfigError) as cm: - ApplicationServiceStore(Database(hs), hs.get_db_conn(), hs) + database = hs.get_datastores().databases[0] + ApplicationServiceStore( + database, make_conn(database._database_config, database.engine), hs + ) e = cm.exception self.assertIn(f1, str(e)) @@ -456,7 +468,10 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase): hs.config.password_providers = [] with self.assertRaises(ConfigError) as cm: - ApplicationServiceStore(Database(hs), hs.get_db_conn(), hs) + database = hs.get_datastores().databases[0] + ApplicationServiceStore( + database, make_conn(database._database_config, database.engine), hs + ) e = cm.exception self.assertIn(f1, str(e)) diff --git a/tests/storage/test_base.py b/tests/storage/test_base.py index 537cfe9f64..cdee0a9e60 100644 --- a/tests/storage/test_base.py +++ b/tests/storage/test_base.py @@ -52,15 +52,17 @@ class SQLBaseStoreTestCase(unittest.TestCase): config = Mock() config._disable_native_upserts = True config.event_cache_size = 1 - config.database_config = {"name": "sqlite3"} - engine = create_engine(config.database_config) + hs = TestHomeServer("test", config=config) + + sqlite_config = {"name": "sqlite3"} + engine = create_engine(sqlite_config) fake_engine = Mock(wraps=engine) fake_engine.can_native_upsert = False - hs = TestHomeServer( - "test", db_pool=self.db_pool, config=config, database_engine=fake_engine - ) - self.datastore = SQLBaseStore(Database(hs), None, hs) + db = Database(Mock(), Mock(config=sqlite_config), fake_engine) + db._db_pool = self.db_pool + + self.datastore = SQLBaseStore(db, None, hs) @defer.inlineCallbacks def test_insert_1col(self): diff --git a/tests/storage/test_registration.py b/tests/storage/test_registration.py index 4578cc3b60..ed5786865a 100644 --- a/tests/storage/test_registration.py +++ b/tests/storage/test_registration.py @@ -26,7 +26,6 @@ class RegistrationStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def setUp(self): hs = yield setup_test_homeserver(self.addCleanup) - self.db_pool = hs.get_db_pool() self.store = hs.get_datastore() diff --git a/tests/utils.py b/tests/utils.py index 585f305b9a..9f5bf40b4b 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -30,6 +30,7 @@ from twisted.internet import defer, reactor from synapse.api.constants import EventTypes from synapse.api.errors import CodeMessageException, cs_error from synapse.api.room_versions import RoomVersions +from synapse.config.database import DatabaseConnectionConfig from synapse.config.homeserver import HomeServerConfig from synapse.config.server import DEFAULT_ROOM_VERSION from synapse.federation.transport import server as federation_server @@ -177,7 +178,6 @@ class TestHomeServer(HomeServer): DATASTORE_CLASS = DataStore -@defer.inlineCallbacks def setup_test_homeserver( cleanup_func, name="test", @@ -214,7 +214,7 @@ def setup_test_homeserver( if USE_POSTGRES_FOR_TESTS: test_db = "synapse_test_%s" % uuid.uuid4().hex - config.database_config = { + database_config = { "name": "psycopg2", "args": { "database": test_db, @@ -226,12 +226,15 @@ def setup_test_homeserver( }, } else: - config.database_config = { + database_config = { "name": "sqlite3", "args": {"database": ":memory:", "cp_min": 1, "cp_max": 1}, } - db_engine = create_engine(config.database_config) + database = DatabaseConnectionConfig("master", database_config, ["main"]) + config.database.databases = [database] + + db_engine = create_engine(database.config) # Create the database before we actually try and connect to it, based off # the template database we generate in setupdb() @@ -251,11 +254,6 @@ def setup_test_homeserver( cur.close() db_conn.close() - # we need to configure the connection pool to run the on_new_connection - # function, so that we can test code that uses custom sqlite functions - # (like rank). - config.database_config["args"]["cp_openfun"] = db_engine.on_new_connection - if datastore is None: hs = homeserverToUse( name, @@ -267,21 +265,19 @@ def setup_test_homeserver( **kargs ) - # Prepare the DB on SQLite -- PostgreSQL is a copy of an already up to - # date db - if not isinstance(db_engine, PostgresEngine): - db_conn = hs.get_db_conn() - yield prepare_database(db_conn, db_engine, config) - db_conn.commit() - db_conn.close() + hs.setup() + if homeserverToUse.__name__ == "TestHomeServer": + hs.setup_master() + + if isinstance(db_engine, PostgresEngine): + database = hs.get_datastores().databases[0] - else: # We need to do cleanup on PostgreSQL def cleanup(): import psycopg2 # Close all the db pools - hs.get_db_pool().close() + database._db_pool.close() dropped = False @@ -320,23 +316,12 @@ def setup_test_homeserver( # Register the cleanup hook cleanup_func(cleanup) - hs.setup() - if homeserverToUse.__name__ == "TestHomeServer": - hs.setup_master() else: - # If we have been given an explicit datastore we probably want to mock - # out the DataStores somehow too. This all feels a bit wrong, but then - # mocking the stores feels wrong too. - datastores = Mock(datastore=datastore) - hs = homeserverToUse( name, - db_pool=None, datastore=datastore, - datastores=datastores, config=config, version_string="Synapse/tests", - database_engine=db_engine, tls_server_context_factory=Mock(), tls_client_options_factory=Mock(), reactor=reactor, -- cgit 1.5.1 From e97d1cf0014668b9d4883d4175b783088444b24b Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 9 Jan 2020 17:21:30 +0000 Subject: Modify check_database to take a connection rather than a cursor We might not need the cursor at all. --- scripts/synapse_port_db | 25 +++++++------------------ synapse/storage/data_stores/__init__.py | 2 +- synapse/storage/engines/postgres.py | 17 +++++++++-------- synapse/storage/engines/sqlite.py | 2 +- 4 files changed, 18 insertions(+), 28 deletions(-) (limited to 'synapse/storage/engines') diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index cb77314f1e..a3dafaffc9 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -447,15 +447,6 @@ class Porter(object): else: return - def setup_db(self, db_config: DatabaseConnectionConfig, engine): - db_conn = make_conn(db_config, engine) - prepare_database(db_conn, engine, config=None) - - db_conn.commit() - - return db_conn - - @defer.inlineCallbacks def build_db_store(self, db_config: DatabaseConnectionConfig): """Builds and returns a database store using the provided configuration. @@ -468,16 +459,14 @@ class Porter(object): self.progress.set_state("Preparing %s" % db_config.config["name"]) engine = create_engine(db_config.config) - conn = self.setup_db(db_config, engine) hs = MockHomeserver(self.hs_config) - store = Store(Database(hs, db_config, engine), conn, hs) - - yield store.db.runInteraction( - "%s_engine.check_database" % db_config.config["name"], - engine.check_database, - ) + with make_conn(db_config, engine) as db_conn: + engine.check_database(db_conn) + prepare_database(db_conn, engine, config=None) + store = Store(Database(hs, db_config, engine), db_conn, hs) + db_conn.commit() return store @@ -502,7 +491,7 @@ class Porter(object): @defer.inlineCallbacks def run(self): try: - self.sqlite_store = yield self.build_db_store( + self.sqlite_store = self.build_db_store( DatabaseConnectionConfig("master-sqlite", self.sqlite_config) ) @@ -518,7 +507,7 @@ class Porter(object): ) defer.returnValue(None) - self.postgres_store = yield self.build_db_store( + self.postgres_store = self.build_db_store( self.hs_config.get_single_database() ) diff --git a/synapse/storage/data_stores/__init__.py b/synapse/storage/data_stores/__init__.py index 092e803799..e1d03429ca 100644 --- a/synapse/storage/data_stores/__init__.py +++ b/synapse/storage/data_stores/__init__.py @@ -47,7 +47,7 @@ class DataStores(object): with make_conn(database_config, engine) as db_conn: logger.info("Preparing database %r...", db_name) - engine.check_database(db_conn.cursor()) + engine.check_database(db_conn) prepare_database( db_conn, engine, hs.config, data_stores=database_config.data_stores, ) diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index b7c4eda338..ba19785fd7 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -32,14 +32,15 @@ class PostgresEngine(object): self.synchronous_commit = database_config.get("synchronous_commit", True) self._version = None # unknown as yet - def check_database(self, txn): - txn.execute("SHOW SERVER_ENCODING") - rows = txn.fetchall() - if rows and rows[0][0] != "UTF8": - raise IncorrectDatabaseSetup( - "Database has incorrect encoding: '%s' instead of 'UTF8'\n" - "See docs/postgres.rst for more information." % (rows[0][0],) - ) + def check_database(self, db_conn): + with db_conn.cursor() as txn: + txn.execute("SHOW SERVER_ENCODING") + rows = txn.fetchall() + if rows and rows[0][0] != "UTF8": + raise IncorrectDatabaseSetup( + "Database has incorrect encoding: '%s' instead of 'UTF8'\n" + "See docs/postgres.rst for more information." % (rows[0][0],) + ) def convert_param_style(self, sql): return sql.replace("?", "%s") diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index df039a072d..3b3c13360b 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -53,7 +53,7 @@ class Sqlite3Engine(object): """ return False - def check_database(self, txn): + def check_database(self, db_conn): pass def convert_param_style(self, sql): -- cgit 1.5.1 From e48ba84e0bfe081814941b74e610ddcd168a3ce8 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 9 Jan 2020 17:33:41 +0000 Subject: Check postgres version in check_database this saves doing it on each connection, and will allow us to pass extra options in. --- synapse/storage/engines/postgres.py | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) (limited to 'synapse/storage/engines') diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index ba19785fd7..2a285e018c 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -33,6 +33,16 @@ class PostgresEngine(object): self._version = None # unknown as yet def check_database(self, db_conn): + # Get the version of PostgreSQL that we're using. As per the psycopg2 + # docs: The number is formed by converting the major, minor, and + # revision numbers into two-decimal-digit numbers and appending them + # together. For example, version 8.1.5 will be returned as 80105 + self._version = db_conn.server_version + + # Are we on a supported PostgreSQL version? + if self._version < 90500: + raise RuntimeError("Synapse requires PostgreSQL 9.5+ or above.") + with db_conn.cursor() as txn: txn.execute("SHOW SERVER_ENCODING") rows = txn.fetchall() @@ -46,17 +56,6 @@ class PostgresEngine(object): return sql.replace("?", "%s") def on_new_connection(self, db_conn): - - # Get the version of PostgreSQL that we're using. As per the psycopg2 - # docs: The number is formed by converting the major, minor, and - # revision numbers into two-decimal-digit numbers and appending them - # together. For example, version 8.1.5 will be returned as 80105 - self._version = db_conn.server_version - - # Are we on a supported PostgreSQL version? - if self._version < 90500: - raise RuntimeError("Synapse requires PostgreSQL 9.5+ or above.") - db_conn.set_isolation_level( self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ ) @@ -120,8 +119,8 @@ class PostgresEngine(object): Returns: string """ - # note that this is a bit of a hack because it relies on on_new_connection - # having been called at least once. Still, that should be a safe bet here. + # note that this is a bit of a hack because it relies on check_database + # having been called. Still, that should be a safe bet here. numver = self._version assert numver is not None -- cgit 1.5.1 From bf468211805900e767b6b07a2bfa6046f70efb7a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 9 Jan 2020 17:46:52 +0000 Subject: Refuse to start if sqlite is older than 3.11.0 --- scripts/synapse_port_db | 16 ++++++++++++---- synapse/storage/engines/postgres.py | 4 ++-- synapse/storage/engines/sqlite.py | 7 +++++-- 3 files changed, 19 insertions(+), 8 deletions(-) (limited to 'synapse/storage/engines') diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index a3dafaffc9..f135c8bc54 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -447,11 +447,15 @@ class Porter(object): else: return - def build_db_store(self, db_config: DatabaseConnectionConfig): + def build_db_store( + self, db_config: DatabaseConnectionConfig, allow_outdated_version: bool = False, + ): """Builds and returns a database store using the provided configuration. Args: - config: The database configuration + db_config: The database configuration + allow_outdated_version: True to suppress errors about the database server + version being too old to run a complete synapse Returns: The built Store object. @@ -463,7 +467,9 @@ class Porter(object): hs = MockHomeserver(self.hs_config) with make_conn(db_config, engine) as db_conn: - engine.check_database(db_conn) + engine.check_database( + db_conn, allow_outdated_version=allow_outdated_version + ) prepare_database(db_conn, engine, config=None) store = Store(Database(hs, db_config, engine), db_conn, hs) db_conn.commit() @@ -491,8 +497,10 @@ class Porter(object): @defer.inlineCallbacks def run(self): try: + # we allow people to port away from outdated versions of sqlite. self.sqlite_store = self.build_db_store( - DatabaseConnectionConfig("master-sqlite", self.sqlite_config) + DatabaseConnectionConfig("master-sqlite", self.sqlite_config), + allow_outdated_version=True, ) # Check if all background updates are done, abort if not. diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 2a285e018c..c84cb452b0 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -32,7 +32,7 @@ class PostgresEngine(object): self.synchronous_commit = database_config.get("synchronous_commit", True) self._version = None # unknown as yet - def check_database(self, db_conn): + def check_database(self, db_conn, allow_outdated_version: bool = False): # Get the version of PostgreSQL that we're using. As per the psycopg2 # docs: The number is formed by converting the major, minor, and # revision numbers into two-decimal-digit numbers and appending them @@ -40,7 +40,7 @@ class PostgresEngine(object): self._version = db_conn.server_version # Are we on a supported PostgreSQL version? - if self._version < 90500: + if not allow_outdated_version and self._version < 90500: raise RuntimeError("Synapse requires PostgreSQL 9.5+ or above.") with db_conn.cursor() as txn: diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index 3b3c13360b..cbf52f5191 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -53,8 +53,11 @@ class Sqlite3Engine(object): """ return False - def check_database(self, db_conn): - pass + def check_database(self, db_conn, allow_outdated_version: bool = False): + if not allow_outdated_version: + version = self.module.sqlite_version_info + if version < (3, 11, 0): + raise RuntimeError("Synapse requires sqlite 3.11 or above.") def convert_param_style(self, sql): return sql -- cgit 1.5.1 From 02b44db922f01a35787d2535a834c9774b68020b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 28 Jan 2020 13:44:21 +0000 Subject: Warn if postgres database has non-C locale. (#6734) As using non-C locale can cause issues on upgrading OS. --- UPGRADE.rst | 9 ++++++++ changelog.d/6734.bugfix | 1 + docs/postgres.md | 20 +++++++++++++++++- synapse/storage/engines/postgres.py | 42 +++++++++++++++++++++++++++++++++++++ synapse/storage/engines/sqlite.py | 5 +++++ synapse/storage/prepare_database.py | 5 +++++ 6 files changed, 81 insertions(+), 1 deletion(-) create mode 100644 changelog.d/6734.bugfix (limited to 'synapse/storage/engines') diff --git a/UPGRADE.rst b/UPGRADE.rst index a0202932b1..470246f128 100644 --- a/UPGRADE.rst +++ b/UPGRADE.rst @@ -76,6 +76,15 @@ for example: dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb +Upgrading to **** +=============================== + +Synapse will now log a warning on start up if used with a PostgreSQL database +that has a non-recommended locale set. + +See [docs/postgres.md](docs/postgres.md) for details. + + Upgrading to v1.8.0 =================== diff --git a/changelog.d/6734.bugfix b/changelog.d/6734.bugfix new file mode 100644 index 0000000000..79c6bab4d1 --- /dev/null +++ b/changelog.d/6734.bugfix @@ -0,0 +1 @@ +Warn if postgres database has a non-C locale, as that can cause issues when upgrading locales (e.g. due to upgrading OS). diff --git a/docs/postgres.md b/docs/postgres.md index 7cb1ad18d4..e0793ecee8 100644 --- a/docs/postgres.md +++ b/docs/postgres.md @@ -32,7 +32,7 @@ Assuming your PostgreSQL database user is called `postgres`, first authenticate su - postgres # Or, if your system uses sudo to get administrative rights sudo -u postgres bash - + Then, create a user ``synapse_user`` with: createuser --pwprompt synapse_user @@ -63,6 +63,24 @@ You may need to enable password authentication so `synapse_user` can connect to the database. See . +### Fixing incorrect `COLLATE` or `CTYPE` + +Synapse will refuse to set up a new database if it has the wrong values of +`COLLATE` and `CTYPE` set, and will log warnings on existing databases. Using +different locales can cause issues if the locale library is updated from +underneath the database, or if a different version of the locale is used on any +replicas. + +The safest way to fix the issue is to take a dump and recreate the database with +the correct `COLLATE` and `CTYPE` parameters (as per +[docs/postgres.md](docs/postgres.md)). It is also possible to change the +parameters on a live database and run a `REINDEX` on the entire database, +however extreme care must be taken to avoid database corruption. + +Note that the above may fail with an error about duplicate rows if corruption +has already occurred, and such duplicate rows will need to be manually removed. + + ## Tuning Postgres The default settings should be fine for most deployments. For larger diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index c84cb452b0..a077345960 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -13,8 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + from ._base import IncorrectDatabaseSetup +logger = logging.getLogger(__name__) + class PostgresEngine(object): single_threaded = False @@ -52,6 +56,44 @@ class PostgresEngine(object): "See docs/postgres.rst for more information." % (rows[0][0],) ) + txn.execute( + "SELECT datcollate, datctype FROM pg_database WHERE datname = current_database()" + ) + collation, ctype = txn.fetchone() + if collation != "C": + logger.warning( + "Database has incorrect collation of %r. Should be 'C'", collation + ) + + if ctype != "C": + logger.warning( + "Database has incorrect ctype of %r. Should be 'C'", ctype + ) + + def check_new_database(self, txn): + """Gets called when setting up a brand new database. This allows us to + apply stricter checks on new databases versus existing database. + """ + + txn.execute( + "SELECT datcollate, datctype FROM pg_database WHERE datname = current_database()" + ) + collation, ctype = txn.fetchone() + + errors = [] + + if collation != "C": + errors.append(" - 'COLLATE' is set to %r. Should be 'C'" % (collation,)) + + if ctype != "C": + errors.append(" - 'CTYPE' is set to %r. Should be 'C'" % (collation,)) + + if errors: + raise IncorrectDatabaseSetup( + "Database is incorrectly configured:\n\n%s\n\n" + "See docs/postgres.md for more information." % ("\n".join(errors)) + ) + def convert_param_style(self, sql): return sql.replace("?", "%s") diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index cbf52f5191..641e490697 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -59,6 +59,11 @@ class Sqlite3Engine(object): if version < (3, 11, 0): raise RuntimeError("Synapse requires sqlite 3.11 or above.") + def check_new_database(self, txn): + """Gets called when setting up a brand new database. This allows us to + apply stricter checks on new databases versus existing database. + """ + def convert_param_style(self, sql): return sql diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index e86984cd50..c285ef52a0 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -136,6 +136,11 @@ def _setup_new_database(cur, database_engine, data_stores): data_stores (list[str]): The names of the data stores to instantiate on the given database. """ + + # We're about to set up a brand new database so we check that its + # configured to our liking. + database_engine.check_new_database(cur) + current_dir = os.path.join(dir_path, "schema", "full_schemas") directory_entries = os.listdir(current_dir) -- cgit 1.5.1 From 7728d87fd7e1af17dd6b0c619cbfecb1fadb624f Mon Sep 17 00:00:00 2001 From: Uday Bansal <43824981+udaybansal19@users.noreply.github.com> Date: Wed, 26 Feb 2020 20:47:03 +0530 Subject: Updated warning for incorrect database collation/ctype (#6985) Signed-off-by: Uday Bansal <43824981+udaybansal19@users.noreply.github.com> --- changelog.d/6985.misc | 1 + synapse/storage/engines/postgres.py | 10 +++++++--- 2 files changed, 8 insertions(+), 3 deletions(-) create mode 100644 changelog.d/6985.misc (limited to 'synapse/storage/engines') diff --git a/changelog.d/6985.misc b/changelog.d/6985.misc new file mode 100644 index 0000000000..ba367fa9af --- /dev/null +++ b/changelog.d/6985.misc @@ -0,0 +1 @@ +Update warning for incorrect database collation/ctype to include link to documentation. diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index a077345960..53b3f372b0 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -53,7 +53,7 @@ class PostgresEngine(object): if rows and rows[0][0] != "UTF8": raise IncorrectDatabaseSetup( "Database has incorrect encoding: '%s' instead of 'UTF8'\n" - "See docs/postgres.rst for more information." % (rows[0][0],) + "See docs/postgres.md for more information." % (rows[0][0],) ) txn.execute( @@ -62,12 +62,16 @@ class PostgresEngine(object): collation, ctype = txn.fetchone() if collation != "C": logger.warning( - "Database has incorrect collation of %r. Should be 'C'", collation + "Database has incorrect collation of %r. Should be 'C'\n" + "See docs/postgres.md for more information.", + collation, ) if ctype != "C": logger.warning( - "Database has incorrect ctype of %r. Should be 'C'", ctype + "Database has incorrect ctype of %r. Should be 'C'\n" + "See docs/postgres.md for more information.", + ctype, ) def check_new_database(self, txn): -- cgit 1.5.1 From 132b673dbefa42eb7669a11522426f26e225ac05 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Thu, 27 Feb 2020 11:53:40 +0000 Subject: Add some type annotations in `synapse.storage` (#6987) I cracked, and added some type definitions in synapse.storage. --- changelog.d/6987.misc | 1 + synapse/storage/database.py | 143 +++++++++++++++++++++--------------- synapse/storage/engines/__init__.py | 28 +++---- synapse/storage/engines/_base.py | 87 ++++++++++++++++++++++ synapse/storage/engines/postgres.py | 12 +-- synapse/storage/engines/sqlite.py | 13 ++-- synapse/storage/types.py | 65 ++++++++++++++++ tox.ini | 5 +- 8 files changed, 270 insertions(+), 84 deletions(-) create mode 100644 changelog.d/6987.misc create mode 100644 synapse/storage/types.py (limited to 'synapse/storage/engines') diff --git a/changelog.d/6987.misc b/changelog.d/6987.misc new file mode 100644 index 0000000000..7ff74cda55 --- /dev/null +++ b/changelog.d/6987.misc @@ -0,0 +1 @@ +Add some type annotations to the database storage classes. diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 1953614401..609db40616 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -15,9 +15,9 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -import sys import time -from typing import Iterable, Tuple +from time import monotonic as monotonic_time +from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Tuple from six import iteritems, iterkeys, itervalues from six.moves import intern, range @@ -32,24 +32,14 @@ from synapse.config.database import DatabaseConnectionConfig from synapse.logging.context import LoggingContext, make_deferred_yieldable from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.background_updates import BackgroundUpdater -from synapse.storage.engines import PostgresEngine, Sqlite3Engine +from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine +from synapse.storage.types import Connection, Cursor from synapse.util.stringutils import exception_to_unicode -# import a function which will return a monotonic time, in seconds -try: - # on python 3, use time.monotonic, since time.clock can go backwards - from time import monotonic as monotonic_time -except ImportError: - # ... but python 2 doesn't have it - from time import clock as monotonic_time - logger = logging.getLogger(__name__) -try: - MAX_TXN_ID = sys.maxint - 1 -except AttributeError: - # python 3 does not have a maximum int value - MAX_TXN_ID = 2 ** 63 - 1 +# python 3 does not have a maximum int value +MAX_TXN_ID = 2 ** 63 - 1 sql_logger = logging.getLogger("synapse.storage.SQL") transaction_logger = logging.getLogger("synapse.storage.txn") @@ -77,7 +67,7 @@ UNIQUE_INDEX_BACKGROUND_UPDATES = { def make_pool( - reactor, db_config: DatabaseConnectionConfig, engine + reactor, db_config: DatabaseConnectionConfig, engine: BaseDatabaseEngine ) -> adbapi.ConnectionPool: """Get the connection pool for the database. """ @@ -90,7 +80,9 @@ def make_pool( ) -def make_conn(db_config: DatabaseConnectionConfig, engine): +def make_conn( + db_config: DatabaseConnectionConfig, engine: BaseDatabaseEngine +) -> Connection: """Make a new connection to the database and return it. Returns: @@ -107,20 +99,27 @@ def make_conn(db_config: DatabaseConnectionConfig, engine): return db_conn -class LoggingTransaction(object): +# The type of entry which goes on our after_callbacks and exception_callbacks lists. +# +# Python 3.5.2 doesn't support Callable with an ellipsis, so we wrap it in quotes so +# that mypy sees the type but the runtime python doesn't. +_CallbackListEntry = Tuple["Callable[..., None]", Iterable[Any], Dict[str, Any]] + + +class LoggingTransaction: """An object that almost-transparently proxies for the 'txn' object passed to the constructor. Adds logging and metrics to the .execute() method. Args: txn: The database transcation object to wrap. - name (str): The name of this transactions for logging. - database_engine (Sqlite3Engine|PostgresEngine) - after_callbacks(list|None): A list that callbacks will be appended to + name: The name of this transactions for logging. + database_engine + after_callbacks: A list that callbacks will be appended to that have been added by `call_after` which should be run on successful completion of the transaction. None indicates that no callbacks should be allowed to be scheduled to run. - exception_callbacks(list|None): A list that callbacks will be appended + exception_callbacks: A list that callbacks will be appended to that have been added by `call_on_exception` which should be run if transaction ends with an error. None indicates that no callbacks should be allowed to be scheduled to run. @@ -135,46 +134,67 @@ class LoggingTransaction(object): ] def __init__( - self, txn, name, database_engine, after_callbacks=None, exception_callbacks=None + self, + txn: Cursor, + name: str, + database_engine: BaseDatabaseEngine, + after_callbacks: Optional[List[_CallbackListEntry]] = None, + exception_callbacks: Optional[List[_CallbackListEntry]] = None, ): - object.__setattr__(self, "txn", txn) - object.__setattr__(self, "name", name) - object.__setattr__(self, "database_engine", database_engine) - object.__setattr__(self, "after_callbacks", after_callbacks) - object.__setattr__(self, "exception_callbacks", exception_callbacks) + self.txn = txn + self.name = name + self.database_engine = database_engine + self.after_callbacks = after_callbacks + self.exception_callbacks = exception_callbacks - def call_after(self, callback, *args, **kwargs): + def call_after(self, callback: "Callable[..., None]", *args, **kwargs): """Call the given callback on the main twisted thread after the transaction has finished. Used to invalidate the caches on the correct thread. """ + # if self.after_callbacks is None, that means that whatever constructed the + # LoggingTransaction isn't expecting there to be any callbacks; assert that + # is not the case. + assert self.after_callbacks is not None self.after_callbacks.append((callback, args, kwargs)) - def call_on_exception(self, callback, *args, **kwargs): + def call_on_exception(self, callback: "Callable[..., None]", *args, **kwargs): + # if self.exception_callbacks is None, that means that whatever constructed the + # LoggingTransaction isn't expecting there to be any callbacks; assert that + # is not the case. + assert self.exception_callbacks is not None self.exception_callbacks.append((callback, args, kwargs)) - def __getattr__(self, name): - return getattr(self.txn, name) + def fetchall(self) -> List[Tuple]: + return self.txn.fetchall() - def __setattr__(self, name, value): - setattr(self.txn, name, value) + def fetchone(self) -> Tuple: + return self.txn.fetchone() - def __iter__(self): + def __iter__(self) -> Iterator[Tuple]: return self.txn.__iter__() + @property + def rowcount(self) -> int: + return self.txn.rowcount + + @property + def description(self) -> Any: + return self.txn.description + def execute_batch(self, sql, args): if isinstance(self.database_engine, PostgresEngine): - from psycopg2.extras import execute_batch + from psycopg2.extras import execute_batch # type: ignore self._do_execute(lambda *x: execute_batch(self.txn, *x), sql, args) else: for val in args: self.execute(sql, val) - def execute(self, sql, *args): + def execute(self, sql: str, *args: Any): self._do_execute(self.txn.execute, sql, *args) - def executemany(self, sql, *args): + def executemany(self, sql: str, *args: Any): self._do_execute(self.txn.executemany, sql, *args) def _make_sql_one_line(self, sql): @@ -207,6 +227,9 @@ class LoggingTransaction(object): sql_logger.debug("[SQL time] {%s} %f sec", self.name, secs) sql_query_timer.labels(sql.split()[0]).observe(secs) + def close(self): + self.txn.close() + class PerformanceCounters(object): def __init__(self): @@ -251,7 +274,9 @@ class Database(object): _TXN_ID = 0 - def __init__(self, hs, database_config: DatabaseConnectionConfig, engine): + def __init__( + self, hs, database_config: DatabaseConnectionConfig, engine: BaseDatabaseEngine + ): self.hs = hs self._clock = hs.get_clock() self._database_config = database_config @@ -259,9 +284,9 @@ class Database(object): self.updates = BackgroundUpdater(hs, self) - self._previous_txn_total_time = 0 - self._current_txn_total_time = 0 - self._previous_loop_ts = 0 + self._previous_txn_total_time = 0.0 + self._current_txn_total_time = 0.0 + self._previous_loop_ts = 0.0 # TODO(paul): These can eventually be removed once the metrics code # is running in mainline, and we have some nice monitoring frontends @@ -463,23 +488,23 @@ class Database(object): sql_txn_timer.labels(desc).observe(duration) @defer.inlineCallbacks - def runInteraction(self, desc, func, *args, **kwargs): + def runInteraction(self, desc: str, func: Callable, *args: Any, **kwargs: Any): """Starts a transaction on the database and runs a given function Arguments: - desc (str): description of the transaction, for logging and metrics - func (func): callback function, which will be called with a + desc: description of the transaction, for logging and metrics + func: callback function, which will be called with a database transaction (twisted.enterprise.adbapi.Transaction) as its first argument, followed by `args` and `kwargs`. - args (list): positional args to pass to `func` - kwargs (dict): named args to pass to `func` + args: positional args to pass to `func` + kwargs: named args to pass to `func` Returns: Deferred: The result of func """ - after_callbacks = [] - exception_callbacks = [] + after_callbacks = [] # type: List[_CallbackListEntry] + exception_callbacks = [] # type: List[_CallbackListEntry] if LoggingContext.current_context() == LoggingContext.sentinel: logger.warning("Starting db txn '%s' from sentinel context", desc) @@ -505,15 +530,15 @@ class Database(object): return result @defer.inlineCallbacks - def runWithConnection(self, func, *args, **kwargs): + def runWithConnection(self, func: Callable, *args: Any, **kwargs: Any): """Wraps the .runWithConnection() method on the underlying db_pool. Arguments: - func (func): callback function, which will be called with a + func: callback function, which will be called with a database connection (twisted.enterprise.adbapi.Connection) as its first argument, followed by `args` and `kwargs`. - args (list): positional args to pass to `func` - kwargs (dict): named args to pass to `func` + args: positional args to pass to `func` + kwargs: named args to pass to `func` Returns: Deferred: The result of func @@ -800,7 +825,7 @@ class Database(object): return False # We didn't find any existing rows, so insert a new one - allvalues = {} + allvalues = {} # type: Dict[str, Any] allvalues.update(keyvalues) allvalues.update(values) allvalues.update(insertion_values) @@ -829,7 +854,7 @@ class Database(object): Returns: None """ - allvalues = {} + allvalues = {} # type: Dict[str, Any] allvalues.update(keyvalues) allvalues.update(insertion_values) @@ -916,7 +941,7 @@ class Database(object): Returns: None """ - allnames = [] + allnames = [] # type: List[str] allnames.extend(key_names) allnames.extend(value_names) @@ -1100,7 +1125,7 @@ class Database(object): keyvalues : dict of column names and values to select the rows with retcols : list of strings giving the names of the columns to return """ - results = [] + results = [] # type: List[Dict[str, Any]] if not iterable: return results @@ -1439,7 +1464,7 @@ class Database(object): raise ValueError("order_direction must be one of 'ASC' or 'DESC'.") where_clause = "WHERE " if filters or keyvalues else "" - arg_list = [] + arg_list = [] # type: List[Any] if filters: where_clause += " AND ".join("%s LIKE ?" % (k,) for k in filters) arg_list += list(filters.values()) diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py index 9d2d519922..035f9ea6e9 100644 --- a/synapse/storage/engines/__init__.py +++ b/synapse/storage/engines/__init__.py @@ -12,29 +12,31 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import importlib import platform -from ._base import IncorrectDatabaseSetup +from ._base import BaseDatabaseEngine, IncorrectDatabaseSetup from .postgres import PostgresEngine from .sqlite import Sqlite3Engine -SUPPORTED_MODULE = {"sqlite3": Sqlite3Engine, "psycopg2": PostgresEngine} - -def create_engine(database_config): +def create_engine(database_config) -> BaseDatabaseEngine: name = database_config["name"] - engine_class = SUPPORTED_MODULE.get(name, None) - if engine_class: + if name == "sqlite3": + import sqlite3 + + return Sqlite3Engine(sqlite3, database_config) + + if name == "psycopg2": # pypy requires psycopg2cffi rather than psycopg2 - if name == "psycopg2" and platform.python_implementation() == "PyPy": - name = "psycopg2cffi" - module = importlib.import_module(name) - return engine_class(module, database_config) + if platform.python_implementation() == "PyPy": + import psycopg2cffi as psycopg2 # type: ignore + else: + import psycopg2 # type: ignore + + return PostgresEngine(psycopg2, database_config) raise RuntimeError("Unsupported database engine '%s'" % (name,)) -__all__ = ["create_engine", "IncorrectDatabaseSetup"] +__all__ = ["create_engine", "BaseDatabaseEngine", "IncorrectDatabaseSetup"] diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py index ec5a4d198b..ab0bbe4bd3 100644 --- a/synapse/storage/engines/_base.py +++ b/synapse/storage/engines/_base.py @@ -12,7 +12,94 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import abc +from typing import Generic, TypeVar + +from synapse.storage.types import Connection class IncorrectDatabaseSetup(RuntimeError): pass + + +ConnectionType = TypeVar("ConnectionType", bound=Connection) + + +class BaseDatabaseEngine(Generic[ConnectionType], metaclass=abc.ABCMeta): + def __init__(self, module, database_config: dict): + self.module = module + + @property + @abc.abstractmethod + def single_threaded(self) -> bool: + ... + + @property + @abc.abstractmethod + def can_native_upsert(self) -> bool: + """ + Do we support native UPSERTs? + """ + ... + + @property + @abc.abstractmethod + def supports_tuple_comparison(self) -> bool: + """ + Do we support comparing tuples, i.e. `(a, b) > (c, d)`? + """ + ... + + @property + @abc.abstractmethod + def supports_using_any_list(self) -> bool: + """ + Do we support using `a = ANY(?)` and passing a list + """ + ... + + @abc.abstractmethod + def check_database( + self, db_conn: ConnectionType, allow_outdated_version: bool = False + ) -> None: + ... + + @abc.abstractmethod + def check_new_database(self, txn) -> None: + """Gets called when setting up a brand new database. This allows us to + apply stricter checks on new databases versus existing database. + """ + ... + + @abc.abstractmethod + def convert_param_style(self, sql: str) -> str: + ... + + @abc.abstractmethod + def on_new_connection(self, db_conn: ConnectionType) -> None: + ... + + @abc.abstractmethod + def is_deadlock(self, error: Exception) -> bool: + ... + + @abc.abstractmethod + def is_connection_closed(self, conn: ConnectionType) -> bool: + ... + + @abc.abstractmethod + def lock_table(self, txn, table: str) -> None: + ... + + @abc.abstractmethod + def get_next_state_group_id(self, txn) -> int: + """Returns an int that can be used as a new state_group ID + """ + ... + + @property + @abc.abstractmethod + def server_version(self) -> str: + """Gets a string giving the server version. For example: '3.22.0' + """ + ... diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 53b3f372b0..6c7d08a6f2 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -15,16 +15,14 @@ import logging -from ._base import IncorrectDatabaseSetup +from ._base import BaseDatabaseEngine, IncorrectDatabaseSetup logger = logging.getLogger(__name__) -class PostgresEngine(object): - single_threaded = False - +class PostgresEngine(BaseDatabaseEngine): def __init__(self, database_module, database_config): - self.module = database_module + super().__init__(database_module, database_config) self.module.extensions.register_type(self.module.extensions.UNICODE) # Disables passing `bytes` to txn.execute, c.f. #6186. If you do @@ -36,6 +34,10 @@ class PostgresEngine(object): self.synchronous_commit = database_config.get("synchronous_commit", True) self._version = None # unknown as yet + @property + def single_threaded(self) -> bool: + return False + def check_database(self, db_conn, allow_outdated_version: bool = False): # Get the version of PostgreSQL that we're using. As per the psycopg2 # docs: The number is formed by converting the major, minor, and diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index 641e490697..2bfeefd54e 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -12,16 +12,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import sqlite3 import struct import threading +from synapse.storage.engines import BaseDatabaseEngine -class Sqlite3Engine(object): - single_threaded = True +class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection]): def __init__(self, database_module, database_config): - self.module = database_module + super().__init__(database_module, database_config) database = database_config.get("args", {}).get("database") self._is_in_memory = database in (None, ":memory:",) @@ -31,6 +31,10 @@ class Sqlite3Engine(object): self._current_state_group_id = None self._current_state_group_id_lock = threading.Lock() + @property + def single_threaded(self) -> bool: + return True + @property def can_native_upsert(self): """ @@ -68,7 +72,6 @@ class Sqlite3Engine(object): return sql def on_new_connection(self, db_conn): - # We need to import here to avoid an import loop. from synapse.storage.prepare_database import prepare_database diff --git a/synapse/storage/types.py b/synapse/storage/types.py new file mode 100644 index 0000000000..daff81c5ee --- /dev/null +++ b/synapse/storage/types.py @@ -0,0 +1,65 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, Iterable, Iterator, List, Tuple + +from typing_extensions import Protocol + + +""" +Some very basic protocol definitions for the DB-API2 classes specified in PEP-249 +""" + + +class Cursor(Protocol): + def execute(self, sql: str, parameters: Iterable[Any] = ...) -> Any: + ... + + def executemany(self, sql: str, parameters: Iterable[Iterable[Any]]) -> Any: + ... + + def fetchall(self) -> List[Tuple]: + ... + + def fetchone(self) -> Tuple: + ... + + @property + def description(self) -> Any: + return None + + @property + def rowcount(self) -> int: + return 0 + + def __iter__(self) -> Iterator[Tuple]: + ... + + def close(self) -> None: + ... + + +class Connection(Protocol): + def cursor(self) -> Cursor: + ... + + def close(self) -> None: + ... + + def commit(self) -> None: + ... + + def rollback(self, *args, **kwargs) -> None: + ... diff --git a/tox.ini b/tox.ini index 4ccfde01b5..6521535137 100644 --- a/tox.ini +++ b/tox.ini @@ -168,7 +168,6 @@ commands= coverage html [testenv:mypy] -basepython = python3.7 skip_install = True deps = {[base]deps} @@ -179,7 +178,8 @@ env = extras = all commands = mypy \ synapse/api \ - synapse/config/ \ + synapse/appservice \ + synapse/config \ synapse/events/spamcheck.py \ synapse/federation/sender \ synapse/federation/transport \ @@ -192,6 +192,7 @@ commands = mypy \ synapse/rest \ synapse/spam_checker_api \ synapse/storage/engines \ + synapse/storage/database.py \ synapse/streams # To find all folders that pass mypy you run: -- cgit 1.5.1 From fbf0782c63bd2aba3c504dabd04abdf10d269a22 Mon Sep 17 00:00:00 2001 From: David Vo Date: Sat, 28 Mar 2020 00:20:00 +1100 Subject: Only import sqlite3 when type checking (#7155) Fixes: #7127 Signed-off-by: David Vo --- changelog.d/7155.bugfix | 1 + synapse/storage/engines/sqlite.py | 7 +++++-- 2 files changed, 6 insertions(+), 2 deletions(-) create mode 100644 changelog.d/7155.bugfix (limited to 'synapse/storage/engines') diff --git a/changelog.d/7155.bugfix b/changelog.d/7155.bugfix new file mode 100644 index 0000000000..0bf51e7aba --- /dev/null +++ b/changelog.d/7155.bugfix @@ -0,0 +1 @@ +Avoid importing `sqlite3` when using the postgres backend. Contributed by David Vo. diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index 2bfeefd54e..3bc2e8b986 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -12,14 +12,17 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import sqlite3 import struct import threading +import typing from synapse.storage.engines import BaseDatabaseEngine +if typing.TYPE_CHECKING: + import sqlite3 # noqa: F401 -class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection]): + +class Sqlite3Engine(BaseDatabaseEngine["sqlite3.Connection"]): def __init__(self, database_module, database_config): super().__init__(database_module, database_config) -- cgit 1.5.1 From 627b0f5f2753e6910adb7a877541d50f5936b8a5 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 30 Apr 2020 13:47:49 -0400 Subject: Persist user interactive authentication sessions (#7302) By persisting the user interactive authentication sessions to the database, this fixes situations where a user hits different works throughout their auth session and also allows sessions to persist through restarts of Synapse. --- changelog.d/7302.bugfix | 1 + synapse/app/generic_worker.py | 2 + synapse/handlers/auth.py | 175 +++++-------- synapse/handlers/cas_handler.py | 2 +- synapse/handlers/saml_handler.py | 2 +- synapse/rest/client/v2_alpha/auth.py | 4 +- synapse/rest/client/v2_alpha/register.py | 4 +- synapse/storage/data_stores/main/__init__.py | 2 + .../main/schema/delta/58/03persist_ui_auth.sql | 36 +++ synapse/storage/data_stores/main/ui_auth.py | 279 +++++++++++++++++++++ synapse/storage/engines/sqlite.py | 1 + tests/rest/client/v2_alpha/test_auth.py | 40 +++ tests/utils.py | 8 +- tox.ini | 3 +- 14 files changed, 434 insertions(+), 125 deletions(-) create mode 100644 changelog.d/7302.bugfix create mode 100644 synapse/storage/data_stores/main/schema/delta/58/03persist_ui_auth.sql create mode 100644 synapse/storage/data_stores/main/ui_auth.py (limited to 'synapse/storage/engines') diff --git a/changelog.d/7302.bugfix b/changelog.d/7302.bugfix new file mode 100644 index 0000000000..820646d1f9 --- /dev/null +++ b/changelog.d/7302.bugfix @@ -0,0 +1 @@ +Persist user interactive authentication sessions across workers and Synapse restarts. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index d125327f08..0ace7b787d 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -127,6 +127,7 @@ from synapse.storage.data_stores.main.monthly_active_users import ( MonthlyActiveUsersWorkerStore, ) from synapse.storage.data_stores.main.presence import UserPresenceState +from synapse.storage.data_stores.main.ui_auth import UIAuthWorkerStore from synapse.storage.data_stores.main.user_directory import UserDirectoryStore from synapse.types import ReadReceipt from synapse.util.async_helpers import Linearizer @@ -439,6 +440,7 @@ class GenericWorkerSlavedStore( # FIXME(#3714): We need to add UserDirectoryStore as we write directly # rather than going via the correct worker. UserDirectoryStore, + UIAuthWorkerStore, SlavedDeviceInboxStore, SlavedDeviceStore, SlavedReceiptsStore, diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index dbe165ce1e..7613e5b6ab 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -41,10 +41,10 @@ from synapse.handlers.ui_auth.checkers import UserInteractiveAuthChecker from synapse.http.server import finish_request from synapse.http.site import SynapseRequest from synapse.logging.context import defer_to_thread +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.module_api import ModuleApi from synapse.push.mailer import load_jinja2_templates from synapse.types import Requester, UserID -from synapse.util.caches.expiringcache import ExpiringCache from ._base import BaseHandler @@ -69,15 +69,6 @@ class AuthHandler(BaseHandler): self.bcrypt_rounds = hs.config.bcrypt_rounds - # This is not a cache per se, but a store of all current sessions that - # expire after N hours - self.sessions = ExpiringCache( - cache_name="register_sessions", - clock=hs.get_clock(), - expiry_ms=self.SESSION_EXPIRE_MS, - reset_expiry_on_get=True, - ) - account_handler = ModuleApi(hs, self) self.password_providers = [ module(config=config, account_handler=account_handler) @@ -119,6 +110,15 @@ class AuthHandler(BaseHandler): self._clock = self.hs.get_clock() + # Expire old UI auth sessions after a period of time. + if hs.config.worker_app is None: + self._clock.looping_call( + run_as_background_process, + 5 * 60 * 1000, + "expire_old_sessions", + self._expire_old_sessions, + ) + # Load the SSO HTML templates. # The following template is shown to the user during a client login via SSO, @@ -301,16 +301,21 @@ class AuthHandler(BaseHandler): if "session" in authdict: sid = authdict["session"] + # Convert the URI and method to strings. + uri = request.uri.decode("utf-8") + method = request.uri.decode("utf-8") + # If there's no session ID, create a new session. if not sid: - session = self._create_session( - clientdict, (request.uri, request.method, clientdict), description + session = await self.store.create_ui_auth_session( + clientdict, uri, method, description ) - session_id = session["id"] else: - session = self._get_session_info(sid) - session_id = sid + try: + session = await self.store.get_ui_auth_session(sid) + except StoreError: + raise SynapseError(400, "Unknown session ID: %s" % (sid,)) if not clientdict: # This was designed to allow the client to omit the parameters @@ -322,15 +327,15 @@ class AuthHandler(BaseHandler): # on a homeserver. # Revisit: Assuming the REST APIs do sensible validation, the data # isn't arbitrary. - clientdict = session["clientdict"] + clientdict = session.clientdict # Ensure that the queried operation does not vary between stages of # the UI authentication session. This is done by generating a stable # comparator based on the URI, method, and body (minus the auth dict) # and storing it during the initial query. Subsequent queries ensure # that this comparator has not changed. - comparator = (request.uri, request.method, clientdict) - if session["ui_auth"] != comparator: + comparator = (uri, method, clientdict) + if (session.uri, session.method, session.clientdict) != comparator: raise SynapseError( 403, "Requested operation has changed during the UI authentication session.", @@ -338,11 +343,9 @@ class AuthHandler(BaseHandler): if not authdict: raise InteractiveAuthIncompleteError( - self._auth_dict_for_flows(flows, session_id) + self._auth_dict_for_flows(flows, session.session_id) ) - creds = session["creds"] - # check auth type currently being presented errordict = {} # type: Dict[str, Any] if "type" in authdict: @@ -350,8 +353,9 @@ class AuthHandler(BaseHandler): try: result = await self._check_auth_dict(authdict, clientip) if result: - creds[login_type] = result - self._save_session(session) + await self.store.mark_ui_auth_stage_complete( + session.session_id, login_type, result + ) except LoginError as e: if login_type == LoginType.EMAIL_IDENTITY: # riot used to have a bug where it would request a new @@ -367,6 +371,7 @@ class AuthHandler(BaseHandler): # so that the client can have another go. errordict = e.error_dict() + creds = await self.store.get_completed_ui_auth_stages(session.session_id) for f in flows: if len(set(f) - set(creds)) == 0: # it's very useful to know what args are stored, but this can @@ -380,9 +385,9 @@ class AuthHandler(BaseHandler): list(clientdict), ) - return creds, clientdict, session_id + return creds, clientdict, session.session_id - ret = self._auth_dict_for_flows(flows, session_id) + ret = self._auth_dict_for_flows(flows, session.session_id) ret["completed"] = list(creds) ret.update(errordict) raise InteractiveAuthIncompleteError(ret) @@ -399,13 +404,11 @@ class AuthHandler(BaseHandler): if "session" not in authdict: raise LoginError(400, "", Codes.MISSING_PARAM) - sess = self._get_session_info(authdict["session"]) - creds = sess["creds"] - result = await self.checkers[stagetype].check_auth(authdict, clientip) if result: - creds[stagetype] = result - self._save_session(sess) + await self.store.mark_ui_auth_stage_complete( + authdict["session"], stagetype, result + ) return True return False @@ -427,7 +430,7 @@ class AuthHandler(BaseHandler): sid = authdict["session"] return sid - def set_session_data(self, session_id: str, key: str, value: Any) -> None: + async def set_session_data(self, session_id: str, key: str, value: Any) -> None: """ Store a key-value pair into the sessions data associated with this request. This data is stored server-side and cannot be modified by @@ -438,11 +441,12 @@ class AuthHandler(BaseHandler): key: The key to store the data under value: The data to store """ - sess = self._get_session_info(session_id) - sess["serverdict"][key] = value - self._save_session(sess) + try: + await self.store.set_ui_auth_session_data(session_id, key, value) + except StoreError: + raise SynapseError(400, "Unknown session ID: %s" % (session_id,)) - def get_session_data( + async def get_session_data( self, session_id: str, key: str, default: Optional[Any] = None ) -> Any: """ @@ -453,8 +457,18 @@ class AuthHandler(BaseHandler): key: The key to store the data under default: Value to return if the key has not been set """ - sess = self._get_session_info(session_id) - return sess["serverdict"].get(key, default) + try: + return await self.store.get_ui_auth_session_data(session_id, key, default) + except StoreError: + raise SynapseError(400, "Unknown session ID: %s" % (session_id,)) + + async def _expire_old_sessions(self): + """ + Invalidate any user interactive authentication sessions that have expired. + """ + now = self._clock.time_msec() + expiration_time = now - self.SESSION_EXPIRE_MS + await self.store.delete_old_ui_auth_sessions(expiration_time) async def _check_auth_dict( self, authdict: Dict[str, Any], clientip: str @@ -534,67 +548,6 @@ class AuthHandler(BaseHandler): "params": params, } - def _create_session( - self, - clientdict: Dict[str, Any], - ui_auth: Tuple[bytes, bytes, Dict[str, Any]], - description: str, - ) -> dict: - """ - Creates a new user interactive authentication session. - - The session can be used to track data across multiple requests, e.g. for - interactive authentication. - - Each session has the following keys: - - id: - A unique identifier for this session. Passed back to the client - and returned for each stage. - clientdict: - The dictionary from the client root level, not the 'auth' key. - ui_auth: - A tuple which is checked at each stage of the authentication to - ensure that the asked for operation has not changed. - creds: - A map, which maps each auth-type (str) to the relevant identity - authenticated by that auth-type (mostly str, but for captcha, bool). - serverdict: - A map of data that is stored server-side and cannot be modified - by the client. - description: - A string description of the operation that the current - authentication is authorising. - Returns: - The newly created session. - """ - session_id = None - while session_id is None or session_id in self.sessions: - session_id = stringutils.random_string(24) - - self.sessions[session_id] = { - "id": session_id, - "clientdict": clientdict, - "ui_auth": ui_auth, - "creds": {}, - "serverdict": {}, - "description": description, - } - - return self.sessions[session_id] - - def _get_session_info(self, session_id: str) -> dict: - """ - Gets a session given a session ID. - - The session can be used to track data across multiple requests, e.g. for - interactive authentication. - """ - try: - return self.sessions[session_id] - except KeyError: - raise SynapseError(400, "Unknown session ID: %s" % (session_id,)) - async def get_access_token_for_user_id( self, user_id: str, device_id: Optional[str], valid_until_ms: Optional[int] ): @@ -994,13 +947,6 @@ class AuthHandler(BaseHandler): await self.store.user_delete_threepid(user_id, medium, address) return result - def _save_session(self, session: Dict[str, Any]) -> None: - """Update the last used time on the session to now and add it back to the session store.""" - # TODO: Persistent storage - logger.debug("Saving session %s", session) - session["last_used"] = self.hs.get_clock().time_msec() - self.sessions[session["id"]] = session - async def hash(self, password: str) -> str: """Computes a secure hash of password. @@ -1052,7 +998,7 @@ class AuthHandler(BaseHandler): else: return False - def start_sso_ui_auth(self, redirect_url: str, session_id: str) -> str: + async def start_sso_ui_auth(self, redirect_url: str, session_id: str) -> str: """ Get the HTML for the SSO redirect confirmation page. @@ -1063,12 +1009,15 @@ class AuthHandler(BaseHandler): Returns: The HTML to render. """ - session = self._get_session_info(session_id) + try: + session = await self.store.get_ui_auth_session(session_id) + except StoreError: + raise SynapseError(400, "Unknown session ID: %s" % (session_id,)) return self._sso_auth_confirm_template.render( - description=session["description"], redirect_url=redirect_url, + description=session.description, redirect_url=redirect_url, ) - def complete_sso_ui_auth( + async def complete_sso_ui_auth( self, registered_user_id: str, session_id: str, request: SynapseRequest, ): """Having figured out a mxid for this user, complete the HTTP request @@ -1080,13 +1029,11 @@ class AuthHandler(BaseHandler): process. """ # Mark the stage of the authentication as successful. - sess = self._get_session_info(session_id) - creds = sess["creds"] - # Save the user who authenticated with SSO, this will be used to ensure # that the account be modified is also the person who logged in. - creds[LoginType.SSO] = registered_user_id - self._save_session(sess) + await self.store.mark_ui_auth_stage_complete( + session_id, LoginType.SSO, registered_user_id + ) # Render the HTML and return. html_bytes = self._sso_auth_success_template.encode("utf-8") diff --git a/synapse/handlers/cas_handler.py b/synapse/handlers/cas_handler.py index 5cb3f9d133..64aaa1335c 100644 --- a/synapse/handlers/cas_handler.py +++ b/synapse/handlers/cas_handler.py @@ -206,7 +206,7 @@ class CasHandler: registered_user_id = await self._auth_handler.check_user_exists(user_id) if session: - self._auth_handler.complete_sso_ui_auth( + await self._auth_handler.complete_sso_ui_auth( registered_user_id, session, request, ) diff --git a/synapse/handlers/saml_handler.py b/synapse/handlers/saml_handler.py index 7c9454b504..96f2dd36ad 100644 --- a/synapse/handlers/saml_handler.py +++ b/synapse/handlers/saml_handler.py @@ -149,7 +149,7 @@ class SamlHandler: # Complete the interactive auth session or the login. if current_session and current_session.ui_auth_session_id: - self._auth_handler.complete_sso_ui_auth( + await self._auth_handler.complete_sso_ui_auth( user_id, current_session.ui_auth_session_id, request ) diff --git a/synapse/rest/client/v2_alpha/auth.py b/synapse/rest/client/v2_alpha/auth.py index 11599f5005..24dd3d3e96 100644 --- a/synapse/rest/client/v2_alpha/auth.py +++ b/synapse/rest/client/v2_alpha/auth.py @@ -140,7 +140,7 @@ class AuthRestServlet(RestServlet): self._cas_server_url = hs.config.cas_server_url self._cas_service_url = hs.config.cas_service_url - def on_GET(self, request, stagetype): + async def on_GET(self, request, stagetype): session = parse_string(request, "session") if not session: raise SynapseError(400, "No session supplied") @@ -180,7 +180,7 @@ class AuthRestServlet(RestServlet): else: raise SynapseError(400, "Homeserver not configured for SSO.") - html = self.auth_handler.start_sso_ui_auth(sso_redirect_url, session) + html = await self.auth_handler.start_sso_ui_auth(sso_redirect_url, session) else: raise SynapseError(404, "Unknown auth stage type") diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index d1b5c49989..af08cc6cce 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -499,7 +499,7 @@ class RegisterRestServlet(RestServlet): # registered a user for this session, so we could just return the # user here. We carry on and go through the auth checks though, # for paranoia. - registered_user_id = self.auth_handler.get_session_data( + registered_user_id = await self.auth_handler.get_session_data( session_id, "registered_user_id", None ) @@ -598,7 +598,7 @@ class RegisterRestServlet(RestServlet): # remember that we've now registered that user account, and with # what user ID (since the user may not have specified) - self.auth_handler.set_session_data( + await self.auth_handler.set_session_data( session_id, "registered_user_id", registered_user_id ) diff --git a/synapse/storage/data_stores/main/__init__.py b/synapse/storage/data_stores/main/__init__.py index bd7c3a00ea..ceba10882c 100644 --- a/synapse/storage/data_stores/main/__init__.py +++ b/synapse/storage/data_stores/main/__init__.py @@ -66,6 +66,7 @@ from .stats import StatsStore from .stream import StreamStore from .tags import TagsStore from .transactions import TransactionStore +from .ui_auth import UIAuthStore from .user_directory import UserDirectoryStore from .user_erasure_store import UserErasureStore @@ -112,6 +113,7 @@ class DataStore( StatsStore, RelationsStore, CacheInvalidationStore, + UIAuthStore, ): def __init__(self, database: Database, db_conn, hs): self.hs = hs diff --git a/synapse/storage/data_stores/main/schema/delta/58/03persist_ui_auth.sql b/synapse/storage/data_stores/main/schema/delta/58/03persist_ui_auth.sql new file mode 100644 index 0000000000..dcb593fc2d --- /dev/null +++ b/synapse/storage/data_stores/main/schema/delta/58/03persist_ui_auth.sql @@ -0,0 +1,36 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS ui_auth_sessions( + session_id TEXT NOT NULL, -- The session ID passed to the client. + creation_time BIGINT NOT NULL, -- The time this session was created (epoch time in milliseconds). + serverdict TEXT NOT NULL, -- A JSON dictionary of arbitrary data added by Synapse. + clientdict TEXT NOT NULL, -- A JSON dictionary of arbitrary data from the client. + uri TEXT NOT NULL, -- The URI the UI authentication session is using. + method TEXT NOT NULL, -- The HTTP method the UI authentication session is using. + -- The clientdict, uri, and method make up an tuple that must be immutable + -- throughout the lifetime of the UI Auth session. + description TEXT NOT NULL, -- A human readable description of the operation which caused the UI Auth flow to occur. + UNIQUE (session_id) +); + +CREATE TABLE IF NOT EXISTS ui_auth_sessions_credentials( + session_id TEXT NOT NULL, -- The corresponding UI Auth session. + stage_type TEXT NOT NULL, -- The stage type. + result TEXT NOT NULL, -- The result of the stage verification, stored as JSON. + UNIQUE (session_id, stage_type), + FOREIGN KEY (session_id) + REFERENCES ui_auth_sessions (session_id) +); diff --git a/synapse/storage/data_stores/main/ui_auth.py b/synapse/storage/data_stores/main/ui_auth.py new file mode 100644 index 0000000000..c8eebc9378 --- /dev/null +++ b/synapse/storage/data_stores/main/ui_auth.py @@ -0,0 +1,279 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import json +from typing import Any, Dict, Optional, Union + +import attr + +import synapse.util.stringutils as stringutils +from synapse.api.errors import StoreError +from synapse.storage._base import SQLBaseStore +from synapse.types import JsonDict + + +@attr.s +class UIAuthSessionData: + session_id = attr.ib(type=str) + # The dictionary from the client root level, not the 'auth' key. + clientdict = attr.ib(type=JsonDict) + # The URI and method the session was intiatied with. These are checked at + # each stage of the authentication to ensure that the asked for operation + # has not changed. + uri = attr.ib(type=str) + method = attr.ib(type=str) + # A string description of the operation that the current authentication is + # authorising. + description = attr.ib(type=str) + + +class UIAuthWorkerStore(SQLBaseStore): + """ + Manage user interactive authentication sessions. + """ + + async def create_ui_auth_session( + self, clientdict: JsonDict, uri: str, method: str, description: str, + ) -> UIAuthSessionData: + """ + Creates a new user interactive authentication session. + + The session can be used to track the stages necessary to authenticate a + user across multiple HTTP requests. + + Args: + clientdict: + The dictionary from the client root level, not the 'auth' key. + uri: + The URI this session was initiated with, this is checked at each + stage of the authentication to ensure that the asked for + operation has not changed. + method: + The method this session was initiated with, this is checked at each + stage of the authentication to ensure that the asked for + operation has not changed. + description: + A string description of the operation that the current + authentication is authorising. + Returns: + The newly created session. + Raises: + StoreError if a unique session ID cannot be generated. + """ + # The clientdict gets stored as JSON. + clientdict_json = json.dumps(clientdict) + + # autogen a session ID and try to create it. We may clash, so just + # try a few times till one goes through, giving up eventually. + attempts = 0 + while attempts < 5: + session_id = stringutils.random_string(24) + + try: + await self.db.simple_insert( + table="ui_auth_sessions", + values={ + "session_id": session_id, + "clientdict": clientdict_json, + "uri": uri, + "method": method, + "description": description, + "serverdict": "{}", + "creation_time": self.hs.get_clock().time_msec(), + }, + desc="create_ui_auth_session", + ) + return UIAuthSessionData( + session_id, clientdict, uri, method, description + ) + except self.db.engine.module.IntegrityError: + attempts += 1 + raise StoreError(500, "Couldn't generate a session ID.") + + async def get_ui_auth_session(self, session_id: str) -> UIAuthSessionData: + """Retrieve a UI auth session. + + Args: + session_id: The ID of the session. + Returns: + A dict containing the device information. + Raises: + StoreError if the session is not found. + """ + result = await self.db.simple_select_one( + table="ui_auth_sessions", + keyvalues={"session_id": session_id}, + retcols=("clientdict", "uri", "method", "description"), + desc="get_ui_auth_session", + ) + + result["clientdict"] = json.loads(result["clientdict"]) + + return UIAuthSessionData(session_id, **result) + + async def mark_ui_auth_stage_complete( + self, session_id: str, stage_type: str, result: Union[str, bool, JsonDict], + ): + """ + Mark a session stage as completed. + + Args: + session_id: The ID of the corresponding session. + stage_type: The completed stage type. + result: The result of the stage verification. + Raises: + StoreError if the session cannot be found. + """ + # Add (or update) the results of the current stage to the database. + # + # Note that we need to allow for the same stage to complete multiple + # times here so that registration is idempotent. + try: + await self.db.simple_upsert( + table="ui_auth_sessions_credentials", + keyvalues={"session_id": session_id, "stage_type": stage_type}, + values={"result": json.dumps(result)}, + desc="mark_ui_auth_stage_complete", + ) + except self.db.engine.module.IntegrityError: + raise StoreError(400, "Unknown session ID: %s" % (session_id,)) + + async def get_completed_ui_auth_stages( + self, session_id: str + ) -> Dict[str, Union[str, bool, JsonDict]]: + """ + Retrieve the completed stages of a UI authentication session. + + Args: + session_id: The ID of the session. + Returns: + The completed stages mapped to the result of the verification of + that auth-type. + """ + results = {} + for row in await self.db.simple_select_list( + table="ui_auth_sessions_credentials", + keyvalues={"session_id": session_id}, + retcols=("stage_type", "result"), + desc="get_completed_ui_auth_stages", + ): + results[row["stage_type"]] = json.loads(row["result"]) + + return results + + async def set_ui_auth_session_data(self, session_id: str, key: str, value: Any): + """ + Store a key-value pair into the sessions data associated with this + request. This data is stored server-side and cannot be modified by + the client. + + Args: + session_id: The ID of this session as returned from check_auth + key: The key to store the data under + value: The data to store + Raises: + StoreError if the session cannot be found. + """ + await self.db.runInteraction( + "set_ui_auth_session_data", + self._set_ui_auth_session_data_txn, + session_id, + key, + value, + ) + + def _set_ui_auth_session_data_txn(self, txn, session_id: str, key: str, value: Any): + # Get the current value. + result = self.db.simple_select_one_txn( + txn, + table="ui_auth_sessions", + keyvalues={"session_id": session_id}, + retcols=("serverdict",), + ) + + # Update it and add it back to the database. + serverdict = json.loads(result["serverdict"]) + serverdict[key] = value + + self.db.simple_update_one_txn( + txn, + table="ui_auth_sessions", + keyvalues={"session_id": session_id}, + updatevalues={"serverdict": json.dumps(serverdict)}, + ) + + async def get_ui_auth_session_data( + self, session_id: str, key: str, default: Optional[Any] = None + ) -> Any: + """ + Retrieve data stored with set_session_data + + Args: + session_id: The ID of this session as returned from check_auth + key: The key to store the data under + default: Value to return if the key has not been set + Raises: + StoreError if the session cannot be found. + """ + result = await self.db.simple_select_one( + table="ui_auth_sessions", + keyvalues={"session_id": session_id}, + retcols=("serverdict",), + desc="get_ui_auth_session_data", + ) + + serverdict = json.loads(result["serverdict"]) + + return serverdict.get(key, default) + + +class UIAuthStore(UIAuthWorkerStore): + def delete_old_ui_auth_sessions(self, expiration_time: int): + """ + Remove sessions which were last used earlier than the expiration time. + + Args: + expiration_time: The latest time that is still considered valid. + This is an epoch time in milliseconds. + + """ + return self.db.runInteraction( + "delete_old_ui_auth_sessions", + self._delete_old_ui_auth_sessions_txn, + expiration_time, + ) + + def _delete_old_ui_auth_sessions_txn(self, txn, expiration_time: int): + # Get the expired sessions. + sql = "SELECT session_id FROM ui_auth_sessions WHERE creation_time <= ?" + txn.execute(sql, [expiration_time]) + session_ids = [r[0] for r in txn.fetchall()] + + # Delete the corresponding completed credentials. + self.db.simple_delete_many_txn( + txn, + table="ui_auth_sessions_credentials", + column="session_id", + iterable=session_ids, + keyvalues={}, + ) + + # Finally, delete the sessions. + self.db.simple_delete_many_txn( + txn, + table="ui_auth_sessions", + column="session_id", + iterable=session_ids, + keyvalues={}, + ) diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index 3bc2e8b986..215a949442 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -85,6 +85,7 @@ class Sqlite3Engine(BaseDatabaseEngine["sqlite3.Connection"]): prepare_database(db_conn, self, config=None) db_conn.create_function("rank", 1, _rank) + db_conn.execute("PRAGMA foreign_keys = ON;") def is_deadlock(self, error): return False diff --git a/tests/rest/client/v2_alpha/test_auth.py b/tests/rest/client/v2_alpha/test_auth.py index 624bf5ada2..587be7b2e7 100644 --- a/tests/rest/client/v2_alpha/test_auth.py +++ b/tests/rest/client/v2_alpha/test_auth.py @@ -181,3 +181,43 @@ class FallbackAuthTests(unittest.HomeserverTestCase): ) self.render(request) self.assertEqual(channel.code, 403) + + def test_complete_operation_unknown_session(self): + """ + Attempting to mark an invalid session as complete should error. + """ + + # Make the initial request to register. (Later on a different password + # will be used.) + request, channel = self.make_request( + "POST", + "register", + {"username": "user", "type": "m.login.password", "password": "bar"}, + ) + self.render(request) + + # Returns a 401 as per the spec + self.assertEqual(request.code, 401) + # Grab the session + session = channel.json_body["session"] + # Assert our configured public key is being given + self.assertEqual( + channel.json_body["params"]["m.login.recaptcha"]["public_key"], "brokencake" + ) + + request, channel = self.make_request( + "GET", "auth/m.login.recaptcha/fallback/web?session=" + session + ) + self.render(request) + self.assertEqual(request.code, 200) + + # Attempt to complete an unknown session, which should return an error. + unknown_session = session + "unknown" + request, channel = self.make_request( + "POST", + "auth/m.login.recaptcha/fallback/web?session=" + + unknown_session + + "&g-recaptcha-response=a", + ) + self.render(request) + self.assertEqual(request.code, 400) diff --git a/tests/utils.py b/tests/utils.py index 037cb134f0..f9be62b499 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -512,8 +512,8 @@ class MockClock(object): return t - def looping_call(self, function, interval): - self.loopers.append([function, interval / 1000.0, self.now]) + def looping_call(self, function, interval, *args, **kwargs): + self.loopers.append([function, interval / 1000.0, self.now, args, kwargs]) def cancel_call_later(self, timer, ignore_errs=False): if timer[2]: @@ -543,9 +543,9 @@ class MockClock(object): self.timers.append(t) for looped in self.loopers: - func, interval, last = looped + func, interval, last, args, kwargs = looped if last + interval < self.now: - func() + func(*args, **kwargs) looped[2] = self.now def advance_time_msec(self, ms): diff --git a/tox.ini b/tox.ini index 2630857436..eccc44e436 100644 --- a/tox.ini +++ b/tox.ini @@ -200,8 +200,9 @@ commands = mypy \ synapse/replication \ synapse/rest \ synapse/spam_checker_api \ - synapse/storage/engines \ + synapse/storage/data_stores/main/ui_auth.py \ synapse/storage/database.py \ + synapse/storage/engines \ synapse/streams \ synapse/util/caches/stream_change_cache.py \ tests/replication/tcp/streams \ -- cgit 1.5.1