diff options
Diffstat (limited to 'synapse/util/logcontext.py')
-rw-r--r-- | synapse/util/logcontext.py | 129 |
1 files changed, 82 insertions, 47 deletions
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index fe412355d8..6b0d2deea0 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -42,6 +42,8 @@ try: def get_thread_resource_usage(): return resource.getrusage(RUSAGE_THREAD) + + except Exception: # If the system doesn't support resource.getrusage(RUSAGE_THREAD) then we # won't track resource usage by returning None. @@ -64,8 +66,11 @@ class ContextResourceUsage(object): """ __slots__ = [ - "ru_stime", "ru_utime", - "db_txn_count", "db_txn_duration_sec", "db_sched_duration_sec", + "ru_stime", + "ru_utime", + "db_txn_count", + "db_txn_duration_sec", + "db_sched_duration_sec", "evt_db_fetch_count", ] @@ -91,8 +96,8 @@ class ContextResourceUsage(object): return ContextResourceUsage(copy_from=self) def reset(self): - self.ru_stime = 0. - self.ru_utime = 0. + self.ru_stime = 0.0 + self.ru_utime = 0.0 self.db_txn_count = 0 self.db_txn_duration_sec = 0 @@ -100,15 +105,18 @@ class ContextResourceUsage(object): self.evt_db_fetch_count = 0 def __repr__(self): - return ("<ContextResourceUsage ru_stime='%r', ru_utime='%r', " - "db_txn_count='%r', db_txn_duration_sec='%r', " - "db_sched_duration_sec='%r', evt_db_fetch_count='%r'>") % ( - self.ru_stime, - self.ru_utime, - self.db_txn_count, - self.db_txn_duration_sec, - self.db_sched_duration_sec, - self.evt_db_fetch_count,) + return ( + "<ContextResourceUsage ru_stime='%r', ru_utime='%r', " + "db_txn_count='%r', db_txn_duration_sec='%r', " + "db_sched_duration_sec='%r', evt_db_fetch_count='%r'>" + ) % ( + self.ru_stime, + self.ru_utime, + self.db_txn_count, + self.db_txn_duration_sec, + self.db_sched_duration_sec, + self.evt_db_fetch_count, + ) def __iadd__(self, other): """Add another ContextResourceUsage's stats to this one's. @@ -159,11 +167,15 @@ class LoggingContext(object): """ __slots__ = [ - "previous_context", "name", "parent_context", + "previous_context", + "name", + "parent_context", "_resource_usage", "usage_start", - "main_thread", "alive", - "request", "tag", + "main_thread", + "alive", + "request", + "tag", ] thread_local = threading.local() @@ -196,6 +208,7 @@ class LoggingContext(object): def __nonzero__(self): return False + __bool__ = __nonzero__ # python3 sentinel = Sentinel() @@ -261,7 +274,8 @@ class LoggingContext(object): if self.previous_context != old_context: logger.warn( "Expected previous context %r, found %r", - self.previous_context, old_context + self.previous_context, + old_context, ) self.alive = True @@ -285,9 +299,8 @@ class LoggingContext(object): self.alive = False # if we have a parent, pass our CPU usage stats on - if ( - self.parent_context is not None - and hasattr(self.parent_context, '_resource_usage') + if self.parent_context is not None and hasattr( + self.parent_context, "_resource_usage" ): self.parent_context._resource_usage += self._resource_usage @@ -320,15 +333,12 @@ class LoggingContext(object): # When we stop, let's record the cpu used since we started if not self.usage_start: - logger.warning( - "Called stop on logcontext %s without calling start", self, - ) + logger.warning("Called stop on logcontext %s without calling start", self) return - usage_end = get_thread_resource_usage() - - self._resource_usage.ru_utime += usage_end.ru_utime - self.usage_start.ru_utime - self._resource_usage.ru_stime += usage_end.ru_stime - self.usage_start.ru_stime + utime_delta, stime_delta = self._get_cputime() + self._resource_usage.ru_utime += utime_delta + self._resource_usage.ru_stime += stime_delta self.usage_start = None @@ -346,13 +356,44 @@ class LoggingContext(object): # can include resource usage so far. is_main_thread = threading.current_thread() is self.main_thread if self.alive and self.usage_start and is_main_thread: - current = get_thread_resource_usage() - res.ru_utime += current.ru_utime - self.usage_start.ru_utime - res.ru_stime += current.ru_stime - self.usage_start.ru_stime + utime_delta, stime_delta = self._get_cputime() + res.ru_utime += utime_delta + res.ru_stime += stime_delta return res + def _get_cputime(self): + """Get the cpu usage time so far + + Returns: Tuple[float, float]: seconds in user mode, seconds in system mode + """ + current = get_thread_resource_usage() + + utime_delta = current.ru_utime - self.usage_start.ru_utime + stime_delta = current.ru_stime - self.usage_start.ru_stime + + # sanity check + if utime_delta < 0: + logger.error( + "utime went backwards! %f < %f", + current.ru_utime, + self.usage_start.ru_utime, + ) + utime_delta = 0 + + if stime_delta < 0: + logger.error( + "stime went backwards! %f < %f", + current.ru_stime, + self.usage_start.ru_stime, + ) + stime_delta = 0 + + return utime_delta, stime_delta + def add_database_transaction(self, duration_sec): + if duration_sec < 0: + raise ValueError("DB txn time can only be non-negative") self._resource_usage.db_txn_count += 1 self._resource_usage.db_txn_duration_sec += duration_sec @@ -363,6 +404,8 @@ class LoggingContext(object): sched_sec (float): number of seconds it took us to get a connection """ + if sched_sec < 0: + raise ValueError("DB scheduling time can only be non-negative") self._resource_usage.db_sched_duration_sec += sched_sec def record_event_fetch(self, event_count): @@ -381,6 +424,7 @@ class LoggingContextFilter(logging.Filter): **defaults: Default values to avoid formatters complaining about missing fields """ + def __init__(self, **defaults): self.defaults = defaults @@ -416,17 +460,12 @@ class PreserveLoggingContext(object): def __enter__(self): """Captures the current logging context""" - self.current_context = LoggingContext.set_current_context( - self.new_context - ) + self.current_context = LoggingContext.set_current_context(self.new_context) if self.current_context: self.has_parent = self.current_context.previous_context is not None if not self.current_context.alive: - logger.debug( - "Entering dead context: %s", - self.current_context, - ) + logger.debug("Entering dead context: %s", self.current_context) def __exit__(self, type, value, traceback): """Restores the current logging context""" @@ -444,10 +483,7 @@ class PreserveLoggingContext(object): if self.current_context is not LoggingContext.sentinel: if not self.current_context.alive: - logger.debug( - "Restoring dead context: %s", - self.current_context, - ) + logger.debug("Restoring dead context: %s", self.current_context) def nested_logging_context(suffix, parent_context=None): @@ -474,15 +510,16 @@ def nested_logging_context(suffix, parent_context=None): if parent_context is None: parent_context = LoggingContext.current_context() return LoggingContext( - parent_context=parent_context, - request=parent_context.request + "-" + suffix, + parent_context=parent_context, request=parent_context.request + "-" + suffix ) def preserve_fn(f): """Function decorator which wraps the function with run_in_background""" + def g(*args, **kwargs): return run_in_background(f, *args, **kwargs) + return g @@ -502,7 +539,7 @@ def run_in_background(f, *args, **kwargs): current = LoggingContext.current_context() try: res = f(*args, **kwargs) - except: # noqa: E722 + except: # noqa: E722 # the assumption here is that the caller doesn't want to be disturbed # by synchronous exceptions, so let's turn them into Failures. return defer.fail() @@ -639,6 +676,4 @@ def defer_to_threadpool(reactor, threadpool, f, *args, **kwargs): with LoggingContext(parent_context=logcontext): return f(*args, **kwargs) - return make_deferred_yieldable( - threads.deferToThreadPool(reactor, threadpool, g) - ) + return make_deferred_yieldable(threads.deferToThreadPool(reactor, threadpool, g)) |