diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index daad58291a..e068f27a10 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -82,7 +82,7 @@ class EventIdMembership:
membership: str
-class RoomMemberWorkerStore(EventsWorkerStore):
+class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
def __init__(
self,
database: DatabasePool,
@@ -1372,6 +1372,50 @@ class RoomMemberWorkerStore(EventsWorkerStore):
_is_local_host_in_room_ignoring_users_txn,
)
+ async def forget(self, user_id: str, room_id: str) -> None:
+ """Indicate that user_id wishes to discard history for room_id."""
+
+ def f(txn: LoggingTransaction) -> None:
+ self.db_pool.simple_update_txn(
+ txn,
+ table="room_memberships",
+ keyvalues={"user_id": user_id, "room_id": room_id},
+ updatevalues={"forgotten": 1},
+ )
+
+ self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id))
+ self._invalidate_cache_and_stream(
+ txn, self.get_forgotten_rooms_for_user, (user_id,)
+ )
+
+ await self.db_pool.runInteraction("forget_membership", f)
+
+ async def get_room_forgetter_stream_pos(self) -> int:
+ """Get the stream position of the background process to forget rooms when left
+ by users.
+ """
+ return await self.db_pool.simple_select_one_onecol(
+ table="room_forgetter_stream_pos",
+ keyvalues={},
+ retcol="stream_id",
+ desc="room_forgetter_stream_pos",
+ )
+
+ async def update_room_forgetter_stream_pos(self, stream_id: int) -> None:
+ """Update the stream position of the background process to forget rooms when
+ left by users.
+
+ Must only be used by the worker running the background process.
+ """
+ assert self.hs.config.worker.run_background_tasks
+
+ await self.db_pool.simple_update_one(
+ table="room_forgetter_stream_pos",
+ keyvalues={},
+ updatevalues={"stream_id": stream_id},
+ desc="room_forgetter_stream_pos",
+ )
+
class RoomMemberBackgroundUpdateStore(SQLBaseStore):
def __init__(
@@ -1553,29 +1597,6 @@ class RoomMemberStore(
):
super().__init__(database, db_conn, hs)
- async def forget(self, user_id: str, room_id: str) -> None:
- """Indicate that user_id wishes to discard history for room_id."""
-
- def f(txn: LoggingTransaction) -> None:
- sql = (
- "UPDATE"
- " room_memberships"
- " SET"
- " forgotten = 1"
- " WHERE"
- " user_id = ?"
- " AND"
- " room_id = ?"
- )
- txn.execute(sql, (user_id, room_id))
-
- self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id))
- self._invalidate_cache_and_stream(
- txn, self.get_forgotten_rooms_for_user, (user_id,)
- )
-
- await self.db_pool.runInteraction("forget_membership", f)
-
def extract_heroes_from_room_summary(
details: Mapping[str, MemberSummary], me: str
diff --git a/synapse/storage/schema/main/delta/76/04_add_room_forgetter.sql b/synapse/storage/schema/main/delta/76/04_add_room_forgetter.sql
new file mode 100644
index 0000000000..be4b57d86f
--- /dev/null
+++ b/synapse/storage/schema/main/delta/76/04_add_room_forgetter.sql
@@ -0,0 +1,24 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE room_forgetter_stream_pos (
+ Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row.
+ stream_id BIGINT NOT NULL,
+ CHECK (Lock='X')
+);
+
+INSERT INTO room_forgetter_stream_pos (
+ stream_id
+) SELECT COALESCE(MAX(stream_ordering), 0) from events;
|