summary refs log tree commit diff
path: root/synapse/config/cache.py
blob: d2f55534d7d10c8701d06b0b0bf01a1e4912236d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
# Copyright 2019-2021 Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import os
import re
import threading
from typing import Any, Callable, Dict, Optional

import attr

from synapse.types import JsonDict
from synapse.util.check_dependencies import DependencyException, check_requirements

from ._base import Config, ConfigError

logger = logging.getLogger(__name__)

# The prefix for all cache factor-related environment variables
_CACHE_PREFIX = "SYNAPSE_CACHE_FACTOR"

# Map from canonicalised cache name to cache.
_CACHES: Dict[str, Callable[[float], None]] = {}

# a lock on the contents of _CACHES
_CACHES_LOCK = threading.Lock()

_DEFAULT_FACTOR_SIZE = 0.5
_DEFAULT_EVENT_CACHE_SIZE = "10K"


@attr.s(slots=True, auto_attribs=True)
class CacheProperties:
    # The default factor size for all caches
    default_factor_size: float = float(
        os.environ.get(_CACHE_PREFIX, _DEFAULT_FACTOR_SIZE)
    )
    resize_all_caches_func: Optional[Callable[[], None]] = None


properties = CacheProperties()


def _canonicalise_cache_name(cache_name: str) -> str:
    """Gets the canonical form of the cache name.

    Since we specify cache names in config and environment variables we need to
    ignore case and special characters. For example, some caches have asterisks
    in their name to denote that they're not attached to a particular database
    function, and these asterisks need to be stripped out
    """

    cache_name = re.sub(r"[^A-Za-z_1-9]", "", cache_name)

    return cache_name.lower()


def add_resizable_cache(
    cache_name: str, cache_resize_callback: Callable[[float], None]
) -> None:
    """Register a cache whose size can dynamically change

    Args:
        cache_name: A reference to the cache
        cache_resize_callback: A callback function that will run whenever
            the cache needs to be resized
    """
    # Some caches have '*' in them which we strip out.
    cache_name = _canonicalise_cache_name(cache_name)

    # sometimes caches are initialised from background threads, so we need to make
    # sure we don't conflict with another thread running a resize operation
    with _CACHES_LOCK:
        _CACHES[cache_name] = cache_resize_callback

    # Ensure all loaded caches are sized appropriately
    #
    # This method should only run once the config has been read,
    # as it uses values read from it
    if properties.resize_all_caches_func:
        properties.resize_all_caches_func()


class CacheConfig(Config):
    section = "caches"
    _environ = os.environ

    event_cache_size: int
    cache_factors: Dict[str, float]
    global_factor: float
    track_memory_usage: bool
    expiry_time_msec: Optional[int]
    sync_response_cache_duration: int

    @staticmethod
    def reset() -> None:
        """Resets the caches to their defaults. Used for tests."""
        properties.default_factor_size = float(
            os.environ.get(_CACHE_PREFIX, _DEFAULT_FACTOR_SIZE)
        )
        properties.resize_all_caches_func = None
        with _CACHES_LOCK:
            _CACHES.clear()

    def generate_config_section(self, **kwargs: Any) -> str:
        return """\
        ## Caching ##

        # Caching can be configured through the following options.
        #
        # A cache 'factor' is a multiplier that can be applied to each of
        # Synapse's caches in order to increase or decrease the maximum
        # number of entries that can be stored.
        #
        # The configuration for cache factors (caches.global_factor and
        # caches.per_cache_factors) can be reloaded while the application is running,
        # by sending a SIGHUP signal to the Synapse process. Changes to other parts of
        # the caching config will NOT be applied after a SIGHUP is received; a restart
        # is necessary.

        # The number of events to cache in memory. Not affected by
        # caches.global_factor.
        #
        #event_cache_size: 10K

        caches:
          # Controls the global cache factor, which is the default cache factor
          # for all caches if a specific factor for that cache is not otherwise
          # set.
          #
          # This can also be set by the "SYNAPSE_CACHE_FACTOR" environment
          # variable. Setting by environment variable takes priority over
          # setting through the config file.
          #
          # Defaults to 0.5, which will half the size of all caches.
          #
          #global_factor: 1.0

          # A dictionary of cache name to cache factor for that individual
          # cache. Overrides the global cache factor for a given cache.
          #
          # These can also be set through environment variables comprised
          # of "SYNAPSE_CACHE_FACTOR_" + the name of the cache in capital
          # letters and underscores. Setting by environment variable
          # takes priority over setting through the config file.
          # Ex. SYNAPSE_CACHE_FACTOR_GET_USERS_WHO_SHARE_ROOM_WITH_USER=2.0
          #
          # Some caches have '*' and other characters that are not
          # alphanumeric or underscores. These caches can be named with or
          # without the special characters stripped. For example, to specify
          # the cache factor for `*stateGroupCache*` via an environment
          # variable would be `SYNAPSE_CACHE_FACTOR_STATEGROUPCACHE=2.0`.
          #
          per_cache_factors:
            #get_users_who_share_room_with_user: 2.0

          # Controls whether cache entries are evicted after a specified time
          # period. Defaults to true. Uncomment to disable this feature.
          #
          #expire_caches: false

          # If expire_caches is enabled, this flag controls how long an entry can
          # be in a cache without having been accessed before being evicted.
          # Defaults to 30m. Uncomment to set a different time to live for cache entries.
          #
          #cache_entry_ttl: 30m

          # This flag enables cache autotuning, and is further specified by the sub-options `max_cache_memory_usage`,
          # `target_cache_memory_usage`, `min_cache_ttl`. These flags work in conjunction with each other to maintain
          # a balance between cache memory usage and cache entry availability. You must be using jemalloc to utilize
          # this option, and all three of the options must be specified for this feature to work.
          #cache_autotuning:
            # This flag sets a ceiling on much memory the cache can use before caches begin to be continuously evicted.
            # They will continue to be evicted until the memory usage drops below the `target_memory_usage`, set in
            # the flag below, or until the `min_cache_ttl` is hit.
            #max_cache_memory_usage: 1024M

            # This flag sets a rough target for the desired memory usage of the caches.
            #target_cache_memory_usage: 758M

            # 'min_cache_ttl` sets a limit under which newer cache entries are not evicted and is only applied when
            # caches are actively being evicted/`max_cache_memory_usage` has been exceeded. This is to protect hot caches
            # from being emptied while Synapse is evicting due to memory.
            #min_cache_ttl: 5m

          # Controls how long the results of a /sync request are cached for after
          # a successful response is returned. A higher duration can help clients with
          # intermittent connections, at the cost of higher memory usage.
          #
          # By default, this is zero, which means that sync responses are not cached
          # at all.
          #
          #sync_response_cache_duration: 2m
        """

    def read_config(self, config: JsonDict, **kwargs: Any) -> None:
        """Populate this config object with values from `config`.

        This method does NOT resize existing or future caches: use `resize_all_caches`.
        We use two separate methods so that we can reject bad config before applying it.
        """
        self.event_cache_size = self.parse_size(
            config.get("event_cache_size", _DEFAULT_EVENT_CACHE_SIZE)
        )
        self.cache_factors = {}

        cache_config = config.get("caches") or {}
        self.global_factor = cache_config.get("global_factor", _DEFAULT_FACTOR_SIZE)
        if not isinstance(self.global_factor, (int, float)):
            raise ConfigError("caches.global_factor must be a number.")

        # Load cache factors from the config
        individual_factors = cache_config.get("per_cache_factors") or {}
        if not isinstance(individual_factors, dict):
            raise ConfigError("caches.per_cache_factors must be a dictionary")

        # Canonicalise the cache names *before* updating with the environment
        # variables.
        individual_factors = {
            _canonicalise_cache_name(key): val
            for key, val in individual_factors.items()
        }

        # Override factors from environment if necessary
        individual_factors.update(
            {
                _canonicalise_cache_name(key[len(_CACHE_PREFIX) + 1 :]): float(val)
                for key, val in self._environ.items()
                if key.startswith(_CACHE_PREFIX + "_")
            }
        )

        for cache, factor in individual_factors.items():
            if not isinstance(factor, (int, float)):
                raise ConfigError(
                    "caches.per_cache_factors.%s must be a number" % (cache,)
                )
            self.cache_factors[cache] = factor

        self.track_memory_usage = cache_config.get("track_memory_usage", False)
        if self.track_memory_usage:
            try:
                check_requirements("cache_memory")
            except DependencyException as e:
                raise ConfigError(
                    e.message  # noqa: B306, DependencyException.message is a property
                )

        expire_caches = cache_config.get("expire_caches", True)
        cache_entry_ttl = cache_config.get("cache_entry_ttl", "30m")

        if expire_caches:
            self.expiry_time_msec = self.parse_duration(cache_entry_ttl)
        else:
            self.expiry_time_msec = None

        # Backwards compatibility support for the now-removed "expiry_time" config flag.
        expiry_time = cache_config.get("expiry_time")

        if expiry_time and expire_caches:
            logger.warning(
                "You have set two incompatible options, expiry_time and expire_caches. Please only use the "
                "expire_caches and cache_entry_ttl options and delete the expiry_time option as it is "
                "deprecated."
            )
        if expiry_time:
            logger.warning(
                "Expiry_time is a deprecated option, please use the expire_caches and cache_entry_ttl options "
                "instead."
            )
            self.expiry_time_msec = self.parse_duration(expiry_time)

        self.cache_autotuning = cache_config.get("cache_autotuning")
        if self.cache_autotuning:
            max_memory_usage = self.cache_autotuning.get("max_cache_memory_usage")
            self.cache_autotuning["max_cache_memory_usage"] = self.parse_size(
                max_memory_usage
            )

            target_mem_size = self.cache_autotuning.get("target_cache_memory_usage")
            self.cache_autotuning["target_cache_memory_usage"] = self.parse_size(
                target_mem_size
            )

            min_cache_ttl = self.cache_autotuning.get("min_cache_ttl")
            self.cache_autotuning["min_cache_ttl"] = self.parse_duration(min_cache_ttl)

        self.sync_response_cache_duration = self.parse_duration(
            cache_config.get("sync_response_cache_duration", 0)
        )

    def resize_all_caches(self) -> None:
        """Ensure all cache sizes are up-to-date.

        For each cache, run the mapped callback function with either
        a specific cache factor or the default, global one.
        """
        # Set the global factor size, so that new caches are appropriately sized.
        properties.default_factor_size = self.global_factor

        # Store this function so that it can be called from other classes without
        # needing an instance of CacheConfig
        properties.resize_all_caches_func = self.resize_all_caches

        # block other threads from modifying _CACHES while we iterate it.
        with _CACHES_LOCK:
            for cache_name, callback in _CACHES.items():
                new_factor = self.cache_factors.get(cache_name, self.global_factor)
                callback(new_factor)