summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/device.py62
-rw-r--r--synapse/handlers/federation.py4
-rw-r--r--synapse/storage/databases/main/devices.py55
-rw-r--r--synapse/storage/databases/main/room.py20
-rw-r--r--synapse/storage/schema/main/delta/73/04pending_device_list_updates.sql28
5 files changed, 169 insertions, 0 deletions
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index bad262731c..f2ef591103 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -309,6 +309,17 @@ class DeviceWorkerHandler:
             "self_signing_key": self_signing_key,
         }
 
+    async def handle_room_un_partial_stated(self, room_id: str) -> None:
+        """Handles sending appropriate device list updates in a room that has
+        gone from partial to full state.
+        """
+
+        # TODO(faster_joins): worker mode support
+        #   https://github.com/matrix-org/synapse/issues/12994
+        logger.error(
+            "Trying handling device list state for partial join: not supported on workers."
+        )
+
 
 class DeviceHandler(DeviceWorkerHandler):
     def __init__(self, hs: "HomeServer"):
@@ -746,6 +757,15 @@ class DeviceHandler(DeviceWorkerHandler):
         finally:
             self._handle_new_device_update_is_processing = False
 
+    async def handle_room_un_partial_stated(self, room_id: str) -> None:
+        """Handles sending appropriate device list updates in a room that has
+        gone from partial to full state.
+        """
+
+        # We defer to the device list updater implementation as we're on the
+        # right worker.
+        await self.device_list_updater.handle_room_un_partial_stated(room_id)
+
 
 def _update_device_from_client_ips(
     device: JsonDict, client_ips: Mapping[Tuple[str, str], Mapping[str, Any]]
@@ -836,6 +856,16 @@ class DeviceListUpdater:
             )
             return
 
+        # Check if we are partially joining any rooms. If so we need to store
+        # all device list updates so that we can handle them correctly once we
+        # know who is in the room.
+        partial_rooms = await self.store.get_partial_state_rooms_and_servers()
+        if partial_rooms:
+            await self.store.add_remote_device_list_to_pending(
+                user_id,
+                device_id,
+            )
+
         room_ids = await self.store.get_rooms_for_user(user_id)
         if not room_ids:
             # We don't share any rooms with this user. Ignore update, as we
@@ -1175,3 +1205,35 @@ class DeviceListUpdater:
             device_ids.append(verify_key.version)
 
         return device_ids
+
+    async def handle_room_un_partial_stated(self, room_id: str) -> None:
+        """Handles sending appropriate device list updates in a room that has
+        gone from partial to full state.
+        """
+
+        pending_updates = (
+            await self.store.get_pending_remote_device_list_updates_for_room(room_id)
+        )
+
+        for user_id, device_id in pending_updates:
+            logger.info(
+                "Got pending device list update in room %s: %s / %s",
+                room_id,
+                user_id,
+                device_id,
+            )
+            position = await self.store.add_device_change_to_streams(
+                user_id,
+                [device_id],
+                room_ids=[room_id],
+            )
+
+            if not position:
+                # This should only happen if there are no updates, which
+                # shouldn't happen when we've passed in a non-empty set of
+                # device IDs.
+                continue
+
+            self.device_handler.notifier.on_new_event(
+                StreamKeyType.DEVICE_LIST, position, rooms=[room_id]
+            )
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 8f847ff845..360ab6fee2 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -149,6 +149,7 @@ class FederationHandler:
         self.http_client = hs.get_proxied_blacklisted_http_client()
         self._replication = hs.get_replication_data_handler()
         self._federation_event_handler = hs.get_federation_event_handler()
+        self._device_handler = hs.get_device_handler()
         self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator()
 
         self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client(
@@ -1631,6 +1632,9 @@ class FederationHandler:
                 #   https://github.com/matrix-org/synapse/issues/12994
                 await self.state_handler.update_current_state(room_id)
 
+                logger.info("Handling any pending device list updates")
+                await self._device_handler.handle_room_un_partial_stated(room_id)
+
                 logger.info("Clearing partial-state flag for %s", room_id)
                 success = await self.store.clear_partial_state_room(room_id)
                 if success:
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 1151fb0cc3..1e562d4a40 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -1995,3 +1995,58 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
                 add_device_list_outbound_pokes_txn,
                 stream_ids,
             )
+
+    async def add_remote_device_list_to_pending(
+        self, user_id: str, device_id: str
+    ) -> None:
+        """Add a device list update to the table tracking remote device list
+        updates during partial joins.
+        """
+
+        async with self._device_list_id_gen.get_next() as stream_id:  # type: ignore[attr-defined]
+            await self.db_pool.simple_upsert(
+                table="device_lists_remote_pending",
+                keyvalues={
+                    "user_id": user_id,
+                    "device_id": device_id,
+                },
+                values={"stream_id": stream_id},
+                desc="add_remote_device_list_to_pending",
+            )
+
+    async def get_pending_remote_device_list_updates_for_room(
+        self, room_id: str
+    ) -> Collection[Tuple[str, str]]:
+        """Get the set of remote device list updates from the pending table for
+        the room.
+        """
+
+        min_device_stream_id = await self.db_pool.simple_select_one_onecol(
+            table="partial_state_rooms",
+            keyvalues={
+                "room_id": room_id,
+            },
+            retcol="device_lists_stream_id",
+            desc="get_pending_remote_device_list_updates_for_room_device",
+        )
+
+        sql = """
+            SELECT user_id, device_id FROM device_lists_remote_pending AS d
+            INNER JOIN current_state_events AS c ON
+                type = 'm.room.member'
+                AND state_key = user_id
+                AND membership = 'join'
+            WHERE
+                room_id = ? AND stream_id > ?
+        """
+
+        def get_pending_remote_device_list_updates_for_room_txn(
+            txn: LoggingTransaction,
+        ) -> Collection[Tuple[str, str]]:
+            txn.execute(sql, (room_id, min_device_stream_id))
+            return cast(Collection[Tuple[str, str]], txn.fetchall())
+
+        return await self.db_pool.runInteraction(
+            "get_pending_remote_device_list_updates_for_room",
+            get_pending_remote_device_list_updates_for_room_txn,
+        )
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 064c332fb7..672c9a03fc 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -1217,6 +1217,26 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
         )
         self._invalidate_cache_and_stream(txn, self.is_partial_state_room, (room_id,))
 
+        # We now delete anything from `device_lists_remote_pending` with a
+        # stream ID less than the minimum
+        # `partial_state_rooms.device_lists_stream_id`, as we no longer need them.
+        device_lists_stream_id = DatabasePool.simple_select_one_onecol_txn(
+            txn,
+            table="partial_state_rooms",
+            keyvalues={},
+            retcol="MIN(device_lists_stream_id)",
+            allow_none=True,
+        )
+        if device_lists_stream_id is None:
+            # There are no rooms being currently partially joined, so we delete everything.
+            txn.execute("DELETE FROM device_lists_remote_pending")
+        else:
+            sql = """
+                DELETE FROM device_lists_remote_pending
+                WHERE stream_id <= ?
+            """
+            txn.execute(sql, (device_lists_stream_id,))
+
     @cached()
     async def is_partial_state_room(self, room_id: str) -> bool:
         """Checks if this room has partial state.
diff --git a/synapse/storage/schema/main/delta/73/04pending_device_list_updates.sql b/synapse/storage/schema/main/delta/73/04pending_device_list_updates.sql
new file mode 100644
index 0000000000..dbd78d677d
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/04pending_device_list_updates.sql
@@ -0,0 +1,28 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Stores remote device lists we have received for remote users while a partial
+-- join is in progress.
+--
+-- This allows us to replay any device list updates if it turns out the remote
+-- user was in the partially joined room
+CREATE TABLE device_lists_remote_pending(
+    stream_id BIGINT PRIMARY KEY,
+    user_id TEXT NOT NULL,
+    device_id TEXT NOT NULL
+);
+
+-- We only keep the most recent update for a given user/device pair.
+CREATE UNIQUE INDEX device_lists_remote_pending_user_device_id ON device_lists_remote_pending(user_id, device_id);