summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/storage/databases/main/roommember.py87
-rw-r--r--synapse/storage/schema/main/delta/90/01_add_column_participant_room_memberships_table.sql6
-rw-r--r--synapse/storage/schema/main/delta/92/02_remove_populate_participant_bg_update.sql17
3 files changed, 18 insertions, 92 deletions
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py

index dfa7dd48d9..b8c78baa6c 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py
@@ -1693,93 +1693,6 @@ class RoomMemberBackgroundUpdateStore(SQLBaseStore): columns=["user_id", "room_id"], ) - self.db_pool.updates.register_background_update_handler( - "populate_participant_bg_update", self._populate_participant - ) - - async def _populate_participant(self, progress: JsonDict, batch_size: int) -> int: - """ - Background update to populate column `participant` on `room_memberships` table - - A 'participant' is someone who is currently joined to a room and has sent at least - one `m.room.message` or `m.room.encrypted` event. - - This background update will set the `participant` column across all rows in - `room_memberships` based on the user's *current* join status, and if - they've *ever* sent a message or encrypted event. Therefore one should - never assume the `participant` column's value is based solely on whether - the user participated in a previous "session" (where a "session" is defined - as a period between the user joining and leaving). See - https://github.com/element-hq/synapse/pull/18068#discussion_r1931070291 - for further detail. - """ - stream_token = progress.get("last_stream_token", None) - - def _get_max_stream_token_txn(txn: LoggingTransaction) -> int: - sql = """ - SELECT event_stream_ordering from room_memberships - ORDER BY event_stream_ordering DESC - LIMIT 1; - """ - txn.execute(sql) - res = txn.fetchone() - if not res or not res[0]: - return 0 - return res[0] - - def _background_populate_participant_txn( - txn: LoggingTransaction, stream_token: str - ) -> None: - sql = """ - UPDATE room_memberships - SET participant = True - FROM ( - SELECT DISTINCT c.state_key, e.room_id - FROM current_state_events AS c - INNER JOIN events AS e ON c.room_id = e.room_id - WHERE c.membership = 'join' - AND c.state_key = e.sender - AND ( - e.type = 'm.room.message' - OR e.type = 'm.room.encrypted' - ) - ) AS subquery - WHERE room_memberships.user_id = subquery.state_key - AND room_memberships.room_id = subquery.room_id - AND room_memberships.event_stream_ordering <= ? - AND room_memberships.event_stream_ordering > ?; - """ - batch = int(stream_token) - _POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE - txn.execute(sql, (stream_token, batch)) - - if stream_token is None: - stream_token = await self.db_pool.runInteraction( - "_get_max_stream_token", _get_max_stream_token_txn - ) - - if stream_token < 0: - await self.db_pool.updates._end_background_update( - "populate_participant_bg_update" - ) - return _POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE - - await self.db_pool.runInteraction( - "_background_populate_participant_txn", - _background_populate_participant_txn, - stream_token, - ) - - progress["last_stream_token"] = ( - stream_token - _POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE - ) - await self.db_pool.runInteraction( - "populate_participant_bg_update", - self.db_pool.updates._background_update_progress_txn, - "populate_participant_bg_update", - progress, - ) - return _POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE - async def _background_add_membership_profile( self, progress: JsonDict, batch_size: int ) -> int: diff --git a/synapse/storage/schema/main/delta/90/01_add_column_participant_room_memberships_table.sql b/synapse/storage/schema/main/delta/90/01_add_column_participant_room_memberships_table.sql
index 672be1031e..dafd046499 100644 --- a/synapse/storage/schema/main/delta/90/01_add_column_participant_room_memberships_table.sql +++ b/synapse/storage/schema/main/delta/90/01_add_column_participant_room_memberships_table.sql
@@ -13,8 +13,4 @@ -- Add a column `participant` to `room_memberships` table to track whether a room member has sent -- a `m.room.message` or `m.room.encrypted` event into a room they are a member of -ALTER TABLE room_memberships ADD COLUMN participant BOOLEAN DEFAULT FALSE; - --- Add a background update to populate `participant` column -INSERT INTO background_updates (ordering, update_name, progress_json) VALUES - (9001, 'populate_participant_bg_update', '{}'); \ No newline at end of file +ALTER TABLE room_memberships ADD COLUMN participant BOOLEAN DEFAULT FALSE; \ No newline at end of file diff --git a/synapse/storage/schema/main/delta/92/02_remove_populate_participant_bg_update.sql b/synapse/storage/schema/main/delta/92/02_remove_populate_participant_bg_update.sql new file mode 100644
index 0000000000..e1f377c37d --- /dev/null +++ b/synapse/storage/schema/main/delta/92/02_remove_populate_participant_bg_update.sql
@@ -0,0 +1,17 @@ +-- +-- This file is licensed under the Affero General Public License (AGPL) version 3. +-- +-- Copyright (C) 2025 New Vector, Ltd +-- +-- This program is free software: you can redistribute it and/or modify +-- it under the terms of the GNU Affero General Public License as +-- published by the Free Software Foundation, either version 3 of the +-- License, or (at your option) any later version. +-- +-- See the GNU Affero General Public License for more details: +-- <https://www.gnu.org/licenses/agpl-3.0.html>. + +-- Remove the background update if it was scheduled, as it is not rollback-safe +-- See https://github.com/element-hq/synapse/issues/18356 for context +DELETE FROM background_updates +WHERE update_name = 'populate_participant_bg_update'; \ No newline at end of file