summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--poetry.lock94
-rw-r--r--pyproject.toml5
-rw-r--r--synapse/config/database.py2
-rw-r--r--synapse/storage/background_updates.py14
-rw-r--r--synapse/storage/database.py101
-rw-r--r--synapse/storage/databases/main/event_federation.py3
-rw-r--r--synapse/storage/databases/main/event_push_actions.py2
-rw-r--r--synapse/storage/databases/main/relations.py4
-rw-r--r--synapse/storage/databases/main/search.py8
-rw-r--r--synapse/storage/databases/main/user_directory.py87
-rw-r--r--synapse/storage/databases/state/bg_updates.py4
-rw-r--r--synapse/storage/engines/__init__.py37
-rw-r--r--synapse/storage/engines/_base.py12
-rw-r--r--synapse/storage/engines/postgres.py93
-rw-r--r--synapse/storage/engines/psycopg.py89
-rw-r--r--synapse/storage/engines/psycopg2.py90
-rw-r--r--synapse/storage/engines/sqlite.py10
-rw-r--r--synapse/storage/prepare_database.py11
-rw-r--r--synapse/storage/schema/main/delta/56/unique_user_filter_index.py4
-rw-r--r--synapse/storage/schema/main/delta/68/05partial_state_rooms_triggers.py9
-rw-r--r--synapse/storage/schema/main/delta/69/01as_txn_seq.py18
-rw-r--r--synapse/storage/types.py2
-rw-r--r--synapse/storage/util/id_generators.py2
-rw-r--r--tests/server.py25
-rw-r--r--tests/utils.py7
25 files changed, 547 insertions, 186 deletions
diff --git a/poetry.lock b/poetry.lock

index 77643b0569..fa8261c400 100644 --- a/poetry.lock +++ b/poetry.lock
@@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.7.0 and should not be changed by hand. [[package]] name = "alabaster" @@ -109,6 +109,34 @@ files = [ pytz = {version = ">=2015.7", markers = "python_version < \"3.9\""} [[package]] +name = "backports-zoneinfo" +version = "0.2.1" +description = "Backport of the standard library zoneinfo module" +optional = true +python-versions = ">=3.6" +files = [ + {file = "backports.zoneinfo-0.2.1-cp36-cp36m-macosx_10_14_x86_64.whl", hash = "sha256:da6013fd84a690242c310d77ddb8441a559e9cb3d3d59ebac9aca1a57b2e18bc"}, + {file = "backports.zoneinfo-0.2.1-cp36-cp36m-manylinux1_i686.whl", hash = "sha256:89a48c0d158a3cc3f654da4c2de1ceba85263fafb861b98b59040a5086259722"}, + {file = "backports.zoneinfo-0.2.1-cp36-cp36m-manylinux1_x86_64.whl", hash = "sha256:1c5742112073a563c81f786e77514969acb58649bcdf6cdf0b4ed31a348d4546"}, + {file = "backports.zoneinfo-0.2.1-cp36-cp36m-win32.whl", hash = "sha256:e8236383a20872c0cdf5a62b554b27538db7fa1bbec52429d8d106effbaeca08"}, + {file = "backports.zoneinfo-0.2.1-cp36-cp36m-win_amd64.whl", hash = "sha256:8439c030a11780786a2002261569bdf362264f605dfa4d65090b64b05c9f79a7"}, + {file = "backports.zoneinfo-0.2.1-cp37-cp37m-macosx_10_14_x86_64.whl", hash = "sha256:f04e857b59d9d1ccc39ce2da1021d196e47234873820cbeaad210724b1ee28ac"}, + {file = "backports.zoneinfo-0.2.1-cp37-cp37m-manylinux1_i686.whl", hash = "sha256:17746bd546106fa389c51dbea67c8b7c8f0d14b5526a579ca6ccf5ed72c526cf"}, + {file = "backports.zoneinfo-0.2.1-cp37-cp37m-manylinux1_x86_64.whl", hash = "sha256:5c144945a7752ca544b4b78c8c41544cdfaf9786f25fe5ffb10e838e19a27570"}, + {file = "backports.zoneinfo-0.2.1-cp37-cp37m-win32.whl", hash = "sha256:e55b384612d93be96506932a786bbcde5a2db7a9e6a4bb4bffe8b733f5b9036b"}, + {file = "backports.zoneinfo-0.2.1-cp37-cp37m-win_amd64.whl", hash = "sha256:a76b38c52400b762e48131494ba26be363491ac4f9a04c1b7e92483d169f6582"}, + {file = "backports.zoneinfo-0.2.1-cp38-cp38-macosx_10_14_x86_64.whl", hash = "sha256:8961c0f32cd0336fb8e8ead11a1f8cd99ec07145ec2931122faaac1c8f7fd987"}, + {file = "backports.zoneinfo-0.2.1-cp38-cp38-manylinux1_i686.whl", hash = "sha256:e81b76cace8eda1fca50e345242ba977f9be6ae3945af8d46326d776b4cf78d1"}, + {file = "backports.zoneinfo-0.2.1-cp38-cp38-manylinux1_x86_64.whl", hash = "sha256:7b0a64cda4145548fed9efc10322770f929b944ce5cee6c0dfe0c87bf4c0c8c9"}, + {file = "backports.zoneinfo-0.2.1-cp38-cp38-win32.whl", hash = "sha256:1b13e654a55cd45672cb54ed12148cd33628f672548f373963b0bff67b217328"}, + {file = "backports.zoneinfo-0.2.1-cp38-cp38-win_amd64.whl", hash = "sha256:4a0f800587060bf8880f954dbef70de6c11bbe59c673c3d818921f042f9954a6"}, + {file = "backports.zoneinfo-0.2.1.tar.gz", hash = "sha256:fadbfe37f74051d024037f223b8e001611eac868b5c5b06144ef4d8b799862f2"}, +] + +[package.extras] +tzdata = ["tzdata"] + +[[package]] name = "bcrypt" version = "4.0.1" description = "Modern password hashing for your software and your servers" @@ -763,17 +791,6 @@ files = [ {file = "ijson-3.2.3-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:4a3a6a2fbbe7550ffe52d151cf76065e6b89cfb3e9d0463e49a7e322a25d0426"}, {file = "ijson-3.2.3-cp311-cp311-win32.whl", hash = "sha256:6a4db2f7fb9acfb855c9ae1aae602e4648dd1f88804a0d5cfb78c3639bcf156c"}, {file = "ijson-3.2.3-cp311-cp311-win_amd64.whl", hash = "sha256:ccd6be56335cbb845f3d3021b1766299c056c70c4c9165fb2fbe2d62258bae3f"}, - {file = "ijson-3.2.3-cp312-cp312-macosx_10_9_universal2.whl", hash = "sha256:055b71bbc37af5c3c5861afe789e15211d2d3d06ac51ee5a647adf4def19c0ea"}, - {file = "ijson-3.2.3-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:c075a547de32f265a5dd139ab2035900fef6653951628862e5cdce0d101af557"}, - {file = "ijson-3.2.3-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:457f8a5fc559478ac6b06b6d37ebacb4811f8c5156e997f0d87d708b0d8ab2ae"}, - {file = "ijson-3.2.3-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:9788f0c915351f41f0e69ec2618b81ebfcf9f13d9d67c6d404c7f5afda3e4afb"}, - {file = "ijson-3.2.3-cp312-cp312-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:fa234ab7a6a33ed51494d9d2197fb96296f9217ecae57f5551a55589091e7853"}, - {file = "ijson-3.2.3-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bdd0dc5da4f9dc6d12ab6e8e0c57d8b41d3c8f9ceed31a99dae7b2baf9ea769a"}, - {file = "ijson-3.2.3-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:c6beb80df19713e39e68dc5c337b5c76d36ccf69c30b79034634e5e4c14d6904"}, - {file = "ijson-3.2.3-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:a2973ce57afb142d96f35a14e9cfec08308ef178a2c76b8b5e1e98f3960438bf"}, - {file = "ijson-3.2.3-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:105c314fd624e81ed20f925271ec506523b8dd236589ab6c0208b8707d652a0e"}, - {file = "ijson-3.2.3-cp312-cp312-win32.whl", hash = "sha256:ac44781de5e901ce8339352bb5594fcb3b94ced315a34dbe840b4cff3450e23b"}, - {file = "ijson-3.2.3-cp312-cp312-win_amd64.whl", hash = "sha256:0567e8c833825b119e74e10a7c29761dc65fcd155f5d4cb10f9d3b8916ef9912"}, {file = "ijson-3.2.3-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:eeb286639649fb6bed37997a5e30eefcacddac79476d24128348ec890b2a0ccb"}, {file = "ijson-3.2.3-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:396338a655fb9af4ac59dd09c189885b51fa0eefc84d35408662031023c110d1"}, {file = "ijson-3.2.3-cp36-cp36m-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:0e0243d166d11a2a47c17c7e885debf3b19ed136be2af1f5d1c34212850236ac"}, @@ -1755,6 +1772,30 @@ files = [ twisted = ["twisted"] [[package]] +name = "psycopg" +version = "3.1.12" +description = "PostgreSQL database adapter for Python" +optional = true +python-versions = ">=3.7" +files = [ + {file = "psycopg-3.1.12-py3-none-any.whl", hash = "sha256:8ec5230d6a7eb654b4fb3cf2d3eda8871d68f24807b934790504467f1deee9f8"}, + {file = "psycopg-3.1.12.tar.gz", hash = "sha256:cec7ad2bc6a8510e56c45746c631cf9394148bdc8a9a11fd8cf8554ce129ae78"}, +] + +[package.dependencies] +"backports.zoneinfo" = {version = ">=0.2.0", markers = "python_version < \"3.9\""} +typing-extensions = ">=4.1" +tzdata = {version = "*", markers = "sys_platform == \"win32\""} + +[package.extras] +binary = ["psycopg-binary (==3.1.12)"] +c = ["psycopg-c (==3.1.12)"] +dev = ["black (>=23.1.0)", "dnspython (>=2.1)", "flake8 (>=4.0)", "mypy (>=1.4.1)", "types-setuptools (>=57.4)", "wheel (>=0.37)"] +docs = ["Sphinx (>=5.0)", "furo (==2022.6.21)", "sphinx-autobuild (>=2021.3.14)", "sphinx-autodoc-typehints (>=1.12)"] +pool = ["psycopg-pool"] +test = ["anyio (>=3.6.2,<4.0)", "mypy (>=1.4.1)", "pproxy (>=2.7)", "pytest (>=6.2.5)", "pytest-cov (>=3.0)", "pytest-randomly (>=3.5)"] + +[[package]] name = "psycopg2" version = "2.9.9" description = "psycopg2 - Python-PostgreSQL Database Adapter" @@ -1765,8 +1806,6 @@ files = [ {file = "psycopg2-2.9.9-cp310-cp310-win_amd64.whl", hash = "sha256:426f9f29bde126913a20a96ff8ce7d73fd8a216cfb323b1f04da402d452853c3"}, {file = "psycopg2-2.9.9-cp311-cp311-win32.whl", hash = "sha256:ade01303ccf7ae12c356a5e10911c9e1c51136003a9a1d92f7aa9d010fb98372"}, {file = "psycopg2-2.9.9-cp311-cp311-win_amd64.whl", hash = "sha256:121081ea2e76729acfb0673ff33755e8703d45e926e416cb59bae3a86c6a4981"}, - {file = "psycopg2-2.9.9-cp312-cp312-win32.whl", hash = "sha256:d735786acc7dd25815e89cc4ad529a43af779db2e25aa7c626de864127e5a024"}, - {file = "psycopg2-2.9.9-cp312-cp312-win_amd64.whl", hash = "sha256:a7653d00b732afb6fc597e29c50ad28087dcb4fbfb28e86092277a559ae4e693"}, {file = "psycopg2-2.9.9-cp37-cp37m-win32.whl", hash = "sha256:5e0d98cade4f0e0304d7d6f25bbfbc5bd186e07b38eac65379309c4ca3193efa"}, {file = "psycopg2-2.9.9-cp37-cp37m-win_amd64.whl", hash = "sha256:7e2dacf8b009a1c1e843b5213a87f7c544b2b042476ed7755be813eaf4e8347a"}, {file = "psycopg2-2.9.9-cp38-cp38-win32.whl", hash = "sha256:ff432630e510709564c01dafdbe996cb552e0b9f3f065eb89bdce5bd31fabf4c"}, @@ -2182,7 +2221,6 @@ files = [ {file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:69b023b2b4daa7548bcfbd4aa3da05b3a74b772db9e23b982788168117739938"}, {file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:81e0b275a9ecc9c0c0c07b4b90ba548307583c125f54d5b6946cfee6360c733d"}, {file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ba336e390cd8e4d1739f42dfe9bb83a3cc2e80f567d8805e11b46f4a943f5515"}, - {file = "PyYAML-6.0.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:326c013efe8048858a6d312ddd31d56e468118ad4cdeda36c719bf5bb6192290"}, {file = "PyYAML-6.0.1-cp310-cp310-win32.whl", hash = "sha256:bd4af7373a854424dabd882decdc5579653d7868b8fb26dc7d0e99f823aa5924"}, {file = "PyYAML-6.0.1-cp310-cp310-win_amd64.whl", hash = "sha256:fd1592b3fdf65fff2ad0004b5e363300ef59ced41c2e6b3a99d4089fa8c5435d"}, {file = "PyYAML-6.0.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:6965a7bc3cf88e5a1c3bd2e0b5c22f8d677dc88a455344035f03399034eb3007"}, @@ -2190,15 +2228,8 @@ files = [ {file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:42f8152b8dbc4fe7d96729ec2b99c7097d656dc1213a3229ca5383f973a5ed6d"}, {file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:062582fca9fabdd2c8b54a3ef1c978d786e0f6b3a1510e0ac93ef59e0ddae2bc"}, {file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d2b04aac4d386b172d5b9692e2d2da8de7bfb6c387fa4f801fbf6fb2e6ba4673"}, - {file = "PyYAML-6.0.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:e7d73685e87afe9f3b36c799222440d6cf362062f78be1013661b00c5c6f678b"}, {file = "PyYAML-6.0.1-cp311-cp311-win32.whl", hash = "sha256:1635fd110e8d85d55237ab316b5b011de701ea0f29d07611174a1b42f1444741"}, {file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"}, - {file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"}, - {file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"}, - {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"}, - {file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"}, - {file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"}, - {file = "PyYAML-6.0.1-cp312-cp312-win_amd64.whl", hash = "sha256:0d3304d8c0adc42be59c5f8a4d9e3d7379e6955ad754aa9d6ab7a398b59dd1df"}, {file = "PyYAML-6.0.1-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:50550eb667afee136e9a77d6dc71ae76a44df8b3e51e41b77f6de2932bfe0f47"}, {file = "PyYAML-6.0.1-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1fe35611261b29bd1de0070f0b2f47cb6ff71fa6595c077e42bd0c419fa27b98"}, {file = "PyYAML-6.0.1-cp36-cp36m-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:704219a11b772aea0d8ecd7058d0082713c3562b4e271b849ad7dc4a5c90c13c"}, @@ -2215,7 +2246,6 @@ files = [ {file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a0cd17c15d3bb3fa06978b4e8958dcdc6e0174ccea823003a106c7d4d7899ac5"}, {file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:28c119d996beec18c05208a8bd78cbe4007878c6dd15091efb73a30e90539696"}, {file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7e07cbde391ba96ab58e532ff4803f79c4129397514e1413a7dc761ccd755735"}, - {file = "PyYAML-6.0.1-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:49a183be227561de579b4a36efbb21b3eab9651dd81b1858589f796549873dd6"}, {file = "PyYAML-6.0.1-cp38-cp38-win32.whl", hash = "sha256:184c5108a2aca3c5b3d3bf9395d50893a7ab82a38004c8f61c258d4428e80206"}, {file = "PyYAML-6.0.1-cp38-cp38-win_amd64.whl", hash = "sha256:1e2722cc9fbb45d9b87631ac70924c11d3a401b2d7f410cc0e3bbf249f2dca62"}, {file = "PyYAML-6.0.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:9eb6caa9a297fc2c2fb8862bc5370d0303ddba53ba97e71f08023b6cd73d16a8"}, @@ -2223,7 +2253,6 @@ files = [ {file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5773183b6446b2c99bb77e77595dd486303b4faab2b086e7b17bc6bef28865f6"}, {file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:b786eecbdf8499b9ca1d697215862083bd6d2a99965554781d0d8d1ad31e13a0"}, {file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bc1bf2925a1ecd43da378f4db9e4f799775d6367bdb94671027b73b393a7c42c"}, - {file = "PyYAML-6.0.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:04ac92ad1925b2cff1db0cfebffb6ffc43457495c9b3c39d3fcae417d7125dc5"}, {file = "PyYAML-6.0.1-cp39-cp39-win32.whl", hash = "sha256:faca3bdcf85b2fc05d06ff3fbc1f83e1391b3e724afa3feba7d13eeab355484c"}, {file = "PyYAML-6.0.1-cp39-cp39-win_amd64.whl", hash = "sha256:510c9deebc5c0225e8c96813043e62b680ba2f9c50a08d3724c7f28a747d1486"}, {file = "PyYAML-6.0.1.tar.gz", hash = "sha256:bfdf460b1736c775f2ba9f6a92bca30bc2095067b8a9d77876d1fad6cc3b4a43"}, @@ -3187,6 +3216,17 @@ files = [ ] [[package]] +name = "tzdata" +version = "2023.3" +description = "Provider of IANA time zone data" +optional = true +python-versions = ">=2" +files = [ + {file = "tzdata-2023.3-py2.py3-none-any.whl", hash = "sha256:7e65763eef3120314099b6939b5546db7adce1e7d6f2e179e3df563c70511eda"}, + {file = "tzdata-2023.3.tar.gz", hash = "sha256:11ef1e08e54acb0d4f95bdb1be05da659673de4acbd21bf9c69e94cc5e907a3a"}, +] + +[[package]] name = "unpaddedbase64" version = "2.1.0" description = "Encode and decode Base64 without \"=\" padding" @@ -3429,13 +3469,13 @@ docs = ["Sphinx", "repoze.sphinx.autointerface"] test = ["zope.i18nmessageid", "zope.testing", "zope.testrunner"] [extras] -all = ["Pympler", "authlib", "hiredis", "jaeger-client", "lxml", "matrix-synapse-ldap3", "opentracing", "psycopg2", "psycopg2cffi", "psycopg2cffi-compat", "pyicu", "pysaml2", "sentry-sdk", "txredisapi"] +all = ["Pympler", "authlib", "hiredis", "jaeger-client", "lxml", "matrix-synapse-ldap3", "opentracing", "psycopg", "psycopg2", "psycopg2cffi", "psycopg2cffi-compat", "pyicu", "pysaml2", "sentry-sdk", "txredisapi"] cache-memory = ["Pympler"] jwt = ["authlib"] matrix-synapse-ldap3 = ["matrix-synapse-ldap3"] oidc = ["authlib"] opentracing = ["jaeger-client", "opentracing"] -postgres = ["psycopg2", "psycopg2cffi", "psycopg2cffi-compat"] +postgres = ["psycopg", "psycopg2", "psycopg2cffi", "psycopg2cffi-compat"] redis = ["hiredis", "txredisapi"] saml2 = ["pysaml2"] sentry = ["sentry-sdk"] @@ -3447,4 +3487,4 @@ user-search = ["pyicu"] [metadata] lock-version = "2.0" python-versions = "^3.8.0" -content-hash = "369455d6a67753a6bcfbad3cd86801b1dd02896d0180080e2ba9501e007353ec" +content-hash = "73869fa198a8ddd7f724915dbea25b462a44fa418ce47a425b1adf16ecd1b82f" diff --git a/pyproject.toml b/pyproject.toml
index df132c0236..75fa24c45d 100644 --- a/pyproject.toml +++ b/pyproject.toml
@@ -233,6 +233,7 @@ matrix-synapse-ldap3 = { version = ">=0.1", optional = true } psycopg2 = { version = ">=2.8", markers = "platform_python_implementation != 'PyPy'", optional = true } psycopg2cffi = { version = ">=2.8", markers = "platform_python_implementation == 'PyPy'", optional = true } psycopg2cffi-compat = { version = "==1.1", markers = "platform_python_implementation == 'PyPy'", optional = true } +psycopg = { version = "^3.1", optional = true } pysaml2 = { version = ">=4.5.0", optional = true } authlib = { version = ">=0.15.1", optional = true } # systemd-python is necessary for logging to the systemd journal via @@ -255,7 +256,7 @@ pyicu = { version = ">=2.10.2", optional = true } # NB: Packages that should be part of `pip install matrix-synapse[all]` need to be specified # twice: once here, and once in the `all` extra. matrix-synapse-ldap3 = ["matrix-synapse-ldap3"] -postgres = ["psycopg2", "psycopg2cffi", "psycopg2cffi-compat"] +postgres = ["psycopg2", "psycopg2cffi", "psycopg2cffi-compat", "psycopg"] saml2 = ["pysaml2"] oidc = ["authlib"] # systemd-python is necessary for logging to the systemd journal via @@ -292,7 +293,7 @@ all = [ # matrix-synapse-ldap3 "matrix-synapse-ldap3", # postgres - "psycopg2", "psycopg2cffi", "psycopg2cffi-compat", + "psycopg2", "psycopg2cffi", "psycopg2cffi-compat", "psycopg", # saml2 "pysaml2", # oidc and jwt diff --git a/synapse/config/database.py b/synapse/config/database.py
index 596d8769fe..76423c429b 100644 --- a/synapse/config/database.py +++ b/synapse/config/database.py
@@ -50,7 +50,7 @@ class DatabaseConnectionConfig: def __init__(self, name: str, db_config: dict): db_engine = db_config.get("name", "sqlite3") - if db_engine not in ("sqlite3", "psycopg2"): + if db_engine not in ("sqlite3", "psycopg2", "psycopg"): raise ConfigError("Unsupported database type %r" % (db_engine,)) if db_engine == "sqlite3": diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 7426dbcad6..1b1e46a64c 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py
@@ -49,7 +49,11 @@ else: if TYPE_CHECKING: from synapse.server import HomeServer - from synapse.storage.database import DatabasePool, LoggingTransaction + from synapse.storage.database import ( + DatabasePool, + LoggingDatabaseConnection, + LoggingTransaction, + ) logger = logging.getLogger(__name__) @@ -746,10 +750,10 @@ class BackgroundUpdater: The named index will be dropped upon completion of the new index. """ - def create_index_psql(conn: Connection) -> None: + def create_index_psql(conn: "LoggingDatabaseConnection") -> None: conn.rollback() # postgres insists on autocommit for the index - conn.set_session(autocommit=True) # type: ignore + conn.engine.attempt_to_set_autocommit(conn.conn, True) try: c = conn.cursor() @@ -793,9 +797,9 @@ class BackgroundUpdater: undo_timeout_sql = f"SET statement_timeout = {default_timeout}" conn.cursor().execute(undo_timeout_sql) - conn.set_session(autocommit=False) # type: ignore + conn.engine.attempt_to_set_autocommit(conn.conn, False) - def create_index_sqlite(conn: Connection) -> None: + def create_index_sqlite(conn: "LoggingDatabaseConnection") -> None: # Sqlite doesn't support concurrent creation of indexes. # # We assume that sqlite doesn't give us invalid indices; however diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index f50a4ce2fc..05775425b7 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py
@@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import inspect +import itertools import logging import time import types @@ -56,7 +57,13 @@ from synapse.logging.context import ( from synapse.metrics import LaterGauge, register_threadpool from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.background_updates import BackgroundUpdater -from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine +from synapse.storage.engines import ( + BaseDatabaseEngine, + Psycopg2Engine, + PsycopgEngine, + Sqlite3Engine, + PostgresEngine, +) from synapse.storage.types import Connection, Cursor, SQLQueryParameters from synapse.util.async_helpers import delay_cancellation from synapse.util.iterutils import batch_iter @@ -343,7 +350,8 @@ class LoggingTransaction: def fetchone(self) -> Optional[Tuple]: return self.txn.fetchone() - def fetchmany(self, size: Optional[int] = None) -> List[Tuple]: + def fetchmany(self, size: int = 0) -> List[Tuple]: + # XXX This can also be called with no arguments. return self.txn.fetchmany(size=size) def fetchall(self) -> List[Tuple]: @@ -369,7 +377,7 @@ class LoggingTransaction: More efficient than `executemany` on PostgreSQL """ - if isinstance(self.database_engine, PostgresEngine): + if isinstance(self.database_engine, Psycopg2Engine): from psycopg2.extras import execute_batch # TODO: is it safe for values to be Iterable[Iterable[Any]] here? @@ -378,6 +386,8 @@ class LoggingTransaction: self._do_execute( lambda the_sql: execute_batch(self.txn, the_sql, args), sql ) + + # TODO Can psycopg3 do anything better? else: # TODO: is it safe for values to be Iterable[Iterable[Any]] here? # https://docs.python.org/3/library/sqlite3.html?highlight=sqlite3#sqlite3.Cursor.executemany @@ -389,7 +399,7 @@ class LoggingTransaction: def execute_values( self, sql: str, - values: Iterable[Iterable[Any]], + values: Sequence[Sequence[Any]], template: Optional[str] = None, fetch: bool = True, ) -> List[Tuple]: @@ -403,17 +413,56 @@ class LoggingTransaction: compose the query. """ assert isinstance(self.database_engine, PostgresEngine) - from psycopg2.extras import execute_values - return self._do_execute( - # TODO: is it safe for values to be Iterable[Iterable[Any]] here? - # https://www.psycopg.org/docs/extras.html?highlight=execute_batch#psycopg2.extras.execute_values says values should be Sequence[Sequence] - lambda the_sql, the_values: execute_values( - self.txn, the_sql, the_values, template=template, fetch=fetch - ), - sql, - values, - ) + if isinstance(self.database_engine, Psycopg2Engine): + from psycopg2.extras import execute_values + + return self._do_execute( + # TODO: is it safe for values to be Iterable[Iterable[Any]] here? + # https://www.psycopg.org/docs/extras.html?highlight=execute_batch#psycopg2.extras.execute_values says values should be Sequence[Sequence] + lambda the_sql, the_values: execute_values( + self.txn, the_sql, the_values, template=template, fetch=fetch + ), + sql, + values, + ) + else: + # We use fetch = False to mean a writable query. You *might* be able + # to morph that into a COPY (...) FROM STDIN, but it isn't worth the + # effort for the few places we set fetch = False. + assert fetch is True + + # execute_values requires a single replacement, but we need to expand it + # for COPY. This assumes all inner sequences are the same length. + value_str = "(" + ", ".join("?" for _ in next(iter(values))) + ")" + sql = sql.replace("?", ", ".join(value_str for _ in values)) + + # Wrap the SQL in the COPY statement. + sql = f"COPY ({sql}) TO STDOUT" + + def f( + the_sql: str, the_args: Sequence[Sequence[Any]] + ) -> Iterable[Tuple[Any, ...]]: + with self.txn.copy(the_sql, the_args) as copy: + yield from copy.rows() + + # Flatten the values. + return self._do_execute(f, sql, list(itertools.chain.from_iterable(values))) + + def copy_write( + self, sql: str, args: Iterable[Any], values: Iterable[Iterable[Any]] + ) -> None: + """Corresponds to a PostgreSQL COPY (...) FROM STDIN call.""" + assert isinstance(self.database_engine, PsycopgEngine) + + def f( + the_sql: str, the_args: Iterable[Any], the_values: Iterable[Iterable[Any]] + ) -> None: + with self.txn.copy(the_sql, the_args) as copy: + for record in the_values: + copy.write_row(record) + + self._do_execute(f, sql, args, values) def execute(self, sql: str, parameters: SQLQueryParameters = ()) -> None: self._do_execute(self.txn.execute, sql, parameters) @@ -445,6 +494,12 @@ class LoggingTransaction: def _make_sql_one_line(self, sql: str) -> str: "Strip newlines out of SQL so that the loggers in the DB are on one line" + if isinstance(self.database_engine, PsycopgEngine): + import psycopg.sql + + if isinstance(sql, psycopg.sql.Composed): + return sql.as_string(None) + return " ".join(line.strip() for line in sql.splitlines() if line.strip()) def _do_execute( @@ -485,7 +540,7 @@ class LoggingTransaction: finally: secs = time.time() - start sql_logger.debug("[SQL time] {%s} %f sec", self.name, secs) - sql_query_timer.labels(sql.split()[0]).observe(secs) + sql_query_timer.labels(one_line_sql.split()[0]).observe(secs) def close(self) -> None: self.txn.close() @@ -1117,7 +1172,7 @@ class DatabasePool: txn: LoggingTransaction, table: str, keys: Collection[str], - values: Collection[Iterable[Any]], + values: Sequence[Sequence[Any]], ) -> None: """Executes an INSERT query on the named table. @@ -1134,7 +1189,7 @@ class DatabasePool: if not values: return - if isinstance(txn.database_engine, PostgresEngine): + if isinstance(txn.database_engine, Psycopg2Engine): # We use `execute_values` as it can be a lot faster than `execute_batch`, # but it's only available on postgres. sql = "INSERT INTO %s (%s) VALUES ?" % ( @@ -1143,6 +1198,14 @@ class DatabasePool: ) txn.execute_values(sql, values, fetch=False) + + elif isinstance(txn.database_engine, PsycopgEngine): + sql = "COPY %s (%s) FROM STDIN" % ( + table, + ", ".join(k for k in keys), + ) + txn.copy_write(sql, (), values) + else: sql = "INSERT INTO %s (%s) VALUES(%s)" % ( table, @@ -1566,7 +1629,7 @@ class DatabasePool: for x, y in zip(key_values, value_values): args.append(tuple(x) + tuple(y)) - if isinstance(txn.database_engine, PostgresEngine): + if isinstance(txn.database_engine, Psycopg2Engine): # We use `execute_values` as it can be a lot faster than `execute_batch`, # but it's only available on postgres. sql = "INSERT INTO %s (%s) VALUES ? ON CONFLICT (%s) DO %s" % ( @@ -2323,7 +2386,7 @@ class DatabasePool: values: for each row, a list of values in the same order as `keys` """ - if isinstance(txn.database_engine, PostgresEngine): + if isinstance(txn.database_engine, Psycopg2Engine): # We use `execute_values` as it can be a lot faster than `execute_batch`, # but it's only available on postgres. sql = "DELETE FROM %s WHERE (%s) IN (VALUES ?)" % ( diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index f1b0991503..b8bbd1eccd 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py
@@ -47,7 +47,7 @@ from synapse.storage.database import ( ) from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.databases.main.signatures import SignatureWorkerStore -from synapse.storage.engines import PostgresEngine, Sqlite3Engine +from synapse.storage.engines import PostgresEngine, Psycopg2Engine, Sqlite3Engine from synapse.types import JsonDict, StrCollection from synapse.util import json_encoder from synapse.util.caches.descriptors import cached @@ -320,7 +320,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas c.chain_id = l.chain_id AND sequence_number <= max_seq """ - rows = txn.execute_values(sql, chains.items()) results.update(r for r, in rows) else: diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index 39556481ff..e4dc68c0d8 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py
@@ -96,10 +96,10 @@ from synapse.storage.database import ( DatabasePool, LoggingDatabaseConnection, LoggingTransaction, - PostgresEngine, ) from synapse.storage.databases.main.receipts import ReceiptsWorkerStore from synapse.storage.databases.main.stream import StreamWorkerStore +from synapse.storage.engines import PostgresEngine from synapse.types import JsonDict from synapse.util import json_encoder from synapse.util.caches.descriptors import cached diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py
index 419b2c7a22..d0bc78b2e3 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py
@@ -46,6 +46,8 @@ from synapse.storage.databases.main.stream import ( generate_pagination_bounds, generate_pagination_where_clause, ) +from synapse.storage.engines import PostgresEngine, Psycopg2Engine +from synapse.types import JsonDict, StreamKeyType, StreamToken from synapse.storage.engines import PostgresEngine from synapse.types import JsonDict, MultiWriterStreamToken, StreamKeyType, StreamToken from synapse.util.caches.descriptors import cached, cachedList @@ -139,7 +141,7 @@ class RelationsWorkerStore(SQLBaseStore): ON CONFLICT (room_id, thread_id) DO NOTHING """ - if isinstance(txn.database_engine, PostgresEngine): + if isinstance(txn.database_engine, Psycopg2Engine): txn.execute_values(sql % ("?",), rows, fetch=False) else: txn.execute_batch(sql % ("(?, ?, ?, ?, ?)",), rows) diff --git a/synapse/storage/databases/main/search.py b/synapse/storage/databases/main/search.py
index f4bef4c99b..4c4112e3b2 100644 --- a/synapse/storage/databases/main/search.py +++ b/synapse/storage/databases/main/search.py
@@ -275,7 +275,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): # we have to set autocommit, because postgres refuses to # CREATE INDEX CONCURRENTLY without it. - conn.set_session(autocommit=True) + conn.engine.attempt_to_set_autocommit(conn.conn, True) try: c = conn.cursor() @@ -301,7 +301,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): # we should now be able to delete the GIST index. c.execute("DROP INDEX IF EXISTS event_search_fts_idx_gist") finally: - conn.set_session(autocommit=False) + conn.engine.attempt_to_set_autocommit(conn.conn, False) if isinstance(self.database_engine, PostgresEngine): await self.db_pool.runWithConnection(create_index) @@ -323,7 +323,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): def create_index(conn: LoggingDatabaseConnection) -> None: conn.rollback() - conn.set_session(autocommit=True) + conn.attempt_to_set_autocommit(True) c = conn.cursor() # We create with NULLS FIRST so that when we search *backwards* @@ -340,7 +340,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): ON event_search(origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST) """ ) - conn.set_session(autocommit=False) + conn.attempt_to_set_autocommit(False) await self.db_pool.runWithConnection(create_index) diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index a9f5d68b63..d4b86ed7a6 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py
@@ -54,7 +54,12 @@ from synapse.storage.database import ( ) from synapse.storage.databases.main.state import StateFilter from synapse.storage.databases.main.state_deltas import StateDeltasStore -from synapse.storage.engines import PostgresEngine, Sqlite3Engine +from synapse.storage.engines import ( + PostgresEngine, + Psycopg2Engine, + PsycopgEngine, + Sqlite3Engine, +) from synapse.types import ( JsonDict, UserID, @@ -719,35 +724,65 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): if isinstance(self.database_engine, PostgresEngine): # We weight the localpart most highly, then display name and finally # server name - template = """ - ( - %s, - setweight(to_tsvector('simple', %s), 'A') - || setweight(to_tsvector('simple', %s), 'D') - || setweight(to_tsvector('simple', COALESCE(%s, '')), 'B') - ) - """ - sql = """ - INSERT INTO user_directory_search(user_id, vector) - VALUES ? ON CONFLICT (user_id) DO UPDATE SET vector=EXCLUDED.vector + if isinstance(self.database_engine, Psycopg2Engine): + template = """ + ( + %s, + setweight(to_tsvector('simple', %s), 'A') + || setweight(to_tsvector('simple', %s), 'D') + || setweight(to_tsvector('simple', COALESCE(%s, '')), 'B') + ) """ - txn.execute_values( - sql, - [ + + sql = """ + INSERT INTO user_directory_search(user_id, vector) + VALUES ? ON CONFLICT (user_id) DO UPDATE SET vector=EXCLUDED.vector + """ + + txn.execute_values( + sql, + [ + ( + p.user_id, + get_localpart_from_id(p.user_id), + get_domain_from_id(p.user_id), + _filter_text_for_index(p.display_name) + if p.display_name + else None, + ) + for p in profiles + ], + template=template, + fetch=False, + ) + elif isinstance(self.database_engine, PsycopgEngine): + sql = """ + INSERT INTO user_directory_search(user_id, vector) + VALUES ( - p.user_id, - get_localpart_from_id(p.user_id), - get_domain_from_id(p.user_id), - _filter_text_for_index(p.display_name) - if p.display_name - else None, + ?, + setweight(to_tsvector('simple', ?), 'A') + || setweight(to_tsvector('simple', ?), 'D') + || setweight(to_tsvector('simple', COALESCE(?, '')), 'B') ) - for p in profiles - ], - template=template, - fetch=False, - ) + ON CONFLICT (user_id) DO UPDATE SET vector=EXCLUDED.vector + """ + + txn.executemany( + sql, + [ + ( + p.user_id, + get_localpart_from_id(p.user_id), + get_domain_from_id(p.user_id), + _filter_text_for_index(p.display_name) + if p.display_name + else None, + ) + for p in profiles + ], + ) elif isinstance(self.database_engine, Sqlite3Engine): values = [] for p in profiles: diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py
index 0f9c550b27..2c3151526d 100644 --- a/synapse/storage/databases/state/bg_updates.py +++ b/synapse/storage/databases/state/bg_updates.py
@@ -492,7 +492,7 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore): conn.rollback() if isinstance(self.database_engine, PostgresEngine): # postgres insists on autocommit for the index - conn.set_session(autocommit=True) + conn.engine.attempt_to_set_autocommit(conn.conn, True) try: txn = conn.cursor() txn.execute( @@ -501,7 +501,7 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore): ) txn.execute("DROP INDEX IF EXISTS state_groups_state_id") finally: - conn.set_session(autocommit=False) + conn.engine.attempt_to_set_autocommit(conn.conn, False) else: txn = conn.cursor() txn.execute( diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py
index d1ccb7390a..53465b274a 100644 --- a/synapse/storage/engines/__init__.py +++ b/synapse/storage/engines/__init__.py
@@ -11,35 +11,41 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from typing import Any, Mapping, NoReturn +from typing import Any, Mapping, NoReturn, cast from ._base import BaseDatabaseEngine, IncorrectDatabaseSetup +from .postgres import PostgresEngine + # The classes `PostgresEngine` and `Sqlite3Engine` must always be importable, because # we use `isinstance(engine, PostgresEngine)` to write different queries for postgres # and sqlite. But the database driver modules are both optional: they may not be # installed. To account for this, create dummy classes on import failure so we can # still run `isinstance()` checks. -try: - from .postgres import PostgresEngine -except ImportError: - - class PostgresEngine(BaseDatabaseEngine): # type: ignore[no-redef] +def dummy_engine(name: str, module: str) -> BaseDatabaseEngine: + class Engine(BaseDatabaseEngine): def __new__(cls, *args: object, **kwargs: object) -> NoReturn: raise RuntimeError( - f"Cannot create {cls.__name__} -- psycopg2 module is not installed" + f"Cannot create {name} -- {module} module is not installed" ) + return cast(BaseDatabaseEngine, Engine) + try: - from .sqlite import Sqlite3Engine + from .psycopg2 import Psycopg2Engine except ImportError: + Psycopg2Engine = dummy_engine("Psycopg2Engine", "psycopg2") # type: ignore[misc,assignment] - class Sqlite3Engine(BaseDatabaseEngine): # type: ignore[no-redef] - def __new__(cls, *args: object, **kwargs: object) -> NoReturn: - raise RuntimeError( - f"Cannot create {cls.__name__} -- sqlite3 module is not installed" - ) +try: + from .psycopg import PsycopgEngine +except ImportError: + PsycopgEngine = dummy_engine("PsycopgEngine", "psycopg") # type: ignore[misc,assignment] + +try: + from .sqlite import Sqlite3Engine +except ImportError: + Sqlite3Engine = dummy_engine("Sqlite3Engine", "sqlite3") # type: ignore[misc,assignment] def create_engine(database_config: Mapping[str, Any]) -> BaseDatabaseEngine: @@ -49,7 +55,10 @@ def create_engine(database_config: Mapping[str, Any]) -> BaseDatabaseEngine: return Sqlite3Engine(database_config) if name == "psycopg2": - return PostgresEngine(database_config) + return Psycopg2Engine(database_config) + + if name == "psycopg": + return PsycopgEngine(database_config) raise RuntimeError("Unsupported database engine '%s'" % (name,)) diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py
index b1a2418cbd..830ae5fea3 100644 --- a/synapse/storage/engines/_base.py +++ b/synapse/storage/engines/_base.py
@@ -58,6 +58,18 @@ class BaseDatabaseEngine(Generic[ConnectionType, CursorType], metaclass=abc.ABCM """Do we support the `RETURNING` clause in insert/update/delete?""" ... + @property + @abc.abstractmethod + def supports_select_distinct_on(self) -> bool: + """Do we support the `DISTINCT ON` clause in SELECT?""" + ... + + @property + @abc.abstractmethod + def supports_sequences(self) -> bool: + """Do we support the `CREATE SEQUENCE` clause?""" + ... + @abc.abstractmethod def check_database( self, db_conn: ConnectionType, allow_outdated_version: bool = False diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 6309363217..f357d876ce 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py
@@ -12,17 +12,19 @@ # See the License for the specific language governing permissions and # limitations under the License. +import abc import logging -from typing import TYPE_CHECKING, Any, Mapping, NoReturn, Optional, Tuple, cast +from typing import TYPE_CHECKING, Any, Mapping, Optional, Tuple, Type, cast -import psycopg2.extensions +from psycopg import sql from synapse.storage.engines._base import ( BaseDatabaseEngine, + ConnectionType, + CursorType, IncorrectDatabaseSetup, - IsolationLevel, ) -from synapse.storage.types import Cursor +from synapse.storage.types import Cursor, DBAPI2Module if TYPE_CHECKING: from synapse.storage.database import LoggingDatabaseConnection @@ -32,18 +34,15 @@ logger = logging.getLogger(__name__) class PostgresEngine( - BaseDatabaseEngine[psycopg2.extensions.connection, psycopg2.extensions.cursor] + BaseDatabaseEngine[ConnectionType, CursorType], metaclass=abc.ABCMeta ): - def __init__(self, database_config: Mapping[str, Any]): - super().__init__(psycopg2, database_config) - psycopg2.extensions.register_type(psycopg2.extensions.UNICODE) + isolation_level_map: Mapping[int, int] + default_isolation_level: int + OperationalError: Type[Exception] - # Disables passing `bytes` to txn.execute, c.f. #6186. If you do - # actually want to use bytes than wrap it in `bytearray`. - def _disable_bytes_adapter(_: bytes) -> NoReturn: - raise Exception("Passing bytes to DB is disabled.") + def __init__(self, module: DBAPI2Module, database_config: Mapping[str, Any]): + super().__init__(module, database_config) - psycopg2.extensions.register_adapter(bytes, _disable_bytes_adapter) self.synchronous_commit: bool = database_config.get("synchronous_commit", True) # Set the statement timeout to 1 hour by default. # Any query taking more than 1 hour should probably be considered a bug; @@ -56,16 +55,15 @@ class PostgresEngine( ) self._version: Optional[int] = None # unknown as yet - self.isolation_level_map: Mapping[int, int] = { - IsolationLevel.READ_COMMITTED: psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED, - IsolationLevel.REPEATABLE_READ: psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ, - IsolationLevel.SERIALIZABLE: psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE, - } - self.default_isolation_level = ( - psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ - ) self.config = database_config + @abc.abstractmethod + def get_server_version(self, db_conn: ConnectionType) -> int: + """Gets called when setting up a brand new database. This allows us to + apply stricter checks on new databases versus existing database. + """ + ... + @property def single_threaded(self) -> bool: return False @@ -79,14 +77,14 @@ class PostgresEngine( def check_database( self, - db_conn: psycopg2.extensions.connection, + db_conn: ConnectionType, allow_outdated_version: bool = False, ) -> None: # Get the version of PostgreSQL that we're using. As per the psycopg2 # docs: The number is formed by converting the major, minor, and # revision numbers into two-decimal-digit numbers and appending them # together. For example, version 8.1.5 will be returned as 80105 - self._version = db_conn.server_version + self._version = self.get_server_version(db_conn) allow_unsafe_locale = self.config.get("allow_unsafe_locale", False) # Are we on a supported PostgreSQL version? @@ -154,7 +152,7 @@ class PostgresEngine( return sql.replace("?", "%s") def on_new_connection(self, db_conn: "LoggingDatabaseConnection") -> None: - db_conn.set_isolation_level(self.default_isolation_level) + self.attempt_to_set_isolation_level(db_conn.conn, self.default_isolation_level) # Set the bytea output to escape, vs the default of hex cursor = db_conn.cursor() @@ -168,7 +166,15 @@ class PostgresEngine( # Abort really long-running statements and turn them into errors. if self.statement_timeout is not None: - cursor.execute("SET statement_timeout TO ?", (self.statement_timeout,)) + # TODO Avoid a circular import, this needs to be abstracted. + if self.__class__.__name__ == "Psycopg2Engine": + cursor.execute("SET statement_timeout TO ?", (self.statement_timeout,)) + else: + cursor.execute( + sql.SQL("SET statement_timeout TO {}").format( + self.statement_timeout + ) + ) cursor.close() db_conn.commit() @@ -183,15 +189,17 @@ class PostgresEngine( """Do we support the `RETURNING` clause in insert/update/delete?""" return True - def is_deadlock(self, error: Exception) -> bool: - if isinstance(error, psycopg2.DatabaseError): - # https://www.postgresql.org/docs/current/static/errcodes-appendix.html - # "40001" serialization_failure - # "40P01" deadlock_detected - return error.pgcode in ["40001", "40P01"] - return False + @property + def supports_select_distinct_on(self) -> bool: + """Do we support the `DISTINCT ON` clause in SELECT?""" + return True - def is_connection_closed(self, conn: psycopg2.extensions.connection) -> bool: + @property + def supports_sequences(self) -> bool: + """Do we support the `CREATE SEQUENCE` clause?""" + return True + + def is_connection_closed(self, conn: ConnectionType) -> bool: return bool(conn.closed) def lock_table(self, txn: Cursor, table: str) -> None: @@ -215,25 +223,8 @@ class PostgresEngine( def row_id_name(self) -> str: return "ctid" - def in_transaction(self, conn: psycopg2.extensions.connection) -> bool: - return conn.status != psycopg2.extensions.STATUS_READY - - def attempt_to_set_autocommit( - self, conn: psycopg2.extensions.connection, autocommit: bool - ) -> None: - return conn.set_session(autocommit=autocommit) - - def attempt_to_set_isolation_level( - self, conn: psycopg2.extensions.connection, isolation_level: Optional[int] - ) -> None: - if isolation_level is None: - isolation_level = self.default_isolation_level - else: - isolation_level = self.isolation_level_map[isolation_level] - return conn.set_isolation_level(isolation_level) - @staticmethod - def executescript(cursor: psycopg2.extensions.cursor, script: str) -> None: + def executescript(cursor: CursorType, script: str) -> None: """Execute a chunk of SQL containing multiple semicolon-delimited statements. Psycopg2 seems happy to do this in DBAPI2's `execute()` function. diff --git a/synapse/storage/engines/psycopg.py b/synapse/storage/engines/psycopg.py new file mode 100644
index 0000000000..f5d2bf8471 --- /dev/null +++ b/synapse/storage/engines/psycopg.py
@@ -0,0 +1,89 @@ +# Copyright 2022-2023 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from typing import TYPE_CHECKING, Any, Mapping, Optional + +import psycopg +import psycopg.errors +import psycopg.sql + +from twisted.enterprise.adbapi import Connection as TxConnection + +from synapse.storage.engines import PostgresEngine +from synapse.storage.engines._base import IsolationLevel + +if TYPE_CHECKING: + pass + + +logger = logging.getLogger(__name__) + + +class PsycopgEngine(PostgresEngine[psycopg.Connection, psycopg.Cursor]): + OperationalError = psycopg.OperationalError + + def __init__(self, database_config: Mapping[str, Any]): + super().__init__(psycopg, database_config) + # psycopg2.extensions.register_type(psycopg2.extensions.UNICODE) + + # Disables passing `bytes` to txn.execute, c.f. #6186. If you do + # actually want to use bytes than wrap it in `bytearray`. + # def _disable_bytes_adapter(_: bytes) -> NoReturn: + # raise Exception("Passing bytes to DB is disabled.") + + self.isolation_level_map: Mapping[int, psycopg.IsolationLevel] = { + IsolationLevel.READ_COMMITTED: psycopg.IsolationLevel.READ_COMMITTED, + IsolationLevel.REPEATABLE_READ: psycopg.IsolationLevel.REPEATABLE_READ, + IsolationLevel.SERIALIZABLE: psycopg.IsolationLevel.SERIALIZABLE, + } + self.default_isolation_level = psycopg.IsolationLevel.REPEATABLE_READ + + def get_server_version(self, db_conn: psycopg.Connection) -> int: + return db_conn.info.server_version + + def convert_param_style(self, sql: str) -> str: + if isinstance(sql, psycopg.sql.Composed): + return sql + + return sql.replace("?", "%s") + + def is_deadlock(self, error: Exception) -> bool: + if isinstance(error, psycopg.errors.Error): + # https://www.postgresql.org/docs/current/static/errcodes-appendix.html + # "40001" serialization_failure + # "40P01" deadlock_detected + return error.sqlstate in ["40001", "40P01"] + return False + + def in_transaction(self, conn: psycopg.Connection) -> bool: + return conn.info.transaction_status != psycopg.pq.TransactionStatus.IDLE + + def attempt_to_set_autocommit( + self, conn: psycopg.Connection, autocommit: bool + ) -> None: + # Sometimes this gets called with a Twisted connection instead, unwrap + # it because it doesn't support __setattr__. + if isinstance(conn, TxConnection): + conn = conn._connection + conn.autocommit = autocommit + + def attempt_to_set_isolation_level( + self, conn: psycopg.Connection, isolation_level: Optional[int] + ) -> None: + if isolation_level is None: + pg_isolation_level = self.default_isolation_level + else: + pg_isolation_level = self.isolation_level_map[isolation_level] + conn.isolation_level = pg_isolation_level diff --git a/synapse/storage/engines/psycopg2.py b/synapse/storage/engines/psycopg2.py new file mode 100644
index 0000000000..5e57d8faf8 --- /dev/null +++ b/synapse/storage/engines/psycopg2.py
@@ -0,0 +1,90 @@ +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from typing import TYPE_CHECKING, Any, Mapping, Optional + +import psycopg2.extensions + +from synapse.storage.engines import PostgresEngine +from synapse.storage.engines._base import IsolationLevel + +if TYPE_CHECKING: + pass + + +logger = logging.getLogger(__name__) + + +class Psycopg2Engine( + PostgresEngine[psycopg2.extensions.connection, psycopg2.extensions.cursor] +): + OperationalError = psycopg2.OperationalError + + def __init__(self, database_config: Mapping[str, Any]): + super().__init__(psycopg2, database_config) + psycopg2.extensions.register_type(psycopg2.extensions.UNICODE) + + self.isolation_level_map: Mapping[int, int] = { + IsolationLevel.READ_COMMITTED: psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED, + IsolationLevel.REPEATABLE_READ: psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ, + IsolationLevel.SERIALIZABLE: psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE, + } + self.default_isolation_level = ( + psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ + ) + self.config = database_config + + def get_server_version(self, db_conn: psycopg2.extensions.connection) -> int: + return db_conn.server_version + + def convert_param_style(self, sql: str) -> str: + return sql.replace("?", "%s") + + def is_deadlock(self, error: Exception) -> bool: + if isinstance(error, psycopg2.DatabaseError): + # https://www.postgresql.org/docs/current/static/errcodes-appendix.html + # "40001" serialization_failure + # "40P01" deadlock_detected + return error.pgcode in ["40001", "40P01"] + return False + + def in_transaction(self, conn: psycopg2.extensions.connection) -> bool: + return conn.status != psycopg2.extensions.STATUS_READY + + def attempt_to_set_autocommit( + self, conn: psycopg2.extensions.connection, autocommit: bool + ) -> None: + return conn.set_session(autocommit=autocommit) + + def attempt_to_set_isolation_level( + self, conn: psycopg2.extensions.connection, isolation_level: Optional[int] + ) -> None: + if isolation_level is None: + isolation_level = self.default_isolation_level + else: + isolation_level = self.isolation_level_map[isolation_level] + return conn.set_isolation_level(isolation_level) + + @staticmethod + def executescript(cursor: psycopg2.extensions.cursor, script: str) -> None: + """Execute a chunk of SQL containing multiple semicolon-delimited statements. + + Psycopg2 seems happy to do this in DBAPI2's `execute()` function. + + For consistency with SQLite, any ongoing transaction is committed before + executing the script in its own transaction. The script transaction is + left open and it is the responsibility of the caller to commit it. + """ + cursor.execute(f"COMMIT; BEGIN TRANSACTION; {script}") diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py
index 802069e1e1..4d63b31f31 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py
@@ -65,6 +65,16 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection, sqlite3.Cursor]): """Do we support the `RETURNING` clause in insert/update/delete?""" return sqlite3.sqlite_version_info >= (3, 35, 0) + @property + def supports_select_distinct_on(self) -> bool: + """Do we support the `DISTINCT ON` clause in SELECT?""" + return False + + @property + def supports_sequences(self) -> bool: + """Do we support the `CREATE SEQUENCE` clause?""" + return False + def check_database( self, db_conn: sqlite3.Connection, allow_outdated_version: bool = False ) -> None: diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 31501fd573..498209896e 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py
@@ -31,7 +31,12 @@ import attr from synapse.config.homeserver import HomeServerConfig from synapse.storage.database import LoggingDatabaseConnection, LoggingTransaction -from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine +from synapse.storage.engines import ( + BaseDatabaseEngine, + PostgresEngine, + PsycopgEngine, + Sqlite3Engine, +) from synapse.storage.schema import SCHEMA_COMPAT_VERSION, SCHEMA_VERSION from synapse.storage.types import Cursor @@ -270,7 +275,7 @@ def _setup_new_database( for file_name in os.listdir(directory) ) - if isinstance(database_engine, PostgresEngine): + if isinstance(database_engine, (PostgresEngine, PsycopgEngine)): specific = "postgres" else: specific = "sqlite" @@ -414,7 +419,7 @@ def _upgrade_existing_database( logger.debug("applied_delta_files: %s", current_schema_state.applied_deltas) - if isinstance(database_engine, PostgresEngine): + if isinstance(database_engine, (PostgresEngine, PsycopgEngine)): specific_engine_extension = ".postgres" else: specific_engine_extension = ".sqlite" diff --git a/synapse/storage/schema/main/delta/56/unique_user_filter_index.py b/synapse/storage/schema/main/delta/56/unique_user_filter_index.py
index 2461f87d77..29a2f7b65d 100644 --- a/synapse/storage/schema/main/delta/56/unique_user_filter_index.py +++ b/synapse/storage/schema/main/delta/56/unique_user_filter_index.py
@@ -2,7 +2,7 @@ import logging from io import StringIO from synapse.storage.database import LoggingTransaction -from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine +from synapse.storage.engines import BaseDatabaseEngine from synapse.storage.prepare_database import execute_statements_from_stream logger = logging.getLogger(__name__) @@ -18,7 +18,7 @@ This migration updates the user_filters table as follows: def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: - if isinstance(database_engine, PostgresEngine): + if database_engine.supports_select_distinct_on: select_clause = """ SELECT DISTINCT ON (user_id, filter_id) user_id, filter_id, filter_json FROM user_filters diff --git a/synapse/storage/schema/main/delta/68/05partial_state_rooms_triggers.py b/synapse/storage/schema/main/delta/68/05partial_state_rooms_triggers.py
index 9210026dde..f3efe4704c 100644 --- a/synapse/storage/schema/main/delta/68/05partial_state_rooms_triggers.py +++ b/synapse/storage/schema/main/delta/68/05partial_state_rooms_triggers.py
@@ -19,7 +19,12 @@ This migration adds triggers to the partial_state_events tables to enforce uniqu Triggers cannot be expressed in .sql files, so we have to use a separate file. """ from synapse.storage.database import LoggingTransaction -from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine +from synapse.storage.engines import ( + BaseDatabaseEngine, + PostgresEngine, + PsycopgEngine, + Sqlite3Engine, +) def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: @@ -43,7 +48,7 @@ def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> END; """ ) - elif isinstance(database_engine, PostgresEngine): + elif isinstance(database_engine, (PostgresEngine, PsycopgEngine)): cur.execute( """ CREATE OR REPLACE FUNCTION check_partial_state_events() RETURNS trigger AS $BODY$ diff --git a/synapse/storage/schema/main/delta/69/01as_txn_seq.py b/synapse/storage/schema/main/delta/69/01as_txn_seq.py
index 6c112425f2..9dd5a27a3f 100644 --- a/synapse/storage/schema/main/delta/69/01as_txn_seq.py +++ b/synapse/storage/schema/main/delta/69/01as_txn_seq.py
@@ -18,11 +18,11 @@ Adds a postgres SEQUENCE for generating application service transaction IDs. """ from synapse.storage.database import LoggingTransaction -from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine +from synapse.storage.engines import BaseDatabaseEngine, PsycopgEngine def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: - if isinstance(database_engine, PostgresEngine): + if database_engine.supports_sequences: # If we already have some AS TXNs we want to start from the current # maximum value. There are two potential places this is stored - the # actual TXNs themselves *and* the AS state table. At time of migration @@ -41,7 +41,13 @@ def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> start_val = max(last_txn_max, txn_max) + 1 - cur.execute( - "CREATE SEQUENCE application_services_txn_id_seq START WITH %s", - (start_val,), - ) + # XXX This is a hack. + sql = f"CREATE SEQUENCE application_services_txn_id_seq START WITH {start_val}" + args = () + + if isinstance(database_engine, PsycopgEngine): + import psycopg.sql + + cur.execute(psycopg.sql.SQL(sql).format(args)) + else: + cur.execute(sql, args) diff --git a/synapse/storage/types.py b/synapse/storage/types.py
index afaeef9a5a..1d9f510784 100644 --- a/synapse/storage/types.py +++ b/synapse/storage/types.py
@@ -44,7 +44,7 @@ class Cursor(Protocol): def fetchone(self) -> Optional[Tuple]: ... - def fetchmany(self, size: Optional[int] = ...) -> List[Tuple]: + def fetchmany(self, size: int = ...) -> List[Tuple]: ... def fetchall(self) -> List[Tuple]: diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 9c3eafb562..52d708ad17 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py
@@ -877,7 +877,7 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator): "agg": "GREATEST" if self._positive else "LEAST", } - pos = (self.get_current_token_for_writer(self._instance_name),) + pos = self.get_current_token_for_writer(self._instance_name) txn.execute(sql, (self._stream_name, self._instance_name, pos)) diff --git a/tests/server.py b/tests/server.py
index c8342db399..74dd00cd3f 100644 --- a/tests/server.py +++ b/tests/server.py
@@ -971,10 +971,14 @@ def setup_test_homeserver( if USE_POSTGRES_FOR_TESTS: test_db = "synapse_test_%s" % uuid.uuid4().hex + if USE_POSTGRES_FOR_TESTS == "psycopg": + db_type = "psycopg" + else: + db_type = "psycopg2" database_config = { - "name": "psycopg2", + "name": db_type, "args": { - "database": test_db, + "dbname": test_db, "host": POSTGRES_HOST, "password": POSTGRES_PASSWORD, "user": POSTGRES_USER, @@ -1030,17 +1034,14 @@ def setup_test_homeserver( # Create the database before we actually try and connect to it, based off # the template database we generate in setupdb() if isinstance(db_engine, PostgresEngine): - import psycopg2.extensions - db_conn = db_engine.module.connect( - database=POSTGRES_BASE_DB, + dbname=POSTGRES_BASE_DB, user=POSTGRES_USER, host=POSTGRES_HOST, port=POSTGRES_PORT, password=POSTGRES_PASSWORD, ) - assert isinstance(db_conn, psycopg2.extensions.connection) - db_conn.autocommit = True + db_engine.attempt_to_set_autocommit(db_conn, True) cur = db_conn.cursor() cur.execute("DROP DATABASE IF EXISTS %s;" % (test_db,)) cur.execute( @@ -1070,9 +1071,6 @@ def setup_test_homeserver( # We need to do cleanup on PostgreSQL def cleanup() -> None: - import psycopg2 - import psycopg2.extensions - # Close all the db pools database_pool._db_pool.close() @@ -1080,14 +1078,13 @@ def setup_test_homeserver( # Drop the test database db_conn = db_engine.module.connect( - database=POSTGRES_BASE_DB, + dbname=POSTGRES_BASE_DB, user=POSTGRES_USER, host=POSTGRES_HOST, port=POSTGRES_PORT, password=POSTGRES_PASSWORD, ) - assert isinstance(db_conn, psycopg2.extensions.connection) - db_conn.autocommit = True + db_engine.attempt_to_set_autocommit(db_conn, True) cur = db_conn.cursor() # Try a few times to drop the DB. Some things may hold on to the @@ -1099,7 +1096,7 @@ def setup_test_homeserver( cur.execute("DROP DATABASE IF EXISTS %s;" % (test_db,)) db_conn.commit() dropped = True - except psycopg2.OperationalError as e: + except db_engine.OperationalError as e: warnings.warn( "Couldn't drop old db: " + str(e), category=UserWarning, diff --git a/tests/utils.py b/tests/utils.py
index e73b46944b..9be02b8ea7 100644 --- a/tests/utils.py +++ b/tests/utils.py
@@ -59,7 +59,10 @@ def setupdb() -> None: # If we're using PostgreSQL, set up the db once if USE_POSTGRES_FOR_TESTS: # create a PostgresEngine - db_engine = create_engine({"name": "psycopg2", "args": {}}) + if USE_POSTGRES_FOR_TESTS == "psycopg": + db_engine = create_engine({"name": "psycopg", "args": {}}) + else: + db_engine = create_engine({"name": "psycopg2", "args": {}}) # connect to postgres to create the base database. db_conn = db_engine.module.connect( user=POSTGRES_USER, @@ -80,11 +83,11 @@ def setupdb() -> None: # Set up in the db db_conn = db_engine.module.connect( - database=POSTGRES_BASE_DB, user=POSTGRES_USER, host=POSTGRES_HOST, port=POSTGRES_PORT, password=POSTGRES_PASSWORD, + dbname=POSTGRES_BASE_DB, ) logging_conn = LoggingDatabaseConnection(db_conn, db_engine, "tests") prepare_database(logging_conn, db_engine, None)