diff --git a/changelog.d/15224.feature b/changelog.d/15224.feature
new file mode 100644
index 0000000000..5d8413f8be
--- /dev/null
+++ b/changelog.d/15224.feature
@@ -0,0 +1 @@
+Add `forget_rooms_on_leave` config option to automatically forget rooms when users leave them or are removed from them.
diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md
index b6516191e8..14c21f73fe 100644
--- a/docs/usage/configuration/config_documentation.md
+++ b/docs/usage/configuration/config_documentation.md
@@ -3699,6 +3699,16 @@ default_power_level_content_override:
trusted_private_chat: null
public_chat: null
```
+---
+### `forget_rooms_on_leave`
+
+Set to true to automatically forget rooms for users when they leave them, either
+normally or via a kick or ban. Defaults to false.
+
+Example configuration:
+```yaml
+forget_rooms_on_leave: false
+```
---
## Opentracing
diff --git a/synapse/config/room.py b/synapse/config/room.py
index 4a7ac00540..b6696cd129 100644
--- a/synapse/config/room.py
+++ b/synapse/config/room.py
@@ -75,3 +75,7 @@ class RoomConfig(Config):
% preset
)
# We validate the actual overrides when we try to apply them.
+
+ # When enabled, users will forget rooms when they leave them, either via a
+ # leave, kick or ban.
+ self.forget_on_leave = config.get("forget_rooms_on_leave", False)
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index ed805d6ec8..fbef600acd 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -16,7 +16,7 @@ import abc
import logging
import random
from http import HTTPStatus
-from typing import TYPE_CHECKING, Iterable, List, Optional, Set, Tuple
+from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple
from synapse import types
from synapse.api.constants import (
@@ -38,7 +38,10 @@ from synapse.event_auth import get_named_level, get_power_level_event
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN
+from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler
from synapse.logging import opentracing
+from synapse.metrics import event_processing_positions
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.module_api import NOT_SPAM
from synapse.types import (
JsonDict,
@@ -280,9 +283,25 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
"""
raise NotImplementedError()
- @abc.abstractmethod
async def forget(self, user: UserID, room_id: str) -> None:
- raise NotImplementedError()
+ user_id = user.to_string()
+
+ member = await self._storage_controllers.state.get_current_state_event(
+ room_id=room_id, event_type=EventTypes.Member, state_key=user_id
+ )
+ membership = member.membership if member else None
+
+ if membership is not None and membership not in [
+ Membership.LEAVE,
+ Membership.BAN,
+ ]:
+ raise SynapseError(400, "User %s in room %s" % (user_id, room_id))
+
+ # In normal case this call is only required if `membership` is not `None`.
+ # But: After the last member had left the room, the background update
+ # `_background_remove_left_rooms` is deleting rows related to this room from
+ # the table `current_state_events` and `get_current_state_events` is `None`.
+ await self.store.forget(user_id, room_id)
async def ratelimit_multiple_invites(
self,
@@ -2046,25 +2065,141 @@ class RoomMemberMasterHandler(RoomMemberHandler):
"""Implements RoomMemberHandler._user_left_room"""
user_left_room(self.distributor, target, room_id)
- async def forget(self, user: UserID, room_id: str) -> None:
- user_id = user.to_string()
- member = await self._storage_controllers.state.get_current_state_event(
- room_id=room_id, event_type=EventTypes.Member, state_key=user_id
- )
- membership = member.membership if member else None
+class RoomForgetterHandler(StateDeltasHandler):
+ """Forgets rooms when they are left, when enabled in the homeserver config.
- if membership is not None and membership not in [
- Membership.LEAVE,
- Membership.BAN,
- ]:
- raise SynapseError(400, "User %s in room %s" % (user_id, room_id))
+ For the purposes of this feature, kicks, bans and "leaves" via state resolution
+ weirdness are all considered to be leaves.
- # In normal case this call is only required if `membership` is not `None`.
- # But: After the last member had left the room, the background update
- # `_background_remove_left_rooms` is deleting rows related to this room from
- # the table `current_state_events` and `get_current_state_events` is `None`.
- await self.store.forget(user_id, room_id)
+ Derived from `StatsHandler` and `UserDirectoryHandler`.
+ """
+
+ def __init__(self, hs: "HomeServer"):
+ super().__init__(hs)
+
+ self._hs = hs
+ self._store = hs.get_datastores().main
+ self._storage_controllers = hs.get_storage_controllers()
+ self._clock = hs.get_clock()
+ self._notifier = hs.get_notifier()
+ self._room_member_handler = hs.get_room_member_handler()
+
+ # The current position in the current_state_delta stream
+ self.pos: Optional[int] = None
+
+ # Guard to ensure we only process deltas one at a time
+ self._is_processing = False
+
+ if hs.config.worker.run_background_tasks:
+ self._notifier.add_replication_callback(self.notify_new_event)
+
+ # We kick this off to pick up outstanding work from before the last restart.
+ self._clock.call_later(0, self.notify_new_event)
+
+ def notify_new_event(self) -> None:
+ """Called when there may be more deltas to process"""
+ if self._is_processing:
+ return
+
+ self._is_processing = True
+
+ async def process() -> None:
+ try:
+ await self._unsafe_process()
+ finally:
+ self._is_processing = False
+
+ run_as_background_process("room_forgetter.notify_new_event", process)
+
+ async def _unsafe_process(self) -> None:
+ # If self.pos is None then means we haven't fetched it from DB
+ if self.pos is None:
+ self.pos = await self._store.get_room_forgetter_stream_pos()
+ room_max_stream_ordering = self._store.get_room_max_stream_ordering()
+ if self.pos > room_max_stream_ordering:
+ # apparently, we've processed more events than exist in the database!
+ # this can happen if events are removed with history purge or similar.
+ logger.warning(
+ "Event stream ordering appears to have gone backwards (%i -> %i): "
+ "rewinding room forgetter processor",
+ self.pos,
+ room_max_stream_ordering,
+ )
+ self.pos = room_max_stream_ordering
+
+ if not self._hs.config.room.forget_on_leave:
+ # Update the processing position, so that if the server admin turns the
+ # feature on at a later date, we don't decide to forget every room that
+ # has ever been left in the past.
+ self.pos = self._store.get_room_max_stream_ordering()
+ await self._store.update_room_forgetter_stream_pos(self.pos)
+ return
+
+ # Loop round handling deltas until we're up to date
+
+ while True:
+ # Be sure to read the max stream_ordering *before* checking if there are any outstanding
+ # deltas, since there is otherwise a chance that we could miss updates which arrive
+ # after we check the deltas.
+ room_max_stream_ordering = self._store.get_room_max_stream_ordering()
+ if self.pos == room_max_stream_ordering:
+ break
+
+ logger.debug(
+ "Processing room forgetting %s->%s", self.pos, room_max_stream_ordering
+ )
+ (
+ max_pos,
+ deltas,
+ ) = await self._storage_controllers.state.get_current_state_deltas(
+ self.pos, room_max_stream_ordering
+ )
+
+ logger.debug("Handling %d state deltas", len(deltas))
+ await self._handle_deltas(deltas)
+
+ self.pos = max_pos
+
+ # Expose current event processing position to prometheus
+ event_processing_positions.labels("room_forgetter").set(max_pos)
+
+ await self._store.update_room_forgetter_stream_pos(max_pos)
+
+ async def _handle_deltas(self, deltas: List[Dict[str, Any]]) -> None:
+ """Called with the state deltas to process"""
+ for delta in deltas:
+ typ = delta["type"]
+ state_key = delta["state_key"]
+ room_id = delta["room_id"]
+ event_id = delta["event_id"]
+ prev_event_id = delta["prev_event_id"]
+
+ if typ != EventTypes.Member:
+ continue
+
+ if not self._hs.is_mine_id(state_key):
+ continue
+
+ change = await self._get_key_change(
+ prev_event_id,
+ event_id,
+ key_name="membership",
+ public_value=Membership.JOIN,
+ )
+ is_leave = change is MatchChange.now_false
+
+ if is_leave:
+ try:
+ await self._room_member_handler.forget(
+ UserID.from_string(state_key), room_id
+ )
+ except SynapseError as e:
+ if e.code == 400:
+ # The user is back in the room.
+ pass
+ else:
+ raise
def get_users_which_can_issue_invite(auth_events: StateMap[EventBase]) -> List[str]:
diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py
index 76e36b8a6d..e8ff1ad063 100644
--- a/synapse/handlers/room_member_worker.py
+++ b/synapse/handlers/room_member_worker.py
@@ -137,6 +137,3 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
await self._notify_change_client(
user_id=target.to_string(), room_id=room_id, change="left"
)
-
- async def forget(self, target: UserID, room_id: str) -> None:
- raise RuntimeError("Cannot forget rooms on workers.")
diff --git a/synapse/server.py b/synapse/server.py
index 75a902d64d..a0036578b1 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -94,7 +94,11 @@ from synapse.handlers.room import (
)
from synapse.handlers.room_batch import RoomBatchHandler
from synapse.handlers.room_list import RoomListHandler
-from synapse.handlers.room_member import RoomMemberHandler, RoomMemberMasterHandler
+from synapse.handlers.room_member import (
+ RoomForgetterHandler,
+ RoomMemberHandler,
+ RoomMemberMasterHandler,
+)
from synapse.handlers.room_member_worker import RoomMemberWorkerHandler
from synapse.handlers.room_summary import RoomSummaryHandler
from synapse.handlers.search import SearchHandler
@@ -233,6 +237,7 @@ class HomeServer(metaclass=abc.ABCMeta):
"message",
"pagination",
"profile",
+ "room_forgetter",
"stats",
]
@@ -848,6 +853,10 @@ class HomeServer(metaclass=abc.ABCMeta):
return PushRulesHandler(self)
@cache_in_self
+ def get_room_forgetter_handler(self) -> RoomForgetterHandler:
+ return RoomForgetterHandler(self)
+
+ @cache_in_self
def get_outbound_redis_connection(self) -> "ConnectionHandler":
"""
The Redis connection used for replication.
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index daad58291a..e068f27a10 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -82,7 +82,7 @@ class EventIdMembership:
membership: str
-class RoomMemberWorkerStore(EventsWorkerStore):
+class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
def __init__(
self,
database: DatabasePool,
@@ -1372,6 +1372,50 @@ class RoomMemberWorkerStore(EventsWorkerStore):
_is_local_host_in_room_ignoring_users_txn,
)
+ async def forget(self, user_id: str, room_id: str) -> None:
+ """Indicate that user_id wishes to discard history for room_id."""
+
+ def f(txn: LoggingTransaction) -> None:
+ self.db_pool.simple_update_txn(
+ txn,
+ table="room_memberships",
+ keyvalues={"user_id": user_id, "room_id": room_id},
+ updatevalues={"forgotten": 1},
+ )
+
+ self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id))
+ self._invalidate_cache_and_stream(
+ txn, self.get_forgotten_rooms_for_user, (user_id,)
+ )
+
+ await self.db_pool.runInteraction("forget_membership", f)
+
+ async def get_room_forgetter_stream_pos(self) -> int:
+ """Get the stream position of the background process to forget rooms when left
+ by users.
+ """
+ return await self.db_pool.simple_select_one_onecol(
+ table="room_forgetter_stream_pos",
+ keyvalues={},
+ retcol="stream_id",
+ desc="room_forgetter_stream_pos",
+ )
+
+ async def update_room_forgetter_stream_pos(self, stream_id: int) -> None:
+ """Update the stream position of the background process to forget rooms when
+ left by users.
+
+ Must only be used by the worker running the background process.
+ """
+ assert self.hs.config.worker.run_background_tasks
+
+ await self.db_pool.simple_update_one(
+ table="room_forgetter_stream_pos",
+ keyvalues={},
+ updatevalues={"stream_id": stream_id},
+ desc="room_forgetter_stream_pos",
+ )
+
class RoomMemberBackgroundUpdateStore(SQLBaseStore):
def __init__(
@@ -1553,29 +1597,6 @@ class RoomMemberStore(
):
super().__init__(database, db_conn, hs)
- async def forget(self, user_id: str, room_id: str) -> None:
- """Indicate that user_id wishes to discard history for room_id."""
-
- def f(txn: LoggingTransaction) -> None:
- sql = (
- "UPDATE"
- " room_memberships"
- " SET"
- " forgotten = 1"
- " WHERE"
- " user_id = ?"
- " AND"
- " room_id = ?"
- )
- txn.execute(sql, (user_id, room_id))
-
- self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id))
- self._invalidate_cache_and_stream(
- txn, self.get_forgotten_rooms_for_user, (user_id,)
- )
-
- await self.db_pool.runInteraction("forget_membership", f)
-
def extract_heroes_from_room_summary(
details: Mapping[str, MemberSummary], me: str
diff --git a/synapse/storage/schema/main/delta/76/04_add_room_forgetter.sql b/synapse/storage/schema/main/delta/76/04_add_room_forgetter.sql
new file mode 100644
index 0000000000..be4b57d86f
--- /dev/null
+++ b/synapse/storage/schema/main/delta/76/04_add_room_forgetter.sql
@@ -0,0 +1,24 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE room_forgetter_stream_pos (
+ Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row.
+ stream_id BIGINT NOT NULL,
+ CHECK (Lock='X')
+);
+
+INSERT INTO room_forgetter_stream_pos (
+ stream_id
+) SELECT COALESCE(MAX(stream_ordering), 0) from events;
diff --git a/tests/handlers/test_room_member.py b/tests/handlers/test_room_member.py
index 6a38893b68..a444d822cd 100644
--- a/tests/handlers/test_room_member.py
+++ b/tests/handlers/test_room_member.py
@@ -333,6 +333,17 @@ class RoomMemberMasterHandlerTestCase(HomeserverTestCase):
self.get_success(self.store.is_locally_forgotten_room(self.room_id))
)
+ @override_config({"forget_rooms_on_leave": True})
+ def test_leave_and_auto_forget(self) -> None:
+ """Tests the `forget_rooms_on_leave` config option."""
+ self.helper.join(self.room_id, user=self.bob, tok=self.bob_token)
+
+ # alice is not the last room member that leaves and forgets the room
+ self.helper.leave(self.room_id, user=self.alice, tok=self.alice_token)
+ self.assertTrue(
+ self.get_success(self.store.did_forget(self.alice, self.room_id))
+ )
+
def test_leave_and_forget_last_user(self) -> None:
"""Tests that forget a room is successfully when the last user has left the room."""
|