summary refs log tree commit diff
diff options
context:
space:
mode:
authorEric Eastwood <eric.eastwood@beta.gouv.fr>2024-08-07 16:37:17 -0500
committerEric Eastwood <eric.eastwood@beta.gouv.fr>2024-08-07 16:37:17 -0500
commit87d95615d4fdc199f9196c19eb519781e78739ca (patch)
tree58556cf87c19864d7a0822ed0d53f71ce81a6a2c
parentServer left room test (diff)
downloadsynapse-87d95615d4fdc199f9196c19eb519781e78739ca.tar.xz
Change to updating the latest membership in the room
-rw-r--r--synapse/storage/databases/main/events.py196
-rw-r--r--synapse/storage/schema/main/delta/87/01_sliding_sync_memberships.sql6
2 files changed, 113 insertions, 89 deletions
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index ef73740ef8..6f9ab3c31f 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -42,7 +42,12 @@ import attr
 from prometheus_client import Counter
 
 import synapse.metrics
-from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
+from synapse.api.constants import (
+    EventContentFields,
+    EventTypes,
+    RelationTypes,
+    Membership,
+)
 from synapse.api.errors import PartialStateConflictError
 from synapse.api.room_versions import RoomVersions
 from synapse.events import EventBase, relation_from_event
@@ -1392,102 +1397,119 @@ class PersistEventsStore:
         # We now update `sliding_sync_non_join_memberships`. We do this regardless of
         # whether the server is still in the room or not because we still want a row if
         # a local user was just left/kicked or got banned from the room.
+        if to_delete:
+            txn.execute_batch(
+                "DELETE FROM sliding_sync_non_join_memberships"
+                " WHERE room_id = ? AND user_id = ?",
+                (
+                    (room_id, state_key)
+                    for event_type, state_key in to_delete
+                    if event_type == EventTypes.Member and self.is_mine_id(state_key)
+                ),
+            )
+
         if to_insert:
             membership_event_id_to_user_id_map: Dict[str, str] = {}
             for state_key, event_id in to_insert.items():
                 if state_key[0] == EventTypes.Member and self.is_mine_id(state_key[1]):
                     membership_event_id_to_user_id_map[event_id] = state_key[1]
 
-            # Fetch the events from the database
-            #
-            # TODO: We should gather this data before we delete the
-            # `current_state_events` in a `no_longer_in_room` situation.
-            (
-                event_type_and_state_key_in_list_clause,
-                event_type_and_state_key_args,
-            ) = make_tuple_in_list_sql_clause(
-                self.database_engine,
-                ("type", "state_key"),
-                [
-                    (EventTypes.Create, ""),
-                    (EventTypes.RoomEncryption, ""),
-                    (EventTypes.Name, ""),
-                ],
-            )
-            txn.execute(
-                f"""
-                SELECT c.event_id, c.type, c.state_key, j.json
-                FROM current_state_events AS c
-                INNER JOIN event_json AS j USING (event_id)
-                WHERE
-                    c.room_id = ?
-                    AND {event_type_and_state_key_in_list_clause}
-                """,
-                [room_id] + event_type_and_state_key_args,
-            )
+            if len(membership_event_id_to_user_id_map) > 0:
+                # Fetch the events from the database
+                #
+                # TODO: We should gather this data before we delete the
+                # `current_state_events` in a `no_longer_in_room` situation.
+                (
+                    event_type_and_state_key_in_list_clause,
+                    event_type_and_state_key_args,
+                ) = make_tuple_in_list_sql_clause(
+                    self.database_engine,
+                    ("type", "state_key"),
+                    [
+                        (EventTypes.Create, ""),
+                        (EventTypes.RoomEncryption, ""),
+                        (EventTypes.Name, ""),
+                    ],
+                )
+                txn.execute(
+                    f"""
+                    SELECT c.event_id, c.type, c.state_key, j.json
+                    FROM current_state_events AS c
+                    INNER JOIN event_json AS j USING (event_id)
+                    WHERE
+                        c.room_id = ?
+                        AND {event_type_and_state_key_in_list_clause}
+                    """,
+                    [room_id] + event_type_and_state_key_args,
+                )
 
-            # Parse the raw event JSON
-            sliding_sync_non_joined_rooms_insert_map: Dict[
-                str, Optional[Union[str, bool]]
-            ] = {}
-            for row in txn:
-                event_id, event_type, state_key, json = row
-                event_json = db_to_json(json)
+                # Parse the raw event JSON
+                sliding_sync_non_joined_rooms_insert_map: Dict[
+                    str, Optional[Union[str, bool]]
+                ] = {}
+                for row in txn:
+                    event_id, event_type, state_key, json = row
+                    event_json = db_to_json(json)
+
+                    if event_type == EventTypes.Create:
+                        room_type = event_json.get("content", {}).get(
+                            EventContentFields.ROOM_TYPE
+                        )
+                        sliding_sync_non_joined_rooms_insert_map["room_type"] = (
+                            room_type
+                        )
+                    elif event_type == EventTypes.RoomEncryption:
+                        is_encrypted = event_json.get("content", {}).get(
+                            EventContentFields.ENCRYPTION_ALGORITHM
+                        )
+                        sliding_sync_non_joined_rooms_insert_map["is_encrypted"] = (
+                            is_encrypted
+                        )
+                    elif event_type == EventTypes.Name:
+                        room_name = event_json.get("content", {}).get(
+                            EventContentFields.ROOM_NAME
+                        )
+                        sliding_sync_non_joined_rooms_insert_map["room_name"] = (
+                            room_name
+                        )
+                    else:
+                        raise AssertionError(
+                            f"Unexpected event (we should not be fetching extra events): ({event_type}, {state_key})"
+                        )
 
-                if event_type == EventTypes.Create:
-                    room_type = event_json.get("content", {}).get(
-                        EventContentFields.ROOM_TYPE
-                    )
-                    sliding_sync_non_joined_rooms_insert_map["room_type"] = room_type
-                elif event_type == EventTypes.RoomEncryption:
-                    is_encrypted = event_json.get("content", {}).get(
-                        EventContentFields.ENCRYPTION_ALGORITHM
-                    )
-                    sliding_sync_non_joined_rooms_insert_map["is_encrypted"] = (
-                        is_encrypted
-                    )
-                elif event_type == EventTypes.Name:
-                    room_name = event_json.get("content", {}).get(
-                        EventContentFields.ROOM_NAME
-                    )
-                    sliding_sync_non_joined_rooms_insert_map["room_name"] = room_name
-                else:
-                    raise AssertionError(
-                        f"Unexpected event (we should not be fetching extra events): ({event_type}, {state_key})"
+                # Update the `sliding_sync_non_join_memberships` table
+                insert_keys = sliding_sync_non_joined_rooms_insert_map.keys()
+                insert_values = sliding_sync_non_joined_rooms_insert_map.values()
+                # TODO: Only do this for non-join membership
+                txn.execute_batch(
+                    f"""
+                    INSERT INTO sliding_sync_non_join_memberships
+                        (room_id, user_id, membership_event_id, membership, event_stream_ordering, {", ".join(insert_keys)})
+                    VALUES (
+                        ?, ?, ?,
+                        (SELECT membership FROM room_memberships WHERE event_id = ?),
+                        (SELECT stream_ordering FROM events WHERE event_id = ?),
+                        {", ".join("?" for _ in insert_values)}
                     )
-
-            # Update the `sliding_sync_non_join_memberships` table
-            insert_keys = sliding_sync_non_joined_rooms_insert_map.keys()
-            insert_values = sliding_sync_non_joined_rooms_insert_map.values()
-            # We `DO NOTHING` on conflict because if the row is already in the database,
-            # we just assume that it was already processed (values should be the same anyways).
-            #
-            # TODO: Only do this for non-join membership
-            txn.execute_batch(
-                f"""
-                INSERT INTO sliding_sync_non_join_memberships
-                    (room_id, membership_event_id, user_id, membership, event_stream_ordering, {", ".join(insert_keys)})
-                VALUES (
-                    ?, ?, ?,
-                    (SELECT membership FROM room_memberships WHERE event_id = ?),
-                    (SELECT stream_ordering FROM events WHERE event_id = ?),
-                    {", ".join("?" for _ in insert_values)}
-                )
-                ON CONFLICT (room_id, membership_event_id)
-                DO NOTHING
-                """,
-                [
+                    ON CONFLICT (room_id, user_id)
+                    DO UPDATE SET
+                        membership_event_id = EXCLUDED.membership_event_id,
+                        membership = EXCLUDED.membership,
+                        event_stream_ordering = EXCLUDED.event_stream_ordering,
+                        {", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)}
+                    """,
                     [
-                        room_id,
-                        membership_event_id,
-                        user_id,
-                        membership_event_id,
-                        membership_event_id,
-                    ]
-                    + list(insert_values)
-                    for membership_event_id, user_id in membership_event_id_to_user_id_map.items()
-                ],
-            )
+                        [
+                            room_id,
+                            user_id,
+                            membership_event_id,
+                            membership_event_id,
+                            membership_event_id,
+                        ]
+                        + list(insert_values)
+                        for membership_event_id, user_id in membership_event_id_to_user_id_map.items()
+                    ],
+                )
 
         txn.call_after(
             self.store._curr_state_delta_stream_cache.entity_has_changed,
diff --git a/synapse/storage/schema/main/delta/87/01_sliding_sync_memberships.sql b/synapse/storage/schema/main/delta/87/01_sliding_sync_memberships.sql
index d496a5a91a..1afdecd8e9 100644
--- a/synapse/storage/schema/main/delta/87/01_sliding_sync_memberships.sql
+++ b/synapse/storage/schema/main/delta/87/01_sliding_sync_memberships.sql
@@ -36,8 +36,8 @@ CREATE TABLE IF NOT EXISTS sliding_sync_joined_rooms(
 -- the membership event itself as the `bump_stamp`.
 CREATE TABLE IF NOT EXISTS sliding_sync_non_join_memberships(
     room_id TEXT NOT NULL REFERENCES rooms(room_id),
-    membership_event_id TEXT NOT NULL REFERENCES events(event_id),
     user_id TEXT NOT NULL,
+    membership_event_id TEXT NOT NULL REFERENCES events(event_id),
     membership TEXT NOT NULL,
     -- `stream_ordering` of the `membership_event_id`
     event_stream_ordering BIGINT REFERENCES events(stream_ordering),
@@ -50,6 +50,8 @@ CREATE TABLE IF NOT EXISTS sliding_sync_non_join_memberships(
     -- `m.room.encryption` -> `content.algorithm` (according to the current state at the
     -- time of the membership)
     is_encrypted BOOLEAN,
-    PRIMARY KEY (room_id, membership_event_id)
+    PRIMARY KEY (room_id, user_id)
 );
 
+CREATE UNIQUE INDEX IF NOT EXISTS sliding_sync_non_join_memberships_event_stream_ordering ON sliding_sync_non_join_memberships(event_stream_ordering);
+CREATE UNIQUE INDEX IF NOT EXISTS sliding_sync_non_join_memberships_membership_event_id ON sliding_sync_non_join_memberships(membership_event_id);