summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/config/room.py4
-rw-r--r--synapse/handlers/room_member.py173
-rw-r--r--synapse/handlers/room_member_worker.py3
-rw-r--r--synapse/server.py11
-rw-r--r--synapse/storage/databases/main/roommember.py69
-rw-r--r--synapse/storage/schema/main/delta/76/04_add_room_forgetter.sql24
6 files changed, 237 insertions, 47 deletions
diff --git a/synapse/config/room.py b/synapse/config/room.py
index 4a7ac00540..b6696cd129 100644
--- a/synapse/config/room.py
+++ b/synapse/config/room.py
@@ -75,3 +75,7 @@ class RoomConfig(Config):
                         % preset
                     )
                 # We validate the actual overrides when we try to apply them.
+
+        # When enabled, users will forget rooms when they leave them, either via a
+        # leave, kick or ban.
+        self.forget_on_leave = config.get("forget_rooms_on_leave", False)
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index ed805d6ec8..fbef600acd 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -16,7 +16,7 @@ import abc
 import logging
 import random
 from http import HTTPStatus
-from typing import TYPE_CHECKING, Iterable, List, Optional, Set, Tuple
+from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple
 
 from synapse import types
 from synapse.api.constants import (
@@ -38,7 +38,10 @@ from synapse.event_auth import get_named_level, get_power_level_event
 from synapse.events import EventBase
 from synapse.events.snapshot import EventContext
 from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN
+from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler
 from synapse.logging import opentracing
+from synapse.metrics import event_processing_positions
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.module_api import NOT_SPAM
 from synapse.types import (
     JsonDict,
@@ -280,9 +283,25 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         """
         raise NotImplementedError()
 
-    @abc.abstractmethod
     async def forget(self, user: UserID, room_id: str) -> None:
-        raise NotImplementedError()
+        user_id = user.to_string()
+
+        member = await self._storage_controllers.state.get_current_state_event(
+            room_id=room_id, event_type=EventTypes.Member, state_key=user_id
+        )
+        membership = member.membership if member else None
+
+        if membership is not None and membership not in [
+            Membership.LEAVE,
+            Membership.BAN,
+        ]:
+            raise SynapseError(400, "User %s in room %s" % (user_id, room_id))
+
+        # In normal case this call is only required if `membership` is not `None`.
+        # But: After the last member had left the room, the background update
+        # `_background_remove_left_rooms` is deleting rows related to this room from
+        # the table `current_state_events` and `get_current_state_events` is `None`.
+        await self.store.forget(user_id, room_id)
 
     async def ratelimit_multiple_invites(
         self,
@@ -2046,25 +2065,141 @@ class RoomMemberMasterHandler(RoomMemberHandler):
         """Implements RoomMemberHandler._user_left_room"""
         user_left_room(self.distributor, target, room_id)
 
-    async def forget(self, user: UserID, room_id: str) -> None:
-        user_id = user.to_string()
 
-        member = await self._storage_controllers.state.get_current_state_event(
-            room_id=room_id, event_type=EventTypes.Member, state_key=user_id
-        )
-        membership = member.membership if member else None
+class RoomForgetterHandler(StateDeltasHandler):
+    """Forgets rooms when they are left, when enabled in the homeserver config.
 
-        if membership is not None and membership not in [
-            Membership.LEAVE,
-            Membership.BAN,
-        ]:
-            raise SynapseError(400, "User %s in room %s" % (user_id, room_id))
+    For the purposes of this feature, kicks, bans and "leaves" via state resolution
+    weirdness are all considered to be leaves.
 
-        # In normal case this call is only required if `membership` is not `None`.
-        # But: After the last member had left the room, the background update
-        # `_background_remove_left_rooms` is deleting rows related to this room from
-        # the table `current_state_events` and `get_current_state_events` is `None`.
-        await self.store.forget(user_id, room_id)
+    Derived from `StatsHandler` and `UserDirectoryHandler`.
+    """
+
+    def __init__(self, hs: "HomeServer"):
+        super().__init__(hs)
+
+        self._hs = hs
+        self._store = hs.get_datastores().main
+        self._storage_controllers = hs.get_storage_controllers()
+        self._clock = hs.get_clock()
+        self._notifier = hs.get_notifier()
+        self._room_member_handler = hs.get_room_member_handler()
+
+        # The current position in the current_state_delta stream
+        self.pos: Optional[int] = None
+
+        # Guard to ensure we only process deltas one at a time
+        self._is_processing = False
+
+        if hs.config.worker.run_background_tasks:
+            self._notifier.add_replication_callback(self.notify_new_event)
+
+            # We kick this off to pick up outstanding work from before the last restart.
+            self._clock.call_later(0, self.notify_new_event)
+
+    def notify_new_event(self) -> None:
+        """Called when there may be more deltas to process"""
+        if self._is_processing:
+            return
+
+        self._is_processing = True
+
+        async def process() -> None:
+            try:
+                await self._unsafe_process()
+            finally:
+                self._is_processing = False
+
+        run_as_background_process("room_forgetter.notify_new_event", process)
+
+    async def _unsafe_process(self) -> None:
+        # If self.pos is None then means we haven't fetched it from DB
+        if self.pos is None:
+            self.pos = await self._store.get_room_forgetter_stream_pos()
+            room_max_stream_ordering = self._store.get_room_max_stream_ordering()
+            if self.pos > room_max_stream_ordering:
+                # apparently, we've processed more events than exist in the database!
+                # this can happen if events are removed with history purge or similar.
+                logger.warning(
+                    "Event stream ordering appears to have gone backwards (%i -> %i): "
+                    "rewinding room forgetter processor",
+                    self.pos,
+                    room_max_stream_ordering,
+                )
+                self.pos = room_max_stream_ordering
+
+        if not self._hs.config.room.forget_on_leave:
+            # Update the processing position, so that if the server admin turns the
+            # feature on at a later date, we don't decide to forget every room that
+            # has ever been left in the past.
+            self.pos = self._store.get_room_max_stream_ordering()
+            await self._store.update_room_forgetter_stream_pos(self.pos)
+            return
+
+        # Loop round handling deltas until we're up to date
+
+        while True:
+            # Be sure to read the max stream_ordering *before* checking if there are any outstanding
+            # deltas, since there is otherwise a chance that we could miss updates which arrive
+            # after we check the deltas.
+            room_max_stream_ordering = self._store.get_room_max_stream_ordering()
+            if self.pos == room_max_stream_ordering:
+                break
+
+            logger.debug(
+                "Processing room forgetting %s->%s", self.pos, room_max_stream_ordering
+            )
+            (
+                max_pos,
+                deltas,
+            ) = await self._storage_controllers.state.get_current_state_deltas(
+                self.pos, room_max_stream_ordering
+            )
+
+            logger.debug("Handling %d state deltas", len(deltas))
+            await self._handle_deltas(deltas)
+
+            self.pos = max_pos
+
+            # Expose current event processing position to prometheus
+            event_processing_positions.labels("room_forgetter").set(max_pos)
+
+            await self._store.update_room_forgetter_stream_pos(max_pos)
+
+    async def _handle_deltas(self, deltas: List[Dict[str, Any]]) -> None:
+        """Called with the state deltas to process"""
+        for delta in deltas:
+            typ = delta["type"]
+            state_key = delta["state_key"]
+            room_id = delta["room_id"]
+            event_id = delta["event_id"]
+            prev_event_id = delta["prev_event_id"]
+
+            if typ != EventTypes.Member:
+                continue
+
+            if not self._hs.is_mine_id(state_key):
+                continue
+
+            change = await self._get_key_change(
+                prev_event_id,
+                event_id,
+                key_name="membership",
+                public_value=Membership.JOIN,
+            )
+            is_leave = change is MatchChange.now_false
+
+            if is_leave:
+                try:
+                    await self._room_member_handler.forget(
+                        UserID.from_string(state_key), room_id
+                    )
+                except SynapseError as e:
+                    if e.code == 400:
+                        # The user is back in the room.
+                        pass
+                    else:
+                        raise
 
 
 def get_users_which_can_issue_invite(auth_events: StateMap[EventBase]) -> List[str]:
diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py
index 76e36b8a6d..e8ff1ad063 100644
--- a/synapse/handlers/room_member_worker.py
+++ b/synapse/handlers/room_member_worker.py
@@ -137,6 +137,3 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
         await self._notify_change_client(
             user_id=target.to_string(), room_id=room_id, change="left"
         )
-
-    async def forget(self, target: UserID, room_id: str) -> None:
-        raise RuntimeError("Cannot forget rooms on workers.")
diff --git a/synapse/server.py b/synapse/server.py
index 75a902d64d..a0036578b1 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -94,7 +94,11 @@ from synapse.handlers.room import (
 )
 from synapse.handlers.room_batch import RoomBatchHandler
 from synapse.handlers.room_list import RoomListHandler
-from synapse.handlers.room_member import RoomMemberHandler, RoomMemberMasterHandler
+from synapse.handlers.room_member import (
+    RoomForgetterHandler,
+    RoomMemberHandler,
+    RoomMemberMasterHandler,
+)
 from synapse.handlers.room_member_worker import RoomMemberWorkerHandler
 from synapse.handlers.room_summary import RoomSummaryHandler
 from synapse.handlers.search import SearchHandler
@@ -233,6 +237,7 @@ class HomeServer(metaclass=abc.ABCMeta):
         "message",
         "pagination",
         "profile",
+        "room_forgetter",
         "stats",
     ]
 
@@ -848,6 +853,10 @@ class HomeServer(metaclass=abc.ABCMeta):
         return PushRulesHandler(self)
 
     @cache_in_self
+    def get_room_forgetter_handler(self) -> RoomForgetterHandler:
+        return RoomForgetterHandler(self)
+
+    @cache_in_self
     def get_outbound_redis_connection(self) -> "ConnectionHandler":
         """
         The Redis connection used for replication.
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index daad58291a..e068f27a10 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -82,7 +82,7 @@ class EventIdMembership:
     membership: str
 
 
-class RoomMemberWorkerStore(EventsWorkerStore):
+class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
     def __init__(
         self,
         database: DatabasePool,
@@ -1372,6 +1372,50 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             _is_local_host_in_room_ignoring_users_txn,
         )
 
+    async def forget(self, user_id: str, room_id: str) -> None:
+        """Indicate that user_id wishes to discard history for room_id."""
+
+        def f(txn: LoggingTransaction) -> None:
+            self.db_pool.simple_update_txn(
+                txn,
+                table="room_memberships",
+                keyvalues={"user_id": user_id, "room_id": room_id},
+                updatevalues={"forgotten": 1},
+            )
+
+            self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id))
+            self._invalidate_cache_and_stream(
+                txn, self.get_forgotten_rooms_for_user, (user_id,)
+            )
+
+        await self.db_pool.runInteraction("forget_membership", f)
+
+    async def get_room_forgetter_stream_pos(self) -> int:
+        """Get the stream position of the background process to forget rooms when left
+        by users.
+        """
+        return await self.db_pool.simple_select_one_onecol(
+            table="room_forgetter_stream_pos",
+            keyvalues={},
+            retcol="stream_id",
+            desc="room_forgetter_stream_pos",
+        )
+
+    async def update_room_forgetter_stream_pos(self, stream_id: int) -> None:
+        """Update the stream position of the background process to forget rooms when
+        left by users.
+
+        Must only be used by the worker running the background process.
+        """
+        assert self.hs.config.worker.run_background_tasks
+
+        await self.db_pool.simple_update_one(
+            table="room_forgetter_stream_pos",
+            keyvalues={},
+            updatevalues={"stream_id": stream_id},
+            desc="room_forgetter_stream_pos",
+        )
+
 
 class RoomMemberBackgroundUpdateStore(SQLBaseStore):
     def __init__(
@@ -1553,29 +1597,6 @@ class RoomMemberStore(
     ):
         super().__init__(database, db_conn, hs)
 
-    async def forget(self, user_id: str, room_id: str) -> None:
-        """Indicate that user_id wishes to discard history for room_id."""
-
-        def f(txn: LoggingTransaction) -> None:
-            sql = (
-                "UPDATE"
-                "  room_memberships"
-                " SET"
-                "  forgotten = 1"
-                " WHERE"
-                "  user_id = ?"
-                " AND"
-                "  room_id = ?"
-            )
-            txn.execute(sql, (user_id, room_id))
-
-            self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id))
-            self._invalidate_cache_and_stream(
-                txn, self.get_forgotten_rooms_for_user, (user_id,)
-            )
-
-        await self.db_pool.runInteraction("forget_membership", f)
-
 
 def extract_heroes_from_room_summary(
     details: Mapping[str, MemberSummary], me: str
diff --git a/synapse/storage/schema/main/delta/76/04_add_room_forgetter.sql b/synapse/storage/schema/main/delta/76/04_add_room_forgetter.sql
new file mode 100644
index 0000000000..be4b57d86f
--- /dev/null
+++ b/synapse/storage/schema/main/delta/76/04_add_room_forgetter.sql
@@ -0,0 +1,24 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE room_forgetter_stream_pos (
+    Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE,  -- Makes sure this table only has one row.
+    stream_id  BIGINT NOT NULL,
+    CHECK (Lock='X')
+);
+
+INSERT INTO room_forgetter_stream_pos (
+    stream_id
+) SELECT COALESCE(MAX(stream_ordering), 0) from events;