diff options
-rw-r--r-- | changelog.d/10113.feature | 1 | ||||
-rw-r--r-- | synapse/logging/opentracing.py | 6 | ||||
-rw-r--r-- | synapse/storage/database.py | 86 |
3 files changed, 60 insertions, 33 deletions
diff --git a/changelog.d/10113.feature b/changelog.d/10113.feature new file mode 100644 index 0000000000..2658ab8918 --- /dev/null +++ b/changelog.d/10113.feature @@ -0,0 +1 @@ +Report OpenTracing spans for database activity. diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index f64845b80c..68f0c00151 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -271,6 +271,12 @@ class SynapseTags: # HTTP request tag (used to distinguish full vs incremental syncs, etc) REQUEST_TAG = "request_tag" + # Text description of a database transaction + DB_TXN_DESC = "db.txn_desc" + + # Uniqueish ID of a database transaction + DB_TXN_ID = "db.txn_id" + # Block everything by default # A regex which matches the server_names to expose traces for. diff --git a/synapse/storage/database.py b/synapse/storage/database.py index a761ad603b..974703d13a 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -40,6 +40,7 @@ from twisted.enterprise import adbapi from synapse.api.errors import StoreError from synapse.config.database import DatabaseConnectionConfig +from synapse.logging import opentracing from synapse.logging.context import ( LoggingContext, current_context, @@ -313,7 +314,14 @@ class LoggingTransaction: start = time.time() try: - return func(sql, *args) + with opentracing.start_active_span( + "db.query", + tags={ + opentracing.tags.DATABASE_TYPE: "sql", + opentracing.tags.DATABASE_STATEMENT: sql, + }, + ): + return func(sql, *args) except Exception as e: sql_logger.debug("[SQL FAIL] {%s} %s", self.name, e) raise @@ -525,9 +533,16 @@ class DatabasePool: exception_callbacks=exception_callbacks, ) try: - r = func(cursor, *args, **kwargs) - conn.commit() - return r + with opentracing.start_active_span( + "db.txn", + tags={ + opentracing.SynapseTags.DB_TXN_DESC: desc, + opentracing.SynapseTags.DB_TXN_ID: name, + }, + ): + r = func(cursor, *args, **kwargs) + conn.commit() + return r except self.engine.module.OperationalError as e: # This can happen if the database disappears mid # transaction. @@ -653,16 +668,17 @@ class DatabasePool: logger.warning("Starting db txn '%s' from sentinel context", desc) try: - result = await self.runWithConnection( - self.new_transaction, - desc, - after_callbacks, - exception_callbacks, - func, - *args, - db_autocommit=db_autocommit, - **kwargs, - ) + with opentracing.start_active_span(f"db.{desc}"): + result = await self.runWithConnection( + self.new_transaction, + desc, + after_callbacks, + exception_callbacks, + func, + *args, + db_autocommit=db_autocommit, + **kwargs, + ) for after_callback, after_args, after_kwargs in after_callbacks: after_callback(*after_args, **after_kwargs) @@ -718,25 +734,29 @@ class DatabasePool: with LoggingContext( str(curr_context), parent_context=parent_context ) as context: - sched_duration_sec = monotonic_time() - start_time - sql_scheduling_timer.observe(sched_duration_sec) - context.add_database_scheduled(sched_duration_sec) - - if self.engine.is_connection_closed(conn): - logger.debug("Reconnecting closed database connection") - conn.reconnect() - - try: - if db_autocommit: - self.engine.attempt_to_set_autocommit(conn, True) - - db_conn = LoggingDatabaseConnection( - conn, self.engine, "runWithConnection" - ) - return func(db_conn, *args, **kwargs) - finally: - if db_autocommit: - self.engine.attempt_to_set_autocommit(conn, False) + with opentracing.start_active_span( + operation_name="db.connection", + ): + sched_duration_sec = monotonic_time() - start_time + sql_scheduling_timer.observe(sched_duration_sec) + context.add_database_scheduled(sched_duration_sec) + + if self.engine.is_connection_closed(conn): + logger.debug("Reconnecting closed database connection") + conn.reconnect() + opentracing.log_kv({"message": "reconnected"}) + + try: + if db_autocommit: + self.engine.attempt_to_set_autocommit(conn, True) + + db_conn = LoggingDatabaseConnection( + conn, self.engine, "runWithConnection" + ) + return func(db_conn, *args, **kwargs) + finally: + if db_autocommit: + self.engine.attempt_to_set_autocommit(conn, False) return await make_deferred_yieldable( self._db_pool.runWithConnection(inner_func, *args, **kwargs) |