diff options
author | Neil Johnson <neil@matrix.org> | 2018-06-06 12:27:33 +0100 |
---|---|---|
committer | Neil Johnson <neil@matrix.org> | 2018-06-06 12:27:33 +0100 |
commit | 752b7b32ed1c33651c0c64fbbb4289c3b62ac89b (patch) | |
tree | e120f6fffa36a2d4f6eae37f555be078940b7854 /synapse/util | |
parent | Merge pull request #3290 from rubo77/patch-7 (diff) | |
parent | 7 char sha in changelog (diff) | |
download | synapse-752b7b32ed1c33651c0c64fbbb4289c3b62ac89b.tar.xz |
Merge tag 'v0.31.0'
Changes in synapse v0.31.0 (2018-06-06) ====================================== Most notable change from v0.30.0 is to switch to python prometheus library to improve system stats reporting. WARNING this changes a number of prometheus metrics in a backwards-incompatible manner. For more details, see `docs/metrics-howto.rst <docs/metrics-howto.rst#removal-of-deprecated-metrics--time-based-counters-becoming-histograms-in-0310>`_. Bug Fixes: * Fix metric documentation tables (PR #3341) * Fix LaterGuage error handling (694968f) * Fix replication metrics (b7e7fd2) Changes in synapse v0.31.0-rc1 (2018-06-04) ========================================== Features: * Switch to the Python Prometheus library (PR #3256, #3274) * Let users leave the server notice room after joining (PR #3287) Changes: * daily user type phone home stats (PR #3264) * Use iter* methods for _filter_events_for_server (PR #3267) * Docs on consent bits (PR #3268) * Remove users from user directory on deactivate (PR #3277) * Avoid sending consent notice to guest users (PR #3288) * disable CPUMetrics if no /proc/self/stat (PR #3299) * Add local and loopback IPv6 addresses to url_preview_ip_range_blacklist (PR #3312) Thanks to @thegcat! * Consistently use six's iteritems and wrap lazy keys/values in list() if they're not meant to be lazy (PR #3307) * Add private IPv6 addresses to example config for url preview blacklist (PR #3317) Thanks to @thegcat! * Reduce stuck read-receipts: ignore depth when updating (PR #3318) * Put python's logs into Trial when running unit tests (PR #3319) Changes, python 3 migration: * Replace some more comparisons with six (PR #3243) Thanks to @NotAFile! * replace some iteritems with six (PR #3244) Thanks to @NotAFile! * Add batch_iter to utils (PR #3245) Thanks to @NotAFile! * use repr, not str (PR #3246) Thanks to @NotAFile! * Misc Python3 fixes (PR #3247) Thanks to @NotAFile! * Py3 storage/_base.py (PR #3278) Thanks to @NotAFile! * more six iteritems (PR #3279) Thanks to @NotAFile! * More Misc. py3 fixes (PR #3280) Thanks to @NotAFile! * remaining isintance fixes (PR #3281) Thanks to @NotAFile! * py3-ize state.py (PR #3283) Thanks to @NotAFile! * extend tox testing for py3 to avoid regressions (PR #3302) Thanks to @krombel! * use memoryview in py3 (PR #3303) Thanks to @NotAFile! Bugs: * Fix federation backfill bugs (PR #3261) * federation: fix LaterGauge usage (PR #3328) Thanks to @intelfx!
Diffstat (limited to 'synapse/util')
-rw-r--r-- | synapse/util/__init__.py | 18 | ||||
-rw-r--r-- | synapse/util/caches/__init__.py | 87 | ||||
-rw-r--r-- | synapse/util/caches/descriptors.py | 16 | ||||
-rw-r--r-- | synapse/util/caches/dictionary_cache.py | 2 | ||||
-rw-r--r-- | synapse/util/caches/expiringcache.py | 4 | ||||
-rw-r--r-- | synapse/util/caches/response_cache.py | 11 | ||||
-rw-r--r-- | synapse/util/caches/stream_change_cache.py | 2 | ||||
-rw-r--r-- | synapse/util/caches/treecache.py | 6 | ||||
-rw-r--r-- | synapse/util/frozenutils.py | 14 | ||||
-rw-r--r-- | synapse/util/logcontext.py | 24 | ||||
-rw-r--r-- | synapse/util/logutils.py | 6 | ||||
-rw-r--r-- | synapse/util/metrics.py | 106 |
12 files changed, 163 insertions, 133 deletions
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 814a7bf71b..fc11e26623 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -20,6 +20,8 @@ from twisted.internet import defer, reactor, task import time import logging +from itertools import islice + logger = logging.getLogger(__name__) @@ -79,3 +81,19 @@ class Clock(object): except Exception: if not ignore_errs: raise + + +def batch_iter(iterable, size): + """batch an iterable up into tuples with a maximum size + + Args: + iterable (iterable): the iterable to slice + size (int): the maximum batch size + + Returns: + an iterator over the chunks + """ + # make sure we can deal with iterables like lists too + sourceiter = iter(iterable) + # call islice until it returns an empty tuple + return iter(lambda: tuple(islice(sourceiter, size)), ()) diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py index 4adae96681..183faf75a1 100644 --- a/synapse/util/caches/__init__.py +++ b/synapse/util/caches/__init__.py @@ -13,28 +13,77 @@ # See the License for the specific language governing permissions and # limitations under the License. -import synapse.metrics +from prometheus_client.core import Gauge, REGISTRY, GaugeMetricFamily + import os -CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5)) +from six.moves import intern +import six -metrics = synapse.metrics.get_metrics_for("synapse.util.caches") +CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5)) caches_by_name = {} -# cache_counter = metrics.register_cache( -# "cache", -# lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()}, -# labels=["name"], -# ) - - -def register_cache(name, cache): - caches_by_name[name] = cache - return metrics.register_cache( - "cache", - lambda: len(cache), - name, - ) +collectors_by_name = {} + +cache_size = Gauge("synapse_util_caches_cache:size", "", ["name"]) +cache_hits = Gauge("synapse_util_caches_cache:hits", "", ["name"]) +cache_evicted = Gauge("synapse_util_caches_cache:evicted_size", "", ["name"]) +cache_total = Gauge("synapse_util_caches_cache:total", "", ["name"]) + +response_cache_size = Gauge("synapse_util_caches_response_cache:size", "", ["name"]) +response_cache_hits = Gauge("synapse_util_caches_response_cache:hits", "", ["name"]) +response_cache_evicted = Gauge( + "synapse_util_caches_response_cache:evicted_size", "", ["name"] +) +response_cache_total = Gauge("synapse_util_caches_response_cache:total", "", ["name"]) + + +def register_cache(cache_type, cache_name, cache): + + # Check if the metric is already registered. Unregister it, if so. + # This usually happens during tests, as at runtime these caches are + # effectively singletons. + metric_name = "cache_%s_%s" % (cache_type, cache_name) + if metric_name in collectors_by_name.keys(): + REGISTRY.unregister(collectors_by_name[metric_name]) + + class CacheMetric(object): + + hits = 0 + misses = 0 + evicted_size = 0 + + def inc_hits(self): + self.hits += 1 + + def inc_misses(self): + self.misses += 1 + + def inc_evictions(self, size=1): + self.evicted_size += size + + def describe(self): + return [] + + def collect(self): + if cache_type == "response_cache": + response_cache_size.labels(cache_name).set(len(cache)) + response_cache_hits.labels(cache_name).set(self.hits) + response_cache_evicted.labels(cache_name).set(self.evicted_size) + response_cache_total.labels(cache_name).set(self.hits + self.misses) + else: + cache_size.labels(cache_name).set(len(cache)) + cache_hits.labels(cache_name).set(self.hits) + cache_evicted.labels(cache_name).set(self.evicted_size) + cache_total.labels(cache_name).set(self.hits + self.misses) + + yield GaugeMetricFamily("__unused", "") + + metric = CacheMetric() + REGISTRY.register(metric) + caches_by_name[cache_name] = cache + collectors_by_name[metric_name] = metric + return metric KNOWN_KEYS = { @@ -66,7 +115,9 @@ def intern_string(string): return None try: - string = string.encode("ascii") + if six.PY2: + string = string.encode("ascii") + return intern(string) except UnicodeEncodeError: return string diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 68285a7594..fc1874b65b 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -31,6 +31,9 @@ import functools import inspect import threading +from six import string_types, itervalues +import six + logger = logging.getLogger(__name__) @@ -80,7 +83,7 @@ class Cache(object): self.name = name self.keylen = keylen self.thread = None - self.metrics = register_cache(name, self.cache) + self.metrics = register_cache("cache", name, self.cache) def _on_evicted(self, evicted_count): self.metrics.inc_evictions(evicted_count) @@ -205,7 +208,7 @@ class Cache(object): def invalidate_all(self): self.check_thread() self.cache.clear() - for entry in self._pending_deferred_cache.itervalues(): + for entry in itervalues(self._pending_deferred_cache): entry.invalidate() self._pending_deferred_cache.clear() @@ -392,9 +395,10 @@ class CacheDescriptor(_CacheDescriptorBase): ret.addErrback(onErr) - # If our cache_key is a string, try to convert to ascii to save - # a bit of space in large caches - if isinstance(cache_key, basestring): + # If our cache_key is a string on py2, try to convert to ascii + # to save a bit of space in large caches. Py3 does this + # internally automatically. + if six.PY2 and isinstance(cache_key, string_types): cache_key = to_ascii(cache_key) result_d = ObservableDeferred(ret, consumeErrors=True) @@ -565,7 +569,7 @@ class CacheListDescriptor(_CacheDescriptorBase): return results return logcontext.make_deferred_yieldable(defer.gatherResults( - cached_defers.values(), + list(cached_defers.values()), consumeErrors=True, ).addCallback(update_results_dict).addErrback( unwrapFirstError diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py index 1709e8b429..bdc21e348f 100644 --- a/synapse/util/caches/dictionary_cache.py +++ b/synapse/util/caches/dictionary_cache.py @@ -55,7 +55,7 @@ class DictionaryCache(object): __slots__ = [] self.sentinel = Sentinel() - self.metrics = register_cache(name, self.cache) + self.metrics = register_cache("dictionary", name, self.cache) def check_thread(self): expected_thread = self.thread diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index 0aa103eecb..ff04c91955 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -52,12 +52,12 @@ class ExpiringCache(object): self._cache = OrderedDict() - self.metrics = register_cache(cache_name, self) - self.iterable = iterable self._size_estimate = 0 + self.metrics = register_cache("expiring", cache_name, self) + def start(self): if not self._expiry_ms: # Don't bother starting the loop if things never expire diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index 7f79333e96..a8491b42d5 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -17,7 +17,7 @@ import logging from twisted.internet import defer from synapse.util.async import ObservableDeferred -from synapse.util.caches import metrics as cache_metrics +from synapse.util.caches import register_cache from synapse.util.logcontext import make_deferred_yieldable, run_in_background logger = logging.getLogger(__name__) @@ -38,15 +38,16 @@ class ResponseCache(object): self.timeout_sec = timeout_ms / 1000. self._name = name - self._metrics = cache_metrics.register_cache( - "response_cache", - size_callback=lambda: self.size(), - cache_name=name, + self._metrics = register_cache( + "response_cache", name, self ) def size(self): return len(self.pending_result_cache) + def __len__(self): + return self.size() + def get(self, key): """Look up the given key. diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 941d873ab8..a7fe0397fa 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -38,7 +38,7 @@ class StreamChangeCache(object): self._cache = sorteddict() self._earliest_known_stream_pos = current_stream_pos self.name = name - self.metrics = register_cache(self.name, self._cache) + self.metrics = register_cache("cache", self.name, self._cache) for entity, stream_pos in prefilled_cache.items(): self.entity_has_changed(entity, stream_pos) diff --git a/synapse/util/caches/treecache.py b/synapse/util/caches/treecache.py index fcc341a6b7..dd4c9e6067 100644 --- a/synapse/util/caches/treecache.py +++ b/synapse/util/caches/treecache.py @@ -1,3 +1,5 @@ +from six import itervalues + SENTINEL = object() @@ -49,7 +51,7 @@ class TreeCache(object): if popped is SENTINEL: return default - node_and_keys = zip(nodes, key) + node_and_keys = list(zip(nodes, key)) node_and_keys.reverse() node_and_keys.append((self.root, None)) @@ -76,7 +78,7 @@ def iterate_tree_cache_entry(d): can contain dicts. """ if isinstance(d, dict): - for value_d in d.itervalues(): + for value_d in itervalues(d): for value in iterate_tree_cache_entry(value_d): yield value else: diff --git a/synapse/util/frozenutils.py b/synapse/util/frozenutils.py index f497b51f4a..15f0a7ba9e 100644 --- a/synapse/util/frozenutils.py +++ b/synapse/util/frozenutils.py @@ -16,16 +16,17 @@ from frozendict import frozendict import simplejson as json +from six import string_types + def freeze(o): - t = type(o) - if t is dict: + if isinstance(o, dict): return frozendict({k: freeze(v) for k, v in o.items()}) - if t is frozendict: + if isinstance(o, frozendict): return o - if t is str or t is unicode: + if isinstance(o, string_types): return o try: @@ -37,11 +38,10 @@ def freeze(o): def unfreeze(o): - t = type(o) - if t is dict or t is frozendict: + if isinstance(o, (dict, frozendict)): return dict({k: unfreeze(v) for k, v in o.items()}) - if t is str or t is unicode: + if isinstance(o, string_types): return o try: diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 914f616312..a58c723403 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -59,7 +59,7 @@ class LoggingContext(object): __slots__ = [ "previous_context", "name", "ru_stime", "ru_utime", - "db_txn_count", "db_txn_duration_ms", "db_sched_duration_ms", + "db_txn_count", "db_txn_duration_sec", "db_sched_duration_sec", "usage_start", "main_thread", "alive", "request", "tag", @@ -84,10 +84,10 @@ class LoggingContext(object): def stop(self): pass - def add_database_transaction(self, duration_ms): + def add_database_transaction(self, duration_sec): pass - def add_database_scheduled(self, sched_ms): + def add_database_scheduled(self, sched_sec): pass def __nonzero__(self): @@ -103,11 +103,11 @@ class LoggingContext(object): self.ru_utime = 0. self.db_txn_count = 0 - # ms spent waiting for db txns, excluding scheduling time - self.db_txn_duration_ms = 0 + # sec spent waiting for db txns, excluding scheduling time + self.db_txn_duration_sec = 0 - # ms spent waiting for db txns to be scheduled - self.db_sched_duration_ms = 0 + # sec spent waiting for db txns to be scheduled + self.db_sched_duration_sec = 0 # If alive has the thread resource usage when the logcontext last # became active. @@ -230,18 +230,18 @@ class LoggingContext(object): return ru_utime, ru_stime - def add_database_transaction(self, duration_ms): + def add_database_transaction(self, duration_sec): self.db_txn_count += 1 - self.db_txn_duration_ms += duration_ms + self.db_txn_duration_sec += duration_sec - def add_database_scheduled(self, sched_ms): + def add_database_scheduled(self, sched_sec): """Record a use of the database pool Args: - sched_ms (int): number of milliseconds it took us to get a + sched_sec (float): number of seconds it took us to get a connection """ - self.db_sched_duration_ms += sched_ms + self.db_sched_duration_sec += sched_sec class LoggingContextFilter(logging.Filter): diff --git a/synapse/util/logutils.py b/synapse/util/logutils.py index 3a83828d25..03249c5dc8 100644 --- a/synapse/util/logutils.py +++ b/synapse/util/logutils.py @@ -96,7 +96,7 @@ def time_function(f): id = _TIME_FUNC_ID _TIME_FUNC_ID += 1 - start = time.clock() * 1000 + start = time.clock() try: _log_debug_as_f( @@ -107,10 +107,10 @@ def time_function(f): r = f(*args, **kwargs) finally: - end = time.clock() * 1000 + end = time.clock() _log_debug_as_f( f, - "[FUNC END] {%s-%d} %f", + "[FUNC END] {%s-%d} %.3f sec", (func_name, id, end - start,), ) diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index e4b5687a4b..1ba7d65c7c 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -15,8 +15,8 @@ from twisted.internet import defer +from prometheus_client import Counter from synapse.util.logcontext import LoggingContext -import synapse.metrics from functools import wraps import logging @@ -24,66 +24,26 @@ import logging logger = logging.getLogger(__name__) +block_counter = Counter("synapse_util_metrics_block_count", "", ["block_name"]) -metrics = synapse.metrics.get_metrics_for(__name__) - -# total number of times we have hit this block -block_counter = metrics.register_counter( - "block_count", - labels=["block_name"], - alternative_names=( - # the following are all deprecated aliases for the same metric - metrics.name_prefix + x for x in ( - "_block_timer:count", - "_block_ru_utime:count", - "_block_ru_stime:count", - "_block_db_txn_count:count", - "_block_db_txn_duration:count", - ) - ) -) - -block_timer = metrics.register_counter( - "block_time_seconds", - labels=["block_name"], - alternative_names=( - metrics.name_prefix + "_block_timer:total", - ), -) - -block_ru_utime = metrics.register_counter( - "block_ru_utime_seconds", labels=["block_name"], - alternative_names=( - metrics.name_prefix + "_block_ru_utime:total", - ), -) - -block_ru_stime = metrics.register_counter( - "block_ru_stime_seconds", labels=["block_name"], - alternative_names=( - metrics.name_prefix + "_block_ru_stime:total", - ), -) - -block_db_txn_count = metrics.register_counter( - "block_db_txn_count", labels=["block_name"], - alternative_names=( - metrics.name_prefix + "_block_db_txn_count:total", - ), -) +block_timer = Counter("synapse_util_metrics_block_time_seconds", "", ["block_name"]) + +block_ru_utime = Counter( + "synapse_util_metrics_block_ru_utime_seconds", "", ["block_name"]) + +block_ru_stime = Counter( + "synapse_util_metrics_block_ru_stime_seconds", "", ["block_name"]) + +block_db_txn_count = Counter( + "synapse_util_metrics_block_db_txn_count", "", ["block_name"]) # seconds spent waiting for db txns, excluding scheduling time, in this block -block_db_txn_duration = metrics.register_counter( - "block_db_txn_duration_seconds", labels=["block_name"], - alternative_names=( - metrics.name_prefix + "_block_db_txn_duration:total", - ), -) +block_db_txn_duration = Counter( + "synapse_util_metrics_block_db_txn_duration_seconds", "", ["block_name"]) # seconds spent waiting for a db connection, in this block -block_db_sched_duration = metrics.register_counter( - "block_db_sched_duration_seconds", labels=["block_name"], -) +block_db_sched_duration = Counter( + "synapse_util_metrics_block_db_sched_duration_seconds", "", ["block_name"]) def measure_func(name): @@ -102,7 +62,7 @@ class Measure(object): __slots__ = [ "clock", "name", "start_context", "start", "new_context", "ru_utime", "ru_stime", - "db_txn_count", "db_txn_duration_ms", "db_sched_duration_ms", + "db_txn_count", "db_txn_duration_sec", "db_sched_duration_sec", "created_context", ] @@ -114,7 +74,7 @@ class Measure(object): self.created_context = False def __enter__(self): - self.start = self.clock.time_msec() + self.start = self.clock.time() self.start_context = LoggingContext.current_context() if not self.start_context: self.start_context = LoggingContext("Measure") @@ -123,17 +83,17 @@ class Measure(object): self.ru_utime, self.ru_stime = self.start_context.get_resource_usage() self.db_txn_count = self.start_context.db_txn_count - self.db_txn_duration_ms = self.start_context.db_txn_duration_ms - self.db_sched_duration_ms = self.start_context.db_sched_duration_ms + self.db_txn_duration_sec = self.start_context.db_txn_duration_sec + self.db_sched_duration_sec = self.start_context.db_sched_duration_sec def __exit__(self, exc_type, exc_val, exc_tb): if isinstance(exc_type, Exception) or not self.start_context: return - duration = self.clock.time_msec() - self.start + duration = self.clock.time() - self.start - block_counter.inc(self.name) - block_timer.inc_by(duration, self.name) + block_counter.labels(self.name).inc() + block_timer.labels(self.name).inc(duration) context = LoggingContext.current_context() @@ -150,19 +110,13 @@ class Measure(object): ru_utime, ru_stime = context.get_resource_usage() - block_ru_utime.inc_by(ru_utime - self.ru_utime, self.name) - block_ru_stime.inc_by(ru_stime - self.ru_stime, self.name) - block_db_txn_count.inc_by( - context.db_txn_count - self.db_txn_count, self.name - ) - block_db_txn_duration.inc_by( - (context.db_txn_duration_ms - self.db_txn_duration_ms) / 1000., - self.name - ) - block_db_sched_duration.inc_by( - (context.db_sched_duration_ms - self.db_sched_duration_ms) / 1000., - self.name - ) + block_ru_utime.labels(self.name).inc(ru_utime - self.ru_utime) + block_ru_stime.labels(self.name).inc(ru_stime - self.ru_stime) + block_db_txn_count.labels(self.name).inc(context.db_txn_count - self.db_txn_count) + block_db_txn_duration.labels(self.name).inc( + context.db_txn_duration_sec - self.db_txn_duration_sec) + block_db_sched_duration.labels(self.name).inc( + context.db_sched_duration_sec - self.db_sched_duration_sec) if self.created_context: self.start_context.__exit__(exc_type, exc_val, exc_tb) |