diff options
author | Brendan Abolivier <babolivier@matrix.org> | 2019-11-04 17:09:22 +0000 |
---|---|---|
committer | Brendan Abolivier <babolivier@matrix.org> | 2019-11-04 17:09:22 +0000 |
commit | 09957ce0e4dcfd84c2de4039653059faae03065b (patch) | |
tree | fc31ab750a3c485a0cfb9d23b0ab52114a91d954 /synapse/storage | |
parent | Merge pull request #6301 from matrix-org/babolivier/msc2326 (diff) | |
download | synapse-09957ce0e4dcfd84c2de4039653059faae03065b.tar.xz |
Implement per-room message retention policies
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/data_stores/main/events.py | 3 | ||||
-rw-r--r-- | synapse/storage/data_stores/main/room.py | 252 | ||||
-rw-r--r-- | synapse/storage/data_stores/main/schema/delta/56/room_retention.sql | 33 |
3 files changed, 288 insertions, 0 deletions
diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index 301f8ea128..b332a42d82 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -929,6 +929,9 @@ class EventsStore( elif event.type == EventTypes.Redaction: # Insert into the redactions table. self._store_redaction(txn, event) + elif event.type == EventTypes.Retention: + # Update the room_retention table. + self._store_retention_policy_for_room_txn(txn, event) self._handle_event_relations(txn, event) diff --git a/synapse/storage/data_stores/main/room.py b/synapse/storage/data_stores/main/room.py index 67bb1b6f60..54a7d24c73 100644 --- a/synapse/storage/data_stores/main/room.py +++ b/synapse/storage/data_stores/main/room.py @@ -19,10 +19,13 @@ import logging import re from typing import Optional, Tuple +from six import integer_types + from canonicaljson import json from twisted.internet import defer +from synapse.api.constants import EventTypes from synapse.api.errors import StoreError from synapse.storage._base import SQLBaseStore from synapse.storage.data_stores.main.search import SearchStore @@ -302,6 +305,85 @@ class RoomWorkerStore(SQLBaseStore): class RoomStore(RoomWorkerStore, SearchStore): + def __init__(self, db_conn, hs): + super(RoomStore, self).__init__(db_conn, hs) + + self.config = hs.config + + self.register_background_update_handler( + "insert_room_retention", self._background_insert_retention, + ) + + @defer.inlineCallbacks + def _background_insert_retention(self, progress, batch_size): + """Retrieves a list of all rooms within a range and inserts an entry for each of + them into the room_retention table. + NULLs the property's columns if missing from the retention event in the room's + state (or NULLs all of them if there's no retention event in the room's state), + so that we fall back to the server's retention policy. + """ + + last_room = progress.get("room_id", "") + + def _background_insert_retention_txn(txn): + txn.execute( + """ + SELECT state.room_id, state.event_id, events.json + FROM current_state_events as state + LEFT JOIN event_json AS events ON (state.event_id = events.event_id) + WHERE state.room_id > ? AND state.type = '%s' + ORDER BY state.room_id ASC + LIMIT ?; + """ % EventTypes.Retention, + (last_room, batch_size) + ) + + rows = self.cursor_to_dict(txn) + + if not rows: + return True + + for row in rows: + if not row["json"]: + retention_policy = {} + else: + ev = json.loads(row["json"]) + retention_policy = json.dumps(ev["content"]) + + self._simple_insert_txn( + txn=txn, + table="room_retention", + values={ + "room_id": row["room_id"], + "event_id": row["event_id"], + "min_lifetime": retention_policy.get("min_lifetime"), + "max_lifetime": retention_policy.get("max_lifetime"), + } + ) + + logger.info("Inserted %d rows into room_retention", len(rows)) + + self._background_update_progress_txn( + txn, "insert_room_retention", { + "room_id": rows[-1]["room_id"], + } + ) + + if batch_size > len(rows): + return True + else: + return False + + end = yield self.runInteraction( + "insert_room_retention", + _background_insert_retention_txn, + ) + + if end: + yield self._end_background_update("insert_room_retention") + + defer.returnValue(batch_size) + @defer.inlineCallbacks def store_room(self, room_id, room_creator_user_id, is_public): """Stores a room. @@ -502,6 +584,37 @@ class RoomStore(RoomWorkerStore, SearchStore): txn, event, "content.body", event.content["body"] ) + def _store_retention_policy_for_room_txn(self, txn, event): + if ( + hasattr(event, "content") + and ("min_lifetime" in event.content or "max_lifetime" in event.content) + ): + if ( + ("min_lifetime" in event.content and not isinstance( + event.content.get("min_lifetime"), integer_types + )) + or ("max_lifetime" in event.content and not isinstance( + event.content.get("max_lifetime"), integer_types + )) + ): + # Ignore the event if one of the value isn't an integer. + return + + self._simple_insert_txn( + txn=txn, + table="room_retention", + values={ + "room_id": event.room_id, + "event_id": event.event_id, + "min_lifetime": event.content.get("min_lifetime"), + "max_lifetime": event.content.get("max_lifetime"), + }, + ) + + self._invalidate_cache_and_stream( + txn, self.get_retention_policy_for_room, (event.room_id,) + ) + def add_event_report( self, room_id, event_id, user_id, reason, content, received_ts ): @@ -683,3 +796,142 @@ class RoomStore(RoomWorkerStore, SearchStore): remote_media_mxcs.append((hostname, media_id)) return local_media_mxcs, remote_media_mxcs + + @defer.inlineCallbacks + def get_rooms_for_retention_period_in_range(self, min_ms, max_ms, include_null=False): + """Retrieves all of the rooms within the given retention range. + + Optionally includes the rooms which don't have a retention policy. + + Args: + min_ms (int|None): Duration in milliseconds that define the lower limit of + the range to handle (exclusive). If None, doesn't set a lower limit. + max_ms (int|None): Duration in milliseconds that define the upper limit of + the range to handle (inclusive). If None, doesn't set an upper limit. + include_null (bool): Whether to include rooms which retention policy is NULL + in the returned set. + + Returns: + dict[str, dict]: The rooms within this range, along with their retention + policy. The key is "room_id", and maps to a dict describing the retention + policy associated with this room ID. The keys for this nested dict are + "min_lifetime" (int|None), and "max_lifetime" (int|None). + """ + + def get_rooms_for_retention_period_in_range_txn(txn): + range_conditions = [] + args = [] + + if min_ms is not None: + range_conditions.append("max_lifetime > ?") + args.append(min_ms) + + if max_ms is not None: + range_conditions.append("max_lifetime <= ?") + args.append(max_ms) + + # Do a first query which will retrieve the rooms that have a retention policy + # in their current state. + sql = """ + SELECT room_id, min_lifetime, max_lifetime FROM room_retention + INNER JOIN current_state_events USING (event_id, room_id) + """ + + if len(range_conditions): + sql += " WHERE (" + " AND ".join(range_conditions) + ")" + + if include_null: + sql += " OR max_lifetime IS NULL" + + txn.execute(sql, args) + + rows = self.cursor_to_dict(txn) + rooms_dict = {} + + for row in rows: + rooms_dict[row["room_id"]] = { + "min_lifetime": row["min_lifetime"], + "max_lifetime": row["max_lifetime"], + } + + if include_null: + # If required, do a second query that retrieves all of the rooms we know + # of so we can handle rooms with no retention policy. + sql = "SELECT DISTINCT room_id FROM current_state_events" + + txn.execute(sql) + + rows = self.cursor_to_dict(txn) + + # If a room isn't already in the dict (i.e. it doesn't have a retention + # policy in its state), add it with a null policy. + for row in rows: + if row["room_id"] not in rooms_dict: + rooms_dict[row["room_id"]] = { + "min_lifetime": None, + "max_lifetime": None, + } + + return rooms_dict + + rooms = yield self.runInteraction( + "get_rooms_for_retention_period_in_range", + get_rooms_for_retention_period_in_range_txn, + ) + + defer.returnValue(rooms) + + @cachedInlineCallbacks() + def get_retention_policy_for_room(self, room_id): + """Get the retention policy for a given room. + + If no retention policy has been found for this room, returns a policy defined + by the configured default policy (which has None as both the 'min_lifetime' and + the 'max_lifetime' if no default policy has been defined in the server's + configuration). + + Args: + room_id (str): The ID of the room to get the retention policy of. + + Returns: + dict[int, int]: "min_lifetime" and "max_lifetime" for this room. + """ + + def get_retention_policy_for_room_txn(txn): + txn.execute( + """ + SELECT min_lifetime, max_lifetime FROM room_retention + INNER JOIN current_state_events USING (event_id, room_id) + WHERE room_id = ?; + """, + (room_id,) + ) + + return self.cursor_to_dict(txn) + + ret = yield self.runInteraction( + "get_retention_policy_for_room", + get_retention_policy_for_room_txn, + ) + + # If we don't know this room ID, ret will be None, in this case return the default + # policy. + if not ret: + defer.returnValue({ + "min_lifetime": self.config.retention_default_min_lifetime, + "max_lifetime": self.config.retention_default_max_lifetime, + }) + + row = ret[0] + + # If one of the room's policy's attributes isn't defined, use the matching + # attribute from the default policy. + # The default values will be None if no default policy has been defined, or if one + # of the attributes is missing from the default policy. + if row["min_lifetime"] is None: + row["min_lifetime"] = self.config.retention_default_min_lifetime + + if row["max_lifetime"] is None: + row["max_lifetime"] = self.config.retention_default_max_lifetime + + defer.returnValue(row) diff --git a/synapse/storage/data_stores/main/schema/delta/56/room_retention.sql b/synapse/storage/data_stores/main/schema/delta/56/room_retention.sql new file mode 100644 index 0000000000..ee6cdf7a14 --- /dev/null +++ b/synapse/storage/data_stores/main/schema/delta/56/room_retention.sql @@ -0,0 +1,33 @@ +/* Copyright 2019 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Tracks the retention policy of a room. +-- A NULL max_lifetime or min_lifetime means that the matching property is not defined in +-- the room's retention policy state event. +-- If a room doesn't have a retention policy state event in its state, both max_lifetime +-- and min_lifetime are NULL. +CREATE TABLE IF NOT EXISTS room_retention( + room_id TEXT, + event_id TEXT, + min_lifetime BIGINT, + max_lifetime BIGINT, + + PRIMARY KEY(room_id, event_id) +); + +CREATE INDEX room_retention_max_lifetime_idx on room_retention(max_lifetime); + +INSERT INTO background_updates (update_name, progress_json) VALUES + ('insert_room_retention', '{}'); |