summary refs log tree commit diff
diff options
context:
space:
mode:
authorSean Quah <seanq@matrix.org>2023-04-15 01:23:01 +0100
committerSean Quah <seanq@matrix.org>2023-04-15 02:52:42 +0100
commit0a734d0cf298daaa413dad15048e2df7baddcfbe (patch)
tree3b4a83a2748eb94aec323c538229b7614ec2b1f1
parentAdd background update to populate `profiles.full_user_id` (diff)
downloadsynapse-0a734d0cf298daaa413dad15048e2df7baddcfbe.tar.xz
Add background update to populate `user_filters.full_user_id`
Signed-off-by: Sean Quah <seanq@matrix.org>
-rw-r--r--synapse/storage/databases/main/__init__.py4
-rw-r--r--synapse/storage/databases/main/filtering.py75
-rw-r--r--synapse/storage/schema/main/delta/75/02_add_user_filters_full_user_id_column.sql3
3 files changed, 78 insertions, 4 deletions
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index dc3948c170..837dc7646e 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -43,7 +43,7 @@ from .event_federation import EventFederationStore
 from .event_push_actions import EventPushActionsStore
 from .events_bg_updates import EventsBackgroundUpdatesStore
 from .events_forward_extremities import EventForwardExtremitiesStore
-from .filtering import FilteringWorkerStore
+from .filtering import FilteringStore
 from .keys import KeyStore
 from .lock import LockStore
 from .media_repository import MediaRepositoryStore
@@ -99,7 +99,7 @@ class DataStore(
     EventFederationStore,
     MediaRepositoryStore,
     RejectionsStore,
-    FilteringWorkerStore,
+    FilteringStore,
     PusherStore,
     PushRuleStore,
     ApplicationServiceTransactionStore,
diff --git a/synapse/storage/databases/main/filtering.py b/synapse/storage/databases/main/filtering.py
index 8e57c8e5a0..88be0f5f2f 100644
--- a/synapse/storage/databases/main/filtering.py
+++ b/synapse/storage/databases/main/filtering.py
@@ -13,16 +13,23 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from typing import Optional, Tuple, Union, cast
+from typing import TYPE_CHECKING, Optional, Tuple, Union, cast
 
 from canonicaljson import encode_canonical_json
 
 from synapse.api.errors import Codes, StoreError, SynapseError
 from synapse.storage._base import SQLBaseStore, db_to_json
-from synapse.storage.database import LoggingTransaction
+from synapse.storage.database import (
+    DatabasePool,
+    LoggingDatabaseConnection,
+    LoggingTransaction,
+)
 from synapse.types import JsonDict
 from synapse.util.caches.descriptors import cached
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 
 class FilteringWorkerStore(SQLBaseStore):
     @cached(num_args=2)
@@ -97,3 +104,67 @@ class FilteringWorkerStore(SQLBaseStore):
 
                 if attempts >= 5:
                     raise StoreError(500, "Couldn't generate a filter ID.")
+
+
+class FilteringBackgroundUpdateStore(FilteringWorkerStore):
+    POPULATE_USER_FILTERS_FULL_USER_ID = "populate_user_filters_full_user_id"
+
+    def __init__(
+        self,
+        database: DatabasePool,
+        db_conn: LoggingDatabaseConnection,
+        hs: "HomeServer",
+    ):
+        super().__init__(database, db_conn, hs)
+
+        self.db_pool.updates.register_background_update_handler(
+            self.POPULATE_USER_FILTERS_FULL_USER_ID,
+            self._populate_user_filters_full_user_id,
+        )
+
+    async def _populate_user_filters_full_user_id(
+        self, progress: JsonDict, batch_size: int
+    ) -> int:
+        """Populates the `user_filters.full_user_id` column.
+
+        In a future Synapse version, this column will be renamed to `user_id`, replacing
+        the existing `user_id` column.
+
+        Note that completion of this background update does not imply that there are no
+        longer any `NULL` values in `full_user_id`. Until the old `user_id` column has
+        been removed, Synapse may be rolled back to a previous version which does not
+        populate `full_user_id` after the background update has finished.
+        """
+
+        def _populate_user_filters_full_user_id_txn(
+            txn: LoggingTransaction,
+        ) -> bool:
+            sql = """
+                UPDATE user_filters
+                SET full_user_id = '@' || user_id || ':' || ?
+                WHERE user_id IN (
+                    SELECT user_id
+                    FROM user_filters
+                    WHERE full_user_id IS NULL
+                    LIMIT ?
+                )
+            """
+            txn.execute(sql, (self.hs.hostname, batch_size))
+
+            return txn.rowcount == 0
+
+        finished = await self.db_pool.runInteraction(
+            "_populate_user_filters_full_user_id_txn",
+            _populate_user_filters_full_user_id_txn,
+        )
+
+        if finished:
+            await self.db_pool.updates._end_background_update(
+                self.POPULATE_USER_FILTERS_FULL_USER_ID
+            )
+
+        return batch_size
+
+
+class FilteringStore(FilteringBackgroundUpdateStore):
+    pass
diff --git a/synapse/storage/schema/main/delta/75/02_add_user_filters_full_user_id_column.sql b/synapse/storage/schema/main/delta/75/02_add_user_filters_full_user_id_column.sql
index cfe4e7cb00..ba993d95c4 100644
--- a/synapse/storage/schema/main/delta/75/02_add_user_filters_full_user_id_column.sql
+++ b/synapse/storage/schema/main/delta/75/02_add_user_filters_full_user_id_column.sql
@@ -21,3 +21,6 @@ CREATE UNIQUE INDEX full_user_filters_unique ON user_filters (full_user_id, filt
 -- NB: This will lock the table for writes while the index is being built.
 --     There are around 4,000,000 user_filters on matrix.org so we expect this to take
 --     a couple of seconds at most.
+
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+    (7502, 'populate_user_filters_full_user_id', '{}');