1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import TYPE_CHECKING
from synapse.logging.context import defer_to_thread
from synapse.metrics.background_process_metrics import run_as_background_process
try:
import auto_compressor as state_compressor
except ImportError:
state_compressor = None
if TYPE_CHECKING:
from synapse.server import HomeServer
# The postgres connection options that the rust library understands. See
# https://docs.rs/tokio-postgres/0.7.2/tokio_postgres/config/struct.Config.html#keys
_VALID_POSTGRES_CONN_ARGS = {
"user",
"password",
"dbname",
"options",
"application_name",
"sslmode",
"host",
"port",
"connect_timeout",
"keepalives",
"keepalives_idle",
"target_session_attrs",
"channel_binding",
}
def setup_state_compressor(hs: "HomeServer"):
"""Schedules the state compressor to run regularly"""
# Return if cannot import auto_compressor
if not state_compressor or not hs.config.worker.run_background_tasks:
return
# Return if compressor isn't enabled
compressor_config = hs.config.statecompressor
if not compressor_config.compressor_enabled:
return
# Check that the database being used is postgres
if hs.get_datastores().state is not None:
for conf in hs.config.database.databases:
if conf.name == "state":
db_config = conf.config
break
else:
for conf in hs.config.database.databases:
if conf.name == "master":
db_config = conf.config
break
if db_config["name"] != "psycopg2":
return
password = db_config.get("args").get("password")
# Construct the database URL from the database config.
#
# This is a bit convoluted as the rust postgres library doesn't have a
# default host/user, so we use the existing Synapse connections to look up
# what parameters were used there. On the flip side, psycopg2 has some
# parameters that rust doesn't understand, so we need to filter them out.
#
# Note: we need to connect to the *state* database.
conn_info_params = hs.get_datastores().state.db_pool.postgres_connection_info_parameters
assert conn_info_params is not None
effective_db_args = {}
for key, value in conn_info_params.items():
if key in _VALID_POSTGRES_CONN_ARGS:
effective_db_args[key] = value
# We cannot extract the password from the connection info, so use the value extracted
# from synapse's config
if password is not None:
effective_db_args["password"] = password
# psycopg2 has a handy util function from going from dictionary to a DSN
# (postgres connection string.)
from psycopg2.extensions import make_dsn
db_url = make_dsn("", **effective_db_args)
# The method to be called periodically
def run_state_compressor():
run_as_background_process(
desc="State Compressor",
func=defer_to_thread,
reactor=hs.get_reactor(),
f=state_compressor.compress_state_events_table,
db_url=db_url,
chunk_size=compressor_config.compressor_chunk_size,
default_levels=compressor_config.compressor_default_levels,
number_of_chunks=compressor_config.compressor_number_of_chunks,
)
# Call the compressor every `time_between_runs` milliseconds
clock = hs.get_clock()
clock.looping_call(
run_state_compressor,
compressor_config.time_between_compressor_runs,
)
|