summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorDavid Robertson <davidr@element.io>2022-10-18 12:33:18 +0100
committerGitHub <noreply@github.com>2022-10-18 12:33:18 +0100
commitc3a4780080a5bcb04132283c0f32f7452655792a (patch)
tree5df23de9727a043922709a76125ae3665e5a828e /synapse
parentAllow poetry-core 1.3.2 (#14217) (diff)
downloadsynapse-c3a4780080a5bcb04132283c0f32f7452655792a.tar.xz
When restarting a partial join resync, prioritise the server which actioned a partial join (#14126)
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/device.py5
-rw-r--r--synapse/handlers/federation.py57
-rw-r--r--synapse/storage/database.py2
-rw-r--r--synapse/storage/databases/main/room.py43
-rw-r--r--synapse/storage/schema/main/delta/73/09partial_joined_via_destination.sql18
5 files changed, 94 insertions, 31 deletions
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index f9cc5bddbc..c597639a7f 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -937,7 +937,10 @@ class DeviceListUpdater:
         # Check if we are partially joining any rooms. If so we need to store
         # all device list updates so that we can handle them correctly once we
         # know who is in the room.
-        partial_rooms = await self.store.get_partial_state_rooms_and_servers()
+        # TODO(faster joins): this fetches and processes a bunch of data that we don't
+        # use. Could be replaced by a tighter query e.g.
+        #   SELECT EXISTS(SELECT 1 FROM partial_state_rooms)
+        partial_rooms = await self.store.get_partial_state_room_resync_info()
         if partial_rooms:
             await self.store.add_remote_device_list_to_pending(
                 user_id,
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 5f7e0a1f79..ccc045d36f 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -632,6 +632,7 @@ class FederationHandler:
                     room_id=room_id,
                     servers=ret.servers_in_room,
                     device_lists_stream_id=self.store.get_device_stream_token(),
+                    joined_via=origin,
                 )
 
             try:
@@ -1615,13 +1616,13 @@ class FederationHandler:
         """Resumes resyncing of all partial-state rooms after a restart."""
         assert not self.config.worker.worker_app
 
-        partial_state_rooms = await self.store.get_partial_state_rooms_and_servers()
-        for room_id, servers_in_room in partial_state_rooms.items():
+        partial_state_rooms = await self.store.get_partial_state_room_resync_info()
+        for room_id, resync_info in partial_state_rooms.items():
             run_as_background_process(
                 desc="sync_partial_state_room",
                 func=self._sync_partial_state_room,
-                initial_destination=None,
-                other_destinations=servers_in_room,
+                initial_destination=resync_info.joined_via,
+                other_destinations=resync_info.servers_in_room,
                 room_id=room_id,
             )
 
@@ -1650,28 +1651,12 @@ class FederationHandler:
         #   really leave, that might mean we have difficulty getting the room state over
         #   federation.
         #   https://github.com/matrix-org/synapse/issues/12802
-        #
-        # TODO(faster_joins): we need some way of prioritising which homeservers in
-        #   `other_destinations` to try first, otherwise we'll spend ages trying dead
-        #   homeservers for large rooms.
-        #   https://github.com/matrix-org/synapse/issues/12999
-
-        if initial_destination is None and len(other_destinations) == 0:
-            raise ValueError(
-                f"Cannot resync state of {room_id}: no destinations provided"
-            )
 
         # Make an infinite iterator of destinations to try. Once we find a working
         # destination, we'll stick with it until it flakes.
-        destinations: Collection[str]
-        if initial_destination is not None:
-            # Move `initial_destination` to the front of the list.
-            destinations = list(other_destinations)
-            if initial_destination in destinations:
-                destinations.remove(initial_destination)
-            destinations = [initial_destination] + destinations
-        else:
-            destinations = other_destinations
+        destinations = _prioritise_destinations_for_partial_state_resync(
+            initial_destination, other_destinations, room_id
+        )
         destination_iter = itertools.cycle(destinations)
 
         # `destination` is the current remote homeserver we're pulling from.
@@ -1769,3 +1754,29 @@ class FederationHandler:
                             room_id,
                             destination,
                         )
+
+
+def _prioritise_destinations_for_partial_state_resync(
+    initial_destination: Optional[str],
+    other_destinations: Collection[str],
+    room_id: str,
+) -> Collection[str]:
+    """Work out the order in which we should ask servers to resync events.
+
+    If an `initial_destination` is given, it takes top priority. Otherwise
+    all servers are treated equally.
+
+    :raises ValueError: if no destination is provided at all.
+    """
+    if initial_destination is None and len(other_destinations) == 0:
+        raise ValueError(f"Cannot resync state of {room_id}: no destinations provided")
+
+    if initial_destination is None:
+        return other_destinations
+
+    # Move `initial_destination` to the front of the list.
+    destinations = list(other_destinations)
+    if initial_destination in destinations:
+        destinations.remove(initial_destination)
+    destinations = [initial_destination] + destinations
+    return destinations
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 7bb21f8f81..4717c9728a 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -1658,7 +1658,7 @@ class DatabasePool:
             table: string giving the table name
             keyvalues: dict of column names and values to select the row with
             retcol: string giving the name of the column to return
-            allow_none: If true, return None instead of failing if the SELECT
+            allow_none: If true, return None instead of raising StoreError if the SELECT
                 statement returns no rows
             desc: description of the transaction, for logging and metrics
         """
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index e41c99027a..7d97f8f60e 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -97,6 +97,12 @@ class RoomSortOrder(Enum):
     STATE_EVENTS = "state_events"
 
 
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class PartialStateResyncInfo:
+    joined_via: Optional[str]
+    servers_in_room: List[str] = attr.ib(factory=list)
+
+
 class RoomWorkerStore(CacheInvalidationWorkerStore):
     def __init__(
         self,
@@ -1160,17 +1166,29 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
             desc="get_partial_state_servers_at_join",
         )
 
-    async def get_partial_state_rooms_and_servers(
+    async def get_partial_state_room_resync_info(
         self,
-    ) -> Mapping[str, Collection[str]]:
-        """Get all rooms containing events with partial state, and the servers known
-        to be in the room.
+    ) -> Mapping[str, PartialStateResyncInfo]:
+        """Get all rooms containing events with partial state, and the information
+        needed to restart a "resync" of those rooms.
 
         Returns:
             A dictionary of rooms with partial state, with room IDs as keys and
             lists of servers in rooms as values.
         """
-        room_servers: Dict[str, List[str]] = {}
+        room_servers: Dict[str, PartialStateResyncInfo] = {}
+
+        rows = await self.db_pool.simple_select_list(
+            table="partial_state_rooms",
+            keyvalues={},
+            retcols=("room_id", "joined_via"),
+            desc="get_server_which_served_partial_join",
+        )
+
+        for row in rows:
+            room_id = row["room_id"]
+            joined_via = row["joined_via"]
+            room_servers[room_id] = PartialStateResyncInfo(joined_via=joined_via)
 
         rows = await self.db_pool.simple_select_list(
             "partial_state_rooms_servers",
@@ -1182,7 +1200,15 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
         for row in rows:
             room_id = row["room_id"]
             server_name = row["server_name"]
-            room_servers.setdefault(room_id, []).append(server_name)
+            entry = room_servers.get(room_id)
+            if entry is None:
+                # There is a foreign key constraint which enforces that every room_id in
+                # partial_state_rooms_servers appears in partial_state_rooms. So we
+                # expect `entry` to be non-null. (This reasoning fails if we've
+                # partial-joined between the two SELECTs, but this is unlikely to happen
+                # in practice.)
+                continue
+            entry.servers_in_room.append(server_name)
 
         return room_servers
 
@@ -1827,6 +1853,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
         room_id: str,
         servers: Collection[str],
         device_lists_stream_id: int,
+        joined_via: str,
     ) -> None:
         """Mark the given room as containing events with partial state.
 
@@ -1842,6 +1869,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
             servers: other servers known to be in the room
             device_lists_stream_id: the device_lists stream ID at the time when we first
                 joined the room.
+            joined_via: the server name we requested a partial join from.
         """
         await self.db_pool.runInteraction(
             "store_partial_state_room",
@@ -1849,6 +1877,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
             room_id,
             servers,
             device_lists_stream_id,
+            joined_via,
         )
 
     def _store_partial_state_room_txn(
@@ -1857,6 +1886,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
         room_id: str,
         servers: Collection[str],
         device_lists_stream_id: int,
+        joined_via: str,
     ) -> None:
         DatabasePool.simple_insert_txn(
             txn,
@@ -1866,6 +1896,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
                 "device_lists_stream_id": device_lists_stream_id,
                 # To be updated later once the join event is persisted.
                 "join_event_id": None,
+                "joined_via": joined_via,
             },
         )
         DatabasePool.simple_insert_many_txn(
diff --git a/synapse/storage/schema/main/delta/73/09partial_joined_via_destination.sql b/synapse/storage/schema/main/delta/73/09partial_joined_via_destination.sql
new file mode 100644
index 0000000000..066d602b18
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/09partial_joined_via_destination.sql
@@ -0,0 +1,18 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- When we resync partial state, we prioritise doing so using the server we
+-- partial-joined from. To do this we need to record which server that was!
+ALTER TABLE partial_state_rooms ADD COLUMN joined_via TEXT;