diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index fe412355d8..6b0d2deea0 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -42,6 +42,8 @@ try:
def get_thread_resource_usage():
return resource.getrusage(RUSAGE_THREAD)
+
+
except Exception:
# If the system doesn't support resource.getrusage(RUSAGE_THREAD) then we
# won't track resource usage by returning None.
@@ -64,8 +66,11 @@ class ContextResourceUsage(object):
"""
__slots__ = [
- "ru_stime", "ru_utime",
- "db_txn_count", "db_txn_duration_sec", "db_sched_duration_sec",
+ "ru_stime",
+ "ru_utime",
+ "db_txn_count",
+ "db_txn_duration_sec",
+ "db_sched_duration_sec",
"evt_db_fetch_count",
]
@@ -91,8 +96,8 @@ class ContextResourceUsage(object):
return ContextResourceUsage(copy_from=self)
def reset(self):
- self.ru_stime = 0.
- self.ru_utime = 0.
+ self.ru_stime = 0.0
+ self.ru_utime = 0.0
self.db_txn_count = 0
self.db_txn_duration_sec = 0
@@ -100,15 +105,18 @@ class ContextResourceUsage(object):
self.evt_db_fetch_count = 0
def __repr__(self):
- return ("<ContextResourceUsage ru_stime='%r', ru_utime='%r', "
- "db_txn_count='%r', db_txn_duration_sec='%r', "
- "db_sched_duration_sec='%r', evt_db_fetch_count='%r'>") % (
- self.ru_stime,
- self.ru_utime,
- self.db_txn_count,
- self.db_txn_duration_sec,
- self.db_sched_duration_sec,
- self.evt_db_fetch_count,)
+ return (
+ "<ContextResourceUsage ru_stime='%r', ru_utime='%r', "
+ "db_txn_count='%r', db_txn_duration_sec='%r', "
+ "db_sched_duration_sec='%r', evt_db_fetch_count='%r'>"
+ ) % (
+ self.ru_stime,
+ self.ru_utime,
+ self.db_txn_count,
+ self.db_txn_duration_sec,
+ self.db_sched_duration_sec,
+ self.evt_db_fetch_count,
+ )
def __iadd__(self, other):
"""Add another ContextResourceUsage's stats to this one's.
@@ -159,11 +167,15 @@ class LoggingContext(object):
"""
__slots__ = [
- "previous_context", "name", "parent_context",
+ "previous_context",
+ "name",
+ "parent_context",
"_resource_usage",
"usage_start",
- "main_thread", "alive",
- "request", "tag",
+ "main_thread",
+ "alive",
+ "request",
+ "tag",
]
thread_local = threading.local()
@@ -196,6 +208,7 @@ class LoggingContext(object):
def __nonzero__(self):
return False
+
__bool__ = __nonzero__ # python3
sentinel = Sentinel()
@@ -261,7 +274,8 @@ class LoggingContext(object):
if self.previous_context != old_context:
logger.warn(
"Expected previous context %r, found %r",
- self.previous_context, old_context
+ self.previous_context,
+ old_context,
)
self.alive = True
@@ -285,9 +299,8 @@ class LoggingContext(object):
self.alive = False
# if we have a parent, pass our CPU usage stats on
- if (
- self.parent_context is not None
- and hasattr(self.parent_context, '_resource_usage')
+ if self.parent_context is not None and hasattr(
+ self.parent_context, "_resource_usage"
):
self.parent_context._resource_usage += self._resource_usage
@@ -320,15 +333,12 @@ class LoggingContext(object):
# When we stop, let's record the cpu used since we started
if not self.usage_start:
- logger.warning(
- "Called stop on logcontext %s without calling start", self,
- )
+ logger.warning("Called stop on logcontext %s without calling start", self)
return
- usage_end = get_thread_resource_usage()
-
- self._resource_usage.ru_utime += usage_end.ru_utime - self.usage_start.ru_utime
- self._resource_usage.ru_stime += usage_end.ru_stime - self.usage_start.ru_stime
+ utime_delta, stime_delta = self._get_cputime()
+ self._resource_usage.ru_utime += utime_delta
+ self._resource_usage.ru_stime += stime_delta
self.usage_start = None
@@ -346,13 +356,44 @@ class LoggingContext(object):
# can include resource usage so far.
is_main_thread = threading.current_thread() is self.main_thread
if self.alive and self.usage_start and is_main_thread:
- current = get_thread_resource_usage()
- res.ru_utime += current.ru_utime - self.usage_start.ru_utime
- res.ru_stime += current.ru_stime - self.usage_start.ru_stime
+ utime_delta, stime_delta = self._get_cputime()
+ res.ru_utime += utime_delta
+ res.ru_stime += stime_delta
return res
+ def _get_cputime(self):
+ """Get the cpu usage time so far
+
+ Returns: Tuple[float, float]: seconds in user mode, seconds in system mode
+ """
+ current = get_thread_resource_usage()
+
+ utime_delta = current.ru_utime - self.usage_start.ru_utime
+ stime_delta = current.ru_stime - self.usage_start.ru_stime
+
+ # sanity check
+ if utime_delta < 0:
+ logger.error(
+ "utime went backwards! %f < %f",
+ current.ru_utime,
+ self.usage_start.ru_utime,
+ )
+ utime_delta = 0
+
+ if stime_delta < 0:
+ logger.error(
+ "stime went backwards! %f < %f",
+ current.ru_stime,
+ self.usage_start.ru_stime,
+ )
+ stime_delta = 0
+
+ return utime_delta, stime_delta
+
def add_database_transaction(self, duration_sec):
+ if duration_sec < 0:
+ raise ValueError("DB txn time can only be non-negative")
self._resource_usage.db_txn_count += 1
self._resource_usage.db_txn_duration_sec += duration_sec
@@ -363,6 +404,8 @@ class LoggingContext(object):
sched_sec (float): number of seconds it took us to get a
connection
"""
+ if sched_sec < 0:
+ raise ValueError("DB scheduling time can only be non-negative")
self._resource_usage.db_sched_duration_sec += sched_sec
def record_event_fetch(self, event_count):
@@ -381,6 +424,7 @@ class LoggingContextFilter(logging.Filter):
**defaults: Default values to avoid formatters complaining about
missing fields
"""
+
def __init__(self, **defaults):
self.defaults = defaults
@@ -416,17 +460,12 @@ class PreserveLoggingContext(object):
def __enter__(self):
"""Captures the current logging context"""
- self.current_context = LoggingContext.set_current_context(
- self.new_context
- )
+ self.current_context = LoggingContext.set_current_context(self.new_context)
if self.current_context:
self.has_parent = self.current_context.previous_context is not None
if not self.current_context.alive:
- logger.debug(
- "Entering dead context: %s",
- self.current_context,
- )
+ logger.debug("Entering dead context: %s", self.current_context)
def __exit__(self, type, value, traceback):
"""Restores the current logging context"""
@@ -444,10 +483,7 @@ class PreserveLoggingContext(object):
if self.current_context is not LoggingContext.sentinel:
if not self.current_context.alive:
- logger.debug(
- "Restoring dead context: %s",
- self.current_context,
- )
+ logger.debug("Restoring dead context: %s", self.current_context)
def nested_logging_context(suffix, parent_context=None):
@@ -474,15 +510,16 @@ def nested_logging_context(suffix, parent_context=None):
if parent_context is None:
parent_context = LoggingContext.current_context()
return LoggingContext(
- parent_context=parent_context,
- request=parent_context.request + "-" + suffix,
+ parent_context=parent_context, request=parent_context.request + "-" + suffix
)
def preserve_fn(f):
"""Function decorator which wraps the function with run_in_background"""
+
def g(*args, **kwargs):
return run_in_background(f, *args, **kwargs)
+
return g
@@ -502,7 +539,7 @@ def run_in_background(f, *args, **kwargs):
current = LoggingContext.current_context()
try:
res = f(*args, **kwargs)
- except: # noqa: E722
+ except: # noqa: E722
# the assumption here is that the caller doesn't want to be disturbed
# by synchronous exceptions, so let's turn them into Failures.
return defer.fail()
@@ -639,6 +676,4 @@ def defer_to_threadpool(reactor, threadpool, f, *args, **kwargs):
with LoggingContext(parent_context=logcontext):
return f(*args, **kwargs)
- return make_deferred_yieldable(
- threads.deferToThreadPool(reactor, threadpool, g)
- )
+ return make_deferred_yieldable(threads.deferToThreadPool(reactor, threadpool, g))
|