From 1f1dee94f6025ce0a6e414cd6098cb766567bdd8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 9 May 2016 10:13:25 +0100 Subject: Manually run GC on reactor tick. This also adds a metric for amount of time spent in GC. --- synapse/metrics/__init__.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) (limited to 'synapse/metrics/__init__.py') diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 5664d5a381..17be491b93 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -22,6 +22,7 @@ import functools import os import stat import time +import gc from twisted.internet import reactor @@ -155,6 +156,7 @@ get_metrics_for("process").register_callback("fds", _process_fds, labels=["type" reactor_metrics = get_metrics_for("reactor") tick_time = reactor_metrics.register_distribution("tick_time") pending_calls_metric = reactor_metrics.register_distribution("pending_calls") +gc_time = reactor_metrics.register_distribution("gc_time") def runUntilCurrentTimer(func): @@ -182,6 +184,18 @@ def runUntilCurrentTimer(func): end = time.time() * 1000 tick_time.inc_by(end - start) pending_calls_metric.inc_by(num_pending) + + threshold = gc.get_threshold() + counts = gc.get_count() + + start = time.time() * 1000 + for i in [2, 1, 0]: + if threshold[i] < counts[i]: + logger.info("Collecting gc %d", i) + gc.collect(i) + end = time.time() * 1000 + gc_time.inc_by(end - start) + return ret return f @@ -196,5 +210,6 @@ try: # runUntilCurrent is called when we have pending calls. It is called once # per iteratation after fd polling. reactor.runUntilCurrent = runUntilCurrentTimer(reactor.runUntilCurrent) + gc.disable() except AttributeError: pass -- cgit 1.4.1 From 7d6e89ed22aab4bae9ce033c2e4757a595257942 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 13 May 2016 16:31:08 +0100 Subject: Add a comment --- synapse/metrics/__init__.py | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'synapse/metrics/__init__.py') diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 17be491b93..c82685a524 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -185,6 +185,8 @@ def runUntilCurrentTimer(func): tick_time.inc_by(end - start) pending_calls_metric.inc_by(num_pending) + # Check if we need to do a manual GC (since its been disabled), and do + # one if necessary. threshold = gc.get_threshold() counts = gc.get_count() @@ -210,6 +212,9 @@ try: # runUntilCurrent is called when we have pending calls. It is called once # per iteratation after fd polling. reactor.runUntilCurrent = runUntilCurrentTimer(reactor.runUntilCurrent) + + # We manually run the GC each reactor tick so that we can get some metrics + # about time spent doing GC, gc.disable() except AttributeError: pass -- cgit 1.4.1 From 60d53f9e9596520472954831ce8fea251a462d46 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 16 May 2016 09:32:29 +0100 Subject: Count number of GC collects --- synapse/metrics/__init__.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) (limited to 'synapse/metrics/__init__.py') diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index c82685a524..bba2657075 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -156,7 +156,12 @@ get_metrics_for("process").register_callback("fds", _process_fds, labels=["type" reactor_metrics = get_metrics_for("reactor") tick_time = reactor_metrics.register_distribution("tick_time") pending_calls_metric = reactor_metrics.register_distribution("pending_calls") -gc_time = reactor_metrics.register_distribution("gc_time") + +gc_time = ( + reactor_metrics.register_distribution("gc_time_gen0"), + reactor_metrics.register_distribution("gc_time_gen2"), + reactor_metrics.register_distribution("gc_time_gen2"), +) def runUntilCurrentTimer(func): @@ -189,14 +194,15 @@ def runUntilCurrentTimer(func): # one if necessary. threshold = gc.get_threshold() counts = gc.get_count() - - start = time.time() * 1000 for i in [2, 1, 0]: if threshold[i] < counts[i]: logger.info("Collecting gc %d", i) + + start = time.time() * 1000 gc.collect(i) - end = time.time() * 1000 - gc_time.inc_by(end - start) + end = time.time() * 1000 + + gc_time[i].inc_by(end - start) return ret -- cgit 1.4.1 From 73c711243382a48b9b67fddf5ed9df2d1ee1be43 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 2 Jun 2016 11:29:44 +0100 Subject: Change CacheMetrics to be quicker We change it so that each cache has an individual CacheMetric, instead of having one global CacheMetric. This means that when a cache tries to increment a counter it does not need to go through so many indirections. --- synapse/metrics/__init__.py | 16 ++++------- synapse/metrics/metric.py | 44 +++++++++++++++--------------- synapse/util/caches/__init__.py | 20 ++++++++++---- synapse/util/caches/descriptors.py | 17 +++++++++--- synapse/util/caches/dictionary_cache.py | 8 +++--- synapse/util/caches/expiringcache.py | 8 +++--- synapse/util/caches/stream_change_cache.py | 16 +++++------ tests/metrics/test_metric.py | 23 +++++++--------- 8 files changed, 82 insertions(+), 70 deletions(-) (limited to 'synapse/metrics/__init__.py') diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 5664d5a381..c38f24485a 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -33,11 +33,7 @@ from .metric import ( logger = logging.getLogger(__name__) -# We'll keep all the available metrics in a single toplevel dict, one shared -# for the entire process. We don't currently support per-HomeServer instances -# of metrics, because in practice any one python VM will host only one -# HomeServer anyway. This makes a lot of implementation neater -all_metrics = {} +all_metrics = [] class Metrics(object): @@ -53,7 +49,7 @@ class Metrics(object): metric = metric_class(full_name, *args, **kwargs) - all_metrics[full_name] = metric + all_metrics.append(metric) return metric def register_counter(self, *args, **kwargs): @@ -84,12 +80,12 @@ def render_all(): # TODO(paul): Internal hack update_resource_metrics() - for name in sorted(all_metrics.keys()): + for metric in all_metrics: try: - strs += all_metrics[name].render() + strs += metric.render() except Exception: - strs += ["# FAILED to render %s" % name] - logger.exception("Failed to render %s metric", name) + strs += ["# FAILED to render"] + logger.exception("Failed to render metric") strs.append("") # to generate a final CRLF diff --git a/synapse/metrics/metric.py b/synapse/metrics/metric.py index 368fc24984..341043952a 100644 --- a/synapse/metrics/metric.py +++ b/synapse/metrics/metric.py @@ -47,9 +47,6 @@ class BaseMetric(object): for k, v in zip(self.labels, values)]) ) - def render(self): - return map_concat(self.render_item, sorted(self.counts.keys())) - class CounterMetric(BaseMetric): """The simplest kind of metric; one that stores a monotonically-increasing @@ -83,6 +80,9 @@ class CounterMetric(BaseMetric): def render_item(self, k): return ["%s%s %d" % (self.name, self._render_key(k), self.counts[k])] + def render(self): + return map_concat(self.render_item, sorted(self.counts.keys())) + class CallbackMetric(BaseMetric): """A metric that returns the numeric value returned by a callback whenever @@ -126,30 +126,30 @@ class DistributionMetric(object): class CacheMetric(object): - """A combination of two CounterMetrics, one to count cache hits and one to - count a total, and a callback metric to yield the current size. + __slots__ = ("name", "cache_name", "hits", "misses", "size_callback") - This metric generates standard metric name pairs, so that monitoring rules - can easily be applied to measure hit ratio.""" - - def __init__(self, name, size_callback, labels=[]): + def __init__(self, name, size_callback, cache_name): self.name = name + self.cache_name = cache_name - self.hits = CounterMetric(name + ":hits", labels=labels) - self.total = CounterMetric(name + ":total", labels=labels) + self.hits = 0 + self.misses = 0 - self.size = CallbackMetric( - name + ":size", - callback=size_callback, - labels=labels, - ) + self.size_callback = size_callback - def inc_hits(self, *values): - self.hits.inc(*values) - self.total.inc(*values) + def inc_hits(self): + self.hits += 1 - def inc_misses(self, *values): - self.total.inc(*values) + def inc_misses(self): + self.misses += 1 def render(self): - return self.hits.render() + self.total.render() + self.size.render() + size = self.size_callback() + hits = self.hits + total = self.misses + self.hits + + return [ + """%s:hits{name="%s"} %d""" % (self.name, self.cache_name, hits), + """%s:total{name="%s"} %d""" % (self.name, self.cache_name, total), + """%s:size{name="%s"} %d""" % (self.name, self.cache_name, size), + ] diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py index d53569ca49..ebd715c5dc 100644 --- a/synapse/util/caches/__init__.py +++ b/synapse/util/caches/__init__.py @@ -24,11 +24,21 @@ DEBUG_CACHES = False metrics = synapse.metrics.get_metrics_for("synapse.util.caches") caches_by_name = {} -cache_counter = metrics.register_cache( - "cache", - lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()}, - labels=["name"], -) +# cache_counter = metrics.register_cache( +# "cache", +# lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()}, +# labels=["name"], +# ) + + +def register_cache(name, cache): + caches_by_name[name] = cache + return metrics.register_cache( + "cache", + lambda: len(cache), + name, + ) + _string_cache = LruCache(int(5000 * CACHE_SIZE_FACTOR)) caches_by_name["string_cache"] = _string_cache diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 758f5982b0..5d25c9e762 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -22,7 +22,7 @@ from synapse.util.logcontext import ( PreserveLoggingContext, preserve_context_over_deferred, preserve_context_over_fn ) -from . import caches_by_name, DEBUG_CACHES, cache_counter +from . import DEBUG_CACHES, register_cache from twisted.internet import defer @@ -43,6 +43,15 @@ CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1)) class Cache(object): + __slots__ = ( + "cache", + "max_entries", + "name", + "keylen", + "sequence", + "thread", + "metrics", + ) def __init__(self, name, max_entries=1000, keylen=1, lru=True, tree=False): if lru: @@ -59,7 +68,7 @@ class Cache(object): self.keylen = keylen self.sequence = 0 self.thread = None - caches_by_name[name] = self.cache + self.metrics = register_cache(name, self.cache) def check_thread(self): expected_thread = self.thread @@ -74,10 +83,10 @@ class Cache(object): def get(self, key, default=_CacheSentinel): val = self.cache.get(key, _CacheSentinel) if val is not _CacheSentinel: - cache_counter.inc_hits(self.name) + self.metrics.inc_hits() return val - cache_counter.inc_misses(self.name) + self.metrics.inc_misses() if default is _CacheSentinel: raise KeyError() diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py index f92d80542b..b0ca1bb79d 100644 --- a/synapse/util/caches/dictionary_cache.py +++ b/synapse/util/caches/dictionary_cache.py @@ -15,7 +15,7 @@ from synapse.util.caches.lrucache import LruCache from collections import namedtuple -from . import caches_by_name, cache_counter +from . import register_cache import threading import logging @@ -43,7 +43,7 @@ class DictionaryCache(object): __slots__ = [] self.sentinel = Sentinel() - caches_by_name[name] = self.cache + self.metrics = register_cache(name, self.cache) def check_thread(self): expected_thread = self.thread @@ -58,7 +58,7 @@ class DictionaryCache(object): def get(self, key, dict_keys=None): entry = self.cache.get(key, self.sentinel) if entry is not self.sentinel: - cache_counter.inc_hits(self.name) + self.metrics.inc_hits() if dict_keys is None: return DictionaryEntry(entry.full, dict(entry.value)) @@ -69,7 +69,7 @@ class DictionaryCache(object): if k in entry.value }) - cache_counter.inc_misses(self.name) + self.metrics.inc_misses() return DictionaryEntry(False, {}) def invalidate(self, key): diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index 2b68c1ac93..080388958f 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.util.caches import cache_counter, caches_by_name +from synapse.util.caches import register_cache import logging @@ -49,7 +49,7 @@ class ExpiringCache(object): self._cache = {} - caches_by_name[cache_name] = self._cache + self.metrics = register_cache(cache_name, self._cache) def start(self): if not self._expiry_ms: @@ -78,9 +78,9 @@ class ExpiringCache(object): def __getitem__(self, key): try: entry = self._cache[key] - cache_counter.inc_hits(self._cache_name) + self.metrics.inc_hits() except KeyError: - cache_counter.inc_misses(self._cache_name) + self.metrics.inc_misses() raise if self._reset_expiry_on_get: diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index ea8a74ca69..3c051dabc4 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.util.caches import cache_counter, caches_by_name +from synapse.util.caches import register_cache from blist import sorteddict @@ -42,7 +42,7 @@ class StreamChangeCache(object): self._cache = sorteddict() self._earliest_known_stream_pos = current_stream_pos self.name = name - caches_by_name[self.name] = self._cache + self.metrics = register_cache(self.name, self._cache) for entity, stream_pos in prefilled_cache.items(): self.entity_has_changed(entity, stream_pos) @@ -53,19 +53,19 @@ class StreamChangeCache(object): assert type(stream_pos) is int if stream_pos < self._earliest_known_stream_pos: - cache_counter.inc_misses(self.name) + self.metrics.inc_misses() return True latest_entity_change_pos = self._entity_to_key.get(entity, None) if latest_entity_change_pos is None: - cache_counter.inc_hits(self.name) + self.metrics.inc_hits() return False if stream_pos < latest_entity_change_pos: - cache_counter.inc_misses(self.name) + self.metrics.inc_misses() return True - cache_counter.inc_hits(self.name) + self.metrics.inc_hits() return False def get_entities_changed(self, entities, stream_pos): @@ -82,10 +82,10 @@ class StreamChangeCache(object): self._cache[k] for k in keys[i:] ).intersection(entities) - cache_counter.inc_hits(self.name) + self.metrics.inc_hits() else: result = entities - cache_counter.inc_misses(self.name) + self.metrics.inc_misses() return result diff --git a/tests/metrics/test_metric.py b/tests/metrics/test_metric.py index f3c1927ce1..f85455a5af 100644 --- a/tests/metrics/test_metric.py +++ b/tests/metrics/test_metric.py @@ -61,9 +61,6 @@ class CounterMetricTestCase(unittest.TestCase): 'vector{method="PUT"} 1', ]) - # Check that passing too few values errors - self.assertRaises(ValueError, counter.inc) - class CallbackMetricTestCase(unittest.TestCase): @@ -138,27 +135,27 @@ class CacheMetricTestCase(unittest.TestCase): def test_cache(self): d = dict() - metric = CacheMetric("cache", lambda: len(d)) + metric = CacheMetric("cache", lambda: len(d), "cache_name") self.assertEquals(metric.render(), [ - 'cache:hits 0', - 'cache:total 0', - 'cache:size 0', + 'cache:hits{name="cache_name"} 0', + 'cache:total{name="cache_name"} 0', + 'cache:size{name="cache_name"} 0', ]) metric.inc_misses() d["key"] = "value" self.assertEquals(metric.render(), [ - 'cache:hits 0', - 'cache:total 1', - 'cache:size 1', + 'cache:hits{name="cache_name"} 0', + 'cache:total{name="cache_name"} 1', + 'cache:size{name="cache_name"} 1', ]) metric.inc_hits() self.assertEquals(metric.render(), [ - 'cache:hits 1', - 'cache:total 2', - 'cache:size 1', + 'cache:hits{name="cache_name"} 1', + 'cache:total{name="cache_name"} 2', + 'cache:size{name="cache_name"} 1', ]) -- cgit 1.4.1 From 75331c5fca6d2207094b8cbf0b3bb34cc52a4ec4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 7 Jun 2016 13:33:13 +0100 Subject: Change the way we do stats --- synapse/metrics/__init__.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) (limited to 'synapse/metrics/__init__.py') diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index f317034b8f..ef14bcd840 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -153,11 +153,7 @@ reactor_metrics = get_metrics_for("reactor") tick_time = reactor_metrics.register_distribution("tick_time") pending_calls_metric = reactor_metrics.register_distribution("pending_calls") -gc_time = ( - reactor_metrics.register_distribution("gc_time_gen0"), - reactor_metrics.register_distribution("gc_time_gen2"), - reactor_metrics.register_distribution("gc_time_gen2"), -) +gc_time = reactor_metrics.register_distribution("gc_time", labels=["gen"]) def runUntilCurrentTimer(func): @@ -190,7 +186,7 @@ def runUntilCurrentTimer(func): # one if necessary. threshold = gc.get_threshold() counts = gc.get_count() - for i in [2, 1, 0]: + for i in (0, 1, 2): if threshold[i] < counts[i]: logger.info("Collecting gc %d", i) @@ -198,7 +194,7 @@ def runUntilCurrentTimer(func): gc.collect(i) end = time.time() * 1000 - gc_time[i].inc_by(end - start) + gc_time.inc_by(end - start, i) return ret -- cgit 1.4.1 From 48e65099b52383743a47844b6369e173b9a96f90 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 7 Jun 2016 13:40:22 +0100 Subject: Also record number of unreachable objects --- synapse/metrics/__init__.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'synapse/metrics/__init__.py') diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index ef14bcd840..b29cec3de1 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -154,6 +154,7 @@ tick_time = reactor_metrics.register_distribution("tick_time") pending_calls_metric = reactor_metrics.register_distribution("pending_calls") gc_time = reactor_metrics.register_distribution("gc_time", labels=["gen"]) +gc_unreachable = reactor_metrics.register_counter("gc_unreachable", labels=["gen"]) def runUntilCurrentTimer(func): @@ -186,15 +187,16 @@ def runUntilCurrentTimer(func): # one if necessary. threshold = gc.get_threshold() counts = gc.get_count() - for i in (0, 1, 2): + for i in (2, 1, 0): if threshold[i] < counts[i]: logger.info("Collecting gc %d", i) start = time.time() * 1000 - gc.collect(i) + unreachable = gc.collect(i) end = time.time() * 1000 gc_time.inc_by(end - start, i) + gc_unreachable.inc_by(unreachable, i) return ret -- cgit 1.4.1 From 18f0cc7d993408a754e7ff26e9474a969adf762a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 7 Jun 2016 16:51:01 +0100 Subject: Record some more GC metrics --- synapse/metrics/__init__.py | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'synapse/metrics/__init__.py') diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index b29cec3de1..8f69aa1ff3 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -156,6 +156,11 @@ pending_calls_metric = reactor_metrics.register_distribution("pending_calls") gc_time = reactor_metrics.register_distribution("gc_time", labels=["gen"]) gc_unreachable = reactor_metrics.register_counter("gc_unreachable", labels=["gen"]) +reactor_metrics.register_callback("gc_total_objects", lambda: len(gc.get_objects())) +reactor_metrics.register_callback( + "gc_counts", lambda: {(i,): v for i, v in enumerate(gc.get_count())}, labels=["gen"] +) + def runUntilCurrentTimer(func): -- cgit 1.4.1 From 0f2165ccf4fd0ae6636018cea7e1b91141179e88 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 7 Jun 2016 17:00:45 +0100 Subject: Don't track total objects as its too expensive to calculate --- synapse/metrics/__init__.py | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse/metrics/__init__.py') diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 8f69aa1ff3..bdd7292a30 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -156,7 +156,6 @@ pending_calls_metric = reactor_metrics.register_distribution("pending_calls") gc_time = reactor_metrics.register_distribution("gc_time", labels=["gen"]) gc_unreachable = reactor_metrics.register_counter("gc_unreachable", labels=["gen"]) -reactor_metrics.register_callback("gc_total_objects", lambda: len(gc.get_objects())) reactor_metrics.register_callback( "gc_counts", lambda: {(i,): v for i, v in enumerate(gc.get_count())}, labels=["gen"] ) -- cgit 1.4.1