diff options
Diffstat (limited to 'synapse/storage/database.py')
-rw-r--r-- | synapse/storage/database.py | 32 |
1 files changed, 26 insertions, 6 deletions
diff --git a/synapse/storage/database.py b/synapse/storage/database.py index e20c5c5302..fec4ae5b97 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -34,6 +34,7 @@ from typing import ( Tuple, Type, TypeVar, + Union, cast, overload, ) @@ -100,6 +101,15 @@ UNIQUE_INDEX_BACKGROUND_UPDATES = { } +class _PoolConnection(Connection): + """ + A Connection from twisted.enterprise.adbapi.Connection. + """ + + def reconnect(self) -> None: + ... + + def make_pool( reactor: IReactorCore, db_config: DatabaseConnectionConfig, @@ -499,6 +509,7 @@ class DatabasePool: """ _TXN_ID = 0 + engine: BaseDatabaseEngine def __init__( self, @@ -671,7 +682,15 @@ class DatabasePool: f = cast(types.FunctionType, func) # type: ignore[redundant-cast] if f.__closure__: for i, cell in enumerate(f.__closure__): - if inspect.isgenerator(cell.cell_contents): + try: + contents = cell.cell_contents + except ValueError: + # cell.cell_contents can raise if the "cell" is empty, + # which indicates that the variable is currently + # unbound. + continue + + if inspect.isgenerator(contents): logger.error( "Programming error: function %s references generator %s " "via its closure", @@ -847,7 +866,8 @@ class DatabasePool: try: with opentracing.start_active_span(f"db.{desc}"): result = await self.runWithConnection( - self.new_transaction, + # mypy seems to have an issue with this, maybe a bug? + self.new_transaction, # type: ignore[arg-type] desc, after_callbacks, async_after_callbacks, @@ -883,7 +903,7 @@ class DatabasePool: async def runWithConnection( self, - func: Callable[..., R], + func: Callable[Concatenate[LoggingDatabaseConnection, P], R], *args: Any, db_autocommit: bool = False, isolation_level: Optional[int] = None, @@ -917,7 +937,7 @@ class DatabasePool: start_time = monotonic_time() - def inner_func(conn, *args, **kwargs): + def inner_func(conn: _PoolConnection, *args: P.args, **kwargs: P.kwargs) -> R: # We shouldn't be in a transaction. If we are then something # somewhere hasn't committed after doing work. (This is likely only # possible during startup, as `run*` will ensure changes are @@ -1010,7 +1030,7 @@ class DatabasePool: decoder: Optional[Callable[[Cursor], R]], query: str, *args: Any, - ) -> R: + ) -> Union[List[Tuple[Any, ...]], R]: """Runs a single query for a result set. Args: @@ -1023,7 +1043,7 @@ class DatabasePool: The result of decoder(results) """ - def interaction(txn): + def interaction(txn: LoggingTransaction) -> Union[List[Tuple[Any, ...]], R]: txn.execute(query, args) if decoder: return decoder(txn) |