summary refs log tree commit diff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/app/_base.py52
-rw-r--r--synapse/config/state_compressor.py6
-rw-r--r--synapse/util/state_compressor.py68
3 files changed, 74 insertions, 52 deletions
diff --git a/synapse/app/_base.py b/synapse/app/_base.py

index 25bf996d38..849693243c 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py
@@ -24,7 +24,6 @@ import traceback import warnings from typing import TYPE_CHECKING, Awaitable, Callable, Iterable -import auto_compressor from cryptography.utils import CryptographyDeprecationWarning from typing_extensions import NoReturn @@ -43,15 +42,13 @@ from synapse.crypto import context_factory from synapse.events.presence_router import load_legacy_presence_router from synapse.events.spamcheck import load_legacy_spam_checkers from synapse.events.third_party_rules import load_legacy_third_party_event_rules -from synapse.logging.context import PreserveLoggingContext, defer_to_thread -from synapse.metrics.background_process_metrics import ( - run_as_background_process, - wrap_as_background_process, -) +from synapse.logging.context import PreserveLoggingContext +from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.metrics.jemalloc import setup_jemalloc_stats from synapse.util.caches.lrucache import setup_expire_lru_cache_entries from synapse.util.daemonize import daemonize_process from synapse.util.rlimit import change_resource_limit +from synapse.util.state_compressor import setup_state_compressor from synapse.util.versionstring import get_version_string if TYPE_CHECKING: @@ -424,49 +421,6 @@ async def start(hs: "HomeServer"): atexit.register(gc.freeze) -def setup_state_compressor(hs): - """Schedules the state compressor to run regularly""" - compressor_config = hs.config.statecompressor - # Check that compressor is enabled - if not compressor_config.enabled: - return - - # Check that the database being used is postgres - db_config = hs.config.database.get_single_database().config - if db_config["name"] != "psycopg2": - return - - # construct the database URL from the database config - db_args = db_config["args"] - db_url = "postgresql://{username}:{password}@{host}:{port}/{database}".format( - username=db_args["user"], - password=db_args["password"], - host=db_args["host"], - port=db_args["port"], - database=db_args["database"], - ) - - # The method to be called periodically - def run_state_compressor(): - run_as_background_process( - desc="State Compressor", - func=defer_to_thread, - reactor=hs.get_reactor(), - f=auto_compressor.compress_largest_rooms, - db_url=db_url, - chunk_size=compressor_config.compressor_chunk_size, - default_levels=compressor_config.compressor_default_levels, - number_of_rooms=compressor_config.compressor_number_of_rooms, - ) - - # Call the compressor every `time_between_runs` milliseconds - clock = hs.get_clock() - clock.looping_call( - run_state_compressor, - compressor_config.time_between_compressor_runs, - ) - - def setup_sentry(hs): """Enable sentry integration, if enabled in configuration diff --git a/synapse/config/state_compressor.py b/synapse/config/state_compressor.py
index d90474766f..434125b038 100644 --- a/synapse/config/state_compressor.py +++ b/synapse/config/state_compressor.py
@@ -42,13 +42,13 @@ class StateCompressorConfig(Config): def generate_config_section(self, **kwargs): return """\ - # The state compressor is an experimental tool which attempts to + # The state compressor is an experimental tool which attempts to # reduce the number of rows in the state_groups_state table # of postgres databases. # # For more information please see # https://matrix-org.github.io/synapse/latest/state_compressor.html - # + # state_compressor: # enabled: true # # The (rough) number of state groups to load at one time @@ -58,5 +58,5 @@ class StateCompressorConfig(Config): # # The default level sizes for the compressor to use # default_levels: 100,50,25 # # How frequently to run the state compressor - # time_between_runs: 1d + # time_between_runs: 1d """ diff --git a/synapse/util/state_compressor.py b/synapse/util/state_compressor.py new file mode 100644
index 0000000000..eb4a895b34 --- /dev/null +++ b/synapse/util/state_compressor.py
@@ -0,0 +1,68 @@ +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from synapse.logging.context import defer_to_thread +from synapse.metrics.background_process_metrics import run_as_background_process + +try: + import auto_compressor as state_compressor +except ImportError: + state_compressor = None + + +def setup_state_compressor(hs): + """Schedules the state compressor to run regularly""" + + # Return if cannot import auto_compressor + if not state_compressor: + return + + # Return if compressor isn't enabled + compressor_config = hs.config.statecompressor + if not compressor_config.compressor_enabled: + return + + # Check that the database being used is postgres + db_config = hs.config.database.get_single_database().config + if db_config["name"] != "psycopg2": + return + + # construct the database URL from the database config + db_args = db_config["args"] + db_url = "postgresql://{username}:{password}@{host}:{port}/{database}".format( + username=db_args["user"], + password=db_args["password"], + host=db_args["host"], + port=db_args["port"], + database=db_args["database"], + ) + + # The method to be called periodically + def run_state_compressor(): + run_as_background_process( + desc="State Compressor", + func=defer_to_thread, + reactor=hs.get_reactor(), + f=state_compressor.compress_largest_rooms, + db_url=db_url, + chunk_size=compressor_config.compressor_chunk_size, + default_levels=compressor_config.compressor_default_levels, + number_of_rooms=compressor_config.compressor_number_of_rooms, + ) + + # Call the compressor every `time_between_runs` milliseconds + clock = hs.get_clock() + clock.looping_call( + run_state_compressor, + compressor_config.time_between_compressor_runs, + )