summary refs log tree commit diff
path: root/synapse/storage/engines/psycopg.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/engines/psycopg.py')
-rw-r--r--synapse/storage/engines/psycopg.py89
1 files changed, 89 insertions, 0 deletions
diff --git a/synapse/storage/engines/psycopg.py b/synapse/storage/engines/psycopg.py
new file mode 100644

index 0000000000..f5d2bf8471 --- /dev/null +++ b/synapse/storage/engines/psycopg.py
@@ -0,0 +1,89 @@ +# Copyright 2022-2023 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from typing import TYPE_CHECKING, Any, Mapping, Optional + +import psycopg +import psycopg.errors +import psycopg.sql + +from twisted.enterprise.adbapi import Connection as TxConnection + +from synapse.storage.engines import PostgresEngine +from synapse.storage.engines._base import IsolationLevel + +if TYPE_CHECKING: + pass + + +logger = logging.getLogger(__name__) + + +class PsycopgEngine(PostgresEngine[psycopg.Connection, psycopg.Cursor]): + OperationalError = psycopg.OperationalError + + def __init__(self, database_config: Mapping[str, Any]): + super().__init__(psycopg, database_config) + # psycopg2.extensions.register_type(psycopg2.extensions.UNICODE) + + # Disables passing `bytes` to txn.execute, c.f. #6186. If you do + # actually want to use bytes than wrap it in `bytearray`. + # def _disable_bytes_adapter(_: bytes) -> NoReturn: + # raise Exception("Passing bytes to DB is disabled.") + + self.isolation_level_map: Mapping[int, psycopg.IsolationLevel] = { + IsolationLevel.READ_COMMITTED: psycopg.IsolationLevel.READ_COMMITTED, + IsolationLevel.REPEATABLE_READ: psycopg.IsolationLevel.REPEATABLE_READ, + IsolationLevel.SERIALIZABLE: psycopg.IsolationLevel.SERIALIZABLE, + } + self.default_isolation_level = psycopg.IsolationLevel.REPEATABLE_READ + + def get_server_version(self, db_conn: psycopg.Connection) -> int: + return db_conn.info.server_version + + def convert_param_style(self, sql: str) -> str: + if isinstance(sql, psycopg.sql.Composed): + return sql + + return sql.replace("?", "%s") + + def is_deadlock(self, error: Exception) -> bool: + if isinstance(error, psycopg.errors.Error): + # https://www.postgresql.org/docs/current/static/errcodes-appendix.html + # "40001" serialization_failure + # "40P01" deadlock_detected + return error.sqlstate in ["40001", "40P01"] + return False + + def in_transaction(self, conn: psycopg.Connection) -> bool: + return conn.info.transaction_status != psycopg.pq.TransactionStatus.IDLE + + def attempt_to_set_autocommit( + self, conn: psycopg.Connection, autocommit: bool + ) -> None: + # Sometimes this gets called with a Twisted connection instead, unwrap + # it because it doesn't support __setattr__. + if isinstance(conn, TxConnection): + conn = conn._connection + conn.autocommit = autocommit + + def attempt_to_set_isolation_level( + self, conn: psycopg.Connection, isolation_level: Optional[int] + ) -> None: + if isolation_level is None: + pg_isolation_level = self.default_isolation_level + else: + pg_isolation_level = self.isolation_level_map[isolation_level] + conn.isolation_level = pg_isolation_level