diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/config/push.py | 10 | ||||
-rw-r--r-- | synapse/config/room.py | 4 | ||||
-rw-r--r-- | synapse/handlers/room_member.py | 173 | ||||
-rw-r--r-- | synapse/handlers/room_member_worker.py | 3 | ||||
-rw-r--r-- | synapse/push/httppusher.py | 18 | ||||
-rw-r--r-- | synapse/rest/admin/experimental_features.py | 1 | ||||
-rw-r--r-- | synapse/server.py | 11 | ||||
-rw-r--r-- | synapse/storage/background_updates.py | 44 | ||||
-rw-r--r-- | synapse/storage/database.py | 13 | ||||
-rw-r--r-- | synapse/storage/databases/main/event_push_actions.py | 253 | ||||
-rw-r--r-- | synapse/storage/databases/main/roommember.py | 69 | ||||
-rw-r--r-- | synapse/storage/databases/main/user_directory.py | 235 | ||||
-rw-r--r-- | synapse/storage/schema/__init__.py | 3 | ||||
-rw-r--r-- | synapse/storage/schema/main/delta/76/04_add_room_forgetter.sql | 24 | ||||
-rw-r--r-- | synapse/storage/schema/main/delta/76/04thread_notifications_backfill.sql | 28 | ||||
-rw-r--r-- | synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.postgres | 37 | ||||
-rw-r--r-- | synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.sqlite | 102 |
17 files changed, 667 insertions, 361 deletions
diff --git a/synapse/config/push.py b/synapse/config/push.py index 3b5378e6ea..8177ff52e2 100644 --- a/synapse/config/push.py +++ b/synapse/config/push.py @@ -42,11 +42,17 @@ class PushConfig(Config): # Now check for the one in the 'email' section and honour it, # with a warning. - push_config = config.get("email") or {} - redact_content = push_config.get("redact_content") + email_push_config = config.get("email") or {} + redact_content = email_push_config.get("redact_content") if redact_content is not None: print( "The 'email.redact_content' option is deprecated: " "please set push.include_content instead" ) self.push_include_content = not redact_content + + # Whether to apply a random delay to outbound push. + self.push_jitter_delay_ms = None + push_jitter_delay = push_config.get("jitter_delay", None) + if push_jitter_delay: + self.push_jitter_delay_ms = self.parse_duration(push_jitter_delay) diff --git a/synapse/config/room.py b/synapse/config/room.py index 4a7ac00540..b6696cd129 100644 --- a/synapse/config/room.py +++ b/synapse/config/room.py @@ -75,3 +75,7 @@ class RoomConfig(Config): % preset ) # We validate the actual overrides when we try to apply them. + + # When enabled, users will forget rooms when they leave them, either via a + # leave, kick or ban. + self.forget_on_leave = config.get("forget_rooms_on_leave", False) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index ed805d6ec8..fbef600acd 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -16,7 +16,7 @@ import abc import logging import random from http import HTTPStatus -from typing import TYPE_CHECKING, Iterable, List, Optional, Set, Tuple +from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple from synapse import types from synapse.api.constants import ( @@ -38,7 +38,10 @@ from synapse.event_auth import get_named_level, get_power_level_event from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN +from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler from synapse.logging import opentracing +from synapse.metrics import event_processing_positions +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.module_api import NOT_SPAM from synapse.types import ( JsonDict, @@ -280,9 +283,25 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): """ raise NotImplementedError() - @abc.abstractmethod async def forget(self, user: UserID, room_id: str) -> None: - raise NotImplementedError() + user_id = user.to_string() + + member = await self._storage_controllers.state.get_current_state_event( + room_id=room_id, event_type=EventTypes.Member, state_key=user_id + ) + membership = member.membership if member else None + + if membership is not None and membership not in [ + Membership.LEAVE, + Membership.BAN, + ]: + raise SynapseError(400, "User %s in room %s" % (user_id, room_id)) + + # In normal case this call is only required if `membership` is not `None`. + # But: After the last member had left the room, the background update + # `_background_remove_left_rooms` is deleting rows related to this room from + # the table `current_state_events` and `get_current_state_events` is `None`. + await self.store.forget(user_id, room_id) async def ratelimit_multiple_invites( self, @@ -2046,25 +2065,141 @@ class RoomMemberMasterHandler(RoomMemberHandler): """Implements RoomMemberHandler._user_left_room""" user_left_room(self.distributor, target, room_id) - async def forget(self, user: UserID, room_id: str) -> None: - user_id = user.to_string() - member = await self._storage_controllers.state.get_current_state_event( - room_id=room_id, event_type=EventTypes.Member, state_key=user_id - ) - membership = member.membership if member else None +class RoomForgetterHandler(StateDeltasHandler): + """Forgets rooms when they are left, when enabled in the homeserver config. - if membership is not None and membership not in [ - Membership.LEAVE, - Membership.BAN, - ]: - raise SynapseError(400, "User %s in room %s" % (user_id, room_id)) + For the purposes of this feature, kicks, bans and "leaves" via state resolution + weirdness are all considered to be leaves. - # In normal case this call is only required if `membership` is not `None`. - # But: After the last member had left the room, the background update - # `_background_remove_left_rooms` is deleting rows related to this room from - # the table `current_state_events` and `get_current_state_events` is `None`. - await self.store.forget(user_id, room_id) + Derived from `StatsHandler` and `UserDirectoryHandler`. + """ + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + + self._hs = hs + self._store = hs.get_datastores().main + self._storage_controllers = hs.get_storage_controllers() + self._clock = hs.get_clock() + self._notifier = hs.get_notifier() + self._room_member_handler = hs.get_room_member_handler() + + # The current position in the current_state_delta stream + self.pos: Optional[int] = None + + # Guard to ensure we only process deltas one at a time + self._is_processing = False + + if hs.config.worker.run_background_tasks: + self._notifier.add_replication_callback(self.notify_new_event) + + # We kick this off to pick up outstanding work from before the last restart. + self._clock.call_later(0, self.notify_new_event) + + def notify_new_event(self) -> None: + """Called when there may be more deltas to process""" + if self._is_processing: + return + + self._is_processing = True + + async def process() -> None: + try: + await self._unsafe_process() + finally: + self._is_processing = False + + run_as_background_process("room_forgetter.notify_new_event", process) + + async def _unsafe_process(self) -> None: + # If self.pos is None then means we haven't fetched it from DB + if self.pos is None: + self.pos = await self._store.get_room_forgetter_stream_pos() + room_max_stream_ordering = self._store.get_room_max_stream_ordering() + if self.pos > room_max_stream_ordering: + # apparently, we've processed more events than exist in the database! + # this can happen if events are removed with history purge or similar. + logger.warning( + "Event stream ordering appears to have gone backwards (%i -> %i): " + "rewinding room forgetter processor", + self.pos, + room_max_stream_ordering, + ) + self.pos = room_max_stream_ordering + + if not self._hs.config.room.forget_on_leave: + # Update the processing position, so that if the server admin turns the + # feature on at a later date, we don't decide to forget every room that + # has ever been left in the past. + self.pos = self._store.get_room_max_stream_ordering() + await self._store.update_room_forgetter_stream_pos(self.pos) + return + + # Loop round handling deltas until we're up to date + + while True: + # Be sure to read the max stream_ordering *before* checking if there are any outstanding + # deltas, since there is otherwise a chance that we could miss updates which arrive + # after we check the deltas. + room_max_stream_ordering = self._store.get_room_max_stream_ordering() + if self.pos == room_max_stream_ordering: + break + + logger.debug( + "Processing room forgetting %s->%s", self.pos, room_max_stream_ordering + ) + ( + max_pos, + deltas, + ) = await self._storage_controllers.state.get_current_state_deltas( + self.pos, room_max_stream_ordering + ) + + logger.debug("Handling %d state deltas", len(deltas)) + await self._handle_deltas(deltas) + + self.pos = max_pos + + # Expose current event processing position to prometheus + event_processing_positions.labels("room_forgetter").set(max_pos) + + await self._store.update_room_forgetter_stream_pos(max_pos) + + async def _handle_deltas(self, deltas: List[Dict[str, Any]]) -> None: + """Called with the state deltas to process""" + for delta in deltas: + typ = delta["type"] + state_key = delta["state_key"] + room_id = delta["room_id"] + event_id = delta["event_id"] + prev_event_id = delta["prev_event_id"] + + if typ != EventTypes.Member: + continue + + if not self._hs.is_mine_id(state_key): + continue + + change = await self._get_key_change( + prev_event_id, + event_id, + key_name="membership", + public_value=Membership.JOIN, + ) + is_leave = change is MatchChange.now_false + + if is_leave: + try: + await self._room_member_handler.forget( + UserID.from_string(state_key), room_id + ) + except SynapseError as e: + if e.code == 400: + # The user is back in the room. + pass + else: + raise def get_users_which_can_issue_invite(auth_events: StateMap[EventBase]) -> List[str]: diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py index 76e36b8a6d..e8ff1ad063 100644 --- a/synapse/handlers/room_member_worker.py +++ b/synapse/handlers/room_member_worker.py @@ -137,6 +137,3 @@ class RoomMemberWorkerHandler(RoomMemberHandler): await self._notify_change_client( user_id=target.to_string(), room_id=room_id, change="left" ) - - async def forget(self, target: UserID, room_id: str) -> None: - raise RuntimeError("Cannot forget rooms on workers.") diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 4f8fa445d9..e91ee05e99 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +import random import urllib.parse from typing import TYPE_CHECKING, Dict, List, Optional, Union @@ -114,6 +115,8 @@ class HttpPusher(Pusher): ) self._pusherpool = hs.get_pusherpool() + self.push_jitter_delay_ms = hs.config.push.push_jitter_delay_ms + self.data = pusher_config.data if self.data is None: raise PusherConfigException("'data' key can not be null for HTTP pusher") @@ -326,6 +329,21 @@ class HttpPusher(Pusher): event = await self.store.get_event(push_action.event_id, allow_none=True) if event is None: return True # It's been redacted + + # Check if we should delay sending out the notification by a random + # amount. + # + # Note: we base the delay off of when the event was sent, rather than + # now, to handle the case where we need to send out many notifications + # at once. If we just slept the random amount each loop then the last + # push notification in the set could be delayed by many times the max + # delay. + if self.push_jitter_delay_ms: + delay_ms = random.randint(1, self.push_jitter_delay_ms) + diff_ms = event.origin_server_ts + delay_ms - self.clock.time_msec() + if diff_ms > 0: + await self.clock.sleep(diff_ms / 1000) + rejected = await self.dispatch_push_event(event, tweaks, badge) if rejected is False: return False diff --git a/synapse/rest/admin/experimental_features.py b/synapse/rest/admin/experimental_features.py index 1d409ac2b7..abf273af10 100644 --- a/synapse/rest/admin/experimental_features.py +++ b/synapse/rest/admin/experimental_features.py @@ -33,7 +33,6 @@ class ExperimentalFeature(str, Enum): """ MSC3026 = "msc3026" - MSC2654 = "msc2654" MSC3881 = "msc3881" MSC3967 = "msc3967" diff --git a/synapse/server.py b/synapse/server.py index 08ad97b952..e597627a6d 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -93,7 +93,11 @@ from synapse.handlers.room import ( ) from synapse.handlers.room_batch import RoomBatchHandler from synapse.handlers.room_list import RoomListHandler -from synapse.handlers.room_member import RoomMemberHandler, RoomMemberMasterHandler +from synapse.handlers.room_member import ( + RoomForgetterHandler, + RoomMemberHandler, + RoomMemberMasterHandler, +) from synapse.handlers.room_member_worker import RoomMemberWorkerHandler from synapse.handlers.room_summary import RoomSummaryHandler from synapse.handlers.search import SearchHandler @@ -232,6 +236,7 @@ class HomeServer(metaclass=abc.ABCMeta): "message", "pagination", "profile", + "room_forgetter", "stats", ] @@ -827,6 +832,10 @@ class HomeServer(metaclass=abc.ABCMeta): return PushRulesHandler(self) @cache_in_self + def get_room_forgetter_handler(self) -> RoomForgetterHandler: + return RoomForgetterHandler(self) + + @cache_in_self def get_outbound_redis_connection(self) -> "ConnectionHandler": """ The Redis connection used for replication. diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index a99aea8926..ca085ef800 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -561,6 +561,50 @@ class BackgroundUpdater: updater, oneshot=True ) + def register_background_validate_constraint( + self, update_name: str, constraint_name: str, table: str + ) -> None: + """Helper for store classes to do a background validate constraint. + + This only applies on PostgreSQL. + + To use: + + 1. use a schema delta file to add a background update. Example: + INSERT INTO background_updates (update_name, progress_json) VALUES + ('validate_my_constraint', '{}'); + + 2. In the Store constructor, call this method + + Args: + update_name: update_name to register for + constraint_name: name of constraint to validate + table: table the constraint is applied to + """ + + def runner(conn: Connection) -> None: + c = conn.cursor() + + sql = f""" + ALTER TABLE {table} VALIDATE CONSTRAINT {constraint_name}; + """ + logger.debug("[SQL] %s", sql) + c.execute(sql) + + async def updater(progress: JsonDict, batch_size: int) -> int: + assert isinstance( + self.db_pool.engine, engines.PostgresEngine + ), "validate constraint background update registered for non-Postres database" + + logger.info("Validating constraint %s to %s", constraint_name, table) + await self.db_pool.runWithConnection(runner) + await self._end_background_update(update_name) + return 1 + + self._background_update_handlers[update_name] = _BackgroundUpdateHandler( + updater, oneshot=True + ) + async def create_index_in_background( self, index_name: str, diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 1f5f5eb6f8..313cf1a8d0 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -386,13 +386,20 @@ class LoggingTransaction: self.executemany(sql, args) def execute_values( - self, sql: str, values: Iterable[Iterable[Any]], fetch: bool = True + self, + sql: str, + values: Iterable[Iterable[Any]], + template: Optional[str] = None, + fetch: bool = True, ) -> List[Tuple]: """Corresponds to psycopg2.extras.execute_values. Only available when using postgres. The `fetch` parameter must be set to False if the query does not return rows (e.g. INSERTs). + + The `template` is the snippet to merge to every item in argslist to + compose the query. """ assert isinstance(self.database_engine, PostgresEngine) from psycopg2.extras import execute_values @@ -400,7 +407,9 @@ class LoggingTransaction: return self._do_execute( # TODO: is it safe for values to be Iterable[Iterable[Any]] here? # https://www.psycopg.org/docs/extras.html?highlight=execute_batch#psycopg2.extras.execute_values says values should be Sequence[Sequence] - lambda the_sql: execute_values(self.txn, the_sql, values, fetch=fetch), + lambda the_sql: execute_values( + self.txn, the_sql, values, template=template, fetch=fetch + ), sql, ) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index eeccf5db24..2e98a29fef 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -100,7 +100,6 @@ from synapse.storage.database import ( ) from synapse.storage.databases.main.receipts import ReceiptsWorkerStore from synapse.storage.databases.main.stream import StreamWorkerStore -from synapse.types import JsonDict from synapse.util import json_encoder from synapse.util.caches.descriptors import cached @@ -289,180 +288,22 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas unique=True, ) - self.db_pool.updates.register_background_update_handler( - "event_push_backfill_thread_id", - self._background_backfill_thread_id, + self.db_pool.updates.register_background_validate_constraint( + "event_push_actions_staging_thread_id", + constraint_name="event_push_actions_staging_thread_id", + table="event_push_actions_staging", ) - - # Indexes which will be used to quickly make the thread_id column non-null. - self.db_pool.updates.register_background_index_update( - "event_push_actions_thread_id_null", - index_name="event_push_actions_thread_id_null", + self.db_pool.updates.register_background_validate_constraint( + "event_push_actions_thread_id", + constraint_name="event_push_actions_thread_id", table="event_push_actions", - columns=["thread_id"], - where_clause="thread_id IS NULL", ) - self.db_pool.updates.register_background_index_update( - "event_push_summary_thread_id_null", - index_name="event_push_summary_thread_id_null", + self.db_pool.updates.register_background_validate_constraint( + "event_push_summary_thread_id", + constraint_name="event_push_summary_thread_id", table="event_push_summary", - columns=["thread_id"], - where_clause="thread_id IS NULL", ) - # Check ASAP (and then later, every 1s) to see if we have finished - # background updates the event_push_actions and event_push_summary tables. - self._clock.call_later(0.0, self._check_event_push_backfill_thread_id) - self._event_push_backfill_thread_id_done = False - - @wrap_as_background_process("check_event_push_backfill_thread_id") - async def _check_event_push_backfill_thread_id(self) -> None: - """ - Has thread_id finished backfilling? - - If not, we need to just-in-time update it so the queries work. - """ - done = await self.db_pool.updates.has_completed_background_update( - "event_push_backfill_thread_id" - ) - - if done: - self._event_push_backfill_thread_id_done = True - else: - # Reschedule to run. - self._clock.call_later(15.0, self._check_event_push_backfill_thread_id) - - async def _background_backfill_thread_id( - self, progress: JsonDict, batch_size: int - ) -> int: - """ - Fill in the thread_id field for event_push_actions and event_push_summary. - - This is preparatory so that it can be made non-nullable in the future. - - Because all current (null) data is done in an unthreaded manner this - simply assumes it is on the "main" timeline. Since event_push_actions - are periodically cleared it is not possible to correctly re-calculate - the thread_id. - """ - event_push_actions_done = progress.get("event_push_actions_done", False) - - def add_thread_id_txn( - txn: LoggingTransaction, start_stream_ordering: int - ) -> int: - sql = """ - SELECT stream_ordering - FROM event_push_actions - WHERE - thread_id IS NULL - AND stream_ordering > ? - ORDER BY stream_ordering - LIMIT ? - """ - txn.execute(sql, (start_stream_ordering, batch_size)) - - # No more rows to process. - rows = txn.fetchall() - if not rows: - progress["event_push_actions_done"] = True - self.db_pool.updates._background_update_progress_txn( - txn, "event_push_backfill_thread_id", progress - ) - return 0 - - # Update the thread ID for any of those rows. - max_stream_ordering = rows[-1][0] - - sql = """ - UPDATE event_push_actions - SET thread_id = 'main' - WHERE ? < stream_ordering AND stream_ordering <= ? AND thread_id IS NULL - """ - txn.execute( - sql, - ( - start_stream_ordering, - max_stream_ordering, - ), - ) - - # Update progress. - processed_rows = txn.rowcount - progress["max_event_push_actions_stream_ordering"] = max_stream_ordering - self.db_pool.updates._background_update_progress_txn( - txn, "event_push_backfill_thread_id", progress - ) - - return processed_rows - - def add_thread_id_summary_txn(txn: LoggingTransaction) -> int: - min_user_id = progress.get("max_summary_user_id", "") - min_room_id = progress.get("max_summary_room_id", "") - - # Slightly overcomplicated query for getting the Nth user ID / room - # ID tuple, or the last if there are less than N remaining. - sql = """ - SELECT user_id, room_id FROM ( - SELECT user_id, room_id FROM event_push_summary - WHERE (user_id, room_id) > (?, ?) - AND thread_id IS NULL - ORDER BY user_id, room_id - LIMIT ? - ) AS e - ORDER BY user_id DESC, room_id DESC - LIMIT 1 - """ - - txn.execute(sql, (min_user_id, min_room_id, batch_size)) - row = txn.fetchone() - if not row: - return 0 - - max_user_id, max_room_id = row - - sql = """ - UPDATE event_push_summary - SET thread_id = 'main' - WHERE - (?, ?) < (user_id, room_id) AND (user_id, room_id) <= (?, ?) - AND thread_id IS NULL - """ - txn.execute(sql, (min_user_id, min_room_id, max_user_id, max_room_id)) - processed_rows = txn.rowcount - - progress["max_summary_user_id"] = max_user_id - progress["max_summary_room_id"] = max_room_id - self.db_pool.updates._background_update_progress_txn( - txn, "event_push_backfill_thread_id", progress - ) - - return processed_rows - - # First update the event_push_actions table, then the event_push_summary table. - # - # Note that the event_push_actions_staging table is ignored since it is - # assumed that items in that table will only exist for a short period of - # time. - if not event_push_actions_done: - result = await self.db_pool.runInteraction( - "event_push_backfill_thread_id", - add_thread_id_txn, - progress.get("max_event_push_actions_stream_ordering", 0), - ) - else: - result = await self.db_pool.runInteraction( - "event_push_backfill_thread_id", - add_thread_id_summary_txn, - ) - - # Only done after the event_push_summary table is done. - if not result: - await self.db_pool.updates._end_background_update( - "event_push_backfill_thread_id" - ) - - return result - async def get_unread_counts_by_room_for_user(self, user_id: str) -> Dict[str, int]: """Get the notification count by room for a user. Only considers notifications, not highlight or unread counts, and threads are currently aggregated under their room. @@ -711,25 +552,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE), ) - # First ensure that the existing rows have an updated thread_id field. - if not self._event_push_backfill_thread_id_done: - txn.execute( - """ - UPDATE event_push_summary - SET thread_id = ? - WHERE room_id = ? AND user_id = ? AND thread_id is NULL - """, - (MAIN_TIMELINE, room_id, user_id), - ) - txn.execute( - """ - UPDATE event_push_actions - SET thread_id = ? - WHERE room_id = ? AND user_id = ? AND thread_id is NULL - """, - (MAIN_TIMELINE, room_id, user_id), - ) - # First we pull the counts from the summary table. # # We check that `last_receipt_stream_ordering` matches the stream ordering of the @@ -1545,25 +1367,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas (room_id, user_id, stream_ordering, *thread_args), ) - # First ensure that the existing rows have an updated thread_id field. - if not self._event_push_backfill_thread_id_done: - txn.execute( - """ - UPDATE event_push_summary - SET thread_id = ? - WHERE room_id = ? AND user_id = ? AND thread_id is NULL - """, - (MAIN_TIMELINE, room_id, user_id), - ) - txn.execute( - """ - UPDATE event_push_actions - SET thread_id = ? - WHERE room_id = ? AND user_id = ? AND thread_id is NULL - """, - (MAIN_TIMELINE, room_id, user_id), - ) - # Fetch the notification counts between the stream ordering of the # latest receipt and what was previously summarised. unread_counts = self._get_notif_unread_count_for_user_room( @@ -1698,19 +1501,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas rotate_to_stream_ordering: The new maximum event stream ordering to summarise. """ - # Ensure that any new actions have an updated thread_id. - if not self._event_push_backfill_thread_id_done: - txn.execute( - """ - UPDATE event_push_actions - SET thread_id = ? - WHERE ? < stream_ordering AND stream_ordering <= ? AND thread_id IS NULL - """, - (MAIN_TIMELINE, old_rotate_stream_ordering, rotate_to_stream_ordering), - ) - - # XXX Do we need to update summaries here too? - # Calculate the new counts that should be upserted into event_push_summary sql = """ SELECT user_id, room_id, thread_id, @@ -1773,20 +1563,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas logger.info("Rotating notifications, handling %d rows", len(summaries)) - # Ensure that any updated threads have the proper thread_id. - if not self._event_push_backfill_thread_id_done: - txn.execute_batch( - """ - UPDATE event_push_summary - SET thread_id = ? - WHERE room_id = ? AND user_id = ? AND thread_id is NULL - """, - [ - (MAIN_TIMELINE, room_id, user_id) - for user_id, room_id, _ in summaries - ], - ) - self.db_pool.simple_upsert_many_txn( txn, table="event_push_summary", @@ -1836,6 +1612,15 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas # deletes. batch_size = self._rotate_count + if isinstance(self.database_engine, PostgresEngine): + # Temporarily disable sequential scans in this transaction. We + # need to do this as the postgres statistics don't take into + # account the `highlight = 0` part when estimating the + # distribution of `stream_ordering`. I.e. since we keep old + # highlight rows the query planner thinks there are way more old + # rows to delete than there actually are. + txn.execute("SET LOCAL enable_seqscan=off") + txn.execute( """ SELECT stream_ordering FROM event_push_actions diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index daad58291a..e068f27a10 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -82,7 +82,7 @@ class EventIdMembership: membership: str -class RoomMemberWorkerStore(EventsWorkerStore): +class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): def __init__( self, database: DatabasePool, @@ -1372,6 +1372,50 @@ class RoomMemberWorkerStore(EventsWorkerStore): _is_local_host_in_room_ignoring_users_txn, ) + async def forget(self, user_id: str, room_id: str) -> None: + """Indicate that user_id wishes to discard history for room_id.""" + + def f(txn: LoggingTransaction) -> None: + self.db_pool.simple_update_txn( + txn, + table="room_memberships", + keyvalues={"user_id": user_id, "room_id": room_id}, + updatevalues={"forgotten": 1}, + ) + + self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id)) + self._invalidate_cache_and_stream( + txn, self.get_forgotten_rooms_for_user, (user_id,) + ) + + await self.db_pool.runInteraction("forget_membership", f) + + async def get_room_forgetter_stream_pos(self) -> int: + """Get the stream position of the background process to forget rooms when left + by users. + """ + return await self.db_pool.simple_select_one_onecol( + table="room_forgetter_stream_pos", + keyvalues={}, + retcol="stream_id", + desc="room_forgetter_stream_pos", + ) + + async def update_room_forgetter_stream_pos(self, stream_id: int) -> None: + """Update the stream position of the background process to forget rooms when + left by users. + + Must only be used by the worker running the background process. + """ + assert self.hs.config.worker.run_background_tasks + + await self.db_pool.simple_update_one( + table="room_forgetter_stream_pos", + keyvalues={}, + updatevalues={"stream_id": stream_id}, + desc="room_forgetter_stream_pos", + ) + class RoomMemberBackgroundUpdateStore(SQLBaseStore): def __init__( @@ -1553,29 +1597,6 @@ class RoomMemberStore( ): super().__init__(database, db_conn, hs) - async def forget(self, user_id: str, room_id: str) -> None: - """Indicate that user_id wishes to discard history for room_id.""" - - def f(txn: LoggingTransaction) -> None: - sql = ( - "UPDATE" - " room_memberships" - " SET" - " forgotten = 1" - " WHERE" - " user_id = ?" - " AND" - " room_id = ?" - ) - txn.execute(sql, (user_id, room_id)) - - self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id)) - self._invalidate_cache_and_stream( - txn, self.get_forgotten_rooms_for_user, (user_id,) - ) - - await self.db_pool.runInteraction("forget_membership", f) - def extract_heroes_from_room_summary( details: Mapping[str, MemberSummary], me: str diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index 5d65faed16..b7d58978de 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -27,6 +27,8 @@ from typing import ( cast, ) +import attr + try: # Figure out if ICU support is available for searching users. import icu @@ -66,6 +68,19 @@ logger = logging.getLogger(__name__) TEMP_TABLE = "_temp_populate_user_directory" +@attr.s(auto_attribs=True, frozen=True) +class _UserDirProfile: + """Helper type for the user directory code for an entry to be inserted into + the directory. + """ + + user_id: str + + # If the display name or avatar URL are unexpected types, replace with None + display_name: Optional[str] = attr.ib(default=None, converter=non_null_str_or_none) + avatar_url: Optional[str] = attr.ib(default=None, converter=non_null_str_or_none) + + class UserDirectoryBackgroundUpdateStore(StateDeltasStore): # How many records do we calculate before sending it to # add_users_who_share_private_rooms? @@ -381,25 +396,65 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): % (len(users_to_work_on), progress["remaining"]) ) - for user_id in users_to_work_on: - if await self.should_include_local_user_in_dir(user_id): - profile = await self.get_profileinfo(get_localpart_from_id(user_id)) # type: ignore[attr-defined] - await self.update_profile_in_user_dir( - user_id, profile.display_name, profile.avatar_url - ) - - # We've finished processing a user. Delete it from the table. - await self.db_pool.simple_delete_one( - TEMP_TABLE + "_users", {"user_id": user_id} - ) - # Update the remaining counter. - progress["remaining"] -= 1 - await self.db_pool.runInteraction( - "populate_user_directory", - self.db_pool.updates._background_update_progress_txn, - "populate_user_directory_process_users", - progress, + # First filter down to users we want to insert into the user directory. + users_to_insert = [ + user_id + for user_id in users_to_work_on + if await self.should_include_local_user_in_dir(user_id) + ] + + # Next fetch their profiles. Note that the `user_id` here is the + # *localpart*, and that not all users have profiles. + profile_rows = await self.db_pool.simple_select_many_batch( + table="profiles", + column="user_id", + iterable=[get_localpart_from_id(u) for u in users_to_insert], + retcols=( + "user_id", + "displayname", + "avatar_url", + ), + keyvalues={}, + desc="populate_user_directory_process_users_get_profiles", + ) + profiles = { + f"@{row['user_id']}:{self.server_name}": _UserDirProfile( + f"@{row['user_id']}:{self.server_name}", + row["displayname"], + row["avatar_url"], ) + for row in profile_rows + } + + profiles_to_insert = [ + profiles.get(user_id) or _UserDirProfile(user_id) + for user_id in users_to_insert + ] + + # Actually insert the users with their profiles into the directory. + await self.db_pool.runInteraction( + "populate_user_directory_process_users_insertion", + self._update_profiles_in_user_dir_txn, + profiles_to_insert, + ) + + # We've finished processing the users. Delete it from the table. + await self.db_pool.simple_delete_many( + table=TEMP_TABLE + "_users", + column="user_id", + iterable=users_to_work_on, + keyvalues={}, + desc="populate_user_directory_process_users_delete", + ) + + # Update the remaining counter. + progress["remaining"] -= len(users_to_work_on) + await self.db_pool.runInteraction( + "populate_user_directory", + self.db_pool.updates._background_update_progress_txn, + "populate_user_directory_process_users", + progress, + ) return len(users_to_work_on) @@ -584,72 +639,102 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): Update or add a user's profile in the user directory. If the user is remote, the profile will be marked as not stale. """ - # If the display name or avatar URL are unexpected types, replace with None. - display_name = non_null_str_or_none(display_name) - avatar_url = non_null_str_or_none(avatar_url) + await self.db_pool.runInteraction( + "update_profiles_in_user_dir", + self._update_profiles_in_user_dir_txn, + [_UserDirProfile(user_id, display_name, avatar_url)], + ) + + def _update_profiles_in_user_dir_txn( + self, + txn: LoggingTransaction, + profiles: Sequence[_UserDirProfile], + ) -> None: + self.db_pool.simple_upsert_many_txn( + txn, + table="user_directory", + key_names=("user_id",), + key_values=[(p.user_id,) for p in profiles], + value_names=("display_name", "avatar_url"), + value_values=[ + ( + p.display_name, + p.avatar_url, + ) + for p in profiles + ], + ) - def _update_profile_in_user_dir_txn(txn: LoggingTransaction) -> None: - self.db_pool.simple_upsert_txn( + # Remote users: Make sure the profile is not marked as stale anymore. + remote_users = [ + p.user_id for p in profiles if not self.hs.is_mine_id(p.user_id) + ] + if remote_users: + self.db_pool.simple_delete_many_txn( txn, - table="user_directory", - keyvalues={"user_id": user_id}, - values={"display_name": display_name, "avatar_url": avatar_url}, + table="user_directory_stale_remote_users", + column="user_id", + values=remote_users, + keyvalues={}, ) - if not self.hs.is_mine_id(user_id): - # Remote users: Make sure the profile is not marked as stale anymore. - self.db_pool.simple_delete_txn( - txn, - table="user_directory_stale_remote_users", - keyvalues={"user_id": user_id}, + if isinstance(self.database_engine, PostgresEngine): + # We weight the localpart most highly, then display name and finally + # server name + template = """ + ( + %s, + setweight(to_tsvector('simple', %s), 'A') + || setweight(to_tsvector('simple', %s), 'D') + || setweight(to_tsvector('simple', COALESCE(%s, '')), 'B') ) + """ - # The display name that goes into the database index. - index_display_name = display_name - if index_display_name is not None: - index_display_name = _filter_text_for_index(index_display_name) - - if isinstance(self.database_engine, PostgresEngine): - # We weight the localpart most highly, then display name and finally - # server name - sql = """ - INSERT INTO user_directory_search(user_id, vector) - VALUES (?, - setweight(to_tsvector('simple', ?), 'A') - || setweight(to_tsvector('simple', ?), 'D') - || setweight(to_tsvector('simple', COALESCE(?, '')), 'B') - ) ON CONFLICT (user_id) DO UPDATE SET vector=EXCLUDED.vector - """ - txn.execute( - sql, + sql = """ + INSERT INTO user_directory_search(user_id, vector) + VALUES ? ON CONFLICT (user_id) DO UPDATE SET vector=EXCLUDED.vector + """ + txn.execute_values( + sql, + [ ( - user_id, - get_localpart_from_id(user_id), - get_domain_from_id(user_id), - index_display_name, - ), - ) - elif isinstance(self.database_engine, Sqlite3Engine): - value = ( - "%s %s" % (user_id, index_display_name) - if index_display_name - else user_id - ) - self.db_pool.simple_upsert_txn( - txn, - table="user_directory_search", - keyvalues={"user_id": user_id}, - values={"value": value}, - ) - else: - # This should be unreachable. - raise Exception("Unrecognized database engine") + p.user_id, + get_localpart_from_id(p.user_id), + get_domain_from_id(p.user_id), + _filter_text_for_index(p.display_name) + if p.display_name + else None, + ) + for p in profiles + ], + template=template, + fetch=False, + ) + elif isinstance(self.database_engine, Sqlite3Engine): + values = [] + for p in profiles: + if p.display_name is not None: + index_display_name = _filter_text_for_index(p.display_name) + value = f"{p.user_id} {index_display_name}" + else: + value = p.user_id - txn.call_after(self.get_user_in_directory.invalidate, (user_id,)) + values.append((value,)) - await self.db_pool.runInteraction( - "update_profile_in_user_dir", _update_profile_in_user_dir_txn - ) + self.db_pool.simple_upsert_many_txn( + txn, + table="user_directory_search", + key_names=("user_id",), + key_values=[(p.user_id,) for p in profiles], + value_names=("value",), + value_values=values, + ) + else: + # This should be unreachable. + raise Exception("Unrecognized database engine") + + for p in profiles: + txn.call_after(self.get_user_in_directory.invalidate, (p.user_id,)) async def add_users_who_share_private_room( self, room_id: str, user_id_tuples: Iterable[Tuple[str, str]] diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index 1672976209..741563abc6 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -106,6 +106,9 @@ Changes in SCHEMA_VERSION = 76: SCHEMA_COMPAT_VERSION = ( # Queries against `event_stream_ordering` columns in membership tables must # be disambiguated. + # + # The threads_id column must written to with non-null values for the + # event_push_actions, event_push_actions_staging, and event_push_summary tables. 74 ) """Limit on how far the synapse codebase can be rolled back without breaking db compat diff --git a/synapse/storage/schema/main/delta/76/04_add_room_forgetter.sql b/synapse/storage/schema/main/delta/76/04_add_room_forgetter.sql new file mode 100644 index 0000000000..be4b57d86f --- /dev/null +++ b/synapse/storage/schema/main/delta/76/04_add_room_forgetter.sql @@ -0,0 +1,24 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE room_forgetter_stream_pos ( + Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row. + stream_id BIGINT NOT NULL, + CHECK (Lock='X') +); + +INSERT INTO room_forgetter_stream_pos ( + stream_id +) SELECT COALESCE(MAX(stream_ordering), 0) from events; diff --git a/synapse/storage/schema/main/delta/76/04thread_notifications_backfill.sql b/synapse/storage/schema/main/delta/76/04thread_notifications_backfill.sql new file mode 100644 index 0000000000..ce6f9ff937 --- /dev/null +++ b/synapse/storage/schema/main/delta/76/04thread_notifications_backfill.sql @@ -0,0 +1,28 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Force the background updates from 06thread_notifications.sql to run in the +-- foreground as code will now require those to be "done". + +DELETE FROM background_updates WHERE update_name = 'event_push_backfill_thread_id'; + +-- Overwrite any null thread_id values. +UPDATE event_push_actions_staging SET thread_id = 'main' WHERE thread_id IS NULL; +UPDATE event_push_actions SET thread_id = 'main' WHERE thread_id IS NULL; +UPDATE event_push_summary SET thread_id = 'main' WHERE thread_id IS NULL; + +-- Drop the background updates to calculate the indexes used to find null thread_ids. +DELETE FROM background_updates WHERE update_name = 'event_push_actions_thread_id_null'; +DELETE FROM background_updates WHERE update_name = 'event_push_summary_thread_id_null'; diff --git a/synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.postgres b/synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.postgres new file mode 100644 index 0000000000..40936def6f --- /dev/null +++ b/synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.postgres @@ -0,0 +1,37 @@ +/* Copyright 2022 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- The thread_id columns can now be made non-nullable, this is done by using a +-- constraint (and not altering the column) to avoid taking out a full table lock. +-- +-- We initially add an invalid constraint which guards against new data (this +-- doesn't lock the table). +ALTER TABLE event_push_actions_staging + ADD CONSTRAINT event_push_actions_staging_thread_id CHECK (thread_id IS NOT NULL) NOT VALID; +ALTER TABLE event_push_actions + ADD CONSTRAINT event_push_actions_thread_id CHECK (thread_id IS NOT NULL) NOT VALID; +ALTER TABLE event_push_summary + ADD CONSTRAINT event_push_summary_thread_id CHECK (thread_id IS NOT NULL) NOT VALID; + +-- We then validate the constraint which doesn't need to worry about new data. It +-- only needs a SHARE UPDATE EXCLUSIVE lock but can still take a while to complete. +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (7605, 'event_push_actions_staging_thread_id', '{}'), + (7605, 'event_push_actions_thread_id', '{}'), + (7605, 'event_push_summary_thread_id', '{}'); + +-- Drop the indexes used to find null thread_ids. +DROP INDEX IF EXISTS event_push_actions_thread_id_null; +DROP INDEX IF EXISTS event_push_summary_thread_id_null; diff --git a/synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.sqlite b/synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.sqlite new file mode 100644 index 0000000000..e9372b6cf9 --- /dev/null +++ b/synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.sqlite @@ -0,0 +1,102 @@ +/* Copyright 2022 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + -- The thread_id columns can now be made non-nullable. +-- +-- SQLite doesn't support modifying columns to an existing table, so it must +-- be recreated. + +-- Create the new tables. +CREATE TABLE event_push_actions_staging_new ( + event_id TEXT NOT NULL, + user_id TEXT NOT NULL, + actions TEXT NOT NULL, + notif SMALLINT NOT NULL, + highlight SMALLINT NOT NULL, + unread SMALLINT, + thread_id TEXT, + inserted_ts BIGINT, + CONSTRAINT event_push_actions_staging_thread_id CHECK (thread_id is NOT NULL) +); + +CREATE TABLE event_push_actions_new ( + room_id TEXT NOT NULL, + event_id TEXT NOT NULL, + user_id TEXT NOT NULL, + profile_tag VARCHAR(32), + actions TEXT NOT NULL, + topological_ordering BIGINT, + stream_ordering BIGINT, + notif SMALLINT, + highlight SMALLINT, + unread SMALLINT, + thread_id TEXT, + CONSTRAINT event_id_user_id_profile_tag_uniqueness UNIQUE (room_id, event_id, user_id, profile_tag), + CONSTRAINT event_push_actions_thread_id CHECK (thread_id is NOT NULL) +); + +CREATE TABLE event_push_summary_new ( + user_id TEXT NOT NULL, + room_id TEXT NOT NULL, + notif_count BIGINT NOT NULL, + stream_ordering BIGINT NOT NULL, + unread_count BIGINT, + last_receipt_stream_ordering BIGINT, + thread_id TEXT, + CONSTRAINT event_push_summary_thread_id CHECK (thread_id is NOT NULL) +); + +-- Copy the data. +INSERT INTO event_push_actions_staging_new (event_id, user_id, actions, notif, highlight, unread, thread_id, inserted_ts) + SELECT event_id, user_id, actions, notif, highlight, unread, thread_id, inserted_ts + FROM event_push_actions_staging; + +INSERT INTO event_push_actions_new (room_id, event_id, user_id, profile_tag, actions, topological_ordering, stream_ordering, notif, highlight, unread, thread_id) + SELECT room_id, event_id, user_id, profile_tag, actions, topological_ordering, stream_ordering, notif, highlight, unread, thread_id + FROM event_push_actions; + +INSERT INTO event_push_summary_new (user_id, room_id, notif_count, stream_ordering, unread_count, last_receipt_stream_ordering, thread_id) + SELECT user_id, room_id, notif_count, stream_ordering, unread_count, last_receipt_stream_ordering, thread_id + FROM event_push_summary; + +-- Drop the old tables. +DROP TABLE event_push_actions_staging; +DROP TABLE event_push_actions; +DROP TABLE event_push_summary; + +-- Rename the tables. +ALTER TABLE event_push_actions_staging_new RENAME TO event_push_actions_staging; +ALTER TABLE event_push_actions_new RENAME TO event_push_actions; +ALTER TABLE event_push_summary_new RENAME TO event_push_summary; + +-- Recreate the indexes. +CREATE INDEX event_push_actions_staging_id ON event_push_actions_staging(event_id); + +CREATE INDEX event_push_actions_highlights_index ON event_push_actions (user_id, room_id, topological_ordering, stream_ordering); +CREATE INDEX event_push_actions_rm_tokens on event_push_actions( user_id, room_id, topological_ordering, stream_ordering ); +CREATE INDEX event_push_actions_room_id_user_id on event_push_actions(room_id, user_id); +CREATE INDEX event_push_actions_stream_ordering on event_push_actions( stream_ordering, user_id ); +CREATE INDEX event_push_actions_u_highlight ON event_push_actions (user_id, stream_ordering); + +CREATE UNIQUE INDEX event_push_summary_unique_index2 ON event_push_summary (user_id, room_id, thread_id) ; + +-- Recreate some indexes in the background, by re-running the background updates +-- from 72/02event_push_actions_index.sql and 72/06thread_notifications.sql. +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (7403, 'event_push_summary_unique_index2', '{}') + ON CONFLICT (update_name) DO UPDATE SET progress_json = '{}'; +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (7403, 'event_push_actions_stream_highlight_index', '{}') + ON CONFLICT (update_name) DO UPDATE SET progress_json = '{}'; |