From 61210567405b1ac7efaa23d5513cc0b443da0a3a Mon Sep 17 00:00:00 2001 From: Sean Quah <8349537+squahtx@users.noreply.github.com> Date: Wed, 16 Mar 2022 15:07:41 +0000 Subject: Handle cancellation in `DatabasePool.runInteraction()` (#12199) To handle cancellation, we ensure that `after_callback`s and `exception_callback`s are always run, since the transaction will complete on another thread regardless of cancellation. We also wait until everything is done before releasing the `CancelledError`, so that logging contexts won't get used after they have been finished. Signed-off-by: Sean Quah --- synapse/storage/database.py | 61 +++++++++++++++++++++++++++------------------ 1 file changed, 37 insertions(+), 24 deletions(-) (limited to 'synapse/storage/database.py') diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 99802228c9..9749f0c06e 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -41,6 +41,7 @@ from prometheus_client import Histogram from typing_extensions import Literal from twisted.enterprise import adbapi +from twisted.internet import defer from synapse.api.errors import StoreError from synapse.config.database import DatabaseConnectionConfig @@ -55,6 +56,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.background_updates import BackgroundUpdater from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine from synapse.storage.types import Connection, Cursor +from synapse.util.async_helpers import delay_cancellation from synapse.util.iterutils import batch_iter if TYPE_CHECKING: @@ -732,34 +734,45 @@ class DatabasePool: Returns: The result of func """ - after_callbacks: List[_CallbackListEntry] = [] - exception_callbacks: List[_CallbackListEntry] = [] - if not current_context(): - logger.warning("Starting db txn '%s' from sentinel context", desc) + async def _runInteraction() -> R: + after_callbacks: List[_CallbackListEntry] = [] + exception_callbacks: List[_CallbackListEntry] = [] - try: - with opentracing.start_active_span(f"db.{desc}"): - result = await self.runWithConnection( - self.new_transaction, - desc, - after_callbacks, - exception_callbacks, - func, - *args, - db_autocommit=db_autocommit, - isolation_level=isolation_level, - **kwargs, - ) + if not current_context(): + logger.warning("Starting db txn '%s' from sentinel context", desc) - for after_callback, after_args, after_kwargs in after_callbacks: - after_callback(*after_args, **after_kwargs) - except Exception: - for after_callback, after_args, after_kwargs in exception_callbacks: - after_callback(*after_args, **after_kwargs) - raise + try: + with opentracing.start_active_span(f"db.{desc}"): + result = await self.runWithConnection( + self.new_transaction, + desc, + after_callbacks, + exception_callbacks, + func, + *args, + db_autocommit=db_autocommit, + isolation_level=isolation_level, + **kwargs, + ) - return cast(R, result) + for after_callback, after_args, after_kwargs in after_callbacks: + after_callback(*after_args, **after_kwargs) + + return cast(R, result) + except Exception: + for after_callback, after_args, after_kwargs in exception_callbacks: + after_callback(*after_args, **after_kwargs) + raise + + # To handle cancellation, we ensure that `after_callback`s and + # `exception_callback`s are always run, since the transaction will complete + # on another thread regardless of cancellation. + # + # We also wait until everything above is done before releasing the + # `CancelledError`, so that logging contexts won't get used after they have been + # finished. + return await delay_cancellation(defer.ensureDeferred(_runInteraction())) async def runWithConnection( self, -- cgit 1.5.1 From f4c5e5864cdc04aa61ad13d6f6ba870df811a881 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Wed, 23 Mar 2022 14:03:24 +0000 Subject: Use psycopg2 type stubs (#12269) --- changelog.d/12269.misc | 1 + setup.py | 1 + synapse/storage/database.py | 14 +++++++++++--- synapse/storage/engines/__init__.py | 2 +- 4 files changed, 14 insertions(+), 4 deletions(-) create mode 100644 changelog.d/12269.misc (limited to 'synapse/storage/database.py') diff --git a/changelog.d/12269.misc b/changelog.d/12269.misc new file mode 100644 index 0000000000..ed79cbb528 --- /dev/null +++ b/changelog.d/12269.misc @@ -0,0 +1 @@ +Use type stubs for `psycopg2`. diff --git a/setup.py b/setup.py index 439ed75d72..63da71ad7b 100755 --- a/setup.py +++ b/setup.py @@ -108,6 +108,7 @@ CONDITIONAL_REQUIREMENTS["mypy"] = [ "types-jsonschema>=3.2.0", "types-opentracing>=2.4.2", "types-Pillow>=8.3.4", + "types-psycopg2>=2.9.9", "types-pyOpenSSL>=20.0.7", "types-PyYAML>=5.4.10", "types-requests>=2.26.0", diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 9749f0c06e..367709a1a7 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -288,7 +288,7 @@ class LoggingTransaction: """ if isinstance(self.database_engine, PostgresEngine): - from psycopg2.extras import execute_batch # type: ignore + from psycopg2.extras import execute_batch self._do_execute(lambda *x: execute_batch(self.txn, *x), sql, args) else: @@ -302,10 +302,18 @@ class LoggingTransaction: rows (e.g. INSERTs). """ assert isinstance(self.database_engine, PostgresEngine) - from psycopg2.extras import execute_values # type: ignore + from psycopg2.extras import execute_values return self._do_execute( - lambda *x: execute_values(self.txn, *x, fetch=fetch), sql, *args + # Type ignore: mypy is unhappy because if `x` is a 5-tuple, then there will + # be two values for `fetch`: one given positionally, and another given + # as a keyword argument. We might be able to fix this by + # - propagating the signature of psycopg2.extras.execute_values to this + # function, or + # - changing `*args: Any` to `values: T` for some appropriate T. + lambda *x: execute_values(self.txn, *x, fetch=fetch), # type: ignore[misc] + sql, + *args, ) def execute(self, sql: str, *args: Any) -> None: diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py index 9abc02046e..afb7d5054d 100644 --- a/synapse/storage/engines/__init__.py +++ b/synapse/storage/engines/__init__.py @@ -27,7 +27,7 @@ def create_engine(database_config) -> BaseDatabaseEngine: if name == "psycopg2": # Note that psycopg2cffi-compat provides the psycopg2 module on pypy. - import psycopg2 # type: ignore + import psycopg2 return PostgresEngine(psycopg2, database_config) -- cgit 1.5.1