summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--docs/SUMMARY.md1
-rw-r--r--docs/sample_config.yaml35
-rw-r--r--docs/state_compressor.md47
-rw-r--r--mypy.ini4
-rw-r--r--synapse/app/_base.py4
-rw-r--r--synapse/config/_base.pyi2
-rw-r--r--synapse/config/homeserver.py2
-rw-r--r--synapse/config/state_compressor.py94
-rw-r--r--synapse/python_dependencies.py2
-rw-r--r--synapse/storage/database.py8
-rw-r--r--synapse/storage/databases/__init__.py2
-rw-r--r--synapse/util/state_compressor.py119
12 files changed, 319 insertions, 1 deletions
diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md

index bdb44543b8..beb6d017ab 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md
@@ -47,6 +47,7 @@ - [Workers](workers.md) - [Using `synctl` with Workers](synctl_workers.md) - [Systemd](systemd-with-workers/README.md) + - [State Compressor](state_compressor.md) - [Administration](usage/administration/README.md) - [Admin API](usage/administration/admin_api/README.md) - [Account Validity](admin_api/account_validity.md) diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml
index 166cec38d3..646019db55 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml
@@ -2648,3 +2648,38 @@ redis: # Optional password if configured on the Redis instance # #password: <secret_password> + + +## State compressor ## + +# The state compressor is an experimental tool which attempts to +# reduce the number of rows in the state_groups_state table +# of postgres databases. +# +# For more information please see +# https://matrix-org.github.io/synapse/latest/state_compressor.html +# +state_compressor: + # Whether the state compressor should run (defaults to false) + # Uncomment to enable it - Note, this requires the 'auto-compressor' + # library to be installed + # + #enabled: true + + # The (rough) number of state groups to load at one time. Defaults + # to 500. + # + #chunk_size: 1000 + + # The number of chunks to compress on each run. Defaults to 50. + # + #number_of_chunks: 1 + + # The default level sizes for the compressor to use. Defaults to + # 100,50,25. + # + #default_levels: 128,64,32. + + # How frequently to run the state compressor. Defaults to 1d + # + #time_between_runs: 1w diff --git a/docs/state_compressor.md b/docs/state_compressor.md new file mode 100644
index 0000000000..56f21a03cd --- /dev/null +++ b/docs/state_compressor.md
@@ -0,0 +1,47 @@ +# State compressor + +The state compressor is an **experimental** tool that attempts to reduce the number of rows +in the `state_groups_state` table inside of a postgres database. Documentation on how it works +can be found on [its github repository](https://github.com/matrix-org/rust-synapse-compress-state). + +## Enabling the state compressor + +The state compressor requires the python library for the `auto_compressor` tool to be +installed. Instructions for this can be found in [the `python.md` file in the source +repo](https://github.com/matrix-org/rust-synapse-compress-state/blob/main/docs/python.md). + +The following configuration options are provided: + +- `chunk_size` +The number of state groups to work on at once. All of the entries from +`state_groups_state` are requested from the database for state groups that are +worked on. Therefore small chunk sizes may be needed on machines with low memory. +Note: if the compressor fails to find space savings on the chunk as a whole +(which may well happen in rooms with lots of backfill in) then the entire chunk +is skipped. This defaults to 500 + +- `number_of_chunks` +The compressor will stop once it has finished compressing this many chunks. Defaults to 100 + +- `default_levels` +Sizes of each new level in the compression algorithm, as a comma separated list. +The first entry in the list is for the lowest, most granular level, with each +subsequent entry being for the next highest level. The number of entries in the +list determines the number of levels that will be used. The sum of the sizes of +the levels effect the performance of fetching the state from the database, as the +sum of the sizes is the upper bound on number of iterations needed to fetch a +given set of state. This defaults to "100,50,25" + +- `time_between_runs` +This controls how often the state compressor is run. This defaults to once every +day. + +An example configuration: +```yaml +state_compressor: + enabled: true + chunk_size: 500 + number_of_chunks: 5 + default_levels: 100,50,25 + time_between_runs: 1d +``` \ No newline at end of file diff --git a/mypy.ini b/mypy.ini
index 437d0a46a5..0343d2006b 100644 --- a/mypy.ini +++ b/mypy.ini
@@ -257,3 +257,7 @@ ignore_missing_imports = True [mypy-ijson.*] ignore_missing_imports = True + + +[mypy-psycopg2.*] +ignore_missing_imports = True diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 548f6dcde9..c99b3b7603 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py
@@ -48,6 +48,7 @@ from synapse.metrics.jemalloc import setup_jemalloc_stats from synapse.util.caches.lrucache import setup_expire_lru_cache_entries from synapse.util.daemonize import daemonize_process from synapse.util.rlimit import change_resource_limit +from synapse.util.state_compressor import setup_state_compressor from synapse.util.versionstring import get_version_string if TYPE_CHECKING: @@ -383,6 +384,9 @@ async def start(hs: "HomeServer"): # If we've configured an expiry time for caches, start the background job now. setup_expire_lru_cache_entries(hs) + # Schedule the state compressor to run + setup_state_compressor(hs) + # It is now safe to start your Synapse. hs.start_listening() hs.get_datastore().db_pool.start_profiling() diff --git a/synapse/config/_base.pyi b/synapse/config/_base.pyi
index 06fbd1166b..b925f692b0 100644 --- a/synapse/config/_base.pyi +++ b/synapse/config/_base.pyi
@@ -32,6 +32,7 @@ from synapse.config import ( server_notices, spam_checker, sso, + state_compressor, stats, third_party_event_rules, tls, @@ -91,6 +92,7 @@ class RootConfig: modules: modules.ModulesConfig caches: cache.CacheConfig federation: federation.FederationConfig + statecompressor: state_compressor.StateCompressorConfig config_classes: List = ... def __init__(self) -> None: ... diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py
index 442f1b9ac0..003cffdab9 100644 --- a/synapse/config/homeserver.py +++ b/synapse/config/homeserver.py
@@ -45,6 +45,7 @@ from .server import ServerConfig from .server_notices import ServerNoticesConfig from .spam_checker import SpamCheckerConfig from .sso import SSOConfig +from .state_compressor import StateCompressorConfig from .stats import StatsConfig from .third_party_event_rules import ThirdPartyRulesConfig from .tls import TlsConfig @@ -97,4 +98,5 @@ class HomeServerConfig(RootConfig): WorkerConfig, RedisConfig, ExperimentalConfig, + StateCompressorConfig, ] diff --git a/synapse/config/state_compressor.py b/synapse/config/state_compressor.py new file mode 100644
index 0000000000..92a0b7e533 --- /dev/null +++ b/synapse/config/state_compressor.py
@@ -0,0 +1,94 @@ +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.config._base import Config, ConfigError +from synapse.config._util import validate_config +from synapse.python_dependencies import DependencyException, check_requirements + + +class StateCompressorConfig(Config): + section = "statecompressor" + + def read_config(self, config, **kwargs): + compressor_config = config.get("state_compressor") or {} + validate_config( + _STATE_COMPRESSOR_SCHEMA, compressor_config, ("state_compressor",) + ) + self.compressor_enabled = compressor_config.get("enabled") or False + + if not self.compressor_enabled: + return + + try: + check_requirements("auto_compressor") + except DependencyException as e: + raise ConfigError from e + + self.compressor_chunk_size = compressor_config.get("chunk_size") or 500 + self.compressor_number_of_chunks = compressor_config.get("number_of_chunks") or 100 + self.compressor_default_levels = ( + compressor_config.get("default_levels") or "100,50,25" + ) + self.time_between_compressor_runs = self.parse_duration( + compressor_config.get("time_between_runs") or "1d" + ) + + def generate_config_section(self, **kwargs): + return """\ + ## State compressor ## + + # The state compressor is an experimental tool which attempts to + # reduce the number of rows in the state_groups_state table + # of postgres databases. + # + # For more information please see + # https://matrix-org.github.io/synapse/latest/state_compressor.html + # + state_compressor: + # Whether the state compressor should run (defaults to false) + # Uncomment to enable it - Note, this requires the 'auto-compressor' + # library to be installed + # + #enabled: true + + # The (rough) number of state groups to load at one time. Defaults + # to 500. + # + #chunk_size: 1000 + + # The number of chunks to compress on each run. Defaults to 100. + # + #number_of_chunks: 1 + + # The default level sizes for the compressor to use. Defaults to + # 100,50,25. + # + #default_levels: 128,64,32. + + # How frequently to run the state compressor. Defaults to 1d + # + #time_between_runs: 1w + """ + + +_STATE_COMPRESSOR_SCHEMA = { + "type": "object", + "properties": { + "enabled": {"type": "boolean"}, + "chunk_size": {"type": "number"}, + "number_of_chunks": {"type": "number"}, + "default_levels": {"type": "string"}, + "time_between_runs": {"type": "string"}, + }, +} diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 154e5b7028..7c5570976e 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py
@@ -114,6 +114,8 @@ CONDITIONAL_REQUIREMENTS = { "redis": ["txredisapi>=1.4.7", "hiredis"], # Required to use experimental `caches.track_memory_usage` config option. "cache_memory": ["pympler"], + # Needs to be manually installed to use + "auto_compressor": ["auto_compressor"], } ALL_OPTIONAL_REQUIREMENTS: Set[str] = set() diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index f5a8f90a0f..7f94cb00d6 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py
@@ -395,6 +395,7 @@ class DatabasePool: hs, database_config: DatabaseConnectionConfig, engine: BaseDatabaseEngine, + db_conn: LoggingDatabaseConnection, ): self.hs = hs self._clock = hs.get_clock() @@ -427,6 +428,13 @@ class DatabasePool: if isinstance(self.engine, Sqlite3Engine): self._unsafe_to_upsert_tables.add("user_directory_search") + # We store the connection info for later use when using postgres + # (primarily to allow things like the state auto compressor to connect + # to the DB). + self.postgres_connection_info_parameters: Optional[Dict] = None + if isinstance(self.engine, PostgresEngine): + self.postgres_connection_info_parameters = db_conn.info.dsn_parameters + if self.engine.can_native_upsert: # Check ASAP (and then later, every 1s) to see if we have finished # background updates of tables that aren't safe to update. diff --git a/synapse/storage/databases/__init__.py b/synapse/storage/databases/__init__.py
index 20b755056b..6f2d9a062e 100644 --- a/synapse/storage/databases/__init__.py +++ b/synapse/storage/databases/__init__.py
@@ -61,7 +61,7 @@ class Databases: databases=database_config.databases, ) - database = DatabasePool(hs, database_config, engine) + database = DatabasePool(hs, database_config, engine, db_conn) if "main" in database_config.databases: logger.info( diff --git a/synapse/util/state_compressor.py b/synapse/util/state_compressor.py new file mode 100644
index 0000000000..6ec58dc9c6 --- /dev/null +++ b/synapse/util/state_compressor.py
@@ -0,0 +1,119 @@ +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import TYPE_CHECKING + +from synapse.logging.context import defer_to_thread +from synapse.metrics.background_process_metrics import run_as_background_process + +try: + import auto_compressor as state_compressor +except ImportError: + state_compressor = None + +if TYPE_CHECKING: + from synapse.server import HomeServer + + +# The postgres connection options that the rust library understands. See +# https://docs.rs/tokio-postgres/0.7.2/tokio_postgres/config/struct.Config.html#keys +_VALID_POSTGRES_CONN_ARGS = { + "user", + "password", + "dbname", + "options", + "application_name", + "sslmode", + "host", + "port", + "connect_timeout", + "keepalives", + "keepalives_idle", + "target_session_attrs", + "channel_binding", +} + + +def setup_state_compressor(hs: "HomeServer"): + """Schedules the state compressor to run regularly""" + + # Return if cannot import auto_compressor + if not state_compressor or not hs.config.worker.run_background_tasks: + return + + # Return if compressor isn't enabled + compressor_config = hs.config.statecompressor + if not compressor_config.compressor_enabled: + return + + # Check that the database being used is postgres + db_config = None + for conf in hs.config.database.databases: + if "state" in conf.databases: + db_config = conf.config + break + + # One of the databases should have the state tables in + assert db_config is not None + + if db_config["name"] != "psycopg2": + return + + password = db_config.get("args").get("password") + + # Construct the database URL from the database config. + # + # This is a bit convoluted as the rust postgres library doesn't have a + # default host/user, so we use the existing Synapse connections to look up + # what parameters were used there. On the flip side, psycopg2 has some + # parameters that rust doesn't understand, so we need to filter them out. + # + # Note: we need to connect to the *state* database. + conn_info_params = hs.get_datastores().state.db_pool.postgres_connection_info_parameters + assert conn_info_params is not None + + effective_db_args = {} + for key, value in conn_info_params.items(): + if key in _VALID_POSTGRES_CONN_ARGS: + effective_db_args[key] = value + + # We cannot extract the password from the connection info, so use the value extracted + # from synapse's config + if password is not None: + effective_db_args["password"] = password + + # psycopg2 has a handy util function from going from dictionary to a DSN + # (postgres connection string.) + from psycopg2.extensions import make_dsn + + db_url = make_dsn("", **effective_db_args) + + # The method to be called periodically + def run_state_compressor(): + run_as_background_process( + desc="State Compressor", + func=defer_to_thread, + reactor=hs.get_reactor(), + f=state_compressor.compress_state_events_table, + db_url=db_url, + chunk_size=compressor_config.compressor_chunk_size, + default_levels=compressor_config.compressor_default_levels, + number_of_chunks=compressor_config.compressor_number_of_chunks, + ) + + # Call the compressor every `time_between_runs` milliseconds + clock = hs.get_clock() + clock.looping_call( + run_state_compressor, + compressor_config.time_between_compressor_runs, + )