summary refs log tree commit diff
path: root/synapse/storage/databases/main/roommember.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/roommember.py')
-rw-r--r--synapse/storage/databases/main/roommember.py69
1 files changed, 45 insertions, 24 deletions
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index daad58291a..e068f27a10 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -82,7 +82,7 @@ class EventIdMembership:
     membership: str
 
 
-class RoomMemberWorkerStore(EventsWorkerStore):
+class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
     def __init__(
         self,
         database: DatabasePool,
@@ -1372,6 +1372,50 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             _is_local_host_in_room_ignoring_users_txn,
         )
 
+    async def forget(self, user_id: str, room_id: str) -> None:
+        """Indicate that user_id wishes to discard history for room_id."""
+
+        def f(txn: LoggingTransaction) -> None:
+            self.db_pool.simple_update_txn(
+                txn,
+                table="room_memberships",
+                keyvalues={"user_id": user_id, "room_id": room_id},
+                updatevalues={"forgotten": 1},
+            )
+
+            self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id))
+            self._invalidate_cache_and_stream(
+                txn, self.get_forgotten_rooms_for_user, (user_id,)
+            )
+
+        await self.db_pool.runInteraction("forget_membership", f)
+
+    async def get_room_forgetter_stream_pos(self) -> int:
+        """Get the stream position of the background process to forget rooms when left
+        by users.
+        """
+        return await self.db_pool.simple_select_one_onecol(
+            table="room_forgetter_stream_pos",
+            keyvalues={},
+            retcol="stream_id",
+            desc="room_forgetter_stream_pos",
+        )
+
+    async def update_room_forgetter_stream_pos(self, stream_id: int) -> None:
+        """Update the stream position of the background process to forget rooms when
+        left by users.
+
+        Must only be used by the worker running the background process.
+        """
+        assert self.hs.config.worker.run_background_tasks
+
+        await self.db_pool.simple_update_one(
+            table="room_forgetter_stream_pos",
+            keyvalues={},
+            updatevalues={"stream_id": stream_id},
+            desc="room_forgetter_stream_pos",
+        )
+
 
 class RoomMemberBackgroundUpdateStore(SQLBaseStore):
     def __init__(
@@ -1553,29 +1597,6 @@ class RoomMemberStore(
     ):
         super().__init__(database, db_conn, hs)
 
-    async def forget(self, user_id: str, room_id: str) -> None:
-        """Indicate that user_id wishes to discard history for room_id."""
-
-        def f(txn: LoggingTransaction) -> None:
-            sql = (
-                "UPDATE"
-                "  room_memberships"
-                " SET"
-                "  forgotten = 1"
-                " WHERE"
-                "  user_id = ?"
-                " AND"
-                "  room_id = ?"
-            )
-            txn.execute(sql, (user_id, room_id))
-
-            self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id))
-            self._invalidate_cache_and_stream(
-                txn, self.get_forgotten_rooms_for_user, (user_id,)
-            )
-
-        await self.db_pool.runInteraction("forget_membership", f)
-
 
 def extract_heroes_from_room_summary(
     details: Mapping[str, MemberSummary], me: str