diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 25bf996d38..849693243c 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -24,7 +24,6 @@ import traceback
import warnings
from typing import TYPE_CHECKING, Awaitable, Callable, Iterable
-import auto_compressor
from cryptography.utils import CryptographyDeprecationWarning
from typing_extensions import NoReturn
@@ -43,15 +42,13 @@ from synapse.crypto import context_factory
from synapse.events.presence_router import load_legacy_presence_router
from synapse.events.spamcheck import load_legacy_spam_checkers
from synapse.events.third_party_rules import load_legacy_third_party_event_rules
-from synapse.logging.context import PreserveLoggingContext, defer_to_thread
-from synapse.metrics.background_process_metrics import (
- run_as_background_process,
- wrap_as_background_process,
-)
+from synapse.logging.context import PreserveLoggingContext
+from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.metrics.jemalloc import setup_jemalloc_stats
from synapse.util.caches.lrucache import setup_expire_lru_cache_entries
from synapse.util.daemonize import daemonize_process
from synapse.util.rlimit import change_resource_limit
+from synapse.util.state_compressor import setup_state_compressor
from synapse.util.versionstring import get_version_string
if TYPE_CHECKING:
@@ -424,49 +421,6 @@ async def start(hs: "HomeServer"):
atexit.register(gc.freeze)
-def setup_state_compressor(hs):
- """Schedules the state compressor to run regularly"""
- compressor_config = hs.config.statecompressor
- # Check that compressor is enabled
- if not compressor_config.enabled:
- return
-
- # Check that the database being used is postgres
- db_config = hs.config.database.get_single_database().config
- if db_config["name"] != "psycopg2":
- return
-
- # construct the database URL from the database config
- db_args = db_config["args"]
- db_url = "postgresql://{username}:{password}@{host}:{port}/{database}".format(
- username=db_args["user"],
- password=db_args["password"],
- host=db_args["host"],
- port=db_args["port"],
- database=db_args["database"],
- )
-
- # The method to be called periodically
- def run_state_compressor():
- run_as_background_process(
- desc="State Compressor",
- func=defer_to_thread,
- reactor=hs.get_reactor(),
- f=auto_compressor.compress_largest_rooms,
- db_url=db_url,
- chunk_size=compressor_config.compressor_chunk_size,
- default_levels=compressor_config.compressor_default_levels,
- number_of_rooms=compressor_config.compressor_number_of_rooms,
- )
-
- # Call the compressor every `time_between_runs` milliseconds
- clock = hs.get_clock()
- clock.looping_call(
- run_state_compressor,
- compressor_config.time_between_compressor_runs,
- )
-
-
def setup_sentry(hs):
"""Enable sentry integration, if enabled in configuration
diff --git a/synapse/config/state_compressor.py b/synapse/config/state_compressor.py
index d90474766f..434125b038 100644
--- a/synapse/config/state_compressor.py
+++ b/synapse/config/state_compressor.py
@@ -42,13 +42,13 @@ class StateCompressorConfig(Config):
def generate_config_section(self, **kwargs):
return """\
- # The state compressor is an experimental tool which attempts to
+ # The state compressor is an experimental tool which attempts to
# reduce the number of rows in the state_groups_state table
# of postgres databases.
#
# For more information please see
# https://matrix-org.github.io/synapse/latest/state_compressor.html
- #
+ #
state_compressor:
# enabled: true
# # The (rough) number of state groups to load at one time
@@ -58,5 +58,5 @@ class StateCompressorConfig(Config):
# # The default level sizes for the compressor to use
# default_levels: 100,50,25
# # How frequently to run the state compressor
- # time_between_runs: 1d
+ # time_between_runs: 1d
"""
diff --git a/synapse/util/state_compressor.py b/synapse/util/state_compressor.py
new file mode 100644
index 0000000000..eb4a895b34
--- /dev/null
+++ b/synapse/util/state_compressor.py
@@ -0,0 +1,68 @@
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from synapse.logging.context import defer_to_thread
+from synapse.metrics.background_process_metrics import run_as_background_process
+
+try:
+ import auto_compressor as state_compressor
+except ImportError:
+ state_compressor = None
+
+
+def setup_state_compressor(hs):
+ """Schedules the state compressor to run regularly"""
+
+ # Return if cannot import auto_compressor
+ if not state_compressor:
+ return
+
+ # Return if compressor isn't enabled
+ compressor_config = hs.config.statecompressor
+ if not compressor_config.compressor_enabled:
+ return
+
+ # Check that the database being used is postgres
+ db_config = hs.config.database.get_single_database().config
+ if db_config["name"] != "psycopg2":
+ return
+
+ # construct the database URL from the database config
+ db_args = db_config["args"]
+ db_url = "postgresql://{username}:{password}@{host}:{port}/{database}".format(
+ username=db_args["user"],
+ password=db_args["password"],
+ host=db_args["host"],
+ port=db_args["port"],
+ database=db_args["database"],
+ )
+
+ # The method to be called periodically
+ def run_state_compressor():
+ run_as_background_process(
+ desc="State Compressor",
+ func=defer_to_thread,
+ reactor=hs.get_reactor(),
+ f=state_compressor.compress_largest_rooms,
+ db_url=db_url,
+ chunk_size=compressor_config.compressor_chunk_size,
+ default_levels=compressor_config.compressor_default_levels,
+ number_of_rooms=compressor_config.compressor_number_of_rooms,
+ )
+
+ # Call the compressor every `time_between_runs` milliseconds
+ clock = hs.get_clock()
+ clock.looping_call(
+ run_state_compressor,
+ compressor_config.time_between_compressor_runs,
+ )
|