summary refs log tree commit diff
path: root/synapse/util/logcontext.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/util/logcontext.py')
-rw-r--r--synapse/util/logcontext.py129
1 files changed, 82 insertions, 47 deletions
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index fe412355d8..6b0d2deea0 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -42,6 +42,8 @@ try:
 
     def get_thread_resource_usage():
         return resource.getrusage(RUSAGE_THREAD)
+
+
 except Exception:
     # If the system doesn't support resource.getrusage(RUSAGE_THREAD) then we
     # won't track resource usage by returning None.
@@ -64,8 +66,11 @@ class ContextResourceUsage(object):
     """
 
     __slots__ = [
-        "ru_stime", "ru_utime",
-        "db_txn_count", "db_txn_duration_sec", "db_sched_duration_sec",
+        "ru_stime",
+        "ru_utime",
+        "db_txn_count",
+        "db_txn_duration_sec",
+        "db_sched_duration_sec",
         "evt_db_fetch_count",
     ]
 
@@ -91,8 +96,8 @@ class ContextResourceUsage(object):
         return ContextResourceUsage(copy_from=self)
 
     def reset(self):
-        self.ru_stime = 0.
-        self.ru_utime = 0.
+        self.ru_stime = 0.0
+        self.ru_utime = 0.0
         self.db_txn_count = 0
 
         self.db_txn_duration_sec = 0
@@ -100,15 +105,18 @@ class ContextResourceUsage(object):
         self.evt_db_fetch_count = 0
 
     def __repr__(self):
-        return ("<ContextResourceUsage ru_stime='%r', ru_utime='%r', "
-                "db_txn_count='%r', db_txn_duration_sec='%r', "
-                "db_sched_duration_sec='%r', evt_db_fetch_count='%r'>") % (
-                    self.ru_stime,
-                    self.ru_utime,
-                    self.db_txn_count,
-                    self.db_txn_duration_sec,
-                    self.db_sched_duration_sec,
-                    self.evt_db_fetch_count,)
+        return (
+            "<ContextResourceUsage ru_stime='%r', ru_utime='%r', "
+            "db_txn_count='%r', db_txn_duration_sec='%r', "
+            "db_sched_duration_sec='%r', evt_db_fetch_count='%r'>"
+        ) % (
+            self.ru_stime,
+            self.ru_utime,
+            self.db_txn_count,
+            self.db_txn_duration_sec,
+            self.db_sched_duration_sec,
+            self.evt_db_fetch_count,
+        )
 
     def __iadd__(self, other):
         """Add another ContextResourceUsage's stats to this one's.
@@ -159,11 +167,15 @@ class LoggingContext(object):
     """
 
     __slots__ = [
-        "previous_context", "name", "parent_context",
+        "previous_context",
+        "name",
+        "parent_context",
         "_resource_usage",
         "usage_start",
-        "main_thread", "alive",
-        "request", "tag",
+        "main_thread",
+        "alive",
+        "request",
+        "tag",
     ]
 
     thread_local = threading.local()
@@ -196,6 +208,7 @@ class LoggingContext(object):
 
         def __nonzero__(self):
             return False
+
         __bool__ = __nonzero__  # python3
 
     sentinel = Sentinel()
@@ -261,7 +274,8 @@ class LoggingContext(object):
         if self.previous_context != old_context:
             logger.warn(
                 "Expected previous context %r, found %r",
-                self.previous_context, old_context
+                self.previous_context,
+                old_context,
             )
         self.alive = True
 
@@ -285,9 +299,8 @@ class LoggingContext(object):
         self.alive = False
 
         # if we have a parent, pass our CPU usage stats on
-        if (
-            self.parent_context is not None
-            and hasattr(self.parent_context, '_resource_usage')
+        if self.parent_context is not None and hasattr(
+            self.parent_context, "_resource_usage"
         ):
             self.parent_context._resource_usage += self._resource_usage
 
@@ -320,15 +333,12 @@ class LoggingContext(object):
 
         # When we stop, let's record the cpu used since we started
         if not self.usage_start:
-            logger.warning(
-                "Called stop on logcontext %s without calling start", self,
-            )
+            logger.warning("Called stop on logcontext %s without calling start", self)
             return
 
-        usage_end = get_thread_resource_usage()
-
-        self._resource_usage.ru_utime += usage_end.ru_utime - self.usage_start.ru_utime
-        self._resource_usage.ru_stime += usage_end.ru_stime - self.usage_start.ru_stime
+        utime_delta, stime_delta = self._get_cputime()
+        self._resource_usage.ru_utime += utime_delta
+        self._resource_usage.ru_stime += stime_delta
 
         self.usage_start = None
 
@@ -346,13 +356,44 @@ class LoggingContext(object):
         # can include resource usage so far.
         is_main_thread = threading.current_thread() is self.main_thread
         if self.alive and self.usage_start and is_main_thread:
-            current = get_thread_resource_usage()
-            res.ru_utime += current.ru_utime - self.usage_start.ru_utime
-            res.ru_stime += current.ru_stime - self.usage_start.ru_stime
+            utime_delta, stime_delta = self._get_cputime()
+            res.ru_utime += utime_delta
+            res.ru_stime += stime_delta
 
         return res
 
+    def _get_cputime(self):
+        """Get the cpu usage time so far
+
+        Returns: Tuple[float, float]: seconds in user mode, seconds in system mode
+        """
+        current = get_thread_resource_usage()
+
+        utime_delta = current.ru_utime - self.usage_start.ru_utime
+        stime_delta = current.ru_stime - self.usage_start.ru_stime
+
+        # sanity check
+        if utime_delta < 0:
+            logger.error(
+                "utime went backwards! %f < %f",
+                current.ru_utime,
+                self.usage_start.ru_utime,
+            )
+            utime_delta = 0
+
+        if stime_delta < 0:
+            logger.error(
+                "stime went backwards! %f < %f",
+                current.ru_stime,
+                self.usage_start.ru_stime,
+            )
+            stime_delta = 0
+
+        return utime_delta, stime_delta
+
     def add_database_transaction(self, duration_sec):
+        if duration_sec < 0:
+            raise ValueError("DB txn time can only be non-negative")
         self._resource_usage.db_txn_count += 1
         self._resource_usage.db_txn_duration_sec += duration_sec
 
@@ -363,6 +404,8 @@ class LoggingContext(object):
             sched_sec (float): number of seconds it took us to get a
                 connection
         """
+        if sched_sec < 0:
+            raise ValueError("DB scheduling time can only be non-negative")
         self._resource_usage.db_sched_duration_sec += sched_sec
 
     def record_event_fetch(self, event_count):
@@ -381,6 +424,7 @@ class LoggingContextFilter(logging.Filter):
         **defaults: Default values to avoid formatters complaining about
             missing fields
     """
+
     def __init__(self, **defaults):
         self.defaults = defaults
 
@@ -416,17 +460,12 @@ class PreserveLoggingContext(object):
 
     def __enter__(self):
         """Captures the current logging context"""
-        self.current_context = LoggingContext.set_current_context(
-            self.new_context
-        )
+        self.current_context = LoggingContext.set_current_context(self.new_context)
 
         if self.current_context:
             self.has_parent = self.current_context.previous_context is not None
             if not self.current_context.alive:
-                logger.debug(
-                    "Entering dead context: %s",
-                    self.current_context,
-                )
+                logger.debug("Entering dead context: %s", self.current_context)
 
     def __exit__(self, type, value, traceback):
         """Restores the current logging context"""
@@ -444,10 +483,7 @@ class PreserveLoggingContext(object):
 
         if self.current_context is not LoggingContext.sentinel:
             if not self.current_context.alive:
-                logger.debug(
-                    "Restoring dead context: %s",
-                    self.current_context,
-                )
+                logger.debug("Restoring dead context: %s", self.current_context)
 
 
 def nested_logging_context(suffix, parent_context=None):
@@ -474,15 +510,16 @@ def nested_logging_context(suffix, parent_context=None):
     if parent_context is None:
         parent_context = LoggingContext.current_context()
     return LoggingContext(
-        parent_context=parent_context,
-        request=parent_context.request + "-" + suffix,
+        parent_context=parent_context, request=parent_context.request + "-" + suffix
     )
 
 
 def preserve_fn(f):
     """Function decorator which wraps the function with run_in_background"""
+
     def g(*args, **kwargs):
         return run_in_background(f, *args, **kwargs)
+
     return g
 
 
@@ -502,7 +539,7 @@ def run_in_background(f, *args, **kwargs):
     current = LoggingContext.current_context()
     try:
         res = f(*args, **kwargs)
-    except:   # noqa: E722
+    except:  # noqa: E722
         # the assumption here is that the caller doesn't want to be disturbed
         # by synchronous exceptions, so let's turn them into Failures.
         return defer.fail()
@@ -639,6 +676,4 @@ def defer_to_threadpool(reactor, threadpool, f, *args, **kwargs):
         with LoggingContext(parent_context=logcontext):
             return f(*args, **kwargs)
 
-    return make_deferred_yieldable(
-        threads.deferToThreadPool(reactor, threadpool, g)
-    )
+    return make_deferred_yieldable(threads.deferToThreadPool(reactor, threadpool, g))