summary refs log tree commit diff
path: root/synapse/util
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/util/__init__.py18
-rw-r--r--synapse/util/caches/__init__.py87
-rw-r--r--synapse/util/caches/descriptors.py16
-rw-r--r--synapse/util/caches/dictionary_cache.py2
-rw-r--r--synapse/util/caches/expiringcache.py4
-rw-r--r--synapse/util/caches/response_cache.py11
-rw-r--r--synapse/util/caches/stream_change_cache.py2
-rw-r--r--synapse/util/caches/treecache.py6
-rw-r--r--synapse/util/frozenutils.py14
-rw-r--r--synapse/util/logcontext.py24
-rw-r--r--synapse/util/logutils.py6
-rw-r--r--synapse/util/metrics.py106
12 files changed, 163 insertions, 133 deletions
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 814a7bf71b..fc11e26623 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -20,6 +20,8 @@ from twisted.internet import defer, reactor, task
 import time
 import logging
 
+from itertools import islice
+
 logger = logging.getLogger(__name__)
 
 
@@ -79,3 +81,19 @@ class Clock(object):
         except Exception:
             if not ignore_errs:
                 raise
+
+
+def batch_iter(iterable, size):
+    """batch an iterable up into tuples with a maximum size
+
+    Args:
+        iterable (iterable): the iterable to slice
+        size (int): the maximum batch size
+
+    Returns:
+        an iterator over the chunks
+    """
+    # make sure we can deal with iterables like lists too
+    sourceiter = iter(iterable)
+    # call islice until it returns an empty tuple
+    return iter(lambda: tuple(islice(sourceiter, size)), ())
diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py
index 4adae96681..183faf75a1 100644
--- a/synapse/util/caches/__init__.py
+++ b/synapse/util/caches/__init__.py
@@ -13,28 +13,77 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import synapse.metrics
+from prometheus_client.core import Gauge, REGISTRY, GaugeMetricFamily
+
 import os
 
-CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5))
+from six.moves import intern
+import six
 
-metrics = synapse.metrics.get_metrics_for("synapse.util.caches")
+CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5))
 
 caches_by_name = {}
-# cache_counter = metrics.register_cache(
-#     "cache",
-#     lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()},
-#     labels=["name"],
-# )
-
-
-def register_cache(name, cache):
-    caches_by_name[name] = cache
-    return metrics.register_cache(
-        "cache",
-        lambda: len(cache),
-        name,
-    )
+collectors_by_name = {}
+
+cache_size = Gauge("synapse_util_caches_cache:size", "", ["name"])
+cache_hits = Gauge("synapse_util_caches_cache:hits", "", ["name"])
+cache_evicted = Gauge("synapse_util_caches_cache:evicted_size", "", ["name"])
+cache_total = Gauge("synapse_util_caches_cache:total", "", ["name"])
+
+response_cache_size = Gauge("synapse_util_caches_response_cache:size", "", ["name"])
+response_cache_hits = Gauge("synapse_util_caches_response_cache:hits", "", ["name"])
+response_cache_evicted = Gauge(
+    "synapse_util_caches_response_cache:evicted_size", "", ["name"]
+)
+response_cache_total = Gauge("synapse_util_caches_response_cache:total", "", ["name"])
+
+
+def register_cache(cache_type, cache_name, cache):
+
+    # Check if the metric is already registered. Unregister it, if so.
+    # This usually happens during tests, as at runtime these caches are
+    # effectively singletons.
+    metric_name = "cache_%s_%s" % (cache_type, cache_name)
+    if metric_name in collectors_by_name.keys():
+        REGISTRY.unregister(collectors_by_name[metric_name])
+
+    class CacheMetric(object):
+
+        hits = 0
+        misses = 0
+        evicted_size = 0
+
+        def inc_hits(self):
+            self.hits += 1
+
+        def inc_misses(self):
+            self.misses += 1
+
+        def inc_evictions(self, size=1):
+            self.evicted_size += size
+
+        def describe(self):
+            return []
+
+        def collect(self):
+            if cache_type == "response_cache":
+                response_cache_size.labels(cache_name).set(len(cache))
+                response_cache_hits.labels(cache_name).set(self.hits)
+                response_cache_evicted.labels(cache_name).set(self.evicted_size)
+                response_cache_total.labels(cache_name).set(self.hits + self.misses)
+            else:
+                cache_size.labels(cache_name).set(len(cache))
+                cache_hits.labels(cache_name).set(self.hits)
+                cache_evicted.labels(cache_name).set(self.evicted_size)
+                cache_total.labels(cache_name).set(self.hits + self.misses)
+
+            yield GaugeMetricFamily("__unused", "")
+
+    metric = CacheMetric()
+    REGISTRY.register(metric)
+    caches_by_name[cache_name] = cache
+    collectors_by_name[metric_name] = metric
+    return metric
 
 
 KNOWN_KEYS = {
@@ -66,7 +115,9 @@ def intern_string(string):
         return None
 
     try:
-        string = string.encode("ascii")
+        if six.PY2:
+            string = string.encode("ascii")
+
         return intern(string)
     except UnicodeEncodeError:
         return string
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 68285a7594..fc1874b65b 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -31,6 +31,9 @@ import functools
 import inspect
 import threading
 
+from six import string_types, itervalues
+import six
+
 
 logger = logging.getLogger(__name__)
 
@@ -80,7 +83,7 @@ class Cache(object):
         self.name = name
         self.keylen = keylen
         self.thread = None
-        self.metrics = register_cache(name, self.cache)
+        self.metrics = register_cache("cache", name, self.cache)
 
     def _on_evicted(self, evicted_count):
         self.metrics.inc_evictions(evicted_count)
@@ -205,7 +208,7 @@ class Cache(object):
     def invalidate_all(self):
         self.check_thread()
         self.cache.clear()
-        for entry in self._pending_deferred_cache.itervalues():
+        for entry in itervalues(self._pending_deferred_cache):
             entry.invalidate()
         self._pending_deferred_cache.clear()
 
@@ -392,9 +395,10 @@ class CacheDescriptor(_CacheDescriptorBase):
 
                 ret.addErrback(onErr)
 
-                # If our cache_key is a string, try to convert to ascii to save
-                # a bit of space in large caches
-                if isinstance(cache_key, basestring):
+                # If our cache_key is a string on py2, try to convert to ascii
+                # to save a bit of space in large caches. Py3 does this
+                # internally automatically.
+                if six.PY2 and isinstance(cache_key, string_types):
                     cache_key = to_ascii(cache_key)
 
                 result_d = ObservableDeferred(ret, consumeErrors=True)
@@ -565,7 +569,7 @@ class CacheListDescriptor(_CacheDescriptorBase):
                     return results
 
                 return logcontext.make_deferred_yieldable(defer.gatherResults(
-                    cached_defers.values(),
+                    list(cached_defers.values()),
                     consumeErrors=True,
                 ).addCallback(update_results_dict).addErrback(
                     unwrapFirstError
diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py
index 1709e8b429..bdc21e348f 100644
--- a/synapse/util/caches/dictionary_cache.py
+++ b/synapse/util/caches/dictionary_cache.py
@@ -55,7 +55,7 @@ class DictionaryCache(object):
             __slots__ = []
 
         self.sentinel = Sentinel()
-        self.metrics = register_cache(name, self.cache)
+        self.metrics = register_cache("dictionary", name, self.cache)
 
     def check_thread(self):
         expected_thread = self.thread
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index 0aa103eecb..ff04c91955 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -52,12 +52,12 @@ class ExpiringCache(object):
 
         self._cache = OrderedDict()
 
-        self.metrics = register_cache(cache_name, self)
-
         self.iterable = iterable
 
         self._size_estimate = 0
 
+        self.metrics = register_cache("expiring", cache_name, self)
+
     def start(self):
         if not self._expiry_ms:
             # Don't bother starting the loop if things never expire
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index 7f79333e96..a8491b42d5 100644
--- a/synapse/util/caches/response_cache.py
+++ b/synapse/util/caches/response_cache.py
@@ -17,7 +17,7 @@ import logging
 from twisted.internet import defer
 
 from synapse.util.async import ObservableDeferred
-from synapse.util.caches import metrics as cache_metrics
+from synapse.util.caches import register_cache
 from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 
 logger = logging.getLogger(__name__)
@@ -38,15 +38,16 @@ class ResponseCache(object):
         self.timeout_sec = timeout_ms / 1000.
 
         self._name = name
-        self._metrics = cache_metrics.register_cache(
-            "response_cache",
-            size_callback=lambda: self.size(),
-            cache_name=name,
+        self._metrics = register_cache(
+            "response_cache", name, self
         )
 
     def size(self):
         return len(self.pending_result_cache)
 
+    def __len__(self):
+        return self.size()
+
     def get(self, key):
         """Look up the given key.
 
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index 941d873ab8..a7fe0397fa 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -38,7 +38,7 @@ class StreamChangeCache(object):
         self._cache = sorteddict()
         self._earliest_known_stream_pos = current_stream_pos
         self.name = name
-        self.metrics = register_cache(self.name, self._cache)
+        self.metrics = register_cache("cache", self.name, self._cache)
 
         for entity, stream_pos in prefilled_cache.items():
             self.entity_has_changed(entity, stream_pos)
diff --git a/synapse/util/caches/treecache.py b/synapse/util/caches/treecache.py
index fcc341a6b7..dd4c9e6067 100644
--- a/synapse/util/caches/treecache.py
+++ b/synapse/util/caches/treecache.py
@@ -1,3 +1,5 @@
+from six import itervalues
+
 SENTINEL = object()
 
 
@@ -49,7 +51,7 @@ class TreeCache(object):
         if popped is SENTINEL:
             return default
 
-        node_and_keys = zip(nodes, key)
+        node_and_keys = list(zip(nodes, key))
         node_and_keys.reverse()
         node_and_keys.append((self.root, None))
 
@@ -76,7 +78,7 @@ def iterate_tree_cache_entry(d):
     can contain dicts.
     """
     if isinstance(d, dict):
-        for value_d in d.itervalues():
+        for value_d in itervalues(d):
             for value in iterate_tree_cache_entry(value_d):
                 yield value
     else:
diff --git a/synapse/util/frozenutils.py b/synapse/util/frozenutils.py
index f497b51f4a..15f0a7ba9e 100644
--- a/synapse/util/frozenutils.py
+++ b/synapse/util/frozenutils.py
@@ -16,16 +16,17 @@
 from frozendict import frozendict
 import simplejson as json
 
+from six import string_types
+
 
 def freeze(o):
-    t = type(o)
-    if t is dict:
+    if isinstance(o, dict):
         return frozendict({k: freeze(v) for k, v in o.items()})
 
-    if t is frozendict:
+    if isinstance(o, frozendict):
         return o
 
-    if t is str or t is unicode:
+    if isinstance(o, string_types):
         return o
 
     try:
@@ -37,11 +38,10 @@ def freeze(o):
 
 
 def unfreeze(o):
-    t = type(o)
-    if t is dict or t is frozendict:
+    if isinstance(o, (dict, frozendict)):
         return dict({k: unfreeze(v) for k, v in o.items()})
 
-    if t is str or t is unicode:
+    if isinstance(o, string_types):
         return o
 
     try:
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index 914f616312..a58c723403 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -59,7 +59,7 @@ class LoggingContext(object):
 
     __slots__ = [
         "previous_context", "name", "ru_stime", "ru_utime",
-        "db_txn_count", "db_txn_duration_ms", "db_sched_duration_ms",
+        "db_txn_count", "db_txn_duration_sec", "db_sched_duration_sec",
         "usage_start",
         "main_thread", "alive",
         "request", "tag",
@@ -84,10 +84,10 @@ class LoggingContext(object):
         def stop(self):
             pass
 
-        def add_database_transaction(self, duration_ms):
+        def add_database_transaction(self, duration_sec):
             pass
 
-        def add_database_scheduled(self, sched_ms):
+        def add_database_scheduled(self, sched_sec):
             pass
 
         def __nonzero__(self):
@@ -103,11 +103,11 @@ class LoggingContext(object):
         self.ru_utime = 0.
         self.db_txn_count = 0
 
-        # ms spent waiting for db txns, excluding scheduling time
-        self.db_txn_duration_ms = 0
+        # sec spent waiting for db txns, excluding scheduling time
+        self.db_txn_duration_sec = 0
 
-        # ms spent waiting for db txns to be scheduled
-        self.db_sched_duration_ms = 0
+        # sec spent waiting for db txns to be scheduled
+        self.db_sched_duration_sec = 0
 
         # If alive has the thread resource usage when the logcontext last
         # became active.
@@ -230,18 +230,18 @@ class LoggingContext(object):
 
         return ru_utime, ru_stime
 
-    def add_database_transaction(self, duration_ms):
+    def add_database_transaction(self, duration_sec):
         self.db_txn_count += 1
-        self.db_txn_duration_ms += duration_ms
+        self.db_txn_duration_sec += duration_sec
 
-    def add_database_scheduled(self, sched_ms):
+    def add_database_scheduled(self, sched_sec):
         """Record a use of the database pool
 
         Args:
-            sched_ms (int): number of milliseconds it took us to get a
+            sched_sec (float): number of seconds it took us to get a
                 connection
         """
-        self.db_sched_duration_ms += sched_ms
+        self.db_sched_duration_sec += sched_sec
 
 
 class LoggingContextFilter(logging.Filter):
diff --git a/synapse/util/logutils.py b/synapse/util/logutils.py
index 3a83828d25..03249c5dc8 100644
--- a/synapse/util/logutils.py
+++ b/synapse/util/logutils.py
@@ -96,7 +96,7 @@ def time_function(f):
         id = _TIME_FUNC_ID
         _TIME_FUNC_ID += 1
 
-        start = time.clock() * 1000
+        start = time.clock()
 
         try:
             _log_debug_as_f(
@@ -107,10 +107,10 @@ def time_function(f):
 
             r = f(*args, **kwargs)
         finally:
-            end = time.clock() * 1000
+            end = time.clock()
             _log_debug_as_f(
                 f,
-                "[FUNC END] {%s-%d} %f",
+                "[FUNC END] {%s-%d} %.3f sec",
                 (func_name, id, end - start,),
             )
 
diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py
index e4b5687a4b..1ba7d65c7c 100644
--- a/synapse/util/metrics.py
+++ b/synapse/util/metrics.py
@@ -15,8 +15,8 @@
 
 from twisted.internet import defer
 
+from prometheus_client import Counter
 from synapse.util.logcontext import LoggingContext
-import synapse.metrics
 
 from functools import wraps
 import logging
@@ -24,66 +24,26 @@ import logging
 
 logger = logging.getLogger(__name__)
 
+block_counter = Counter("synapse_util_metrics_block_count", "", ["block_name"])
 
-metrics = synapse.metrics.get_metrics_for(__name__)
-
-# total number of times we have hit this block
-block_counter = metrics.register_counter(
-    "block_count",
-    labels=["block_name"],
-    alternative_names=(
-        # the following are all deprecated aliases for the same metric
-        metrics.name_prefix + x for x in (
-            "_block_timer:count",
-            "_block_ru_utime:count",
-            "_block_ru_stime:count",
-            "_block_db_txn_count:count",
-            "_block_db_txn_duration:count",
-        )
-    )
-)
-
-block_timer = metrics.register_counter(
-    "block_time_seconds",
-    labels=["block_name"],
-    alternative_names=(
-        metrics.name_prefix + "_block_timer:total",
-    ),
-)
-
-block_ru_utime = metrics.register_counter(
-    "block_ru_utime_seconds", labels=["block_name"],
-    alternative_names=(
-        metrics.name_prefix + "_block_ru_utime:total",
-    ),
-)
-
-block_ru_stime = metrics.register_counter(
-    "block_ru_stime_seconds", labels=["block_name"],
-    alternative_names=(
-        metrics.name_prefix + "_block_ru_stime:total",
-    ),
-)
-
-block_db_txn_count = metrics.register_counter(
-    "block_db_txn_count", labels=["block_name"],
-    alternative_names=(
-        metrics.name_prefix + "_block_db_txn_count:total",
-    ),
-)
+block_timer = Counter("synapse_util_metrics_block_time_seconds", "", ["block_name"])
+
+block_ru_utime = Counter(
+    "synapse_util_metrics_block_ru_utime_seconds", "", ["block_name"])
+
+block_ru_stime = Counter(
+    "synapse_util_metrics_block_ru_stime_seconds", "", ["block_name"])
+
+block_db_txn_count = Counter(
+    "synapse_util_metrics_block_db_txn_count", "", ["block_name"])
 
 # seconds spent waiting for db txns, excluding scheduling time, in this block
-block_db_txn_duration = metrics.register_counter(
-    "block_db_txn_duration_seconds", labels=["block_name"],
-    alternative_names=(
-        metrics.name_prefix + "_block_db_txn_duration:total",
-    ),
-)
+block_db_txn_duration = Counter(
+    "synapse_util_metrics_block_db_txn_duration_seconds", "", ["block_name"])
 
 # seconds spent waiting for a db connection, in this block
-block_db_sched_duration = metrics.register_counter(
-    "block_db_sched_duration_seconds", labels=["block_name"],
-)
+block_db_sched_duration = Counter(
+    "synapse_util_metrics_block_db_sched_duration_seconds", "", ["block_name"])
 
 
 def measure_func(name):
@@ -102,7 +62,7 @@ class Measure(object):
     __slots__ = [
         "clock", "name", "start_context", "start", "new_context", "ru_utime",
         "ru_stime",
-        "db_txn_count", "db_txn_duration_ms", "db_sched_duration_ms",
+        "db_txn_count", "db_txn_duration_sec", "db_sched_duration_sec",
         "created_context",
     ]
 
@@ -114,7 +74,7 @@ class Measure(object):
         self.created_context = False
 
     def __enter__(self):
-        self.start = self.clock.time_msec()
+        self.start = self.clock.time()
         self.start_context = LoggingContext.current_context()
         if not self.start_context:
             self.start_context = LoggingContext("Measure")
@@ -123,17 +83,17 @@ class Measure(object):
 
         self.ru_utime, self.ru_stime = self.start_context.get_resource_usage()
         self.db_txn_count = self.start_context.db_txn_count
-        self.db_txn_duration_ms = self.start_context.db_txn_duration_ms
-        self.db_sched_duration_ms = self.start_context.db_sched_duration_ms
+        self.db_txn_duration_sec = self.start_context.db_txn_duration_sec
+        self.db_sched_duration_sec = self.start_context.db_sched_duration_sec
 
     def __exit__(self, exc_type, exc_val, exc_tb):
         if isinstance(exc_type, Exception) or not self.start_context:
             return
 
-        duration = self.clock.time_msec() - self.start
+        duration = self.clock.time() - self.start
 
-        block_counter.inc(self.name)
-        block_timer.inc_by(duration, self.name)
+        block_counter.labels(self.name).inc()
+        block_timer.labels(self.name).inc(duration)
 
         context = LoggingContext.current_context()
 
@@ -150,19 +110,13 @@ class Measure(object):
 
         ru_utime, ru_stime = context.get_resource_usage()
 
-        block_ru_utime.inc_by(ru_utime - self.ru_utime, self.name)
-        block_ru_stime.inc_by(ru_stime - self.ru_stime, self.name)
-        block_db_txn_count.inc_by(
-            context.db_txn_count - self.db_txn_count, self.name
-        )
-        block_db_txn_duration.inc_by(
-            (context.db_txn_duration_ms - self.db_txn_duration_ms) / 1000.,
-            self.name
-        )
-        block_db_sched_duration.inc_by(
-            (context.db_sched_duration_ms - self.db_sched_duration_ms) / 1000.,
-            self.name
-        )
+        block_ru_utime.labels(self.name).inc(ru_utime - self.ru_utime)
+        block_ru_stime.labels(self.name).inc(ru_stime - self.ru_stime)
+        block_db_txn_count.labels(self.name).inc(context.db_txn_count - self.db_txn_count)
+        block_db_txn_duration.labels(self.name).inc(
+            context.db_txn_duration_sec - self.db_txn_duration_sec)
+        block_db_sched_duration.labels(self.name).inc(
+            context.db_sched_duration_sec - self.db_sched_duration_sec)
 
         if self.created_context:
             self.start_context.__exit__(exc_type, exc_val, exc_tb)