diff options
-rw-r--r-- | synapse/storage/databases/main/events.py | 196 | ||||
-rw-r--r-- | synapse/storage/schema/main/delta/87/01_sliding_sync_memberships.sql | 6 |
2 files changed, 113 insertions, 89 deletions
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index ef73740ef8..6f9ab3c31f 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -42,7 +42,12 @@ import attr from prometheus_client import Counter import synapse.metrics -from synapse.api.constants import EventContentFields, EventTypes, RelationTypes +from synapse.api.constants import ( + EventContentFields, + EventTypes, + RelationTypes, + Membership, +) from synapse.api.errors import PartialStateConflictError from synapse.api.room_versions import RoomVersions from synapse.events import EventBase, relation_from_event @@ -1392,102 +1397,119 @@ class PersistEventsStore: # We now update `sliding_sync_non_join_memberships`. We do this regardless of # whether the server is still in the room or not because we still want a row if # a local user was just left/kicked or got banned from the room. + if to_delete: + txn.execute_batch( + "DELETE FROM sliding_sync_non_join_memberships" + " WHERE room_id = ? AND user_id = ?", + ( + (room_id, state_key) + for event_type, state_key in to_delete + if event_type == EventTypes.Member and self.is_mine_id(state_key) + ), + ) + if to_insert: membership_event_id_to_user_id_map: Dict[str, str] = {} for state_key, event_id in to_insert.items(): if state_key[0] == EventTypes.Member and self.is_mine_id(state_key[1]): membership_event_id_to_user_id_map[event_id] = state_key[1] - # Fetch the events from the database - # - # TODO: We should gather this data before we delete the - # `current_state_events` in a `no_longer_in_room` situation. - ( - event_type_and_state_key_in_list_clause, - event_type_and_state_key_args, - ) = make_tuple_in_list_sql_clause( - self.database_engine, - ("type", "state_key"), - [ - (EventTypes.Create, ""), - (EventTypes.RoomEncryption, ""), - (EventTypes.Name, ""), - ], - ) - txn.execute( - f""" - SELECT c.event_id, c.type, c.state_key, j.json - FROM current_state_events AS c - INNER JOIN event_json AS j USING (event_id) - WHERE - c.room_id = ? - AND {event_type_and_state_key_in_list_clause} - """, - [room_id] + event_type_and_state_key_args, - ) + if len(membership_event_id_to_user_id_map) > 0: + # Fetch the events from the database + # + # TODO: We should gather this data before we delete the + # `current_state_events` in a `no_longer_in_room` situation. + ( + event_type_and_state_key_in_list_clause, + event_type_and_state_key_args, + ) = make_tuple_in_list_sql_clause( + self.database_engine, + ("type", "state_key"), + [ + (EventTypes.Create, ""), + (EventTypes.RoomEncryption, ""), + (EventTypes.Name, ""), + ], + ) + txn.execute( + f""" + SELECT c.event_id, c.type, c.state_key, j.json + FROM current_state_events AS c + INNER JOIN event_json AS j USING (event_id) + WHERE + c.room_id = ? + AND {event_type_and_state_key_in_list_clause} + """, + [room_id] + event_type_and_state_key_args, + ) - # Parse the raw event JSON - sliding_sync_non_joined_rooms_insert_map: Dict[ - str, Optional[Union[str, bool]] - ] = {} - for row in txn: - event_id, event_type, state_key, json = row - event_json = db_to_json(json) + # Parse the raw event JSON + sliding_sync_non_joined_rooms_insert_map: Dict[ + str, Optional[Union[str, bool]] + ] = {} + for row in txn: + event_id, event_type, state_key, json = row + event_json = db_to_json(json) + + if event_type == EventTypes.Create: + room_type = event_json.get("content", {}).get( + EventContentFields.ROOM_TYPE + ) + sliding_sync_non_joined_rooms_insert_map["room_type"] = ( + room_type + ) + elif event_type == EventTypes.RoomEncryption: + is_encrypted = event_json.get("content", {}).get( + EventContentFields.ENCRYPTION_ALGORITHM + ) + sliding_sync_non_joined_rooms_insert_map["is_encrypted"] = ( + is_encrypted + ) + elif event_type == EventTypes.Name: + room_name = event_json.get("content", {}).get( + EventContentFields.ROOM_NAME + ) + sliding_sync_non_joined_rooms_insert_map["room_name"] = ( + room_name + ) + else: + raise AssertionError( + f"Unexpected event (we should not be fetching extra events): ({event_type}, {state_key})" + ) - if event_type == EventTypes.Create: - room_type = event_json.get("content", {}).get( - EventContentFields.ROOM_TYPE - ) - sliding_sync_non_joined_rooms_insert_map["room_type"] = room_type - elif event_type == EventTypes.RoomEncryption: - is_encrypted = event_json.get("content", {}).get( - EventContentFields.ENCRYPTION_ALGORITHM - ) - sliding_sync_non_joined_rooms_insert_map["is_encrypted"] = ( - is_encrypted - ) - elif event_type == EventTypes.Name: - room_name = event_json.get("content", {}).get( - EventContentFields.ROOM_NAME - ) - sliding_sync_non_joined_rooms_insert_map["room_name"] = room_name - else: - raise AssertionError( - f"Unexpected event (we should not be fetching extra events): ({event_type}, {state_key})" + # Update the `sliding_sync_non_join_memberships` table + insert_keys = sliding_sync_non_joined_rooms_insert_map.keys() + insert_values = sliding_sync_non_joined_rooms_insert_map.values() + # TODO: Only do this for non-join membership + txn.execute_batch( + f""" + INSERT INTO sliding_sync_non_join_memberships + (room_id, user_id, membership_event_id, membership, event_stream_ordering, {", ".join(insert_keys)}) + VALUES ( + ?, ?, ?, + (SELECT membership FROM room_memberships WHERE event_id = ?), + (SELECT stream_ordering FROM events WHERE event_id = ?), + {", ".join("?" for _ in insert_values)} ) - - # Update the `sliding_sync_non_join_memberships` table - insert_keys = sliding_sync_non_joined_rooms_insert_map.keys() - insert_values = sliding_sync_non_joined_rooms_insert_map.values() - # We `DO NOTHING` on conflict because if the row is already in the database, - # we just assume that it was already processed (values should be the same anyways). - # - # TODO: Only do this for non-join membership - txn.execute_batch( - f""" - INSERT INTO sliding_sync_non_join_memberships - (room_id, membership_event_id, user_id, membership, event_stream_ordering, {", ".join(insert_keys)}) - VALUES ( - ?, ?, ?, - (SELECT membership FROM room_memberships WHERE event_id = ?), - (SELECT stream_ordering FROM events WHERE event_id = ?), - {", ".join("?" for _ in insert_values)} - ) - ON CONFLICT (room_id, membership_event_id) - DO NOTHING - """, - [ + ON CONFLICT (room_id, user_id) + DO UPDATE SET + membership_event_id = EXCLUDED.membership_event_id, + membership = EXCLUDED.membership, + event_stream_ordering = EXCLUDED.event_stream_ordering, + {", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)} + """, [ - room_id, - membership_event_id, - user_id, - membership_event_id, - membership_event_id, - ] - + list(insert_values) - for membership_event_id, user_id in membership_event_id_to_user_id_map.items() - ], - ) + [ + room_id, + user_id, + membership_event_id, + membership_event_id, + membership_event_id, + ] + + list(insert_values) + for membership_event_id, user_id in membership_event_id_to_user_id_map.items() + ], + ) txn.call_after( self.store._curr_state_delta_stream_cache.entity_has_changed, diff --git a/synapse/storage/schema/main/delta/87/01_sliding_sync_memberships.sql b/synapse/storage/schema/main/delta/87/01_sliding_sync_memberships.sql index d496a5a91a..1afdecd8e9 100644 --- a/synapse/storage/schema/main/delta/87/01_sliding_sync_memberships.sql +++ b/synapse/storage/schema/main/delta/87/01_sliding_sync_memberships.sql @@ -36,8 +36,8 @@ CREATE TABLE IF NOT EXISTS sliding_sync_joined_rooms( -- the membership event itself as the `bump_stamp`. CREATE TABLE IF NOT EXISTS sliding_sync_non_join_memberships( room_id TEXT NOT NULL REFERENCES rooms(room_id), - membership_event_id TEXT NOT NULL REFERENCES events(event_id), user_id TEXT NOT NULL, + membership_event_id TEXT NOT NULL REFERENCES events(event_id), membership TEXT NOT NULL, -- `stream_ordering` of the `membership_event_id` event_stream_ordering BIGINT REFERENCES events(stream_ordering), @@ -50,6 +50,8 @@ CREATE TABLE IF NOT EXISTS sliding_sync_non_join_memberships( -- `m.room.encryption` -> `content.algorithm` (according to the current state at the -- time of the membership) is_encrypted BOOLEAN, - PRIMARY KEY (room_id, membership_event_id) + PRIMARY KEY (room_id, user_id) ); +CREATE UNIQUE INDEX IF NOT EXISTS sliding_sync_non_join_memberships_event_stream_ordering ON sliding_sync_non_join_memberships(event_stream_ordering); +CREATE UNIQUE INDEX IF NOT EXISTS sliding_sync_non_join_memberships_membership_event_id ON sliding_sync_non_join_memberships(membership_event_id); |