diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md
index bdb44543b8..beb6d017ab 100644
--- a/docs/SUMMARY.md
+++ b/docs/SUMMARY.md
@@ -47,6 +47,7 @@
- [Workers](workers.md)
- [Using `synctl` with Workers](synctl_workers.md)
- [Systemd](systemd-with-workers/README.md)
+ - [State Compressor](state_compressor.md)
- [Administration](usage/administration/README.md)
- [Admin API](usage/administration/admin_api/README.md)
- [Account Validity](admin_api/account_validity.md)
diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml
index 166cec38d3..646019db55 100644
--- a/docs/sample_config.yaml
+++ b/docs/sample_config.yaml
@@ -2648,3 +2648,38 @@ redis:
# Optional password if configured on the Redis instance
#
#password: <secret_password>
+
+
+## State compressor ##
+
+# The state compressor is an experimental tool which attempts to
+# reduce the number of rows in the state_groups_state table
+# of postgres databases.
+#
+# For more information please see
+# https://matrix-org.github.io/synapse/latest/state_compressor.html
+#
+state_compressor:
+ # Whether the state compressor should run (defaults to false)
+ # Uncomment to enable it - Note, this requires the 'auto-compressor'
+ # library to be installed
+ #
+ #enabled: true
+
+ # The (rough) number of state groups to load at one time. Defaults
+ # to 500.
+ #
+ #chunk_size: 1000
+
+ # The number of chunks to compress on each run. Defaults to 50.
+ #
+ #number_of_chunks: 1
+
+ # The default level sizes for the compressor to use. Defaults to
+ # 100,50,25.
+ #
+ #default_levels: 128,64,32.
+
+ # How frequently to run the state compressor. Defaults to 1d
+ #
+ #time_between_runs: 1w
diff --git a/docs/state_compressor.md b/docs/state_compressor.md
new file mode 100644
index 0000000000..56f21a03cd
--- /dev/null
+++ b/docs/state_compressor.md
@@ -0,0 +1,47 @@
+# State compressor
+
+The state compressor is an **experimental** tool that attempts to reduce the number of rows
+in the `state_groups_state` table inside of a postgres database. Documentation on how it works
+can be found on [its github repository](https://github.com/matrix-org/rust-synapse-compress-state).
+
+## Enabling the state compressor
+
+The state compressor requires the python library for the `auto_compressor` tool to be
+installed. Instructions for this can be found in [the `python.md` file in the source
+repo](https://github.com/matrix-org/rust-synapse-compress-state/blob/main/docs/python.md).
+
+The following configuration options are provided:
+
+- `chunk_size`
+The number of state groups to work on at once. All of the entries from
+`state_groups_state` are requested from the database for state groups that are
+worked on. Therefore small chunk sizes may be needed on machines with low memory.
+Note: if the compressor fails to find space savings on the chunk as a whole
+(which may well happen in rooms with lots of backfill in) then the entire chunk
+is skipped. This defaults to 500
+
+- `number_of_chunks`
+The compressor will stop once it has finished compressing this many chunks. Defaults to 100
+
+- `default_levels`
+Sizes of each new level in the compression algorithm, as a comma separated list.
+The first entry in the list is for the lowest, most granular level, with each
+subsequent entry being for the next highest level. The number of entries in the
+list determines the number of levels that will be used. The sum of the sizes of
+the levels effect the performance of fetching the state from the database, as the
+sum of the sizes is the upper bound on number of iterations needed to fetch a
+given set of state. This defaults to "100,50,25"
+
+- `time_between_runs`
+This controls how often the state compressor is run. This defaults to once every
+day.
+
+An example configuration:
+```yaml
+state_compressor:
+ enabled: true
+ chunk_size: 500
+ number_of_chunks: 5
+ default_levels: 100,50,25
+ time_between_runs: 1d
+```
\ No newline at end of file
diff --git a/mypy.ini b/mypy.ini
index 437d0a46a5..0343d2006b 100644
--- a/mypy.ini
+++ b/mypy.ini
@@ -257,3 +257,7 @@ ignore_missing_imports = True
[mypy-ijson.*]
ignore_missing_imports = True
+
+
+[mypy-psycopg2.*]
+ignore_missing_imports = True
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 548f6dcde9..c99b3b7603 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -48,6 +48,7 @@ from synapse.metrics.jemalloc import setup_jemalloc_stats
from synapse.util.caches.lrucache import setup_expire_lru_cache_entries
from synapse.util.daemonize import daemonize_process
from synapse.util.rlimit import change_resource_limit
+from synapse.util.state_compressor import setup_state_compressor
from synapse.util.versionstring import get_version_string
if TYPE_CHECKING:
@@ -383,6 +384,9 @@ async def start(hs: "HomeServer"):
# If we've configured an expiry time for caches, start the background job now.
setup_expire_lru_cache_entries(hs)
+ # Schedule the state compressor to run
+ setup_state_compressor(hs)
+
# It is now safe to start your Synapse.
hs.start_listening()
hs.get_datastore().db_pool.start_profiling()
diff --git a/synapse/config/_base.pyi b/synapse/config/_base.pyi
index 06fbd1166b..b925f692b0 100644
--- a/synapse/config/_base.pyi
+++ b/synapse/config/_base.pyi
@@ -32,6 +32,7 @@ from synapse.config import (
server_notices,
spam_checker,
sso,
+ state_compressor,
stats,
third_party_event_rules,
tls,
@@ -91,6 +92,7 @@ class RootConfig:
modules: modules.ModulesConfig
caches: cache.CacheConfig
federation: federation.FederationConfig
+ statecompressor: state_compressor.StateCompressorConfig
config_classes: List = ...
def __init__(self) -> None: ...
diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py
index 442f1b9ac0..003cffdab9 100644
--- a/synapse/config/homeserver.py
+++ b/synapse/config/homeserver.py
@@ -45,6 +45,7 @@ from .server import ServerConfig
from .server_notices import ServerNoticesConfig
from .spam_checker import SpamCheckerConfig
from .sso import SSOConfig
+from .state_compressor import StateCompressorConfig
from .stats import StatsConfig
from .third_party_event_rules import ThirdPartyRulesConfig
from .tls import TlsConfig
@@ -97,4 +98,5 @@ class HomeServerConfig(RootConfig):
WorkerConfig,
RedisConfig,
ExperimentalConfig,
+ StateCompressorConfig,
]
diff --git a/synapse/config/state_compressor.py b/synapse/config/state_compressor.py
new file mode 100644
index 0000000000..92a0b7e533
--- /dev/null
+++ b/synapse/config/state_compressor.py
@@ -0,0 +1,94 @@
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.config._base import Config, ConfigError
+from synapse.config._util import validate_config
+from synapse.python_dependencies import DependencyException, check_requirements
+
+
+class StateCompressorConfig(Config):
+ section = "statecompressor"
+
+ def read_config(self, config, **kwargs):
+ compressor_config = config.get("state_compressor") or {}
+ validate_config(
+ _STATE_COMPRESSOR_SCHEMA, compressor_config, ("state_compressor",)
+ )
+ self.compressor_enabled = compressor_config.get("enabled") or False
+
+ if not self.compressor_enabled:
+ return
+
+ try:
+ check_requirements("auto_compressor")
+ except DependencyException as e:
+ raise ConfigError from e
+
+ self.compressor_chunk_size = compressor_config.get("chunk_size") or 500
+ self.compressor_number_of_chunks = compressor_config.get("number_of_chunks") or 100
+ self.compressor_default_levels = (
+ compressor_config.get("default_levels") or "100,50,25"
+ )
+ self.time_between_compressor_runs = self.parse_duration(
+ compressor_config.get("time_between_runs") or "1d"
+ )
+
+ def generate_config_section(self, **kwargs):
+ return """\
+ ## State compressor ##
+
+ # The state compressor is an experimental tool which attempts to
+ # reduce the number of rows in the state_groups_state table
+ # of postgres databases.
+ #
+ # For more information please see
+ # https://matrix-org.github.io/synapse/latest/state_compressor.html
+ #
+ state_compressor:
+ # Whether the state compressor should run (defaults to false)
+ # Uncomment to enable it - Note, this requires the 'auto-compressor'
+ # library to be installed
+ #
+ #enabled: true
+
+ # The (rough) number of state groups to load at one time. Defaults
+ # to 500.
+ #
+ #chunk_size: 1000
+
+ # The number of chunks to compress on each run. Defaults to 100.
+ #
+ #number_of_chunks: 1
+
+ # The default level sizes for the compressor to use. Defaults to
+ # 100,50,25.
+ #
+ #default_levels: 128,64,32.
+
+ # How frequently to run the state compressor. Defaults to 1d
+ #
+ #time_between_runs: 1w
+ """
+
+
+_STATE_COMPRESSOR_SCHEMA = {
+ "type": "object",
+ "properties": {
+ "enabled": {"type": "boolean"},
+ "chunk_size": {"type": "number"},
+ "number_of_chunks": {"type": "number"},
+ "default_levels": {"type": "string"},
+ "time_between_runs": {"type": "string"},
+ },
+}
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 154e5b7028..7c5570976e 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -114,6 +114,8 @@ CONDITIONAL_REQUIREMENTS = {
"redis": ["txredisapi>=1.4.7", "hiredis"],
# Required to use experimental `caches.track_memory_usage` config option.
"cache_memory": ["pympler"],
+ # Needs to be manually installed to use
+ "auto_compressor": ["auto_compressor"],
}
ALL_OPTIONAL_REQUIREMENTS: Set[str] = set()
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index f5a8f90a0f..7f94cb00d6 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -395,6 +395,7 @@ class DatabasePool:
hs,
database_config: DatabaseConnectionConfig,
engine: BaseDatabaseEngine,
+ db_conn: LoggingDatabaseConnection,
):
self.hs = hs
self._clock = hs.get_clock()
@@ -427,6 +428,13 @@ class DatabasePool:
if isinstance(self.engine, Sqlite3Engine):
self._unsafe_to_upsert_tables.add("user_directory_search")
+ # We store the connection info for later use when using postgres
+ # (primarily to allow things like the state auto compressor to connect
+ # to the DB).
+ self.postgres_connection_info_parameters: Optional[Dict] = None
+ if isinstance(self.engine, PostgresEngine):
+ self.postgres_connection_info_parameters = db_conn.info.dsn_parameters
+
if self.engine.can_native_upsert:
# Check ASAP (and then later, every 1s) to see if we have finished
# background updates of tables that aren't safe to update.
diff --git a/synapse/storage/databases/__init__.py b/synapse/storage/databases/__init__.py
index 20b755056b..6f2d9a062e 100644
--- a/synapse/storage/databases/__init__.py
+++ b/synapse/storage/databases/__init__.py
@@ -61,7 +61,7 @@ class Databases:
databases=database_config.databases,
)
- database = DatabasePool(hs, database_config, engine)
+ database = DatabasePool(hs, database_config, engine, db_conn)
if "main" in database_config.databases:
logger.info(
diff --git a/synapse/util/state_compressor.py b/synapse/util/state_compressor.py
new file mode 100644
index 0000000000..6ec58dc9c6
--- /dev/null
+++ b/synapse/util/state_compressor.py
@@ -0,0 +1,119 @@
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from typing import TYPE_CHECKING
+
+from synapse.logging.context import defer_to_thread
+from synapse.metrics.background_process_metrics import run_as_background_process
+
+try:
+ import auto_compressor as state_compressor
+except ImportError:
+ state_compressor = None
+
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
+
+# The postgres connection options that the rust library understands. See
+# https://docs.rs/tokio-postgres/0.7.2/tokio_postgres/config/struct.Config.html#keys
+_VALID_POSTGRES_CONN_ARGS = {
+ "user",
+ "password",
+ "dbname",
+ "options",
+ "application_name",
+ "sslmode",
+ "host",
+ "port",
+ "connect_timeout",
+ "keepalives",
+ "keepalives_idle",
+ "target_session_attrs",
+ "channel_binding",
+}
+
+
+def setup_state_compressor(hs: "HomeServer"):
+ """Schedules the state compressor to run regularly"""
+
+ # Return if cannot import auto_compressor
+ if not state_compressor or not hs.config.worker.run_background_tasks:
+ return
+
+ # Return if compressor isn't enabled
+ compressor_config = hs.config.statecompressor
+ if not compressor_config.compressor_enabled:
+ return
+
+ # Check that the database being used is postgres
+ db_config = None
+ for conf in hs.config.database.databases:
+ if "state" in conf.databases:
+ db_config = conf.config
+ break
+
+ # One of the databases should have the state tables in
+ assert db_config is not None
+
+ if db_config["name"] != "psycopg2":
+ return
+
+ password = db_config.get("args").get("password")
+
+ # Construct the database URL from the database config.
+ #
+ # This is a bit convoluted as the rust postgres library doesn't have a
+ # default host/user, so we use the existing Synapse connections to look up
+ # what parameters were used there. On the flip side, psycopg2 has some
+ # parameters that rust doesn't understand, so we need to filter them out.
+ #
+ # Note: we need to connect to the *state* database.
+ conn_info_params = hs.get_datastores().state.db_pool.postgres_connection_info_parameters
+ assert conn_info_params is not None
+
+ effective_db_args = {}
+ for key, value in conn_info_params.items():
+ if key in _VALID_POSTGRES_CONN_ARGS:
+ effective_db_args[key] = value
+
+ # We cannot extract the password from the connection info, so use the value extracted
+ # from synapse's config
+ if password is not None:
+ effective_db_args["password"] = password
+
+ # psycopg2 has a handy util function from going from dictionary to a DSN
+ # (postgres connection string.)
+ from psycopg2.extensions import make_dsn
+
+ db_url = make_dsn("", **effective_db_args)
+
+ # The method to be called periodically
+ def run_state_compressor():
+ run_as_background_process(
+ desc="State Compressor",
+ func=defer_to_thread,
+ reactor=hs.get_reactor(),
+ f=state_compressor.compress_state_events_table,
+ db_url=db_url,
+ chunk_size=compressor_config.compressor_chunk_size,
+ default_levels=compressor_config.compressor_default_levels,
+ number_of_chunks=compressor_config.compressor_number_of_chunks,
+ )
+
+ # Call the compressor every `time_between_runs` milliseconds
+ clock = hs.get_clock()
+ clock.looping_call(
+ run_state_compressor,
+ compressor_config.time_between_compressor_runs,
+ )
|