From 55370331da54c46c04253b009865097fe9e95191 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Tue, 10 Jul 2018 13:56:07 +0100 Subject: Refactor logcontext resource usage tracking (#3501) Factor out the resource usage tracking out to a separate object, which can be passed around and copied independently of the logcontext itself. --- synapse/util/logcontext.py | 144 ++++++++++++++++++++++++++++++++++----------- 1 file changed, 111 insertions(+), 33 deletions(-) (limited to 'synapse/util/logcontext.py') diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index fe9288b031..44a0a56818 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -49,6 +49,90 @@ except Exception: return None +class ContextResourceUsage(object): + """Object for tracking the resources used by a log context + + Attributes: + ru_utime (float): user CPU time (in seconds) + ru_stime (float): system CPU time (in seconds) + db_txn_count (int): number of database transactions done + db_sched_duration_sec (float): amount of time spent waiting for a + database connection + db_txn_duration_sec (float): amount of time spent doing database + transactions (excluding scheduling time) + evt_db_fetch_count (int): number of events requested from the database + """ + + __slots__ = [ + "ru_stime", "ru_utime", + "db_txn_count", "db_txn_duration_sec", "db_sched_duration_sec", + "evt_db_fetch_count", + ] + + def __init__(self, copy_from=None): + """Create a new ContextResourceUsage + + Args: + copy_from (ContextResourceUsage|None): if not None, an object to + copy stats from + """ + if copy_from is None: + self.reset() + else: + self.ru_utime = copy_from.ru_utime + self.ru_stime = copy_from.ru_stime + self.db_txn_count = copy_from.db_txn_count + + self.db_txn_duration_sec = copy_from.db_txn_duration_sec + self.db_sched_duration_sec = copy_from.db_sched_duration_sec + self.evt_db_fetch_count = copy_from.evt_db_fetch_count + + def copy(self): + return ContextResourceUsage(copy_from=self) + + def reset(self): + self.ru_stime = 0. + self.ru_utime = 0. + self.db_txn_count = 0 + + self.db_txn_duration_sec = 0 + self.db_sched_duration_sec = 0 + self.evt_db_fetch_count = 0 + + def __iadd__(self, other): + """Add another ContextResourceUsage's stats to this one's. + + Args: + other (ContextResourceUsage): the other resource usage object + """ + self.ru_utime += other.ru_utime + self.ru_stime += other.ru_stime + self.db_txn_count += other.db_txn_count + self.db_txn_duration_sec += other.db_txn_duration_sec + self.db_sched_duration_sec += other.db_sched_duration_sec + self.evt_db_fetch_count += other.evt_db_fetch_count + return self + + def __isub__(self, other): + self.ru_utime -= other.ru_utime + self.ru_stime -= other.ru_stime + self.db_txn_count -= other.db_txn_count + self.db_txn_duration_sec -= other.db_txn_duration_sec + self.db_sched_duration_sec -= other.db_sched_duration_sec + self.evt_db_fetch_count -= other.evt_db_fetch_count + return self + + def __add__(self, other): + res = ContextResourceUsage(copy_from=self) + res += other + return res + + def __sub__(self, other): + res = ContextResourceUsage(copy_from=self) + res -= other + return res + + class LoggingContext(object): """Additional context for log formatting. Contexts are scoped within a "with" block. @@ -58,9 +142,8 @@ class LoggingContext(object): """ __slots__ = [ - "previous_context", "name", "ru_stime", "ru_utime", - "db_txn_count", "db_txn_duration_sec", "db_sched_duration_sec", - "evt_db_fetch_count", + "previous_context", "name", + "_resource_usage", "usage_start", "main_thread", "alive", "request", "tag", @@ -103,18 +186,9 @@ class LoggingContext(object): def __init__(self, name=None): self.previous_context = LoggingContext.current_context() self.name = name - self.ru_stime = 0. - self.ru_utime = 0. - self.db_txn_count = 0 - # sec spent waiting for db txns, excluding scheduling time - self.db_txn_duration_sec = 0 - - # sec spent waiting for db txns to be scheduled - self.db_sched_duration_sec = 0 - - # number of events this thread has fetched from the db - self.evt_db_fetch_count = 0 + # track the resources used by this context so far + self._resource_usage = ContextResourceUsage() # If alive has the thread resource usage when the logcontext last # became active. @@ -207,39 +281,43 @@ class LoggingContext(object): logger.warning("Stopped logcontext %s on different thread", self) return - # When we stop, let's record the resource used since we started - if self.usage_start: - usage_end = get_thread_resource_usage() + # When we stop, let's record the cpu used since we started + if not self.usage_start: + logger.warning( + "Called stop on logcontext %s without calling start", self, + ) + return + + usage_end = get_thread_resource_usage() - self.ru_utime += usage_end.ru_utime - self.usage_start.ru_utime - self.ru_stime += usage_end.ru_stime - self.usage_start.ru_stime + self._resource_usage.ru_utime += usage_end.ru_utime - self.usage_start.ru_utime + self._resource_usage.ru_stime += usage_end.ru_stime - self.usage_start.ru_stime - self.usage_start = None - else: - logger.warning("Called stop on logcontext %s without calling start", self) + self.usage_start = None def get_resource_usage(self): - """Get CPU time used by this logcontext so far. + """Get resources used by this logcontext so far. Returns: - tuple[float, float]: The user and system CPU usage in seconds + ContextResourceUsage: a *copy* of the object tracking resource + usage so far """ - ru_utime = self.ru_utime - ru_stime = self.ru_stime + # we always return a copy, for consistency + res = self._resource_usage.copy() # If we are on the correct thread and we're currently running then we # can include resource usage so far. is_main_thread = threading.current_thread() is self.main_thread if self.alive and self.usage_start and is_main_thread: current = get_thread_resource_usage() - ru_utime += current.ru_utime - self.usage_start.ru_utime - ru_stime += current.ru_stime - self.usage_start.ru_stime + res.ru_utime += current.ru_utime - self.usage_start.ru_utime + res.ru_stime += current.ru_stime - self.usage_start.ru_stime - return ru_utime, ru_stime + return res def add_database_transaction(self, duration_sec): - self.db_txn_count += 1 - self.db_txn_duration_sec += duration_sec + self._resource_usage.db_txn_count += 1 + self._resource_usage.db_txn_duration_sec += duration_sec def add_database_scheduled(self, sched_sec): """Record a use of the database pool @@ -248,7 +326,7 @@ class LoggingContext(object): sched_sec (float): number of seconds it took us to get a connection """ - self.db_sched_duration_sec += sched_sec + self._resource_usage.db_sched_duration_sec += sched_sec def record_event_fetch(self, event_count): """Record a number of events being fetched from the db @@ -256,7 +334,7 @@ class LoggingContext(object): Args: event_count (int): number of events being fetched """ - self.evt_db_fetch_count += event_count + self._resource_usage.evt_db_fetch_count += event_count class LoggingContextFilter(logging.Filter): -- cgit 1.5.1 From c3c29aa19625324ac9a7a8ebcdd58a9c0b457f74 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Tue, 10 Jul 2018 16:12:36 +0100 Subject: Attempt to include db threads in cpu usage stats (#3496) Let's try to include time spent in the DB threads in the per-request/block cpu usage metrics. --- changelog.d/3496.feature | 1 + synapse/storage/_base.py | 32 +++++++++++++++----------------- synapse/storage/events_worker.py | 3 ++- synapse/util/logcontext.py | 23 +++++++++++++++++++++-- 4 files changed, 39 insertions(+), 20 deletions(-) create mode 100644 changelog.d/3496.feature (limited to 'synapse/util/logcontext.py') diff --git a/changelog.d/3496.feature b/changelog.d/3496.feature new file mode 100644 index 0000000000..6a06a7e755 --- /dev/null +++ b/changelog.d/3496.feature @@ -0,0 +1 @@ +Include CPU time from database threads in request/block metrics. diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 1fd5d8f162..98dde77431 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -220,7 +220,7 @@ class SQLBaseStore(object): self._clock.looping_call(loop, 10000) def _new_transaction(self, conn, desc, after_callbacks, exception_callbacks, - logging_context, func, *args, **kwargs): + func, *args, **kwargs): start = time.time() txn_id = self._TXN_ID @@ -284,8 +284,7 @@ class SQLBaseStore(object): end = time.time() duration = end - start - if logging_context is not None: - logging_context.add_database_transaction(duration) + LoggingContext.current_context().add_database_transaction(duration) transaction_logger.debug("[TXN END] {%s} %f sec", name, duration) @@ -309,19 +308,15 @@ class SQLBaseStore(object): Returns: Deferred: The result of func """ - current_context = LoggingContext.current_context() - after_callbacks = [] exception_callbacks = [] - def inner_func(conn, *args, **kwargs): - return self._new_transaction( - conn, desc, after_callbacks, exception_callbacks, current_context, - func, *args, **kwargs - ) - try: - result = yield self.runWithConnection(inner_func, *args, **kwargs) + result = yield self.runWithConnection( + self._new_transaction, + desc, after_callbacks, exception_callbacks, func, + *args, **kwargs + ) for after_callback, after_args, after_kwargs in after_callbacks: after_callback(*after_args, **after_kwargs) @@ -346,22 +341,25 @@ class SQLBaseStore(object): Returns: Deferred: The result of func """ - current_context = LoggingContext.current_context() + parent_context = LoggingContext.current_context() + if parent_context == LoggingContext.sentinel: + logger.warn( + "Running db txn from sentinel context: metrics will be lost", + ) + parent_context = None start_time = time.time() def inner_func(conn, *args, **kwargs): - with LoggingContext("runWithConnection") as context: + with LoggingContext("runWithConnection", parent_context) as context: sched_duration_sec = time.time() - start_time sql_scheduling_timer.observe(sched_duration_sec) - current_context.add_database_scheduled(sched_duration_sec) + context.add_database_scheduled(sched_duration_sec) if self.database_engine.is_connection_closed(conn): logger.debug("Reconnecting closed database connection") conn.reconnect() - current_context.copy_to(context) - return func(conn, *args, **kwargs) with PreserveLoggingContext(): diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index fa2659403d..67433606c6 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -261,7 +261,8 @@ class EventsWorkerStore(SQLBaseStore): ] rows = self._new_transaction( - conn, "do_fetch", [], [], None, self._fetch_event_rows, event_ids + conn, "do_fetch", [], [], + self._fetch_event_rows, event_ids, ) row_dict = { diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 44a0a56818..f6c7175f74 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -137,12 +137,18 @@ class LoggingContext(object): """Additional context for log formatting. Contexts are scoped within a "with" block. + If a parent is given when creating a new context, then: + - logging fields are copied from the parent to the new context on entry + - when the new context exits, the cpu usage stats are copied from the + child to the parent + Args: name (str): Name for the context for debugging. + parent_context (LoggingContext|None): The parent of the new context """ __slots__ = [ - "previous_context", "name", + "previous_context", "name", "parent_context", "_resource_usage", "usage_start", "main_thread", "alive", @@ -183,7 +189,7 @@ class LoggingContext(object): sentinel = Sentinel() - def __init__(self, name=None): + def __init__(self, name=None, parent_context=None): self.previous_context = LoggingContext.current_context() self.name = name @@ -199,6 +205,8 @@ class LoggingContext(object): self.tag = "" self.alive = True + self.parent_context = parent_context + def __str__(self): return "%s@%x" % (self.name, id(self)) @@ -236,6 +244,10 @@ class LoggingContext(object): self.previous_context, old_context ) self.alive = True + + if self.parent_context is not None: + self.parent_context.copy_to(self) + return self def __exit__(self, type, value, traceback): @@ -257,6 +269,13 @@ class LoggingContext(object): self.previous_context = None self.alive = False + # if we have a parent, pass our CPU usage stats on + if self.parent_context is not None: + self.parent_context._resource_usage += self._resource_usage + + # reset them in case we get entered again + self._resource_usage.reset() + def copy_to(self, record): """Copy logging fields from this context to a log record or another LoggingContext -- cgit 1.5.1 From 95ccb6e2ec57f2150a697ea9cde030e8f78d6db9 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Thu, 19 Jul 2018 20:58:18 +1000 Subject: Don't spew errors because we can't save metrics (#3563) --- changelog.d/3548.bugfix | 1 + synapse/util/logcontext.py | 11 +++++++++++ synapse/util/metrics.py | 19 +++++++++++++------ 3 files changed, 25 insertions(+), 6 deletions(-) create mode 100644 changelog.d/3548.bugfix (limited to 'synapse/util/logcontext.py') diff --git a/changelog.d/3548.bugfix b/changelog.d/3548.bugfix new file mode 100644 index 0000000000..38dc3b1232 --- /dev/null +++ b/changelog.d/3548.bugfix @@ -0,0 +1 @@ +Catch failures saving metrics captured by Measure, and instead log the faulty metrics information for further analysis. diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index f6c7175f74..8dcae50b39 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -99,6 +99,17 @@ class ContextResourceUsage(object): self.db_sched_duration_sec = 0 self.evt_db_fetch_count = 0 + def __repr__(self): + return ("") % ( + self.ru_stime, + self.ru_utime, + self.db_txn_count, + self.db_txn_duration_sec, + self.db_sched_duration_sec, + self.evt_db_fetch_count,) + def __iadd__(self, other): """Add another ContextResourceUsage's stats to this one's. diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index 6ba7107896..97f1267380 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -104,12 +104,19 @@ class Measure(object): logger.warn("Expected context. (%r)", self.name) return - usage = context.get_resource_usage() - self.start_usage - block_ru_utime.labels(self.name).inc(usage.ru_utime) - block_ru_stime.labels(self.name).inc(usage.ru_stime) - block_db_txn_count.labels(self.name).inc(usage.db_txn_count) - block_db_txn_duration.labels(self.name).inc(usage.db_txn_duration_sec) - block_db_sched_duration.labels(self.name).inc(usage.db_sched_duration_sec) + current = context.get_resource_usage() + usage = current - self.start_usage + try: + block_ru_utime.labels(self.name).inc(usage.ru_utime) + block_ru_stime.labels(self.name).inc(usage.ru_stime) + block_db_txn_count.labels(self.name).inc(usage.db_txn_count) + block_db_txn_duration.labels(self.name).inc(usage.db_txn_duration_sec) + block_db_sched_duration.labels(self.name).inc(usage.db_sched_duration_sec) + except ValueError: + logger.warn( + "Failed to save metrics! OLD: %r, NEW: %r", + self.start_usage, current + ) if self.created_context: self.start_context.__exit__(exc_type, exc_val, exc_tb) -- cgit 1.5.1