summary refs log tree commit diff
path: root/synapse/storage/databases/main/transactions.py
diff options
context:
space:
mode:
authorJonathan de Jong <jonathan@automatia.nl>2021-03-23 14:52:30 +0100
committerGitHub <noreply@github.com>2021-03-23 13:52:30 +0000
commit0caf2a338eacda084454bae84514875af6349eeb (patch)
treeca3e3215004aeed7dd164b9276dedecfd09a5757 /synapse/storage/databases/main/transactions.py
parentFederation API for Space summary (#9652) (diff)
downloadsynapse-0caf2a338eacda084454bae84514875af6349eeb.tar.xz
Fix federation stall on concurrent access errors (#9639)
Diffstat (limited to 'synapse/storage/databases/main/transactions.py')
-rw-r--r--synapse/storage/databases/main/transactions.py45
1 files changed, 9 insertions, 36 deletions
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index 0309661841..b7072f1f5e 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -22,7 +22,6 @@ from canonicaljson import encode_canonical_json
 from synapse.metrics.background_process_metrics import wrap_as_background_process
 from synapse.storage._base import SQLBaseStore, db_to_json
 from synapse.storage.database import DatabasePool, LoggingTransaction
-from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 from synapse.types import JsonDict
 from synapse.util.caches.expiringcache import ExpiringCache
 
@@ -312,49 +311,23 @@ class TransactionStore(TransactionWorkerStore):
             stream_ordering: the stream_ordering of the event
         """
 
-        return await self.db_pool.runInteraction(
-            "store_destination_rooms_entries",
-            self._store_destination_rooms_entries_txn,
-            destinations,
-            room_id,
-            stream_ordering,
+        await self.db_pool.simple_upsert_many(
+            table="destinations",
+            key_names=("destination",),
+            key_values=[(d,) for d in destinations],
+            value_names=[],
+            value_values=[],
+            desc="store_destination_rooms_entries_dests",
         )
 
-    def _store_destination_rooms_entries_txn(
-        self,
-        txn: LoggingTransaction,
-        destinations: Iterable[str],
-        room_id: str,
-        stream_ordering: int,
-    ) -> None:
-
-        # ensure we have a `destinations` row for this destination, as there is
-        # a foreign key constraint.
-        if isinstance(self.database_engine, PostgresEngine):
-            q = """
-                INSERT INTO destinations (destination)
-                    VALUES (?)
-                    ON CONFLICT DO NOTHING;
-            """
-        elif isinstance(self.database_engine, Sqlite3Engine):
-            q = """
-                INSERT OR IGNORE INTO destinations (destination)
-                    VALUES (?);
-            """
-        else:
-            raise RuntimeError("Unknown database engine")
-
-        txn.execute_batch(q, ((destination,) for destination in destinations))
-
         rows = [(destination, room_id) for destination in destinations]
-
-        self.db_pool.simple_upsert_many_txn(
-            txn,
+        await self.db_pool.simple_upsert_many(
             table="destination_rooms",
             key_names=("destination", "room_id"),
             key_values=rows,
             value_names=["stream_ordering"],
             value_values=[(stream_ordering,)] * len(rows),
+            desc="store_destination_rooms_entries_rooms",
         )
 
     async def get_destination_last_successful_stream_ordering(