summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/roommember.py69
-rw-r--r--synapse/storage/schema/main/delta/76/04_add_room_forgetter.sql24
2 files changed, 69 insertions, 24 deletions
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index daad58291a..e068f27a10 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -82,7 +82,7 @@ class EventIdMembership:
     membership: str
 
 
-class RoomMemberWorkerStore(EventsWorkerStore):
+class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
     def __init__(
         self,
         database: DatabasePool,
@@ -1372,6 +1372,50 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             _is_local_host_in_room_ignoring_users_txn,
         )
 
+    async def forget(self, user_id: str, room_id: str) -> None:
+        """Indicate that user_id wishes to discard history for room_id."""
+
+        def f(txn: LoggingTransaction) -> None:
+            self.db_pool.simple_update_txn(
+                txn,
+                table="room_memberships",
+                keyvalues={"user_id": user_id, "room_id": room_id},
+                updatevalues={"forgotten": 1},
+            )
+
+            self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id))
+            self._invalidate_cache_and_stream(
+                txn, self.get_forgotten_rooms_for_user, (user_id,)
+            )
+
+        await self.db_pool.runInteraction("forget_membership", f)
+
+    async def get_room_forgetter_stream_pos(self) -> int:
+        """Get the stream position of the background process to forget rooms when left
+        by users.
+        """
+        return await self.db_pool.simple_select_one_onecol(
+            table="room_forgetter_stream_pos",
+            keyvalues={},
+            retcol="stream_id",
+            desc="room_forgetter_stream_pos",
+        )
+
+    async def update_room_forgetter_stream_pos(self, stream_id: int) -> None:
+        """Update the stream position of the background process to forget rooms when
+        left by users.
+
+        Must only be used by the worker running the background process.
+        """
+        assert self.hs.config.worker.run_background_tasks
+
+        await self.db_pool.simple_update_one(
+            table="room_forgetter_stream_pos",
+            keyvalues={},
+            updatevalues={"stream_id": stream_id},
+            desc="room_forgetter_stream_pos",
+        )
+
 
 class RoomMemberBackgroundUpdateStore(SQLBaseStore):
     def __init__(
@@ -1553,29 +1597,6 @@ class RoomMemberStore(
     ):
         super().__init__(database, db_conn, hs)
 
-    async def forget(self, user_id: str, room_id: str) -> None:
-        """Indicate that user_id wishes to discard history for room_id."""
-
-        def f(txn: LoggingTransaction) -> None:
-            sql = (
-                "UPDATE"
-                "  room_memberships"
-                " SET"
-                "  forgotten = 1"
-                " WHERE"
-                "  user_id = ?"
-                " AND"
-                "  room_id = ?"
-            )
-            txn.execute(sql, (user_id, room_id))
-
-            self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id))
-            self._invalidate_cache_and_stream(
-                txn, self.get_forgotten_rooms_for_user, (user_id,)
-            )
-
-        await self.db_pool.runInteraction("forget_membership", f)
-
 
 def extract_heroes_from_room_summary(
     details: Mapping[str, MemberSummary], me: str
diff --git a/synapse/storage/schema/main/delta/76/04_add_room_forgetter.sql b/synapse/storage/schema/main/delta/76/04_add_room_forgetter.sql
new file mode 100644
index 0000000000..be4b57d86f
--- /dev/null
+++ b/synapse/storage/schema/main/delta/76/04_add_room_forgetter.sql
@@ -0,0 +1,24 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE room_forgetter_stream_pos (
+    Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE,  -- Makes sure this table only has one row.
+    stream_id  BIGINT NOT NULL,
+    CHECK (Lock='X')
+);
+
+INSERT INTO room_forgetter_stream_pos (
+    stream_id
+) SELECT COALESCE(MAX(stream_ordering), 0) from events;