diff --git a/pyproject.toml b/pyproject.toml
index 29e8383330..a0627e9ccc 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -293,7 +293,7 @@ all = [
# matrix-synapse-ldap3
"matrix-synapse-ldap3",
# postgres
- "psycopg2", "psycopg2cffi", "psycopg2cffi-compat",
+ "psycopg2", "psycopg2cffi", "psycopg2cffi-compat", "psycopg",
# saml2
"pysaml2",
# oidc and jwt
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 6aeaedeaef..45962d07cc 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -60,9 +60,9 @@ from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.background_updates import BackgroundUpdater
from synapse.storage.engines import (
BaseDatabaseEngine,
- PostgresEngine,
+ Psycopg2Engine,
PsycopgEngine,
- Sqlite3Engine, Psycopg2Engine,
+ Sqlite3Engine,
)
from synapse.storage.types import Connection, Cursor, SQLQueryParameters
from synapse.util.async_helpers import delay_cancellation
@@ -426,18 +426,14 @@ class LoggingTransaction:
values,
)
- def copy_write(
- self,
- sql: str, args: Iterable[Iterable[Any]]
- ) -> None:
+ def copy_write(self, sql: str, args: Iterable[Iterable[Any]]) -> None:
# TODO use _do_execute
with self.txn.copy(sql) as copy:
for record in args:
copy.write_row(record)
def copy_read(
- self,
- sql: str, args: Iterable[Iterable[Any]]
+ self, sql: str, args: Iterable[Iterable[Any]]
) -> Iterable[Tuple[Any, ...]]:
# TODO use _do_execute
sql = self.database_engine.convert_param_style(sql)
@@ -466,6 +462,7 @@ class LoggingTransaction:
"Strip newlines out of SQL so that the loggers in the DB are on one line"
if isinstance(self.database_engine, PsycopgEngine):
import psycopg.sql
+
if isinstance(sql, psycopg.sql.Composed):
return sql.as_string(None)
@@ -1201,7 +1198,8 @@ class DatabasePool:
elif isinstance(txn.database_engine, PsycopgEngine):
sql = "COPY %s (%s) FROM STDIN" % (
- table, ", ".join(k for k in keys),
+ table,
+ ", ".join(k for k in keys),
)
txn.copy_write(sql, values)
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index e2e0717412..6d0d8a5402 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -47,7 +47,7 @@ from synapse.storage.database import (
)
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.databases.main.signatures import SignatureWorkerStore
-from synapse.storage.engines import PostgresEngine, Sqlite3Engine, Psycopg2Engine
+from synapse.storage.engines import PostgresEngine, Psycopg2Engine, Sqlite3Engine
from synapse.types import JsonDict, StrCollection
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
@@ -332,9 +332,13 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
AND sequence_number <= max_seq
)
TO STDOUT
- """ % (", ".join("(?, ?)" for _ in chains))
+ """ % (
+ ", ".join("(?, ?)" for _ in chains)
+ )
# Flatten the arguments.
- rows = txn.copy_read(sql, list(itertools.chain.from_iterable(chains.items())))
+ rows = txn.copy_read(
+ sql, list(itertools.chain.from_iterable(chains.items()))
+ )
results.update(r for r, in rows)
else:
# For SQLite we just fall back to doing a noddy for loop.
@@ -622,7 +626,9 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
AND min_seq < sequence_number AND sequence_number <= max_seq
)
TO STDOUT
- """ % (", ".join("(?, ?, ?)" for _ in args))
+ """ % (
+ ", ".join("(?, ?, ?)" for _ in args)
+ )
# Flatten the arguments.
rows = txn.copy_read(sql, list(itertools.chain.from_iterable(args)))
result.update(r for r, in rows)
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index ba99e63d26..ed29d1fa5d 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -96,10 +96,10 @@ from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
LoggingTransaction,
- PostgresEngine,
)
from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
from synapse.storage.databases.main.stream import StreamWorkerStore
+from synapse.storage.engines import PostgresEngine
from synapse.types import JsonDict
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index 96379602ba..ed41e52201 100644
--- a/synapse/storage/databases/main/user_directory.py
+++ b/synapse/storage/databases/main/user_directory.py
@@ -54,8 +54,12 @@ from synapse.storage.database import (
)
from synapse.storage.databases.main.state import StateFilter
from synapse.storage.databases.main.state_deltas import StateDeltasStore
-from synapse.storage.engines import PostgresEngine, Sqlite3Engine, PsycopgEngine, \
- Psycopg2Engine
+from synapse.storage.engines import (
+ PostgresEngine,
+ Psycopg2Engine,
+ PsycopgEngine,
+ Sqlite3Engine,
+)
from synapse.types import (
JsonDict,
UserID,
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index c28730c4ca..f357d876ce 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -33,7 +33,9 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
-class PostgresEngine(BaseDatabaseEngine[ConnectionType, CursorType], metaclass=abc.ABCMeta):
+class PostgresEngine(
+ BaseDatabaseEngine[ConnectionType, CursorType], metaclass=abc.ABCMeta
+):
isolation_level_map: Mapping[int, int]
default_isolation_level: int
OperationalError: Type[Exception]
@@ -168,7 +170,11 @@ class PostgresEngine(BaseDatabaseEngine[ConnectionType, CursorType], metaclass=a
if self.__class__.__name__ == "Psycopg2Engine":
cursor.execute("SET statement_timeout TO ?", (self.statement_timeout,))
else:
- cursor.execute(sql.SQL("SET statement_timeout TO {}").format(self.statement_timeout))
+ cursor.execute(
+ sql.SQL("SET statement_timeout TO {}").format(
+ self.statement_timeout
+ )
+ )
cursor.close()
db_conn.commit()
diff --git a/synapse/storage/engines/psycopg.py b/synapse/storage/engines/psycopg.py
index ddd745217f..f5d2bf8471 100644
--- a/synapse/storage/engines/psycopg.py
+++ b/synapse/storage/engines/psycopg.py
@@ -22,9 +22,7 @@ import psycopg.sql
from twisted.enterprise.adbapi import Connection as TxConnection
from synapse.storage.engines import PostgresEngine
-from synapse.storage.engines._base import (
- IsolationLevel,
-)
+from synapse.storage.engines._base import IsolationLevel
if TYPE_CHECKING:
pass
diff --git a/synapse/storage/engines/psycopg2.py b/synapse/storage/engines/psycopg2.py
index 78b7a4c51f..5e57d8faf8 100644
--- a/synapse/storage/engines/psycopg2.py
+++ b/synapse/storage/engines/psycopg2.py
@@ -18,9 +18,7 @@ from typing import TYPE_CHECKING, Any, Mapping, Optional
import psycopg2.extensions
from synapse.storage.engines import PostgresEngine
-from synapse.storage.engines._base import (
- IsolationLevel,
-)
+from synapse.storage.engines._base import IsolationLevel
if TYPE_CHECKING:
pass
diff --git a/synapse/storage/schema/main/delta/69/01as_txn_seq.py b/synapse/storage/schema/main/delta/69/01as_txn_seq.py
index fc14845df3..9dd5a27a3f 100644
--- a/synapse/storage/schema/main/delta/69/01as_txn_seq.py
+++ b/synapse/storage/schema/main/delta/69/01as_txn_seq.py
@@ -47,8 +47,7 @@ def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) ->
if isinstance(database_engine, PsycopgEngine):
import psycopg.sql
- cur.execute(
- psycopg.sql.SQL(sql).format(args)
- )
+
+ cur.execute(psycopg.sql.SQL(sql).format(args))
else:
cur.execute(sql, args)
|