summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rwxr-xr-xsynapse/_scripts/synapse_port_db.py1
-rw-r--r--synapse/handlers/message.py6
-rw-r--r--synapse/storage/databases/main/roommember.py148
-rw-r--r--synapse/storage/schema/__init__.py5
-rw-r--r--synapse/storage/schema/main/delta/90/01_add_column_participant_room_memberships_table.sql20
5 files changed, 179 insertions, 1 deletions
diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py

index 3f67a739a0..1bb9940180 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py
@@ -128,6 +128,7 @@ BOOLEAN_COLUMNS = { "pushers": ["enabled"], "redactions": ["have_censored"], "remote_media_cache": ["authenticated"], + "room_memberships": ["participant"], "room_stats_state": ["is_federatable"], "rooms": ["is_public", "has_auth_chain_index"], "sliding_sync_joined_rooms": ["is_encrypted"], diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 4642b8b578..52c61cfa54 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py
@@ -1462,6 +1462,12 @@ class EventCreationHandler: ) return prev_event + if not event.is_state() and event.type in [ + EventTypes.Message, + EventTypes.Encrypted, + ]: + await self.store.set_room_participation(event.user_id, event.room_id) + if event.internal_metadata.is_out_of_band_membership(): # the only sort of out-of-band-membership events we expect to see here are # invite rejections and rescinded knocks that we have generated ourselves. diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 50ed6a28bf..a0a6dcd04e 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py
@@ -79,6 +79,7 @@ logger = logging.getLogger(__name__) _MEMBERSHIP_PROFILE_UPDATE_NAME = "room_membership_profile_update" _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME = "current_state_events_membership" +_POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE = 1000 @attr.s(frozen=True, slots=True, auto_attribs=True) @@ -1606,6 +1607,66 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): from_ts, ) + async def set_room_participation(self, user_id: str, room_id: str) -> None: + """ + Record the provided user as participating in the given room + + Args: + user_id: the user ID of the user + room_id: ID of the room to set the participant in + """ + + def _set_room_participation_txn( + txn: LoggingTransaction, user_id: str, room_id: str + ) -> None: + sql = """ + UPDATE room_memberships + SET participant = true + WHERE (user_id, room_id) IN ( + SELECT user_id, room_id + FROM room_memberships + WHERE user_id = ? + AND room_id = ? + ORDER BY event_stream_ordering DESC + LIMIT 1 + ) + """ + txn.execute(sql, (user_id, room_id)) + + await self.db_pool.runInteraction( + "_set_room_participation_txn", _set_room_participation_txn, user_id, room_id + ) + + async def get_room_participation(self, user_id: str, room_id: str) -> bool: + """ + Check whether a user is listed as a participant in a room + + Args: + user_id: user ID of the user + room_id: ID of the room to check in + """ + + def _get_room_participation_txn( + txn: LoggingTransaction, user_id: str, room_id: str + ) -> bool: + sql = """ + SELECT participant + FROM room_memberships + WHERE user_id = ? + AND room_id = ? + ORDER BY event_stream_ordering DESC + LIMIT 1 + """ + txn.execute(sql, (user_id, room_id)) + res = txn.fetchone() + if res: + return res[0] + return False + + return await self.db_pool.runInteraction( + "_get_room_participation_txn", _get_room_participation_txn, user_id, room_id + ) + class RoomMemberBackgroundUpdateStore(SQLBaseStore): def __init__( @@ -1636,6 +1697,93 @@ class RoomMemberBackgroundUpdateStore(SQLBaseStore): columns=["user_id", "room_id"], ) + self.db_pool.updates.register_background_update_handler( + "populate_participant_bg_update", self._populate_participant + ) + + async def _populate_participant(self, progress: JsonDict, batch_size: int) -> int: + """ + Background update to populate column `participant` on `room_memberships` table + + A 'participant' is someone who is currently joined to a room and has sent at least + one `m.room.message` or `m.room.encrypted` event. + + This background update will set the `participant` column across all rows in + `room_memberships` based on the user's *current* join status, and if + they've *ever* sent a message or encrypted event. Therefore one should + never assume the `participant` column's value is based solely on whether + the user participated in a previous "session" (where a "session" is defined + as a period between the user joining and leaving). See + https://github.com/element-hq/synapse/pull/18068#discussion_r1931070291 + for further detail. + """ + stream_token = progress.get("last_stream_token", None) + + def _get_max_stream_token_txn(txn: LoggingTransaction) -> int: + sql = """ + SELECT event_stream_ordering from room_memberships + ORDER BY event_stream_ordering DESC + LIMIT 1; + """ + txn.execute(sql) + res = txn.fetchone() + if not res or not res[0]: + return 0 + return res[0] + + def _background_populate_participant_txn( + txn: LoggingTransaction, stream_token: str + ) -> None: + sql = """ + UPDATE room_memberships + SET participant = True + FROM ( + SELECT DISTINCT c.state_key, e.room_id + FROM current_state_events AS c + INNER JOIN events AS e ON c.room_id = e.room_id + WHERE c.membership = 'join' + AND c.state_key = e.sender + AND ( + e.type = 'm.room.message' + OR e.type = 'm.room.encrypted' + ) + ) AS subquery + WHERE room_memberships.user_id = subquery.state_key + AND room_memberships.room_id = subquery.room_id + AND room_memberships.event_stream_ordering <= ? + AND room_memberships.event_stream_ordering > ?; + """ + batch = int(stream_token) - _POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE + txn.execute(sql, (stream_token, batch)) + + if stream_token is None: + stream_token = await self.db_pool.runInteraction( + "_get_max_stream_token", _get_max_stream_token_txn + ) + + if stream_token < 0: + await self.db_pool.updates._end_background_update( + "populate_participant_bg_update" + ) + return _POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE + + await self.db_pool.runInteraction( + "_background_populate_participant_txn", + _background_populate_participant_txn, + stream_token, + ) + + progress["last_stream_token"] = ( + stream_token - _POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE + ) + await self.db_pool.runInteraction( + "populate_participant_bg_update", + self.db_pool.updates._background_update_progress_txn, + "populate_participant_bg_update", + progress, + ) + return _POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE + async def _background_add_membership_profile( self, progress: JsonDict, batch_size: int ) -> int: diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index 49e648a92f..f87b1a4a0a 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py
@@ -19,7 +19,7 @@ # # -SCHEMA_VERSION = 89 # remember to update the list below when updating +SCHEMA_VERSION = 90 # remember to update the list below when updating """Represents the expectations made by the codebase about the database schema This should be incremented whenever the codebase changes its requirements on the @@ -158,6 +158,9 @@ Changes in SCHEMA_VERSION = 88 Changes in SCHEMA_VERSION = 89 - Add `state_groups_pending_deletion` and `state_groups_persisting` tables. + +Changes in SCHEMA_VERSION = 90 + - Add a column `participant` to `room_memberships` table """ diff --git a/synapse/storage/schema/main/delta/90/01_add_column_participant_room_memberships_table.sql b/synapse/storage/schema/main/delta/90/01_add_column_participant_room_memberships_table.sql new file mode 100644
index 0000000000..672be1031e --- /dev/null +++ b/synapse/storage/schema/main/delta/90/01_add_column_participant_room_memberships_table.sql
@@ -0,0 +1,20 @@ +-- +-- This file is licensed under the Affero General Public License (AGPL) version 3. +-- +-- Copyright (C) 2025 New Vector, Ltd +-- +-- This program is free software: you can redistribute it and/or modify +-- it under the terms of the GNU Affero General Public License as +-- published by the Free Software Foundation, either version 3 of the +-- License, or (at your option) any later version. +-- +-- See the GNU Affero General Public License for more details: +-- <https://www.gnu.org/licenses/agpl-3.0.html>. + +-- Add a column `participant` to `room_memberships` table to track whether a room member has sent +-- a `m.room.message` or `m.room.encrypted` event into a room they are a member of +ALTER TABLE room_memberships ADD COLUMN participant BOOLEAN DEFAULT FALSE; + +-- Add a background update to populate `participant` column +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (9001, 'populate_participant_bg_update', '{}'); \ No newline at end of file