diff options
30 files changed, 748 insertions, 380 deletions
diff --git a/changelog.d/15224.feature b/changelog.d/15224.feature new file mode 100644 index 0000000000..5d8413f8be --- /dev/null +++ b/changelog.d/15224.feature @@ -0,0 +1 @@ +Add `forget_rooms_on_leave` config option to automatically forget rooms when users leave them or are removed from them. diff --git a/changelog.d/15437.misc b/changelog.d/15437.misc new file mode 100644 index 0000000000..2dea23784f --- /dev/null +++ b/changelog.d/15437.misc @@ -0,0 +1 @@ +Make the `thread_id` column on `event_push_actions`, `event_push_actions_staging`, and `event_push_summary` non-null. diff --git a/changelog.d/15516.feature b/changelog.d/15516.feature new file mode 100644 index 0000000000..02a101bb88 --- /dev/null +++ b/changelog.d/15516.feature @@ -0,0 +1 @@ +Add a config option to delay push notifications by a random amount, to discourage time-based profiling. diff --git a/changelog.d/15522.misc b/changelog.d/15522.misc new file mode 100644 index 0000000000..a5a229e4a0 --- /dev/null +++ b/changelog.d/15522.misc @@ -0,0 +1 @@ +Remove references to supporting per-user flag for [MSC2654](https://github.com/matrix-org/matrix-spec-proposals/pull/2654) (#15522). diff --git a/changelog.d/15527.misc b/changelog.d/15527.misc new file mode 100644 index 0000000000..752a32adeb --- /dev/null +++ b/changelog.d/15527.misc @@ -0,0 +1 @@ +Don't use a trusted key server when running the demo scripts. \ No newline at end of file diff --git a/changelog.d/15529.misc b/changelog.d/15529.misc new file mode 100644 index 0000000000..7ad424d8df --- /dev/null +++ b/changelog.d/15529.misc @@ -0,0 +1 @@ +Speed up rebuilding of the user directory for local users. diff --git a/changelog.d/15531.misc b/changelog.d/15531.misc new file mode 100644 index 0000000000..6d4da961b5 --- /dev/null +++ b/changelog.d/15531.misc @@ -0,0 +1 @@ +Speed up deleting of old rows in `event_push_actions`. diff --git a/demo/start.sh b/demo/start.sh index fdd75816fb..06ec6f985f 100755 --- a/demo/start.sh +++ b/demo/start.sh @@ -46,7 +46,7 @@ for port in 8080 8081 8082; do echo '' # Warning, this heredoc depends on the interaction of tabs and spaces. - # Please don't accidentaly bork me with your fancy settings. + # Please don't accidentally bork me with your fancy settings. listeners=$(cat <<-PORTLISTENERS # Configure server to listen on both $https_port and $port # This overides some of the default settings above @@ -80,12 +80,8 @@ for port in 8080 8081 8082; do echo "tls_certificate_path: \"$DIR/$port/localhost:$port.tls.crt\"" echo "tls_private_key_path: \"$DIR/$port/localhost:$port.tls.key\"" - # Ignore keys from the trusted keys server - echo '# Ignore keys from the trusted keys server' - echo 'trusted_key_servers:' - echo ' - server_name: "matrix.org"' - echo ' accept_keys_insecurely: true' - echo '' + # Request keys directly from servers contacted over federation + echo 'trusted_key_servers: []' # Allow the servers to communicate over localhost. allow_list=$(cat <<-ALLOW_LIST diff --git a/docs/admin_api/experimental_features.md b/docs/admin_api/experimental_features.md index c1aebe4b01..07b630915d 100644 --- a/docs/admin_api/experimental_features.md +++ b/docs/admin_api/experimental_features.md @@ -1,10 +1,12 @@ # Experimental Features API This API allows a server administrator to enable or disable some experimental features on a per-user -basis. Currently supported features are [msc3026](https://github.com/matrix-org/matrix-spec-proposals/pull/3026): busy -presence state enabled, [msc2654](https://github.com/matrix-org/matrix-spec-proposals/pull/2654): enable unread counts, -[msc3881](https://github.com/matrix-org/matrix-spec-proposals/pull/3881): enable remotely toggling push notifications -for another client, and [msc3967](https://github.com/matrix-org/matrix-spec-proposals/pull/3967): do not require +basis. The currently supported features are: +- [MSC3026](https://github.com/matrix-org/matrix-spec-proposals/pull/3026): busy +presence state enabled +- [MSC3881](https://github.com/matrix-org/matrix-spec-proposals/pull/3881): enable remotely toggling push notifications +for another client +- [MSC3967](https://github.com/matrix-org/matrix-spec-proposals/pull/3967): do not require UIA when first uploading cross-signing keys. @@ -19,7 +21,7 @@ provide a body containing the user id and listing the features to enable/disable { "features": { "msc3026":true, - "msc2654":true + "msc3881":true } } ``` @@ -46,7 +48,6 @@ user like so: { "features": { "msc3026": true, - "msc2654": true, "msc3881": false, "msc3967": false } diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md index 1b6f256949..14c21f73fe 100644 --- a/docs/usage/configuration/config_documentation.md +++ b/docs/usage/configuration/config_documentation.md @@ -3442,6 +3442,9 @@ This option has a number of sub-options. They are as follows: user has unread messages in. Defaults to true, meaning push clients will see the number of rooms with unread messages in them. Set to false to instead send the number of unread messages. +* `jitter_delay`: Delays push notifications by a random amount up to the given + duration. Useful for mitigating timing attacks. Optional, defaults to no + delay. _Added in Synapse 1.84.0._ Example configuration: ```yaml @@ -3449,6 +3452,7 @@ push: enabled: true include_content: false group_unread_count_by_room: false + jitter_delay: "10s" ``` --- ## Rooms @@ -3695,6 +3699,16 @@ default_power_level_content_override: trusted_private_chat: null public_chat: null ``` +--- +### `forget_rooms_on_leave` + +Set to true to automatically forget rooms for users when they leave them, either +normally or via a kick or ban. Defaults to false. + +Example configuration: +```yaml +forget_rooms_on_leave: false +``` --- ## Opentracing diff --git a/synapse/config/push.py b/synapse/config/push.py index 3b5378e6ea..8177ff52e2 100644 --- a/synapse/config/push.py +++ b/synapse/config/push.py @@ -42,11 +42,17 @@ class PushConfig(Config): # Now check for the one in the 'email' section and honour it, # with a warning. - push_config = config.get("email") or {} - redact_content = push_config.get("redact_content") + email_push_config = config.get("email") or {} + redact_content = email_push_config.get("redact_content") if redact_content is not None: print( "The 'email.redact_content' option is deprecated: " "please set push.include_content instead" ) self.push_include_content = not redact_content + + # Whether to apply a random delay to outbound push. + self.push_jitter_delay_ms = None + push_jitter_delay = push_config.get("jitter_delay", None) + if push_jitter_delay: + self.push_jitter_delay_ms = self.parse_duration(push_jitter_delay) diff --git a/synapse/config/room.py b/synapse/config/room.py index 4a7ac00540..b6696cd129 100644 --- a/synapse/config/room.py +++ b/synapse/config/room.py @@ -75,3 +75,7 @@ class RoomConfig(Config): % preset ) # We validate the actual overrides when we try to apply them. + + # When enabled, users will forget rooms when they leave them, either via a + # leave, kick or ban. + self.forget_on_leave = config.get("forget_rooms_on_leave", False) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index ed805d6ec8..fbef600acd 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -16,7 +16,7 @@ import abc import logging import random from http import HTTPStatus -from typing import TYPE_CHECKING, Iterable, List, Optional, Set, Tuple +from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple from synapse import types from synapse.api.constants import ( @@ -38,7 +38,10 @@ from synapse.event_auth import get_named_level, get_power_level_event from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN +from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler from synapse.logging import opentracing +from synapse.metrics import event_processing_positions +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.module_api import NOT_SPAM from synapse.types import ( JsonDict, @@ -280,9 +283,25 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): """ raise NotImplementedError() - @abc.abstractmethod async def forget(self, user: UserID, room_id: str) -> None: - raise NotImplementedError() + user_id = user.to_string() + + member = await self._storage_controllers.state.get_current_state_event( + room_id=room_id, event_type=EventTypes.Member, state_key=user_id + ) + membership = member.membership if member else None + + if membership is not None and membership not in [ + Membership.LEAVE, + Membership.BAN, + ]: + raise SynapseError(400, "User %s in room %s" % (user_id, room_id)) + + # In normal case this call is only required if `membership` is not `None`. + # But: After the last member had left the room, the background update + # `_background_remove_left_rooms` is deleting rows related to this room from + # the table `current_state_events` and `get_current_state_events` is `None`. + await self.store.forget(user_id, room_id) async def ratelimit_multiple_invites( self, @@ -2046,25 +2065,141 @@ class RoomMemberMasterHandler(RoomMemberHandler): """Implements RoomMemberHandler._user_left_room""" user_left_room(self.distributor, target, room_id) - async def forget(self, user: UserID, room_id: str) -> None: - user_id = user.to_string() - member = await self._storage_controllers.state.get_current_state_event( - room_id=room_id, event_type=EventTypes.Member, state_key=user_id - ) - membership = member.membership if member else None +class RoomForgetterHandler(StateDeltasHandler): + """Forgets rooms when they are left, when enabled in the homeserver config. - if membership is not None and membership not in [ - Membership.LEAVE, - Membership.BAN, - ]: - raise SynapseError(400, "User %s in room %s" % (user_id, room_id)) + For the purposes of this feature, kicks, bans and "leaves" via state resolution + weirdness are all considered to be leaves. - # In normal case this call is only required if `membership` is not `None`. - # But: After the last member had left the room, the background update - # `_background_remove_left_rooms` is deleting rows related to this room from - # the table `current_state_events` and `get_current_state_events` is `None`. - await self.store.forget(user_id, room_id) + Derived from `StatsHandler` and `UserDirectoryHandler`. + """ + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + + self._hs = hs + self._store = hs.get_datastores().main + self._storage_controllers = hs.get_storage_controllers() + self._clock = hs.get_clock() + self._notifier = hs.get_notifier() + self._room_member_handler = hs.get_room_member_handler() + + # The current position in the current_state_delta stream + self.pos: Optional[int] = None + + # Guard to ensure we only process deltas one at a time + self._is_processing = False + + if hs.config.worker.run_background_tasks: + self._notifier.add_replication_callback(self.notify_new_event) + + # We kick this off to pick up outstanding work from before the last restart. + self._clock.call_later(0, self.notify_new_event) + + def notify_new_event(self) -> None: + """Called when there may be more deltas to process""" + if self._is_processing: + return + + self._is_processing = True + + async def process() -> None: + try: + await self._unsafe_process() + finally: + self._is_processing = False + + run_as_background_process("room_forgetter.notify_new_event", process) + + async def _unsafe_process(self) -> None: + # If self.pos is None then means we haven't fetched it from DB + if self.pos is None: + self.pos = await self._store.get_room_forgetter_stream_pos() + room_max_stream_ordering = self._store.get_room_max_stream_ordering() + if self.pos > room_max_stream_ordering: + # apparently, we've processed more events than exist in the database! + # this can happen if events are removed with history purge or similar. + logger.warning( + "Event stream ordering appears to have gone backwards (%i -> %i): " + "rewinding room forgetter processor", + self.pos, + room_max_stream_ordering, + ) + self.pos = room_max_stream_ordering + + if not self._hs.config.room.forget_on_leave: + # Update the processing position, so that if the server admin turns the + # feature on at a later date, we don't decide to forget every room that + # has ever been left in the past. + self.pos = self._store.get_room_max_stream_ordering() + await self._store.update_room_forgetter_stream_pos(self.pos) + return + + # Loop round handling deltas until we're up to date + + while True: + # Be sure to read the max stream_ordering *before* checking if there are any outstanding + # deltas, since there is otherwise a chance that we could miss updates which arrive + # after we check the deltas. + room_max_stream_ordering = self._store.get_room_max_stream_ordering() + if self.pos == room_max_stream_ordering: + break + + logger.debug( + "Processing room forgetting %s->%s", self.pos, room_max_stream_ordering + ) + ( + max_pos, + deltas, + ) = await self._storage_controllers.state.get_current_state_deltas( + self.pos, room_max_stream_ordering + ) + + logger.debug("Handling %d state deltas", len(deltas)) + await self._handle_deltas(deltas) + + self.pos = max_pos + + # Expose current event processing position to prometheus + event_processing_positions.labels("room_forgetter").set(max_pos) + + await self._store.update_room_forgetter_stream_pos(max_pos) + + async def _handle_deltas(self, deltas: List[Dict[str, Any]]) -> None: + """Called with the state deltas to process""" + for delta in deltas: + typ = delta["type"] + state_key = delta["state_key"] + room_id = delta["room_id"] + event_id = delta["event_id"] + prev_event_id = delta["prev_event_id"] + + if typ != EventTypes.Member: + continue + + if not self._hs.is_mine_id(state_key): + continue + + change = await self._get_key_change( + prev_event_id, + event_id, + key_name="membership", + public_value=Membership.JOIN, + ) + is_leave = change is MatchChange.now_false + + if is_leave: + try: + await self._room_member_handler.forget( + UserID.from_string(state_key), room_id + ) + except SynapseError as e: + if e.code == 400: + # The user is back in the room. + pass + else: + raise def get_users_which_can_issue_invite(auth_events: StateMap[EventBase]) -> List[str]: diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py index 76e36b8a6d..e8ff1ad063 100644 --- a/synapse/handlers/room_member_worker.py +++ b/synapse/handlers/room_member_worker.py @@ -137,6 +137,3 @@ class RoomMemberWorkerHandler(RoomMemberHandler): await self._notify_change_client( user_id=target.to_string(), room_id=room_id, change="left" ) - - async def forget(self, target: UserID, room_id: str) -> None: - raise RuntimeError("Cannot forget rooms on workers.") diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 4f8fa445d9..e91ee05e99 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +import random import urllib.parse from typing import TYPE_CHECKING, Dict, List, Optional, Union @@ -114,6 +115,8 @@ class HttpPusher(Pusher): ) self._pusherpool = hs.get_pusherpool() + self.push_jitter_delay_ms = hs.config.push.push_jitter_delay_ms + self.data = pusher_config.data if self.data is None: raise PusherConfigException("'data' key can not be null for HTTP pusher") @@ -326,6 +329,21 @@ class HttpPusher(Pusher): event = await self.store.get_event(push_action.event_id, allow_none=True) if event is None: return True # It's been redacted + + # Check if we should delay sending out the notification by a random + # amount. + # + # Note: we base the delay off of when the event was sent, rather than + # now, to handle the case where we need to send out many notifications + # at once. If we just slept the random amount each loop then the last + # push notification in the set could be delayed by many times the max + # delay. + if self.push_jitter_delay_ms: + delay_ms = random.randint(1, self.push_jitter_delay_ms) + diff_ms = event.origin_server_ts + delay_ms - self.clock.time_msec() + if diff_ms > 0: + await self.clock.sleep(diff_ms / 1000) + rejected = await self.dispatch_push_event(event, tweaks, badge) if rejected is False: return False diff --git a/synapse/rest/admin/experimental_features.py b/synapse/rest/admin/experimental_features.py index 1d409ac2b7..abf273af10 100644 --- a/synapse/rest/admin/experimental_features.py +++ b/synapse/rest/admin/experimental_features.py @@ -33,7 +33,6 @@ class ExperimentalFeature(str, Enum): """ MSC3026 = "msc3026" - MSC2654 = "msc2654" MSC3881 = "msc3881" MSC3967 = "msc3967" diff --git a/synapse/server.py b/synapse/server.py index 08ad97b952..e597627a6d 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -93,7 +93,11 @@ from synapse.handlers.room import ( ) from synapse.handlers.room_batch import RoomBatchHandler from synapse.handlers.room_list import RoomListHandler -from synapse.handlers.room_member import RoomMemberHandler, RoomMemberMasterHandler +from synapse.handlers.room_member import ( + RoomForgetterHandler, + RoomMemberHandler, + RoomMemberMasterHandler, +) from synapse.handlers.room_member_worker import RoomMemberWorkerHandler from synapse.handlers.room_summary import RoomSummaryHandler from synapse.handlers.search import SearchHandler @@ -232,6 +236,7 @@ class HomeServer(metaclass=abc.ABCMeta): "message", "pagination", "profile", + "room_forgetter", "stats", ] @@ -827,6 +832,10 @@ class HomeServer(metaclass=abc.ABCMeta): return PushRulesHandler(self) @cache_in_self + def get_room_forgetter_handler(self) -> RoomForgetterHandler: + return RoomForgetterHandler(self) + + @cache_in_self def get_outbound_redis_connection(self) -> "ConnectionHandler": """ The Redis connection used for replication. diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index a99aea8926..ca085ef800 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -561,6 +561,50 @@ class BackgroundUpdater: updater, oneshot=True ) + def register_background_validate_constraint( + self, update_name: str, constraint_name: str, table: str + ) -> None: + """Helper for store classes to do a background validate constraint. + + This only applies on PostgreSQL. + + To use: + + 1. use a schema delta file to add a background update. Example: + INSERT INTO background_updates (update_name, progress_json) VALUES + ('validate_my_constraint', '{}'); + + 2. In the Store constructor, call this method + + Args: + update_name: update_name to register for + constraint_name: name of constraint to validate + table: table the constraint is applied to + """ + + def runner(conn: Connection) -> None: + c = conn.cursor() + + sql = f""" + ALTER TABLE {table} VALIDATE CONSTRAINT {constraint_name}; + """ + logger.debug("[SQL] %s", sql) + c.execute(sql) + + async def updater(progress: JsonDict, batch_size: int) -> int: + assert isinstance( + self.db_pool.engine, engines.PostgresEngine + ), "validate constraint background update registered for non-Postres database" + + logger.info("Validating constraint %s to %s", constraint_name, table) + await self.db_pool.runWithConnection(runner) + await self._end_background_update(update_name) + return 1 + + self._background_update_handlers[update_name] = _BackgroundUpdateHandler( + updater, oneshot=True + ) + async def create_index_in_background( self, index_name: str, diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 1f5f5eb6f8..313cf1a8d0 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -386,13 +386,20 @@ class LoggingTransaction: self.executemany(sql, args) def execute_values( - self, sql: str, values: Iterable[Iterable[Any]], fetch: bool = True + self, + sql: str, + values: Iterable[Iterable[Any]], + template: Optional[str] = None, + fetch: bool = True, ) -> List[Tuple]: """Corresponds to psycopg2.extras.execute_values. Only available when using postgres. The `fetch` parameter must be set to False if the query does not return rows (e.g. INSERTs). + + The `template` is the snippet to merge to every item in argslist to + compose the query. """ assert isinstance(self.database_engine, PostgresEngine) from psycopg2.extras import execute_values @@ -400,7 +407,9 @@ class LoggingTransaction: return self._do_execute( # TODO: is it safe for values to be Iterable[Iterable[Any]] here? # https://www.psycopg.org/docs/extras.html?highlight=execute_batch#psycopg2.extras.execute_values says values should be Sequence[Sequence] - lambda the_sql: execute_values(self.txn, the_sql, values, fetch=fetch), + lambda the_sql: execute_values( + self.txn, the_sql, values, template=template, fetch=fetch + ), sql, ) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index eeccf5db24..2e98a29fef 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -100,7 +100,6 @@ from synapse.storage.database import ( ) from synapse.storage.databases.main.receipts import ReceiptsWorkerStore from synapse.storage.databases.main.stream import StreamWorkerStore -from synapse.types import JsonDict from synapse.util import json_encoder from synapse.util.caches.descriptors import cached @@ -289,180 +288,22 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas unique=True, ) - self.db_pool.updates.register_background_update_handler( - "event_push_backfill_thread_id", - self._background_backfill_thread_id, + self.db_pool.updates.register_background_validate_constraint( + "event_push_actions_staging_thread_id", + constraint_name="event_push_actions_staging_thread_id", + table="event_push_actions_staging", ) - - # Indexes which will be used to quickly make the thread_id column non-null. - self.db_pool.updates.register_background_index_update( - "event_push_actions_thread_id_null", - index_name="event_push_actions_thread_id_null", + self.db_pool.updates.register_background_validate_constraint( + "event_push_actions_thread_id", + constraint_name="event_push_actions_thread_id", table="event_push_actions", - columns=["thread_id"], - where_clause="thread_id IS NULL", ) - self.db_pool.updates.register_background_index_update( - "event_push_summary_thread_id_null", - index_name="event_push_summary_thread_id_null", + self.db_pool.updates.register_background_validate_constraint( + "event_push_summary_thread_id", + constraint_name="event_push_summary_thread_id", table="event_push_summary", - columns=["thread_id"], - where_clause="thread_id IS NULL", ) - # Check ASAP (and then later, every 1s) to see if we have finished - # background updates the event_push_actions and event_push_summary tables. - self._clock.call_later(0.0, self._check_event_push_backfill_thread_id) - self._event_push_backfill_thread_id_done = False - - @wrap_as_background_process("check_event_push_backfill_thread_id") - async def _check_event_push_backfill_thread_id(self) -> None: - """ - Has thread_id finished backfilling? - - If not, we need to just-in-time update it so the queries work. - """ - done = await self.db_pool.updates.has_completed_background_update( - "event_push_backfill_thread_id" - ) - - if done: - self._event_push_backfill_thread_id_done = True - else: - # Reschedule to run. - self._clock.call_later(15.0, self._check_event_push_backfill_thread_id) - - async def _background_backfill_thread_id( - self, progress: JsonDict, batch_size: int - ) -> int: - """ - Fill in the thread_id field for event_push_actions and event_push_summary. - - This is preparatory so that it can be made non-nullable in the future. - - Because all current (null) data is done in an unthreaded manner this - simply assumes it is on the "main" timeline. Since event_push_actions - are periodically cleared it is not possible to correctly re-calculate - the thread_id. - """ - event_push_actions_done = progress.get("event_push_actions_done", False) - - def add_thread_id_txn( - txn: LoggingTransaction, start_stream_ordering: int - ) -> int: - sql = """ - SELECT stream_ordering - FROM event_push_actions - WHERE - thread_id IS NULL - AND stream_ordering > ? - ORDER BY stream_ordering - LIMIT ? - """ - txn.execute(sql, (start_stream_ordering, batch_size)) - - # No more rows to process. - rows = txn.fetchall() - if not rows: - progress["event_push_actions_done"] = True - self.db_pool.updates._background_update_progress_txn( - txn, "event_push_backfill_thread_id", progress - ) - return 0 - - # Update the thread ID for any of those rows. - max_stream_ordering = rows[-1][0] - - sql = """ - UPDATE event_push_actions - SET thread_id = 'main' - WHERE ? < stream_ordering AND stream_ordering <= ? AND thread_id IS NULL - """ - txn.execute( - sql, - ( - start_stream_ordering, - max_stream_ordering, - ), - ) - - # Update progress. - processed_rows = txn.rowcount - progress["max_event_push_actions_stream_ordering"] = max_stream_ordering - self.db_pool.updates._background_update_progress_txn( - txn, "event_push_backfill_thread_id", progress - ) - - return processed_rows - - def add_thread_id_summary_txn(txn: LoggingTransaction) -> int: - min_user_id = progress.get("max_summary_user_id", "") - min_room_id = progress.get("max_summary_room_id", "") - - # Slightly overcomplicated query for getting the Nth user ID / room - # ID tuple, or the last if there are less than N remaining. - sql = """ - SELECT user_id, room_id FROM ( - SELECT user_id, room_id FROM event_push_summary - WHERE (user_id, room_id) > (?, ?) - AND thread_id IS NULL - ORDER BY user_id, room_id - LIMIT ? - ) AS e - ORDER BY user_id DESC, room_id DESC - LIMIT 1 - """ - - txn.execute(sql, (min_user_id, min_room_id, batch_size)) - row = txn.fetchone() - if not row: - return 0 - - max_user_id, max_room_id = row - - sql = """ - UPDATE event_push_summary - SET thread_id = 'main' - WHERE - (?, ?) < (user_id, room_id) AND (user_id, room_id) <= (?, ?) - AND thread_id IS NULL - """ - txn.execute(sql, (min_user_id, min_room_id, max_user_id, max_room_id)) - processed_rows = txn.rowcount - - progress["max_summary_user_id"] = max_user_id - progress["max_summary_room_id"] = max_room_id - self.db_pool.updates._background_update_progress_txn( - txn, "event_push_backfill_thread_id", progress - ) - - return processed_rows - - # First update the event_push_actions table, then the event_push_summary table. - # - # Note that the event_push_actions_staging table is ignored since it is - # assumed that items in that table will only exist for a short period of - # time. - if not event_push_actions_done: - result = await self.db_pool.runInteraction( - "event_push_backfill_thread_id", - add_thread_id_txn, - progress.get("max_event_push_actions_stream_ordering", 0), - ) - else: - result = await self.db_pool.runInteraction( - "event_push_backfill_thread_id", - add_thread_id_summary_txn, - ) - - # Only done after the event_push_summary table is done. - if not result: - await self.db_pool.updates._end_background_update( - "event_push_backfill_thread_id" - ) - - return result - async def get_unread_counts_by_room_for_user(self, user_id: str) -> Dict[str, int]: """Get the notification count by room for a user. Only considers notifications, not highlight or unread counts, and threads are currently aggregated under their room. @@ -711,25 +552,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE), ) - # First ensure that the existing rows have an updated thread_id field. - if not self._event_push_backfill_thread_id_done: - txn.execute( - """ - UPDATE event_push_summary - SET thread_id = ? - WHERE room_id = ? AND user_id = ? AND thread_id is NULL - """, - (MAIN_TIMELINE, room_id, user_id), - ) - txn.execute( - """ - UPDATE event_push_actions - SET thread_id = ? - WHERE room_id = ? AND user_id = ? AND thread_id is NULL - """, - (MAIN_TIMELINE, room_id, user_id), - ) - # First we pull the counts from the summary table. # # We check that `last_receipt_stream_ordering` matches the stream ordering of the @@ -1545,25 +1367,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas (room_id, user_id, stream_ordering, *thread_args), ) - # First ensure that the existing rows have an updated thread_id field. - if not self._event_push_backfill_thread_id_done: - txn.execute( - """ - UPDATE event_push_summary - SET thread_id = ? - WHERE room_id = ? AND user_id = ? AND thread_id is NULL - """, - (MAIN_TIMELINE, room_id, user_id), - ) - txn.execute( - """ - UPDATE event_push_actions - SET thread_id = ? - WHERE room_id = ? AND user_id = ? AND thread_id is NULL - """, - (MAIN_TIMELINE, room_id, user_id), - ) - # Fetch the notification counts between the stream ordering of the # latest receipt and what was previously summarised. unread_counts = self._get_notif_unread_count_for_user_room( @@ -1698,19 +1501,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas rotate_to_stream_ordering: The new maximum event stream ordering to summarise. """ - # Ensure that any new actions have an updated thread_id. - if not self._event_push_backfill_thread_id_done: - txn.execute( - """ - UPDATE event_push_actions - SET thread_id = ? - WHERE ? < stream_ordering AND stream_ordering <= ? AND thread_id IS NULL - """, - (MAIN_TIMELINE, old_rotate_stream_ordering, rotate_to_stream_ordering), - ) - - # XXX Do we need to update summaries here too? - # Calculate the new counts that should be upserted into event_push_summary sql = """ SELECT user_id, room_id, thread_id, @@ -1773,20 +1563,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas logger.info("Rotating notifications, handling %d rows", len(summaries)) - # Ensure that any updated threads have the proper thread_id. - if not self._event_push_backfill_thread_id_done: - txn.execute_batch( - """ - UPDATE event_push_summary - SET thread_id = ? - WHERE room_id = ? AND user_id = ? AND thread_id is NULL - """, - [ - (MAIN_TIMELINE, room_id, user_id) - for user_id, room_id, _ in summaries - ], - ) - self.db_pool.simple_upsert_many_txn( txn, table="event_push_summary", @@ -1836,6 +1612,15 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas # deletes. batch_size = self._rotate_count + if isinstance(self.database_engine, PostgresEngine): + # Temporarily disable sequential scans in this transaction. We + # need to do this as the postgres statistics don't take into + # account the `highlight = 0` part when estimating the + # distribution of `stream_ordering`. I.e. since we keep old + # highlight rows the query planner thinks there are way more old + # rows to delete than there actually are. + txn.execute("SET LOCAL enable_seqscan=off") + txn.execute( """ SELECT stream_ordering FROM event_push_actions diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index daad58291a..e068f27a10 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -82,7 +82,7 @@ class EventIdMembership: membership: str -class RoomMemberWorkerStore(EventsWorkerStore): +class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): def __init__( self, database: DatabasePool, @@ -1372,6 +1372,50 @@ class RoomMemberWorkerStore(EventsWorkerStore): _is_local_host_in_room_ignoring_users_txn, ) + async def forget(self, user_id: str, room_id: str) -> None: + """Indicate that user_id wishes to discard history for room_id.""" + + def f(txn: LoggingTransaction) -> None: + self.db_pool.simple_update_txn( + txn, + table="room_memberships", + keyvalues={"user_id": user_id, "room_id": room_id}, + updatevalues={"forgotten": 1}, + ) + + self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id)) + self._invalidate_cache_and_stream( + txn, self.get_forgotten_rooms_for_user, (user_id,) + ) + + await self.db_pool.runInteraction("forget_membership", f) + + async def get_room_forgetter_stream_pos(self) -> int: + """Get the stream position of the background process to forget rooms when left + by users. + """ + return await self.db_pool.simple_select_one_onecol( + table="room_forgetter_stream_pos", + keyvalues={}, + retcol="stream_id", + desc="room_forgetter_stream_pos", + ) + + async def update_room_forgetter_stream_pos(self, stream_id: int) -> None: + """Update the stream position of the background process to forget rooms when + left by users. + + Must only be used by the worker running the background process. + """ + assert self.hs.config.worker.run_background_tasks + + await self.db_pool.simple_update_one( + table="room_forgetter_stream_pos", + keyvalues={}, + updatevalues={"stream_id": stream_id}, + desc="room_forgetter_stream_pos", + ) + class RoomMemberBackgroundUpdateStore(SQLBaseStore): def __init__( @@ -1553,29 +1597,6 @@ class RoomMemberStore( ): super().__init__(database, db_conn, hs) - async def forget(self, user_id: str, room_id: str) -> None: - """Indicate that user_id wishes to discard history for room_id.""" - - def f(txn: LoggingTransaction) -> None: - sql = ( - "UPDATE" - " room_memberships" - " SET" - " forgotten = 1" - " WHERE" - " user_id = ?" - " AND" - " room_id = ?" - ) - txn.execute(sql, (user_id, room_id)) - - self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id)) - self._invalidate_cache_and_stream( - txn, self.get_forgotten_rooms_for_user, (user_id,) - ) - - await self.db_pool.runInteraction("forget_membership", f) - def extract_heroes_from_room_summary( details: Mapping[str, MemberSummary], me: str diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index 5d65faed16..b7d58978de 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -27,6 +27,8 @@ from typing import ( cast, ) +import attr + try: # Figure out if ICU support is available for searching users. import icu @@ -66,6 +68,19 @@ logger = logging.getLogger(__name__) TEMP_TABLE = "_temp_populate_user_directory" +@attr.s(auto_attribs=True, frozen=True) +class _UserDirProfile: + """Helper type for the user directory code for an entry to be inserted into + the directory. + """ + + user_id: str + + # If the display name or avatar URL are unexpected types, replace with None + display_name: Optional[str] = attr.ib(default=None, converter=non_null_str_or_none) + avatar_url: Optional[str] = attr.ib(default=None, converter=non_null_str_or_none) + + class UserDirectoryBackgroundUpdateStore(StateDeltasStore): # How many records do we calculate before sending it to # add_users_who_share_private_rooms? @@ -381,25 +396,65 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): % (len(users_to_work_on), progress["remaining"]) ) - for user_id in users_to_work_on: - if await self.should_include_local_user_in_dir(user_id): - profile = await self.get_profileinfo(get_localpart_from_id(user_id)) # type: ignore[attr-defined] - await self.update_profile_in_user_dir( - user_id, profile.display_name, profile.avatar_url - ) - - # We've finished processing a user. Delete it from the table. - await self.db_pool.simple_delete_one( - TEMP_TABLE + "_users", {"user_id": user_id} - ) - # Update the remaining counter. - progress["remaining"] -= 1 - await self.db_pool.runInteraction( - "populate_user_directory", - self.db_pool.updates._background_update_progress_txn, - "populate_user_directory_process_users", - progress, + # First filter down to users we want to insert into the user directory. + users_to_insert = [ + user_id + for user_id in users_to_work_on + if await self.should_include_local_user_in_dir(user_id) + ] + + # Next fetch their profiles. Note that the `user_id` here is the + # *localpart*, and that not all users have profiles. + profile_rows = await self.db_pool.simple_select_many_batch( + table="profiles", + column="user_id", + iterable=[get_localpart_from_id(u) for u in users_to_insert], + retcols=( + "user_id", + "displayname", + "avatar_url", + ), + keyvalues={}, + desc="populate_user_directory_process_users_get_profiles", + ) + profiles = { + f"@{row['user_id']}:{self.server_name}": _UserDirProfile( + f"@{row['user_id']}:{self.server_name}", + row["displayname"], + row["avatar_url"], ) + for row in profile_rows + } + + profiles_to_insert = [ + profiles.get(user_id) or _UserDirProfile(user_id) + for user_id in users_to_insert + ] + + # Actually insert the users with their profiles into the directory. + await self.db_pool.runInteraction( + "populate_user_directory_process_users_insertion", + self._update_profiles_in_user_dir_txn, + profiles_to_insert, + ) + + # We've finished processing the users. Delete it from the table. + await self.db_pool.simple_delete_many( + table=TEMP_TABLE + "_users", + column="user_id", + iterable=users_to_work_on, + keyvalues={}, + desc="populate_user_directory_process_users_delete", + ) + + # Update the remaining counter. + progress["remaining"] -= len(users_to_work_on) + await self.db_pool.runInteraction( + "populate_user_directory", + self.db_pool.updates._background_update_progress_txn, + "populate_user_directory_process_users", + progress, + ) return len(users_to_work_on) @@ -584,72 +639,102 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): Update or add a user's profile in the user directory. If the user is remote, the profile will be marked as not stale. """ - # If the display name or avatar URL are unexpected types, replace with None. - display_name = non_null_str_or_none(display_name) - avatar_url = non_null_str_or_none(avatar_url) + await self.db_pool.runInteraction( + "update_profiles_in_user_dir", + self._update_profiles_in_user_dir_txn, + [_UserDirProfile(user_id, display_name, avatar_url)], + ) + + def _update_profiles_in_user_dir_txn( + self, + txn: LoggingTransaction, + profiles: Sequence[_UserDirProfile], + ) -> None: + self.db_pool.simple_upsert_many_txn( + txn, + table="user_directory", + key_names=("user_id",), + key_values=[(p.user_id,) for p in profiles], + value_names=("display_name", "avatar_url"), + value_values=[ + ( + p.display_name, + p.avatar_url, + ) + for p in profiles + ], + ) - def _update_profile_in_user_dir_txn(txn: LoggingTransaction) -> None: - self.db_pool.simple_upsert_txn( + # Remote users: Make sure the profile is not marked as stale anymore. + remote_users = [ + p.user_id for p in profiles if not self.hs.is_mine_id(p.user_id) + ] + if remote_users: + self.db_pool.simple_delete_many_txn( txn, - table="user_directory", - keyvalues={"user_id": user_id}, - values={"display_name": display_name, "avatar_url": avatar_url}, + table="user_directory_stale_remote_users", + column="user_id", + values=remote_users, + keyvalues={}, ) - if not self.hs.is_mine_id(user_id): - # Remote users: Make sure the profile is not marked as stale anymore. - self.db_pool.simple_delete_txn( - txn, - table="user_directory_stale_remote_users", - keyvalues={"user_id": user_id}, + if isinstance(self.database_engine, PostgresEngine): + # We weight the localpart most highly, then display name and finally + # server name + template = """ + ( + %s, + setweight(to_tsvector('simple', %s), 'A') + || setweight(to_tsvector('simple', %s), 'D') + || setweight(to_tsvector('simple', COALESCE(%s, '')), 'B') ) + """ - # The display name that goes into the database index. - index_display_name = display_name - if index_display_name is not None: - index_display_name = _filter_text_for_index(index_display_name) - - if isinstance(self.database_engine, PostgresEngine): - # We weight the localpart most highly, then display name and finally - # server name - sql = """ - INSERT INTO user_directory_search(user_id, vector) - VALUES (?, - setweight(to_tsvector('simple', ?), 'A') - || setweight(to_tsvector('simple', ?), 'D') - || setweight(to_tsvector('simple', COALESCE(?, '')), 'B') - ) ON CONFLICT (user_id) DO UPDATE SET vector=EXCLUDED.vector - """ - txn.execute( - sql, + sql = """ + INSERT INTO user_directory_search(user_id, vector) + VALUES ? ON CONFLICT (user_id) DO UPDATE SET vector=EXCLUDED.vector + """ + txn.execute_values( + sql, + [ ( - user_id, - get_localpart_from_id(user_id), - get_domain_from_id(user_id), - index_display_name, - ), - ) - elif isinstance(self.database_engine, Sqlite3Engine): - value = ( - "%s %s" % (user_id, index_display_name) - if index_display_name - else user_id - ) - self.db_pool.simple_upsert_txn( - txn, - table="user_directory_search", - keyvalues={"user_id": user_id}, - values={"value": value}, - ) - else: - # This should be unreachable. - raise Exception("Unrecognized database engine") + p.user_id, + get_localpart_from_id(p.user_id), + get_domain_from_id(p.user_id), + _filter_text_for_index(p.display_name) + if p.display_name + else None, + ) + for p in profiles + ], + template=template, + fetch=False, + ) + elif isinstance(self.database_engine, Sqlite3Engine): + values = [] + for p in profiles: + if p.display_name is not None: + index_display_name = _filter_text_for_index(p.display_name) + value = f"{p.user_id} {index_display_name}" + else: + value = p.user_id - txn.call_after(self.get_user_in_directory.invalidate, (user_id,)) + values.append((value,)) - await self.db_pool.runInteraction( - "update_profile_in_user_dir", _update_profile_in_user_dir_txn - ) + self.db_pool.simple_upsert_many_txn( + txn, + table="user_directory_search", + key_names=("user_id",), + key_values=[(p.user_id,) for p in profiles], + value_names=("value",), + value_values=values, + ) + else: + # This should be unreachable. + raise Exception("Unrecognized database engine") + + for p in profiles: + txn.call_after(self.get_user_in_directory.invalidate, (p.user_id,)) async def add_users_who_share_private_room( self, room_id: str, user_id_tuples: Iterable[Tuple[str, str]] diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index 1672976209..741563abc6 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -106,6 +106,9 @@ Changes in SCHEMA_VERSION = 76: SCHEMA_COMPAT_VERSION = ( # Queries against `event_stream_ordering` columns in membership tables must # be disambiguated. + # + # The threads_id column must written to with non-null values for the + # event_push_actions, event_push_actions_staging, and event_push_summary tables. 74 ) """Limit on how far the synapse codebase can be rolled back without breaking db compat diff --git a/synapse/storage/schema/main/delta/76/04_add_room_forgetter.sql b/synapse/storage/schema/main/delta/76/04_add_room_forgetter.sql new file mode 100644 index 0000000000..be4b57d86f --- /dev/null +++ b/synapse/storage/schema/main/delta/76/04_add_room_forgetter.sql @@ -0,0 +1,24 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE room_forgetter_stream_pos ( + Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row. + stream_id BIGINT NOT NULL, + CHECK (Lock='X') +); + +INSERT INTO room_forgetter_stream_pos ( + stream_id +) SELECT COALESCE(MAX(stream_ordering), 0) from events; diff --git a/synapse/storage/schema/main/delta/76/04thread_notifications_backfill.sql b/synapse/storage/schema/main/delta/76/04thread_notifications_backfill.sql new file mode 100644 index 0000000000..ce6f9ff937 --- /dev/null +++ b/synapse/storage/schema/main/delta/76/04thread_notifications_backfill.sql @@ -0,0 +1,28 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Force the background updates from 06thread_notifications.sql to run in the +-- foreground as code will now require those to be "done". + +DELETE FROM background_updates WHERE update_name = 'event_push_backfill_thread_id'; + +-- Overwrite any null thread_id values. +UPDATE event_push_actions_staging SET thread_id = 'main' WHERE thread_id IS NULL; +UPDATE event_push_actions SET thread_id = 'main' WHERE thread_id IS NULL; +UPDATE event_push_summary SET thread_id = 'main' WHERE thread_id IS NULL; + +-- Drop the background updates to calculate the indexes used to find null thread_ids. +DELETE FROM background_updates WHERE update_name = 'event_push_actions_thread_id_null'; +DELETE FROM background_updates WHERE update_name = 'event_push_summary_thread_id_null'; diff --git a/synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.postgres b/synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.postgres new file mode 100644 index 0000000000..40936def6f --- /dev/null +++ b/synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.postgres @@ -0,0 +1,37 @@ +/* Copyright 2022 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- The thread_id columns can now be made non-nullable, this is done by using a +-- constraint (and not altering the column) to avoid taking out a full table lock. +-- +-- We initially add an invalid constraint which guards against new data (this +-- doesn't lock the table). +ALTER TABLE event_push_actions_staging + ADD CONSTRAINT event_push_actions_staging_thread_id CHECK (thread_id IS NOT NULL) NOT VALID; +ALTER TABLE event_push_actions + ADD CONSTRAINT event_push_actions_thread_id CHECK (thread_id IS NOT NULL) NOT VALID; +ALTER TABLE event_push_summary + ADD CONSTRAINT event_push_summary_thread_id CHECK (thread_id IS NOT NULL) NOT VALID; + +-- We then validate the constraint which doesn't need to worry about new data. It +-- only needs a SHARE UPDATE EXCLUSIVE lock but can still take a while to complete. +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (7605, 'event_push_actions_staging_thread_id', '{}'), + (7605, 'event_push_actions_thread_id', '{}'), + (7605, 'event_push_summary_thread_id', '{}'); + +-- Drop the indexes used to find null thread_ids. +DROP INDEX IF EXISTS event_push_actions_thread_id_null; +DROP INDEX IF EXISTS event_push_summary_thread_id_null; diff --git a/synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.sqlite b/synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.sqlite new file mode 100644 index 0000000000..e9372b6cf9 --- /dev/null +++ b/synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.sqlite @@ -0,0 +1,102 @@ +/* Copyright 2022 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + -- The thread_id columns can now be made non-nullable. +-- +-- SQLite doesn't support modifying columns to an existing table, so it must +-- be recreated. + +-- Create the new tables. +CREATE TABLE event_push_actions_staging_new ( + event_id TEXT NOT NULL, + user_id TEXT NOT NULL, + actions TEXT NOT NULL, + notif SMALLINT NOT NULL, + highlight SMALLINT NOT NULL, + unread SMALLINT, + thread_id TEXT, + inserted_ts BIGINT, + CONSTRAINT event_push_actions_staging_thread_id CHECK (thread_id is NOT NULL) +); + +CREATE TABLE event_push_actions_new ( + room_id TEXT NOT NULL, + event_id TEXT NOT NULL, + user_id TEXT NOT NULL, + profile_tag VARCHAR(32), + actions TEXT NOT NULL, + topological_ordering BIGINT, + stream_ordering BIGINT, + notif SMALLINT, + highlight SMALLINT, + unread SMALLINT, + thread_id TEXT, + CONSTRAINT event_id_user_id_profile_tag_uniqueness UNIQUE (room_id, event_id, user_id, profile_tag), + CONSTRAINT event_push_actions_thread_id CHECK (thread_id is NOT NULL) +); + +CREATE TABLE event_push_summary_new ( + user_id TEXT NOT NULL, + room_id TEXT NOT NULL, + notif_count BIGINT NOT NULL, + stream_ordering BIGINT NOT NULL, + unread_count BIGINT, + last_receipt_stream_ordering BIGINT, + thread_id TEXT, + CONSTRAINT event_push_summary_thread_id CHECK (thread_id is NOT NULL) +); + +-- Copy the data. +INSERT INTO event_push_actions_staging_new (event_id, user_id, actions, notif, highlight, unread, thread_id, inserted_ts) + SELECT event_id, user_id, actions, notif, highlight, unread, thread_id, inserted_ts + FROM event_push_actions_staging; + +INSERT INTO event_push_actions_new (room_id, event_id, user_id, profile_tag, actions, topological_ordering, stream_ordering, notif, highlight, unread, thread_id) + SELECT room_id, event_id, user_id, profile_tag, actions, topological_ordering, stream_ordering, notif, highlight, unread, thread_id + FROM event_push_actions; + +INSERT INTO event_push_summary_new (user_id, room_id, notif_count, stream_ordering, unread_count, last_receipt_stream_ordering, thread_id) + SELECT user_id, room_id, notif_count, stream_ordering, unread_count, last_receipt_stream_ordering, thread_id + FROM event_push_summary; + +-- Drop the old tables. +DROP TABLE event_push_actions_staging; +DROP TABLE event_push_actions; +DROP TABLE event_push_summary; + +-- Rename the tables. +ALTER TABLE event_push_actions_staging_new RENAME TO event_push_actions_staging; +ALTER TABLE event_push_actions_new RENAME TO event_push_actions; +ALTER TABLE event_push_summary_new RENAME TO event_push_summary; + +-- Recreate the indexes. +CREATE INDEX event_push_actions_staging_id ON event_push_actions_staging(event_id); + +CREATE INDEX event_push_actions_highlights_index ON event_push_actions (user_id, room_id, topological_ordering, stream_ordering); +CREATE INDEX event_push_actions_rm_tokens on event_push_actions( user_id, room_id, topological_ordering, stream_ordering ); +CREATE INDEX event_push_actions_room_id_user_id on event_push_actions(room_id, user_id); +CREATE INDEX event_push_actions_stream_ordering on event_push_actions( stream_ordering, user_id ); +CREATE INDEX event_push_actions_u_highlight ON event_push_actions (user_id, stream_ordering); + +CREATE UNIQUE INDEX event_push_summary_unique_index2 ON event_push_summary (user_id, room_id, thread_id) ; + +-- Recreate some indexes in the background, by re-running the background updates +-- from 72/02event_push_actions_index.sql and 72/06thread_notifications.sql. +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (7403, 'event_push_summary_unique_index2', '{}') + ON CONFLICT (update_name) DO UPDATE SET progress_json = '{}'; +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (7403, 'event_push_actions_stream_highlight_index', '{}') + ON CONFLICT (update_name) DO UPDATE SET progress_json = '{}'; diff --git a/tests/handlers/test_room_member.py b/tests/handlers/test_room_member.py index 6a38893b68..a444d822cd 100644 --- a/tests/handlers/test_room_member.py +++ b/tests/handlers/test_room_member.py @@ -333,6 +333,17 @@ class RoomMemberMasterHandlerTestCase(HomeserverTestCase): self.get_success(self.store.is_locally_forgotten_room(self.room_id)) ) + @override_config({"forget_rooms_on_leave": True}) + def test_leave_and_auto_forget(self) -> None: + """Tests the `forget_rooms_on_leave` config option.""" + self.helper.join(self.room_id, user=self.bob, tok=self.bob_token) + + # alice is not the last room member that leaves and forgets the room + self.helper.leave(self.room_id, user=self.alice, tok=self.alice_token) + self.assertTrue( + self.get_success(self.store.did_forget(self.alice, self.room_id)) + ) + def test_leave_and_forget_last_user(self) -> None: """Tests that forget a room is successfully when the last user has left the room.""" diff --git a/tests/push/test_http.py b/tests/push/test_http.py index 99cec0836b..54f558742d 100644 --- a/tests/push/test_http.py +++ b/tests/push/test_http.py @@ -962,3 +962,40 @@ class HTTPPusherTests(HomeserverTestCase): channel.json_body["pushers"][0]["org.matrix.msc3881.device_id"], lookup_result.device_id, ) + + @override_config({"push": {"jitter_delay": "10s"}}) + def test_jitter(self) -> None: + """Tests that enabling jitter actually delays sending push.""" + user_id, access_token = self._make_user_with_pusher("user") + other_user_id, other_access_token = self._make_user_with_pusher("otheruser") + + room = self.helper.create_room_as(user_id, tok=access_token) + self.helper.join(room=room, user=other_user_id, tok=other_access_token) + + # Send a message and check that it did not generate a push, as it should + # be delayed. + self.helper.send(room, body="Hi!", tok=other_access_token) + self.assertEqual(len(self.push_attempts), 0) + + # Now advance time past the max jitter, and assert the message was sent. + self.reactor.advance(15) + self.assertEqual(len(self.push_attempts), 1) + + self.push_attempts[0][0].callback({}) + + # Now we send a bunch of messages and assert that they were all sent + # within the 10s max delay. + for _ in range(10): + self.helper.send(room, body="Hi!", tok=other_access_token) + + index = 1 + for _ in range(11): + while len(self.push_attempts) > index: + self.push_attempts[index][0].callback({}) + self.pump() + index += 1 + + self.reactor.advance(1) + self.pump() + + self.assertEqual(len(self.push_attempts), 11) diff --git a/tests/rest/admin/test_admin.py b/tests/rest/admin/test_admin.py index 645a00b4b1..695e84357a 100644 --- a/tests/rest/admin/test_admin.py +++ b/tests/rest/admin/test_admin.py @@ -399,7 +399,7 @@ class ExperimentalFeaturesTestCase(unittest.HomeserverTestCase): "PUT", url, content={ - "features": {"msc3026": True, "msc2654": True}, + "features": {"msc3026": True, "msc3881": True}, }, access_token=self.admin_user_tok, ) @@ -420,7 +420,7 @@ class ExperimentalFeaturesTestCase(unittest.HomeserverTestCase): ) self.assertEqual( True, - channel.json_body["features"]["msc2654"], + channel.json_body["features"]["msc3881"], ) # test disabling a feature works @@ -448,10 +448,6 @@ class ExperimentalFeaturesTestCase(unittest.HomeserverTestCase): ) self.assertEqual( True, - channel.json_body["features"]["msc2654"], - ) - self.assertEqual( - False, channel.json_body["features"]["msc3881"], ) self.assertEqual( |