diff --git a/changelog.d/9639.bugfix b/changelog.d/9639.bugfix
new file mode 100644
index 0000000000..51b3746707
--- /dev/null
+++ b/changelog.d/9639.bugfix
@@ -0,0 +1 @@
+Fix bug where federation sending can stall due to `concurrent access` database exceptions when it falls behind.
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index 0309661841..b7072f1f5e 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -22,7 +22,6 @@ from canonicaljson import encode_canonical_json
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import DatabasePool, LoggingTransaction
-from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.types import JsonDict
from synapse.util.caches.expiringcache import ExpiringCache
@@ -312,49 +311,23 @@ class TransactionStore(TransactionWorkerStore):
stream_ordering: the stream_ordering of the event
"""
- return await self.db_pool.runInteraction(
- "store_destination_rooms_entries",
- self._store_destination_rooms_entries_txn,
- destinations,
- room_id,
- stream_ordering,
+ await self.db_pool.simple_upsert_many(
+ table="destinations",
+ key_names=("destination",),
+ key_values=[(d,) for d in destinations],
+ value_names=[],
+ value_values=[],
+ desc="store_destination_rooms_entries_dests",
)
- def _store_destination_rooms_entries_txn(
- self,
- txn: LoggingTransaction,
- destinations: Iterable[str],
- room_id: str,
- stream_ordering: int,
- ) -> None:
-
- # ensure we have a `destinations` row for this destination, as there is
- # a foreign key constraint.
- if isinstance(self.database_engine, PostgresEngine):
- q = """
- INSERT INTO destinations (destination)
- VALUES (?)
- ON CONFLICT DO NOTHING;
- """
- elif isinstance(self.database_engine, Sqlite3Engine):
- q = """
- INSERT OR IGNORE INTO destinations (destination)
- VALUES (?);
- """
- else:
- raise RuntimeError("Unknown database engine")
-
- txn.execute_batch(q, ((destination,) for destination in destinations))
-
rows = [(destination, room_id) for destination in destinations]
-
- self.db_pool.simple_upsert_many_txn(
- txn,
+ await self.db_pool.simple_upsert_many(
table="destination_rooms",
key_names=("destination", "room_id"),
key_values=rows,
value_names=["stream_ordering"],
value_values=[(stream_ordering,)] * len(rows),
+ desc="store_destination_rooms_entries_rooms",
)
async def get_destination_last_successful_stream_ordering(
|