summary refs log tree commit diff
path: root/synapse/util/state_compressor.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/util/state_compressor.py')
-rw-r--r--synapse/util/state_compressor.py119
1 files changed, 119 insertions, 0 deletions
diff --git a/synapse/util/state_compressor.py b/synapse/util/state_compressor.py
new file mode 100644

index 0000000000..6ec58dc9c6 --- /dev/null +++ b/synapse/util/state_compressor.py
@@ -0,0 +1,119 @@ +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import TYPE_CHECKING + +from synapse.logging.context import defer_to_thread +from synapse.metrics.background_process_metrics import run_as_background_process + +try: + import auto_compressor as state_compressor +except ImportError: + state_compressor = None + +if TYPE_CHECKING: + from synapse.server import HomeServer + + +# The postgres connection options that the rust library understands. See +# https://docs.rs/tokio-postgres/0.7.2/tokio_postgres/config/struct.Config.html#keys +_VALID_POSTGRES_CONN_ARGS = { + "user", + "password", + "dbname", + "options", + "application_name", + "sslmode", + "host", + "port", + "connect_timeout", + "keepalives", + "keepalives_idle", + "target_session_attrs", + "channel_binding", +} + + +def setup_state_compressor(hs: "HomeServer"): + """Schedules the state compressor to run regularly""" + + # Return if cannot import auto_compressor + if not state_compressor or not hs.config.worker.run_background_tasks: + return + + # Return if compressor isn't enabled + compressor_config = hs.config.statecompressor + if not compressor_config.compressor_enabled: + return + + # Check that the database being used is postgres + db_config = None + for conf in hs.config.database.databases: + if "state" in conf.databases: + db_config = conf.config + break + + # One of the databases should have the state tables in + assert db_config is not None + + if db_config["name"] != "psycopg2": + return + + password = db_config.get("args").get("password") + + # Construct the database URL from the database config. + # + # This is a bit convoluted as the rust postgres library doesn't have a + # default host/user, so we use the existing Synapse connections to look up + # what parameters were used there. On the flip side, psycopg2 has some + # parameters that rust doesn't understand, so we need to filter them out. + # + # Note: we need to connect to the *state* database. + conn_info_params = hs.get_datastores().state.db_pool.postgres_connection_info_parameters + assert conn_info_params is not None + + effective_db_args = {} + for key, value in conn_info_params.items(): + if key in _VALID_POSTGRES_CONN_ARGS: + effective_db_args[key] = value + + # We cannot extract the password from the connection info, so use the value extracted + # from synapse's config + if password is not None: + effective_db_args["password"] = password + + # psycopg2 has a handy util function from going from dictionary to a DSN + # (postgres connection string.) + from psycopg2.extensions import make_dsn + + db_url = make_dsn("", **effective_db_args) + + # The method to be called periodically + def run_state_compressor(): + run_as_background_process( + desc="State Compressor", + func=defer_to_thread, + reactor=hs.get_reactor(), + f=state_compressor.compress_state_events_table, + db_url=db_url, + chunk_size=compressor_config.compressor_chunk_size, + default_levels=compressor_config.compressor_default_levels, + number_of_chunks=compressor_config.compressor_number_of_chunks, + ) + + # Call the compressor every `time_between_runs` milliseconds + clock = hs.get_clock() + clock.looping_call( + run_state_compressor, + compressor_config.time_between_compressor_runs, + )