diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index f9cc5bddbc..c597639a7f 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -937,7 +937,10 @@ class DeviceListUpdater:
# Check if we are partially joining any rooms. If so we need to store
# all device list updates so that we can handle them correctly once we
# know who is in the room.
- partial_rooms = await self.store.get_partial_state_rooms_and_servers()
+ # TODO(faster joins): this fetches and processes a bunch of data that we don't
+ # use. Could be replaced by a tighter query e.g.
+ # SELECT EXISTS(SELECT 1 FROM partial_state_rooms)
+ partial_rooms = await self.store.get_partial_state_room_resync_info()
if partial_rooms:
await self.store.add_remote_device_list_to_pending(
user_id,
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 5f7e0a1f79..ccc045d36f 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -632,6 +632,7 @@ class FederationHandler:
room_id=room_id,
servers=ret.servers_in_room,
device_lists_stream_id=self.store.get_device_stream_token(),
+ joined_via=origin,
)
try:
@@ -1615,13 +1616,13 @@ class FederationHandler:
"""Resumes resyncing of all partial-state rooms after a restart."""
assert not self.config.worker.worker_app
- partial_state_rooms = await self.store.get_partial_state_rooms_and_servers()
- for room_id, servers_in_room in partial_state_rooms.items():
+ partial_state_rooms = await self.store.get_partial_state_room_resync_info()
+ for room_id, resync_info in partial_state_rooms.items():
run_as_background_process(
desc="sync_partial_state_room",
func=self._sync_partial_state_room,
- initial_destination=None,
- other_destinations=servers_in_room,
+ initial_destination=resync_info.joined_via,
+ other_destinations=resync_info.servers_in_room,
room_id=room_id,
)
@@ -1650,28 +1651,12 @@ class FederationHandler:
# really leave, that might mean we have difficulty getting the room state over
# federation.
# https://github.com/matrix-org/synapse/issues/12802
- #
- # TODO(faster_joins): we need some way of prioritising which homeservers in
- # `other_destinations` to try first, otherwise we'll spend ages trying dead
- # homeservers for large rooms.
- # https://github.com/matrix-org/synapse/issues/12999
-
- if initial_destination is None and len(other_destinations) == 0:
- raise ValueError(
- f"Cannot resync state of {room_id}: no destinations provided"
- )
# Make an infinite iterator of destinations to try. Once we find a working
# destination, we'll stick with it until it flakes.
- destinations: Collection[str]
- if initial_destination is not None:
- # Move `initial_destination` to the front of the list.
- destinations = list(other_destinations)
- if initial_destination in destinations:
- destinations.remove(initial_destination)
- destinations = [initial_destination] + destinations
- else:
- destinations = other_destinations
+ destinations = _prioritise_destinations_for_partial_state_resync(
+ initial_destination, other_destinations, room_id
+ )
destination_iter = itertools.cycle(destinations)
# `destination` is the current remote homeserver we're pulling from.
@@ -1769,3 +1754,29 @@ class FederationHandler:
room_id,
destination,
)
+
+
+def _prioritise_destinations_for_partial_state_resync(
+ initial_destination: Optional[str],
+ other_destinations: Collection[str],
+ room_id: str,
+) -> Collection[str]:
+ """Work out the order in which we should ask servers to resync events.
+
+ If an `initial_destination` is given, it takes top priority. Otherwise
+ all servers are treated equally.
+
+ :raises ValueError: if no destination is provided at all.
+ """
+ if initial_destination is None and len(other_destinations) == 0:
+ raise ValueError(f"Cannot resync state of {room_id}: no destinations provided")
+
+ if initial_destination is None:
+ return other_destinations
+
+ # Move `initial_destination` to the front of the list.
+ destinations = list(other_destinations)
+ if initial_destination in destinations:
+ destinations.remove(initial_destination)
+ destinations = [initial_destination] + destinations
+ return destinations
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 7bb21f8f81..4717c9728a 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -1658,7 +1658,7 @@ class DatabasePool:
table: string giving the table name
keyvalues: dict of column names and values to select the row with
retcol: string giving the name of the column to return
- allow_none: If true, return None instead of failing if the SELECT
+ allow_none: If true, return None instead of raising StoreError if the SELECT
statement returns no rows
desc: description of the transaction, for logging and metrics
"""
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index e41c99027a..7d97f8f60e 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -97,6 +97,12 @@ class RoomSortOrder(Enum):
STATE_EVENTS = "state_events"
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class PartialStateResyncInfo:
+ joined_via: Optional[str]
+ servers_in_room: List[str] = attr.ib(factory=list)
+
+
class RoomWorkerStore(CacheInvalidationWorkerStore):
def __init__(
self,
@@ -1160,17 +1166,29 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
desc="get_partial_state_servers_at_join",
)
- async def get_partial_state_rooms_and_servers(
+ async def get_partial_state_room_resync_info(
self,
- ) -> Mapping[str, Collection[str]]:
- """Get all rooms containing events with partial state, and the servers known
- to be in the room.
+ ) -> Mapping[str, PartialStateResyncInfo]:
+ """Get all rooms containing events with partial state, and the information
+ needed to restart a "resync" of those rooms.
Returns:
A dictionary of rooms with partial state, with room IDs as keys and
lists of servers in rooms as values.
"""
- room_servers: Dict[str, List[str]] = {}
+ room_servers: Dict[str, PartialStateResyncInfo] = {}
+
+ rows = await self.db_pool.simple_select_list(
+ table="partial_state_rooms",
+ keyvalues={},
+ retcols=("room_id", "joined_via"),
+ desc="get_server_which_served_partial_join",
+ )
+
+ for row in rows:
+ room_id = row["room_id"]
+ joined_via = row["joined_via"]
+ room_servers[room_id] = PartialStateResyncInfo(joined_via=joined_via)
rows = await self.db_pool.simple_select_list(
"partial_state_rooms_servers",
@@ -1182,7 +1200,15 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
for row in rows:
room_id = row["room_id"]
server_name = row["server_name"]
- room_servers.setdefault(room_id, []).append(server_name)
+ entry = room_servers.get(room_id)
+ if entry is None:
+ # There is a foreign key constraint which enforces that every room_id in
+ # partial_state_rooms_servers appears in partial_state_rooms. So we
+ # expect `entry` to be non-null. (This reasoning fails if we've
+ # partial-joined between the two SELECTs, but this is unlikely to happen
+ # in practice.)
+ continue
+ entry.servers_in_room.append(server_name)
return room_servers
@@ -1827,6 +1853,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
room_id: str,
servers: Collection[str],
device_lists_stream_id: int,
+ joined_via: str,
) -> None:
"""Mark the given room as containing events with partial state.
@@ -1842,6 +1869,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
servers: other servers known to be in the room
device_lists_stream_id: the device_lists stream ID at the time when we first
joined the room.
+ joined_via: the server name we requested a partial join from.
"""
await self.db_pool.runInteraction(
"store_partial_state_room",
@@ -1849,6 +1877,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
room_id,
servers,
device_lists_stream_id,
+ joined_via,
)
def _store_partial_state_room_txn(
@@ -1857,6 +1886,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
room_id: str,
servers: Collection[str],
device_lists_stream_id: int,
+ joined_via: str,
) -> None:
DatabasePool.simple_insert_txn(
txn,
@@ -1866,6 +1896,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
"device_lists_stream_id": device_lists_stream_id,
# To be updated later once the join event is persisted.
"join_event_id": None,
+ "joined_via": joined_via,
},
)
DatabasePool.simple_insert_many_txn(
diff --git a/synapse/storage/schema/main/delta/73/09partial_joined_via_destination.sql b/synapse/storage/schema/main/delta/73/09partial_joined_via_destination.sql
new file mode 100644
index 0000000000..066d602b18
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/09partial_joined_via_destination.sql
@@ -0,0 +1,18 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- When we resync partial state, we prioritise doing so using the server we
+-- partial-joined from. To do this we need to record which server that was!
+ALTER TABLE partial_state_rooms ADD COLUMN joined_via TEXT;
|