summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2022-09-28 14:42:43 +0100
committerGitHub <noreply@github.com>2022-09-28 13:42:43 +0000
commit4b17a5ace846d82b09fccce79da77a8207a6765f (patch)
tree053a043932b0cbfc26a86347c3b09d31b8cb032b /synapse/storage
parentfix: Push notifications for invite over federation (#13719) (diff)
downloadsynapse-4b17a5ace846d82b09fccce79da77a8207a6765f.tar.xz
Handle remote device list updates during partial join (#13913)
c.f. #12993 (comment), point 3

This stores all device list updates that we receive while partial joins are ongoing, and processes them once we have the full state.

Note: We don't actually process the device lists in the same ways as if we weren't partially joined. Instead of updating the device list remote cache, we simply notify local users that a change in the remote user's devices has happened. I think this is safe as if the local user requests the keys for the remote user and we don't have them we'll simply fetch them as normal.
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/devices.py55
-rw-r--r--synapse/storage/databases/main/room.py20
-rw-r--r--synapse/storage/schema/main/delta/73/04pending_device_list_updates.sql28
3 files changed, 103 insertions, 0 deletions
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 1151fb0cc3..1e562d4a40 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -1995,3 +1995,58 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
                 add_device_list_outbound_pokes_txn,
                 stream_ids,
             )
+
+    async def add_remote_device_list_to_pending(
+        self, user_id: str, device_id: str
+    ) -> None:
+        """Add a device list update to the table tracking remote device list
+        updates during partial joins.
+        """
+
+        async with self._device_list_id_gen.get_next() as stream_id:  # type: ignore[attr-defined]
+            await self.db_pool.simple_upsert(
+                table="device_lists_remote_pending",
+                keyvalues={
+                    "user_id": user_id,
+                    "device_id": device_id,
+                },
+                values={"stream_id": stream_id},
+                desc="add_remote_device_list_to_pending",
+            )
+
+    async def get_pending_remote_device_list_updates_for_room(
+        self, room_id: str
+    ) -> Collection[Tuple[str, str]]:
+        """Get the set of remote device list updates from the pending table for
+        the room.
+        """
+
+        min_device_stream_id = await self.db_pool.simple_select_one_onecol(
+            table="partial_state_rooms",
+            keyvalues={
+                "room_id": room_id,
+            },
+            retcol="device_lists_stream_id",
+            desc="get_pending_remote_device_list_updates_for_room_device",
+        )
+
+        sql = """
+            SELECT user_id, device_id FROM device_lists_remote_pending AS d
+            INNER JOIN current_state_events AS c ON
+                type = 'm.room.member'
+                AND state_key = user_id
+                AND membership = 'join'
+            WHERE
+                room_id = ? AND stream_id > ?
+        """
+
+        def get_pending_remote_device_list_updates_for_room_txn(
+            txn: LoggingTransaction,
+        ) -> Collection[Tuple[str, str]]:
+            txn.execute(sql, (room_id, min_device_stream_id))
+            return cast(Collection[Tuple[str, str]], txn.fetchall())
+
+        return await self.db_pool.runInteraction(
+            "get_pending_remote_device_list_updates_for_room",
+            get_pending_remote_device_list_updates_for_room_txn,
+        )
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 064c332fb7..672c9a03fc 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -1217,6 +1217,26 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
         )
         self._invalidate_cache_and_stream(txn, self.is_partial_state_room, (room_id,))
 
+        # We now delete anything from `device_lists_remote_pending` with a
+        # stream ID less than the minimum
+        # `partial_state_rooms.device_lists_stream_id`, as we no longer need them.
+        device_lists_stream_id = DatabasePool.simple_select_one_onecol_txn(
+            txn,
+            table="partial_state_rooms",
+            keyvalues={},
+            retcol="MIN(device_lists_stream_id)",
+            allow_none=True,
+        )
+        if device_lists_stream_id is None:
+            # There are no rooms being currently partially joined, so we delete everything.
+            txn.execute("DELETE FROM device_lists_remote_pending")
+        else:
+            sql = """
+                DELETE FROM device_lists_remote_pending
+                WHERE stream_id <= ?
+            """
+            txn.execute(sql, (device_lists_stream_id,))
+
     @cached()
     async def is_partial_state_room(self, room_id: str) -> bool:
         """Checks if this room has partial state.
diff --git a/synapse/storage/schema/main/delta/73/04pending_device_list_updates.sql b/synapse/storage/schema/main/delta/73/04pending_device_list_updates.sql
new file mode 100644
index 0000000000..dbd78d677d
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/04pending_device_list_updates.sql
@@ -0,0 +1,28 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Stores remote device lists we have received for remote users while a partial
+-- join is in progress.
+--
+-- This allows us to replay any device list updates if it turns out the remote
+-- user was in the partially joined room
+CREATE TABLE device_lists_remote_pending(
+    stream_id BIGINT PRIMARY KEY,
+    user_id TEXT NOT NULL,
+    device_id TEXT NOT NULL
+);
+
+-- We only keep the most recent update for a given user/device pair.
+CREATE UNIQUE INDEX device_lists_remote_pending_user_device_id ON device_lists_remote_pending(user_id, device_id);