summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/9639.bugfix1
-rw-r--r--synapse/storage/databases/main/transactions.py45
2 files changed, 10 insertions, 36 deletions
diff --git a/changelog.d/9639.bugfix b/changelog.d/9639.bugfix
new file mode 100644
index 0000000000..51b3746707
--- /dev/null
+++ b/changelog.d/9639.bugfix
@@ -0,0 +1 @@
+Fix bug where federation sending can stall due to `concurrent access` database exceptions when it falls behind.
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index 0309661841..b7072f1f5e 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -22,7 +22,6 @@ from canonicaljson import encode_canonical_json
 from synapse.metrics.background_process_metrics import wrap_as_background_process
 from synapse.storage._base import SQLBaseStore, db_to_json
 from synapse.storage.database import DatabasePool, LoggingTransaction
-from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 from synapse.types import JsonDict
 from synapse.util.caches.expiringcache import ExpiringCache
 
@@ -312,49 +311,23 @@ class TransactionStore(TransactionWorkerStore):
             stream_ordering: the stream_ordering of the event
         """
 
-        return await self.db_pool.runInteraction(
-            "store_destination_rooms_entries",
-            self._store_destination_rooms_entries_txn,
-            destinations,
-            room_id,
-            stream_ordering,
+        await self.db_pool.simple_upsert_many(
+            table="destinations",
+            key_names=("destination",),
+            key_values=[(d,) for d in destinations],
+            value_names=[],
+            value_values=[],
+            desc="store_destination_rooms_entries_dests",
         )
 
-    def _store_destination_rooms_entries_txn(
-        self,
-        txn: LoggingTransaction,
-        destinations: Iterable[str],
-        room_id: str,
-        stream_ordering: int,
-    ) -> None:
-
-        # ensure we have a `destinations` row for this destination, as there is
-        # a foreign key constraint.
-        if isinstance(self.database_engine, PostgresEngine):
-            q = """
-                INSERT INTO destinations (destination)
-                    VALUES (?)
-                    ON CONFLICT DO NOTHING;
-            """
-        elif isinstance(self.database_engine, Sqlite3Engine):
-            q = """
-                INSERT OR IGNORE INTO destinations (destination)
-                    VALUES (?);
-            """
-        else:
-            raise RuntimeError("Unknown database engine")
-
-        txn.execute_batch(q, ((destination,) for destination in destinations))
-
         rows = [(destination, room_id) for destination in destinations]
-
-        self.db_pool.simple_upsert_many_txn(
-            txn,
+        await self.db_pool.simple_upsert_many(
             table="destination_rooms",
             key_names=("destination", "room_id"),
             key_values=rows,
             value_names=["stream_ordering"],
             value_values=[(stream_ordering,)] * len(rows),
+            desc="store_destination_rooms_entries_rooms",
         )
 
     async def get_destination_last_successful_stream_ordering(