summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/15134.feature1
-rwxr-xr-xdocker/configure_workers_and_start.py1
-rw-r--r--docs/workers.md1
-rw-r--r--synapse/rest/__init__.py3
-rw-r--r--synapse/storage/databases/main/__init__.py4
-rw-r--r--synapse/storage/databases/main/filtering.py25
6 files changed, 27 insertions, 8 deletions
diff --git a/changelog.d/15134.feature b/changelog.d/15134.feature
new file mode 100644
index 0000000000..0dbb30bc8f
--- /dev/null
+++ b/changelog.d/15134.feature
@@ -0,0 +1 @@
+Allow use of the `/filter` Client-Server APIs on workers.
\ No newline at end of file
diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py
index 58c62f2231..7f615e5066 100755
--- a/docker/configure_workers_and_start.py
+++ b/docker/configure_workers_and_start.py
@@ -142,6 +142,7 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
             "^/_matrix/client/(api/v1|r0|v3|unstable/.*)/rooms/.*/aliases",
             "^/_matrix/client/v1/rooms/.*/timestamp_to_event$",
             "^/_matrix/client/(api/v1|r0|v3|unstable)/search",
+            "^/_matrix/client/(r0|v3|unstable)/user/.*/filter(/|$)",
         ],
         "shared_extra_conf": {},
         "worker_extra_conf": "",
diff --git a/docs/workers.md b/docs/workers.md
index 2eb970ffa6..35a96f12a9 100644
--- a/docs/workers.md
+++ b/docs/workers.md
@@ -232,6 +232,7 @@ information.
     ^/_matrix/client/(api/v1|r0|v3|unstable)/joined_rooms$
     ^/_matrix/client/v1/rooms/.*/timestamp_to_event$
     ^/_matrix/client/(api/v1|r0|v3|unstable)/search$
+    ^/_matrix/client/(r0|v3|unstable)/user/.*/filter(/|$)
 
     # Encryption requests
     ^/_matrix/client/(r0|v3|unstable)/keys/query$
diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py
index 14c4e6ebbb..c327f15043 100644
--- a/synapse/rest/__init__.py
+++ b/synapse/rest/__init__.py
@@ -108,8 +108,7 @@ class ClientRestResource(JsonResource):
         if is_main_process:
             logout.register_servlets(hs, client_resource)
         sync.register_servlets(hs, client_resource)
-        if is_main_process:
-            filter.register_servlets(hs, client_resource)
+        filter.register_servlets(hs, client_resource)
         account.register_servlets(hs, client_resource)
         register.register_servlets(hs, client_resource)
         if is_main_process:
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index 837dc7646e..dc3948c170 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -43,7 +43,7 @@ from .event_federation import EventFederationStore
 from .event_push_actions import EventPushActionsStore
 from .events_bg_updates import EventsBackgroundUpdatesStore
 from .events_forward_extremities import EventForwardExtremitiesStore
-from .filtering import FilteringStore
+from .filtering import FilteringWorkerStore
 from .keys import KeyStore
 from .lock import LockStore
 from .media_repository import MediaRepositoryStore
@@ -99,7 +99,7 @@ class DataStore(
     EventFederationStore,
     MediaRepositoryStore,
     RejectionsStore,
-    FilteringStore,
+    FilteringWorkerStore,
     PusherStore,
     PushRuleStore,
     ApplicationServiceTransactionStore,
diff --git a/synapse/storage/databases/main/filtering.py b/synapse/storage/databases/main/filtering.py
index 12f3b601f1..8e57c8e5a0 100644
--- a/synapse/storage/databases/main/filtering.py
+++ b/synapse/storage/databases/main/filtering.py
@@ -17,7 +17,7 @@ from typing import Optional, Tuple, Union, cast
 
 from canonicaljson import encode_canonical_json
 
-from synapse.api.errors import Codes, SynapseError
+from synapse.api.errors import Codes, StoreError, SynapseError
 from synapse.storage._base import SQLBaseStore, db_to_json
 from synapse.storage.database import LoggingTransaction
 from synapse.types import JsonDict
@@ -46,8 +46,6 @@ class FilteringWorkerStore(SQLBaseStore):
 
         return db_to_json(def_json)
 
-
-class FilteringStore(FilteringWorkerStore):
     async def add_user_filter(self, user_localpart: str, user_filter: JsonDict) -> int:
         def_json = encode_canonical_json(user_filter)
 
@@ -79,4 +77,23 @@ class FilteringStore(FilteringWorkerStore):
 
             return filter_id
 
-        return await self.db_pool.runInteraction("add_user_filter", _do_txn)
+        attempts = 0
+        while True:
+            # Try a few times.
+            # This is technically needed if a user tries to create two filters at once,
+            # leading to two concurrent transactions.
+            # The failure case would be:
+            # - SELECT filter_id ... filter_json = ? → both transactions return no rows
+            # - SELECT MAX(filter_id) ... → both transactions return e.g. 5
+            # - INSERT INTO ... → both transactions insert filter_id = 6
+            # One of the transactions will commit. The other will get a unique key
+            # constraint violation error (IntegrityError). This is not the same as a
+            # serialisability violation, which would be automatically retried by
+            # `runInteraction`.
+            try:
+                return await self.db_pool.runInteraction("add_user_filter", _do_txn)
+            except self.db_pool.engine.module.IntegrityError:
+                attempts += 1
+
+                if attempts >= 5:
+                    raise StoreError(500, "Couldn't generate a filter ID.")