diff --git a/changelog.d/13913.misc b/changelog.d/13913.misc
new file mode 100644
index 0000000000..30b4401049
--- /dev/null
+++ b/changelog.d/13913.misc
@@ -0,0 +1 @@
+Faster remote room joins: correctly handle remote device list updates during a partial join.
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index bad262731c..f2ef591103 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -309,6 +309,17 @@ class DeviceWorkerHandler:
"self_signing_key": self_signing_key,
}
+ async def handle_room_un_partial_stated(self, room_id: str) -> None:
+ """Handles sending appropriate device list updates in a room that has
+ gone from partial to full state.
+ """
+
+ # TODO(faster_joins): worker mode support
+ # https://github.com/matrix-org/synapse/issues/12994
+ logger.error(
+ "Trying handling device list state for partial join: not supported on workers."
+ )
+
class DeviceHandler(DeviceWorkerHandler):
def __init__(self, hs: "HomeServer"):
@@ -746,6 +757,15 @@ class DeviceHandler(DeviceWorkerHandler):
finally:
self._handle_new_device_update_is_processing = False
+ async def handle_room_un_partial_stated(self, room_id: str) -> None:
+ """Handles sending appropriate device list updates in a room that has
+ gone from partial to full state.
+ """
+
+ # We defer to the device list updater implementation as we're on the
+ # right worker.
+ await self.device_list_updater.handle_room_un_partial_stated(room_id)
+
def _update_device_from_client_ips(
device: JsonDict, client_ips: Mapping[Tuple[str, str], Mapping[str, Any]]
@@ -836,6 +856,16 @@ class DeviceListUpdater:
)
return
+ # Check if we are partially joining any rooms. If so we need to store
+ # all device list updates so that we can handle them correctly once we
+ # know who is in the room.
+ partial_rooms = await self.store.get_partial_state_rooms_and_servers()
+ if partial_rooms:
+ await self.store.add_remote_device_list_to_pending(
+ user_id,
+ device_id,
+ )
+
room_ids = await self.store.get_rooms_for_user(user_id)
if not room_ids:
# We don't share any rooms with this user. Ignore update, as we
@@ -1175,3 +1205,35 @@ class DeviceListUpdater:
device_ids.append(verify_key.version)
return device_ids
+
+ async def handle_room_un_partial_stated(self, room_id: str) -> None:
+ """Handles sending appropriate device list updates in a room that has
+ gone from partial to full state.
+ """
+
+ pending_updates = (
+ await self.store.get_pending_remote_device_list_updates_for_room(room_id)
+ )
+
+ for user_id, device_id in pending_updates:
+ logger.info(
+ "Got pending device list update in room %s: %s / %s",
+ room_id,
+ user_id,
+ device_id,
+ )
+ position = await self.store.add_device_change_to_streams(
+ user_id,
+ [device_id],
+ room_ids=[room_id],
+ )
+
+ if not position:
+ # This should only happen if there are no updates, which
+ # shouldn't happen when we've passed in a non-empty set of
+ # device IDs.
+ continue
+
+ self.device_handler.notifier.on_new_event(
+ StreamKeyType.DEVICE_LIST, position, rooms=[room_id]
+ )
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 8f847ff845..360ab6fee2 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -149,6 +149,7 @@ class FederationHandler:
self.http_client = hs.get_proxied_blacklisted_http_client()
self._replication = hs.get_replication_data_handler()
self._federation_event_handler = hs.get_federation_event_handler()
+ self._device_handler = hs.get_device_handler()
self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator()
self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client(
@@ -1631,6 +1632,9 @@ class FederationHandler:
# https://github.com/matrix-org/synapse/issues/12994
await self.state_handler.update_current_state(room_id)
+ logger.info("Handling any pending device list updates")
+ await self._device_handler.handle_room_un_partial_stated(room_id)
+
logger.info("Clearing partial-state flag for %s", room_id)
success = await self.store.clear_partial_state_room(room_id)
if success:
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 1151fb0cc3..1e562d4a40 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -1995,3 +1995,58 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
add_device_list_outbound_pokes_txn,
stream_ids,
)
+
+ async def add_remote_device_list_to_pending(
+ self, user_id: str, device_id: str
+ ) -> None:
+ """Add a device list update to the table tracking remote device list
+ updates during partial joins.
+ """
+
+ async with self._device_list_id_gen.get_next() as stream_id: # type: ignore[attr-defined]
+ await self.db_pool.simple_upsert(
+ table="device_lists_remote_pending",
+ keyvalues={
+ "user_id": user_id,
+ "device_id": device_id,
+ },
+ values={"stream_id": stream_id},
+ desc="add_remote_device_list_to_pending",
+ )
+
+ async def get_pending_remote_device_list_updates_for_room(
+ self, room_id: str
+ ) -> Collection[Tuple[str, str]]:
+ """Get the set of remote device list updates from the pending table for
+ the room.
+ """
+
+ min_device_stream_id = await self.db_pool.simple_select_one_onecol(
+ table="partial_state_rooms",
+ keyvalues={
+ "room_id": room_id,
+ },
+ retcol="device_lists_stream_id",
+ desc="get_pending_remote_device_list_updates_for_room_device",
+ )
+
+ sql = """
+ SELECT user_id, device_id FROM device_lists_remote_pending AS d
+ INNER JOIN current_state_events AS c ON
+ type = 'm.room.member'
+ AND state_key = user_id
+ AND membership = 'join'
+ WHERE
+ room_id = ? AND stream_id > ?
+ """
+
+ def get_pending_remote_device_list_updates_for_room_txn(
+ txn: LoggingTransaction,
+ ) -> Collection[Tuple[str, str]]:
+ txn.execute(sql, (room_id, min_device_stream_id))
+ return cast(Collection[Tuple[str, str]], txn.fetchall())
+
+ return await self.db_pool.runInteraction(
+ "get_pending_remote_device_list_updates_for_room",
+ get_pending_remote_device_list_updates_for_room_txn,
+ )
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 064c332fb7..672c9a03fc 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -1217,6 +1217,26 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
)
self._invalidate_cache_and_stream(txn, self.is_partial_state_room, (room_id,))
+ # We now delete anything from `device_lists_remote_pending` with a
+ # stream ID less than the minimum
+ # `partial_state_rooms.device_lists_stream_id`, as we no longer need them.
+ device_lists_stream_id = DatabasePool.simple_select_one_onecol_txn(
+ txn,
+ table="partial_state_rooms",
+ keyvalues={},
+ retcol="MIN(device_lists_stream_id)",
+ allow_none=True,
+ )
+ if device_lists_stream_id is None:
+ # There are no rooms being currently partially joined, so we delete everything.
+ txn.execute("DELETE FROM device_lists_remote_pending")
+ else:
+ sql = """
+ DELETE FROM device_lists_remote_pending
+ WHERE stream_id <= ?
+ """
+ txn.execute(sql, (device_lists_stream_id,))
+
@cached()
async def is_partial_state_room(self, room_id: str) -> bool:
"""Checks if this room has partial state.
diff --git a/synapse/storage/schema/main/delta/73/04pending_device_list_updates.sql b/synapse/storage/schema/main/delta/73/04pending_device_list_updates.sql
new file mode 100644
index 0000000000..dbd78d677d
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/04pending_device_list_updates.sql
@@ -0,0 +1,28 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Stores remote device lists we have received for remote users while a partial
+-- join is in progress.
+--
+-- This allows us to replay any device list updates if it turns out the remote
+-- user was in the partially joined room
+CREATE TABLE device_lists_remote_pending(
+ stream_id BIGINT PRIMARY KEY,
+ user_id TEXT NOT NULL,
+ device_id TEXT NOT NULL
+);
+
+-- We only keep the most recent update for a given user/device pair.
+CREATE UNIQUE INDEX device_lists_remote_pending_user_device_id ON device_lists_remote_pending(user_id, device_id);
|