summary refs log tree commit diff
diff options
context:
space:
mode:
authorJonathan de Jong <jonathan@automatia.nl>2021-03-23 14:52:30 +0100
committerGitHub <noreply@github.com>2021-03-23 13:52:30 +0000
commit0caf2a338eacda084454bae84514875af6349eeb (patch)
treeca3e3215004aeed7dd164b9276dedecfd09a5757
parentFederation API for Space summary (#9652) (diff)
downloadsynapse-0caf2a338eacda084454bae84514875af6349eeb.tar.xz
Fix federation stall on concurrent access errors (#9639)
Diffstat (limited to '')
-rw-r--r--changelog.d/9639.bugfix1
-rw-r--r--synapse/storage/databases/main/transactions.py45
2 files changed, 10 insertions, 36 deletions
diff --git a/changelog.d/9639.bugfix b/changelog.d/9639.bugfix
new file mode 100644

index 0000000000..51b3746707 --- /dev/null +++ b/changelog.d/9639.bugfix
@@ -0,0 +1 @@ +Fix bug where federation sending can stall due to `concurrent access` database exceptions when it falls behind. diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index 0309661841..b7072f1f5e 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py
@@ -22,7 +22,6 @@ from canonicaljson import encode_canonical_json from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import DatabasePool, LoggingTransaction -from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.types import JsonDict from synapse.util.caches.expiringcache import ExpiringCache @@ -312,49 +311,23 @@ class TransactionStore(TransactionWorkerStore): stream_ordering: the stream_ordering of the event """ - return await self.db_pool.runInteraction( - "store_destination_rooms_entries", - self._store_destination_rooms_entries_txn, - destinations, - room_id, - stream_ordering, + await self.db_pool.simple_upsert_many( + table="destinations", + key_names=("destination",), + key_values=[(d,) for d in destinations], + value_names=[], + value_values=[], + desc="store_destination_rooms_entries_dests", ) - def _store_destination_rooms_entries_txn( - self, - txn: LoggingTransaction, - destinations: Iterable[str], - room_id: str, - stream_ordering: int, - ) -> None: - - # ensure we have a `destinations` row for this destination, as there is - # a foreign key constraint. - if isinstance(self.database_engine, PostgresEngine): - q = """ - INSERT INTO destinations (destination) - VALUES (?) - ON CONFLICT DO NOTHING; - """ - elif isinstance(self.database_engine, Sqlite3Engine): - q = """ - INSERT OR IGNORE INTO destinations (destination) - VALUES (?); - """ - else: - raise RuntimeError("Unknown database engine") - - txn.execute_batch(q, ((destination,) for destination in destinations)) - rows = [(destination, room_id) for destination in destinations] - - self.db_pool.simple_upsert_many_txn( - txn, + await self.db_pool.simple_upsert_many( table="destination_rooms", key_names=("destination", "room_id"), key_values=rows, value_names=["stream_ordering"], value_values=[(stream_ordering,)] * len(rows), + desc="store_destination_rooms_entries_rooms", ) async def get_destination_last_successful_stream_ordering(