diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index 837dc7646e..dc3948c170 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -43,7 +43,7 @@ from .event_federation import EventFederationStore
from .event_push_actions import EventPushActionsStore
from .events_bg_updates import EventsBackgroundUpdatesStore
from .events_forward_extremities import EventForwardExtremitiesStore
-from .filtering import FilteringStore
+from .filtering import FilteringWorkerStore
from .keys import KeyStore
from .lock import LockStore
from .media_repository import MediaRepositoryStore
@@ -99,7 +99,7 @@ class DataStore(
EventFederationStore,
MediaRepositoryStore,
RejectionsStore,
- FilteringStore,
+ FilteringWorkerStore,
PusherStore,
PushRuleStore,
ApplicationServiceTransactionStore,
diff --git a/synapse/storage/databases/main/filtering.py b/synapse/storage/databases/main/filtering.py
index 12f3b601f1..8e57c8e5a0 100644
--- a/synapse/storage/databases/main/filtering.py
+++ b/synapse/storage/databases/main/filtering.py
@@ -17,7 +17,7 @@ from typing import Optional, Tuple, Union, cast
from canonicaljson import encode_canonical_json
-from synapse.api.errors import Codes, SynapseError
+from synapse.api.errors import Codes, StoreError, SynapseError
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import LoggingTransaction
from synapse.types import JsonDict
@@ -46,8 +46,6 @@ class FilteringWorkerStore(SQLBaseStore):
return db_to_json(def_json)
-
-class FilteringStore(FilteringWorkerStore):
async def add_user_filter(self, user_localpart: str, user_filter: JsonDict) -> int:
def_json = encode_canonical_json(user_filter)
@@ -79,4 +77,23 @@ class FilteringStore(FilteringWorkerStore):
return filter_id
- return await self.db_pool.runInteraction("add_user_filter", _do_txn)
+ attempts = 0
+ while True:
+ # Try a few times.
+ # This is technically needed if a user tries to create two filters at once,
+ # leading to two concurrent transactions.
+ # The failure case would be:
+ # - SELECT filter_id ... filter_json = ? → both transactions return no rows
+ # - SELECT MAX(filter_id) ... → both transactions return e.g. 5
+ # - INSERT INTO ... → both transactions insert filter_id = 6
+ # One of the transactions will commit. The other will get a unique key
+ # constraint violation error (IntegrityError). This is not the same as a
+ # serialisability violation, which would be automatically retried by
+ # `runInteraction`.
+ try:
+ return await self.db_pool.runInteraction("add_user_filter", _do_txn)
+ except self.db_pool.engine.module.IntegrityError:
+ attempts += 1
+
+ if attempts >= 5:
+ raise StoreError(500, "Couldn't generate a filter ID.")
|