summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2024-07-30 14:17:51 +0100
committerErik Johnston <erik@matrix.org>2024-07-30 14:17:51 +0100
commitb1125d0ec15442040d3495d4bdbfb2d2f5595d79 (patch)
treeebc70da436513b4d0c56be50dc51a98fe34eb4f6
parentDon't set the initial flag (diff)
parentBump ruff from 0.5.4 to 0.5.5 (#17494) (diff)
downloadsynapse-b1125d0ec15442040d3495d4bdbfb2d2f5595d79.tar.xz
Merge remote-tracking branch 'origin/develop' into erikj/ss_hacks
-rw-r--r--changelog.d/17499.bugfix1
-rw-r--r--changelog.d/17501.misc1
-rw-r--r--poetry.lock40
-rw-r--r--pyproject.toml2
-rw-r--r--synapse/handlers/e2e_keys.py26
-rw-r--r--synapse/handlers/sliding_sync.py73
-rw-r--r--synapse/rest/client/sync.py29
-rw-r--r--synapse/storage/databases/main/stream.py10
-rw-r--r--synapse/types/handlers/__init__.py7
-rw-r--r--tests/handlers/test_e2e_keys.py59
-rw-r--r--tests/rest/client/test_sync.py168
11 files changed, 249 insertions, 167 deletions
diff --git a/changelog.d/17499.bugfix b/changelog.d/17499.bugfix
new file mode 100644
index 0000000000..5cb7b3c30e
--- /dev/null
+++ b/changelog.d/17499.bugfix
@@ -0,0 +1 @@
+Fix a bug introduced in v1.110.0 which caused `/keys/query` to return incomplete results, leading to high network activity and CPU usage on Matrix clients.
diff --git a/changelog.d/17501.misc b/changelog.d/17501.misc
new file mode 100644
index 0000000000..ba96472acb
--- /dev/null
+++ b/changelog.d/17501.misc
@@ -0,0 +1 @@
+Add some opentracing tags and logging to the experimental sliding sync implementation.
diff --git a/poetry.lock b/poetry.lock
index 945b91e022..417f6850b8 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -2358,29 +2358,29 @@ files = [
 
 [[package]]
 name = "ruff"
-version = "0.5.4"
+version = "0.5.5"
 description = "An extremely fast Python linter and code formatter, written in Rust."
 optional = false
 python-versions = ">=3.7"
 files = [
-    {file = "ruff-0.5.4-py3-none-linux_armv6l.whl", hash = "sha256:82acef724fc639699b4d3177ed5cc14c2a5aacd92edd578a9e846d5b5ec18ddf"},
-    {file = "ruff-0.5.4-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:da62e87637c8838b325e65beee485f71eb36202ce8e3cdbc24b9fcb8b99a37be"},
-    {file = "ruff-0.5.4-py3-none-macosx_11_0_arm64.whl", hash = "sha256:e98ad088edfe2f3b85a925ee96da652028f093d6b9b56b76fc242d8abb8e2059"},
-    {file = "ruff-0.5.4-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4c55efbecc3152d614cfe6c2247a3054cfe358cefbf794f8c79c8575456efe19"},
-    {file = "ruff-0.5.4-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:f9b85eaa1f653abd0a70603b8b7008d9e00c9fa1bbd0bf40dad3f0c0bdd06793"},
-    {file = "ruff-0.5.4-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:0cf497a47751be8c883059c4613ba2f50dd06ec672692de2811f039432875278"},
-    {file = "ruff-0.5.4-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:09c14ed6a72af9ccc8d2e313d7acf7037f0faff43cde4b507e66f14e812e37f7"},
-    {file = "ruff-0.5.4-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:628f6b8f97b8bad2490240aa84f3e68f390e13fabc9af5c0d3b96b485921cd60"},
-    {file = "ruff-0.5.4-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:3520a00c0563d7a7a7c324ad7e2cde2355733dafa9592c671fb2e9e3cd8194c1"},
-    {file = "ruff-0.5.4-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:93789f14ca2244fb91ed481456f6d0bb8af1f75a330e133b67d08f06ad85b516"},
-    {file = "ruff-0.5.4-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:029454e2824eafa25b9df46882f7f7844d36fd8ce51c1b7f6d97e2615a57bbcc"},
-    {file = "ruff-0.5.4-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:9492320eed573a13a0bc09a2957f17aa733fff9ce5bf00e66e6d4a88ec33813f"},
-    {file = "ruff-0.5.4-py3-none-musllinux_1_2_i686.whl", hash = "sha256:a6e1f62a92c645e2919b65c02e79d1f61e78a58eddaebca6c23659e7c7cb4ac7"},
-    {file = "ruff-0.5.4-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:768fa9208df2bec4b2ce61dbc7c2ddd6b1be9fb48f1f8d3b78b3332c7d71c1ff"},
-    {file = "ruff-0.5.4-py3-none-win32.whl", hash = "sha256:e1e7393e9c56128e870b233c82ceb42164966f25b30f68acbb24ed69ce9c3a4e"},
-    {file = "ruff-0.5.4-py3-none-win_amd64.whl", hash = "sha256:58b54459221fd3f661a7329f177f091eb35cf7a603f01d9eb3eb11cc348d38c4"},
-    {file = "ruff-0.5.4-py3-none-win_arm64.whl", hash = "sha256:bd53da65f1085fb5b307c38fd3c0829e76acf7b2a912d8d79cadcdb4875c1eb7"},
-    {file = "ruff-0.5.4.tar.gz", hash = "sha256:2795726d5f71c4f4e70653273d1c23a8182f07dd8e48c12de5d867bfb7557eed"},
+    {file = "ruff-0.5.5-py3-none-linux_armv6l.whl", hash = "sha256:605d589ec35d1da9213a9d4d7e7a9c761d90bba78fc8790d1c5e65026c1b9eaf"},
+    {file = "ruff-0.5.5-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:00817603822a3e42b80f7c3298c8269e09f889ee94640cd1fc7f9329788d7bf8"},
+    {file = "ruff-0.5.5-py3-none-macosx_11_0_arm64.whl", hash = "sha256:187a60f555e9f865a2ff2c6984b9afeffa7158ba6e1eab56cb830404c942b0f3"},
+    {file = "ruff-0.5.5-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:fe26fc46fa8c6e0ae3f47ddccfbb136253c831c3289bba044befe68f467bfb16"},
+    {file = "ruff-0.5.5-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:4ad25dd9c5faac95c8e9efb13e15803cd8bbf7f4600645a60ffe17c73f60779b"},
+    {file = "ruff-0.5.5-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f70737c157d7edf749bcb952d13854e8f745cec695a01bdc6e29c29c288fc36e"},
+    {file = "ruff-0.5.5-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:cfd7de17cef6ab559e9f5ab859f0d3296393bc78f69030967ca4d87a541b97a0"},
+    {file = "ruff-0.5.5-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:a09b43e02f76ac0145f86a08e045e2ea452066f7ba064fd6b0cdccb486f7c3e7"},
+    {file = "ruff-0.5.5-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:d0b856cb19c60cd40198be5d8d4b556228e3dcd545b4f423d1ad812bfdca5884"},
+    {file = "ruff-0.5.5-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3687d002f911e8a5faf977e619a034d159a8373514a587249cc00f211c67a091"},
+    {file = "ruff-0.5.5-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:ac9dc814e510436e30d0ba535f435a7f3dc97f895f844f5b3f347ec8c228a523"},
+    {file = "ruff-0.5.5-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:af9bdf6c389b5add40d89b201425b531e0a5cceb3cfdcc69f04d3d531c6be74f"},
+    {file = "ruff-0.5.5-py3-none-musllinux_1_2_i686.whl", hash = "sha256:d40a8533ed545390ef8315b8e25c4bb85739b90bd0f3fe1280a29ae364cc55d8"},
+    {file = "ruff-0.5.5-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:cab904683bf9e2ecbbe9ff235bfe056f0eba754d0168ad5407832928d579e7ab"},
+    {file = "ruff-0.5.5-py3-none-win32.whl", hash = "sha256:696f18463b47a94575db635ebb4c178188645636f05e934fdf361b74edf1bb2d"},
+    {file = "ruff-0.5.5-py3-none-win_amd64.whl", hash = "sha256:50f36d77f52d4c9c2f1361ccbfbd09099a1b2ea5d2b2222c586ab08885cf3445"},
+    {file = "ruff-0.5.5-py3-none-win_arm64.whl", hash = "sha256:3191317d967af701f1b73a31ed5788795936e423b7acce82a2b63e26eb3e89d6"},
+    {file = "ruff-0.5.5.tar.gz", hash = "sha256:cc5516bdb4858d972fbc31d246bdb390eab8df1a26e2353be2dbc0c2d7f5421a"},
 ]
 
 [[package]]
@@ -3215,4 +3215,4 @@ user-search = ["pyicu"]
 [metadata]
 lock-version = "2.0"
 python-versions = "^3.8.0"
-content-hash = "e65fbd044230964cae8810c84289bcf0bc43b27532ea5a5ef8843eab4f6514af"
+content-hash = "5f458ce53b7469844af2e0c5a9c5ef720736de5f080c4eb8d3a0e60286424f44"
diff --git a/pyproject.toml b/pyproject.toml
index 1adf8e087f..fe6289839c 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -322,7 +322,7 @@ all = [
 # This helps prevents merge conflicts when running a batch of dependabot updates.
 isort = ">=5.10.1"
 black = ">=22.7.0"
-ruff = "0.5.4"
+ruff = "0.5.5"
 # Type checking only works with the pydantic.v1 compat module from pydantic v2
 pydantic = "^2"
 
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 668cec513b..f78e66ad0a 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -291,13 +291,20 @@ class E2eKeysHandler:
 
             # Only try and fetch keys for destinations that are not marked as
             # down.
-            filtered_destinations = await filter_destinations_by_retry_limiter(
-                remote_queries_not_in_cache.keys(),
-                self.clock,
-                self.store,
-                # Let's give an arbitrary grace period for those hosts that are
-                # only recently down
-                retry_due_within_ms=60 * 1000,
+            unfiltered_destinations = remote_queries_not_in_cache.keys()
+            filtered_destinations = set(
+                await filter_destinations_by_retry_limiter(
+                    unfiltered_destinations,
+                    self.clock,
+                    self.store,
+                    # Let's give an arbitrary grace period for those hosts that are
+                    # only recently down
+                    retry_due_within_ms=60 * 1000,
+                )
+            )
+            failures.update(
+                (dest, _NOT_READY_FOR_RETRY_FAILURE)
+                for dest in (unfiltered_destinations - filtered_destinations)
             )
 
             await concurrently_execute(
@@ -1641,6 +1648,9 @@ def _check_device_signature(
         raise SynapseError(400, "Invalid signature", Codes.INVALID_SIGNATURE)
 
 
+_NOT_READY_FOR_RETRY_FAILURE = {"status": 503, "message": "Not ready for retry"}
+
+
 def _exception_to_failure(e: Exception) -> JsonDict:
     if isinstance(e, SynapseError):
         return {"status": e.code, "errcode": e.errcode, "message": str(e)}
@@ -1649,7 +1659,7 @@ def _exception_to_failure(e: Exception) -> JsonDict:
         return {"status": e.code, "message": str(e)}
 
     if isinstance(e, NotRetryingDestination):
-        return {"status": 503, "message": "Not ready for retry"}
+        return _NOT_READY_FOR_RETRY_FAILURE
 
     # include ConnectionRefused and other errors
     #
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 3e8e833367..ebb15a8451 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -41,7 +41,7 @@ from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membe
 from synapse.events import EventBase
 from synapse.events.utils import strip_event
 from synapse.handlers.relations import BundledAggregations
-from synapse.logging.opentracing import set_tag, start_active_span, tag_args, trace
+from synapse.logging.opentracing import log_kv, start_active_span, tag_args, trace
 from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
 from synapse.storage.databases.main.stream import CurrentStateDeltaMembership
 from synapse.storage.roommember import MemberSummary
@@ -631,21 +631,39 @@ class SlidingSyncHandler:
         # previously.
         if from_token:
             rooms_should_send = set()
+
+            # First we check if there are rooms that match a list/room
+            # subscription and have updates we need to send (i.e. either because
+            # we haven't sent the room down, or we have but there are missing
+            # updates).
             for room_id in relevant_room_map:
                 status = await self.connection_store.have_sent_room(
                     sync_config,
                     from_token.connection_position,
                     room_id,
                 )
-                if status.status != HaveSentRoomFlag.LIVE:
+                if (
+                    # The room was never sent down before so the client needs to know
+                    # about it regardless of any updates.
+                    status.status == HaveSentRoomFlag.NEVER
+                    # `PREVIOUSLY` literally means the "room was sent down before *AND*
+                    # there are updates we haven't sent down" so we already know this
+                    # room has updates.
+                    or status.status == HaveSentRoomFlag.PREVIOUSLY
+                ):
                     rooms_should_send.add(room_id)
+                elif status.status == HaveSentRoomFlag.LIVE:
+                    # We know that we've sent all updates up until `from_token`,
+                    # so we just need to check if there have been updates since
+                    # then.
+                    pass
+                else:
+                    assert_never(status.status)
 
-            # We only need to check for any new events and not state changes, as
-            # state changes can only happen if an event has also been sent.
-            rooms_that_have_updates = (
-                self.store._events_stream_cache.get_entities_changed(
-                    relevant_room_map, from_token.stream_token.room_key.stream
-                )
+            # We only need to check for new events since any state changes
+            # will also come down as new events.
+            rooms_that_have_updates = self.store.get_rooms_that_might_have_updates(
+                relevant_room_map.keys(), from_token.stream_token.room_key
             )
             rooms_should_send.update(rooms_that_have_updates)
             relevant_room_map = {
@@ -672,8 +690,9 @@ class SlidingSyncHandler:
             if room_sync_result or not from_token:
                 rooms[room_id] = room_sync_result
 
-        with start_active_span("sliding_sync.generate_room_entries"):
-            await concurrently_execute(handle_room, relevant_room_map, 10)
+        if relevant_room_map:
+            with start_active_span("sliding_sync.generate_room_entries"):
+                await concurrently_execute(handle_room, relevant_room_map, 10)
 
         extensions = await self.get_extensions_response(
             sync_config=sync_config,
@@ -684,22 +703,22 @@ class SlidingSyncHandler:
         )
 
         if has_lists or has_room_subscriptions:
-            connection_token = await self.connection_store.record_rooms(
+            connection_position = await self.connection_store.record_rooms(
                 sync_config=sync_config,
-                from_token=from_token,
                 relevant_room_map=relevant_room_map,
+                from_token=from_token,
                 sent_room_ids=relevant_room_map.keys(),
                 # TODO: We need to calculate which rooms have had updates since the `from_token` but were not included in the `sent_room_ids`
                 unsent_room_ids=[],
             )
         elif from_token:
-            connection_token = from_token.connection_position
+            connection_position = from_token.connection_position
         else:
             # Initial sync without a `from_token` starts at `0`
-            connection_token = 0
+            connection_position = 0
 
         return SlidingSyncResult(
-            next_pos=SlidingSyncStreamToken(to_token, connection_token),
+            next_pos=SlidingSyncStreamToken(to_token, connection_position),
             lists=lists,
             rooms=rooms,
             extensions=extensions,
@@ -1473,7 +1492,6 @@ class SlidingSyncHandler:
                 connection_token=from_token.connection_position,
                 room_id=room_id,
             )
-
             if room_status.status == HaveSentRoomFlag.LIVE:
                 from_bound = from_token.stream_token.room_key
                 initial = False
@@ -1493,11 +1511,9 @@ class SlidingSyncHandler:
             ):
                 from_bound = None
 
-            set_tag("sliding_sync.from_bound", from_bound)
-            set_tag("sliding_sync.room_status", room_status.status)
+            log_kv({"sliding_sync.room_status": room_status})
 
-        set_tag("sliding_sync.initial", initial)
-        set_tag("room_id", room_id)
+        log_kv({"sliding_sync.from_bound": from_bound, "sliding_sync.initial": initial})
 
         # Assemble the list of timeline events
         #
@@ -1914,6 +1930,7 @@ class SlidingSyncHandler:
             highlight_count=0,
         )
 
+    @trace
     async def get_extensions_response(
         self,
         sync_config: SlidingSyncConfig,
@@ -2384,10 +2401,13 @@ class SlidingSyncConnectionStore:
     """In-memory store of per-connection state, including what rooms we have
     previously sent down a sliding sync connection.
 
-    Note: This is NOT safe to run in a worker setup.
+    Note: This is NOT safe to run in a worker setup because connection positions will
+    point to different sets of rooms on different workers. e.g. for the same connection,
+    a connection position of 5 might have totally different states on worker A and
+    worker B.
 
-    The complication here is that we need to handle requests being resent, i.e.
-    if we sent down a room in a response that the client received, we must
+    One complication that we need to deal with here is needing to handle requests being
+    resent, i.e. if we sent down a room in a response that the client received, we must
     consider the room *not* sent when we get the request again.
 
     This is handled by using an integer "token", which is returned to the client
@@ -2428,9 +2448,9 @@ class SlidingSyncConnectionStore:
     async def record_rooms(
         self,
         sync_config: SlidingSyncConfig,
+        relevant_room_map: Dict[str, RoomSyncConfig],
         from_token: Optional[SlidingSyncStreamToken],
         *,
-        relevant_room_map: Dict[str, RoomSyncConfig],
         sent_room_ids: StrCollection,
         unsent_room_ids: StrCollection,
     ) -> int:
@@ -2469,9 +2489,7 @@ class SlidingSyncConnectionStore:
         # end we can treat this as a noop.
         have_updated = False
         for room_id in sent_room_ids:
-            new_room_statuses[room_id] = HaveSentRoom(
-                HaveSentRoomFlag.LIVE, None, relevant_room_map[room_id].timeline_limit
-            )
+            new_room_statuses[room_id] = HAVE_SENT_ROOM_LIVE
             have_updated = True
 
         # Whether we add/update the entries for unsent rooms depends on the
@@ -2494,7 +2512,6 @@ class SlidingSyncConnectionStore:
                 if from_token:
                     new_room_statuses[room_id] = HaveSentRoom.previously(
                         from_token.stream_token.room_key,
-                        None,
                         relevant_room_map[room_id].timeline_limit,
                     )
                 else:
diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index 27be64aa0c..9cd39a3df9 100644
--- a/synapse/rest/client/sync.py
+++ b/synapse/rest/client/sync.py
@@ -46,11 +46,10 @@ from synapse.handlers.sync import (
 from synapse.http.server import HttpServer
 from synapse.http.servlet import (
     RestServlet,
+    parse_and_validate_json_object_from_request,
     parse_boolean,
     parse_integer,
-    parse_json_object_from_request,
     parse_string,
-    validate_json_object,
 )
 from synapse.http.site import SynapseRequest
 from synapse.logging.opentracing import log_kv, set_tag, trace_with_opname
@@ -897,14 +896,24 @@ class SlidingSyncRestServlet(RestServlet):
         # maybe some filters like sync v2  where they are built up once and referenced
         # by filter ID. For now, we will just prototype with always passing everything
         # in.
-        content = parse_json_object_from_request(request, allow_empty_body=False)
-        body = validate_json_object(content, SlidingSyncBody)
-        logger.info("Sliding sync request: %r", body)
-        # logger.info("Sliding sync json: %r", content)
-        log_kv({"request_body": body})
-
-        if body.lists:
-            set_tag("sliding_sync.lists", True)
+        body = parse_and_validate_json_object_from_request(request, SlidingSyncBody)
+
+        # Tag and log useful data to differentiate requests.
+        set_tag("sliding_sync.conn_id", body.conn_id or "")
+        log_kv(
+            {
+                "sliding_sync.lists": {
+                    list_name: {
+                        "ranges": list_config.ranges,
+                        "timeline_limit": list_config.timeline_limit,
+                    }
+                    for list_name, list_config in (body.lists or {}).items()
+                },
+                "sliding_sync.room_subscriptions": list(
+                    (body.room_subscriptions or {}).keys()
+                ),
+            }
+        )
 
         sync_config = SlidingSyncConfig(
             user=user,
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 395a1f46af..430c837828 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -2245,3 +2245,13 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             )
 
         return rows
+
+    def get_rooms_that_might_have_updates(
+        self, room_ids: StrCollection, from_token: RoomStreamToken
+    ) -> StrCollection:
+        """Filters given room IDs down to those that might have updates, i.e.
+        removes rooms that definitely do not have updates.
+        """
+        return self._events_stream_cache.get_entities_changed(
+            room_ids, from_token.stream
+        )
diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py
index faebece6fd..12bdb94d3a 100644
--- a/synapse/types/handlers/__init__.py
+++ b/synapse/types/handlers/__init__.py
@@ -240,7 +240,10 @@ class SlidingSyncResult:
 
         def __bool__(self) -> bool:
             return (
+                # If this is the first time the client is seeing the room, we should not filter it out
+                # under any circumstance.
                 self.initial
+                # We need to let the client know if there are any new events
                 or bool(self.required_state)
                 or bool(self.timeline_events)
                 or bool(self.stripped_state)
@@ -391,6 +394,10 @@ class SlidingSyncResult:
         to tell if the notifier needs to wait for more events when polling for
         events.
         """
+        # We don't include `self.lists` here, as a) `lists` is always non-empty even if
+        # there are no changes, and b) since we're sorting rooms by `stream_ordering` of
+        # the latest activity, anything that would cause the order to change would end
+        # up in `self.rooms` and cause us to send down the change.
         return bool(self.rooms or self.extensions)
 
     @staticmethod
diff --git a/tests/handlers/test_e2e_keys.py b/tests/handlers/test_e2e_keys.py
index 0e6352ff4b..8a3dfdcf75 100644
--- a/tests/handlers/test_e2e_keys.py
+++ b/tests/handlers/test_e2e_keys.py
@@ -43,9 +43,7 @@ from tests.unittest import override_config
 class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
     def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
         self.appservice_api = mock.AsyncMock()
-        return self.setup_test_homeserver(
-            federation_client=mock.Mock(), application_service_api=self.appservice_api
-        )
+        return self.setup_test_homeserver(application_service_api=self.appservice_api)
 
     def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
         self.handler = hs.get_e2e_keys_handler()
@@ -1224,6 +1222,61 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
             },
         )
 
+    def test_query_devices_remote_down(self) -> None:
+        """Tests that querying keys for a remote user on an unreachable server returns
+        results in the "failures" property
+        """
+
+        remote_user_id = "@test:other"
+        local_user_id = "@test:test"
+
+        # The backoff code treats time zero as special
+        self.reactor.advance(5)
+
+        self.hs.get_federation_http_client().agent.request = mock.AsyncMock(  # type: ignore[method-assign]
+            side_effect=Exception("boop")
+        )
+
+        e2e_handler = self.hs.get_e2e_keys_handler()
+
+        query_result = self.get_success(
+            e2e_handler.query_devices(
+                {
+                    "device_keys": {remote_user_id: []},
+                },
+                timeout=10,
+                from_user_id=local_user_id,
+                from_device_id="some_device_id",
+            )
+        )
+
+        self.assertEqual(
+            query_result["failures"],
+            {
+                "other": {
+                    "message": "Failed to send request: Exception: boop",
+                    "status": 503,
+                }
+            },
+        )
+
+        # Do it again: we should hit the backoff
+        query_result = self.get_success(
+            e2e_handler.query_devices(
+                {
+                    "device_keys": {remote_user_id: []},
+                },
+                timeout=10,
+                from_user_id=local_user_id,
+                from_device_id="some_device_id",
+            )
+        )
+
+        self.assertEqual(
+            query_result["failures"],
+            {"other": {"message": "Not ready for retry", "status": 503}},
+        )
+
     @parameterized.expand(
         [
             # The remote homeserver's response indicates that this user has 0/1/2 devices.
diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py
index 7c961cf939..a97660e2f2 100644
--- a/tests/rest/client/test_sync.py
+++ b/tests/rest/client/test_sync.py
@@ -4457,7 +4457,7 @@ class SlidingSyncTestCase(SlidingSyncBase):
         # `world_readable` but currently we don't support this.
         self.assertIsNone(response_body["rooms"].get(room_id1), response_body["rooms"])
 
-    def test_incremental_sync_incremental_state(self) -> None:
+    def test_rooms_required_state_incremental_sync_LIVE(self) -> None:
         """Test that we only get state updates in incremental sync for rooms
         we've already seen (LIVE).
         """
@@ -4512,6 +4512,7 @@ class SlidingSyncTestCase(SlidingSyncBase):
             self.storage_controllers.state.get_current_state(room_id1)
         )
 
+        self.assertNotIn("initial", response_body["rooms"][room_id1])
         self._assertRequiredStateIncludes(
             response_body["rooms"][room_id1]["required_state"],
             {
@@ -4520,78 +4521,8 @@ class SlidingSyncTestCase(SlidingSyncBase):
             exact=True,
         )
 
-    def test_incremental_sync_full_state_new_room(self) -> None:
-        """Test that we get all state in incremental sync for rooms that
-        we haven't seen before.
-        """
-
-        user1_id = self.register_user("user1", "pass")
-        user1_tok = self.login(user1_id, "pass")
-        user2_id = self.register_user("user2", "pass")
-        user2_tok = self.login(user2_id, "pass")
-
-        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
-        self.helper.join(room_id1, user1_id, tok=user1_tok)
-
-        room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok)
-        self.helper.join(room_id2, user1_id, tok=user1_tok)
-
-        # Make the Sliding Sync request, we'll only receive room_id2
-        sync_body = {
-            "lists": {
-                "foo-list": {
-                    "ranges": [[0, 0]],
-                    "required_state": [
-                        [EventTypes.Create, ""],
-                        [EventTypes.RoomHistoryVisibility, ""],
-                        # This one doesn't exist in the room
-                        [EventTypes.Name, ""],
-                    ],
-                    "timeline_limit": 0,
-                }
-            }
-        }
-
-        response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
-
-        state_map = self.get_success(
-            self.storage_controllers.state.get_current_state(room_id2)
-        )
-
-        self._assertRequiredStateIncludes(
-            response_body["rooms"][room_id2]["required_state"],
-            {
-                state_map[(EventTypes.Create, "")],
-                state_map[(EventTypes.RoomHistoryVisibility, "")],
-            },
-            exact=True,
-        )
-        self.assertNotIn(room_id1, response_body["rooms"])
-
-        # Send a state event in room 1
-        self.helper.send_state(
-            room_id1, EventTypes.Name, body={"name": "foo"}, tok=user2_tok
-        )
-
-        # We should get room_id1 down sync, with the full state.
-        response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
-
-        state_map = self.get_success(
-            self.storage_controllers.state.get_current_state(room_id1)
-        )
-
-        self._assertRequiredStateIncludes(
-            response_body["rooms"][room_id1]["required_state"],
-            {
-                state_map[(EventTypes.Create, "")],
-                state_map[(EventTypes.RoomHistoryVisibility, "")],
-                state_map[(EventTypes.Name, "")],
-            },
-            exact=True,
-        )
-
     @parameterized.expand([(False,), (True,)])
-    def test_incremental_sync_full_state_previously(self, limited: bool) -> None:
+    def test_rooms_timeline_incremental_sync_PREVIOUSLY(self, limited: bool) -> None:
         """
         Test getting room data where we have previously sent down the room, but
         we missed sending down some timeline events previously and so its status
@@ -4615,12 +4546,7 @@ class SlidingSyncTestCase(SlidingSyncBase):
             "lists": {
                 "foo-list": {
                     "ranges": [[0, 0]],
-                    "required_state": [
-                        [EventTypes.Create, ""],
-                        [EventTypes.RoomHistoryVisibility, ""],
-                        # This one doesn't exist in the room
-                        [EventTypes.Name, ""],
-                    ],
+                    "required_state": [],
                     "timeline_limit": timeline_limit,
                 }
             },
@@ -4699,6 +4625,7 @@ class SlidingSyncTestCase(SlidingSyncBase):
         self.assertCountEqual(
             response_body["rooms"].keys(), {room_id1}, response_body["rooms"]
         )
+        self.assertNotIn("initial", response_body["rooms"][room_id1])
 
         self.assertEqual(
             [ev["event_id"] for ev in response_body["rooms"][room_id1]["timeline"]],
@@ -4707,7 +4634,7 @@ class SlidingSyncTestCase(SlidingSyncBase):
         self.assertEqual(response_body["rooms"][room_id1]["limited"], limited)
         self.assertEqual(response_body["rooms"][room_id1].get("required_state"), None)
 
-    def test_incremental_sync_full_state_previously_state(self) -> None:
+    def test_rooms_required_state_incremental_sync_PREVIOUSLY(self) -> None:
         """
         Test getting room data where we have previously sent down the room, but
         we missed sending down some state previously and so its status is
@@ -4722,7 +4649,6 @@ class SlidingSyncTestCase(SlidingSyncBase):
 
         self.helper.send(room_id1, "msg", tok=user1_tok)
 
-        timeline_limit = 5
         conn_id = "conn_id"
         sync_body = {
             "lists": {
@@ -4734,7 +4660,7 @@ class SlidingSyncTestCase(SlidingSyncBase):
                         # This one doesn't exist in the room
                         [EventTypes.Name, ""],
                     ],
-                    "timeline_limit": timeline_limit,
+                    "timeline_limit": 0,
                 }
             },
             "conn_id": "conn_id",
@@ -4746,7 +4672,7 @@ class SlidingSyncTestCase(SlidingSyncBase):
             response_body["rooms"].keys(), {room_id1}, response_body["rooms"]
         )
 
-        # We now send down some state in room1 (depending on the test param).
+        # We now send down some state in room1
         resp = self.helper.send_state(
             room_id1, EventTypes.Name, {"name": "foo"}, tok=user1_tok
         )
@@ -4807,6 +4733,7 @@ class SlidingSyncTestCase(SlidingSyncBase):
         self.assertCountEqual(
             response_body["rooms"].keys(), {room_id1}, response_body["rooms"]
         )
+        self.assertNotIn("initial", response_body["rooms"][room_id1])
 
         # We should only see the name change.
         self.assertEqual(
@@ -4817,9 +4744,9 @@ class SlidingSyncTestCase(SlidingSyncBase):
             [name_change_id],
         )
 
-    def test_incremental_sync_full_state_never(self) -> None:
+    def test_rooms_required_state_incremental_sync_NEVER(self) -> None:
         """
-        Test getting room data where we have not previously sent down the room
+        Test getting `required_state` where we have NEVER sent down the room before
         """
 
         user1_id = self.register_user("user1", "pass")
@@ -4857,8 +4784,7 @@ class SlidingSyncTestCase(SlidingSyncBase):
 
         # We now send another event to room1, so we should send down the full
         # room.
-        resp = self.helper.send(room_id1, "msg2", tok=user1_tok)
-        latest_message_event = resp["event_id"]
+        self.helper.send(room_id1, "msg2", tok=user1_tok)
 
         # This sync should contain the messages from room1 not yet sent down.
         response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
@@ -4867,11 +4793,6 @@ class SlidingSyncTestCase(SlidingSyncBase):
             response_body["rooms"].keys(), {room_id1}, response_body["rooms"]
         )
 
-        self.assertEqual(
-            [ev["event_id"] for ev in response_body["rooms"][room_id1]["timeline"]],
-            [latest_message_event],
-        )
-        self.assertEqual(response_body["rooms"][room_id1]["limited"], True)
         self.assertEqual(response_body["rooms"][room_id1]["initial"], True)
 
         state_map = self.get_success(
@@ -4887,6 +4808,61 @@ class SlidingSyncTestCase(SlidingSyncBase):
             exact=True,
         )
 
+    def test_rooms_timeline_incremental_sync_NEVER(self) -> None:
+        """
+        Test getting timeline room data where we have NEVER sent down the room
+        before
+        """
+
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 0]],
+                    "required_state": [],
+                    "timeline_limit": 5,
+                }
+            },
+        }
+
+        expected_events = []
+        for _ in range(4):
+            resp = self.helper.send(room_id1, "msg", tok=user1_tok)
+            expected_events.append(resp["event_id"])
+
+        # A message happens in the other room, so room1 won't get sent down.
+        self.helper.send(room_id2, "msg", tok=user1_tok)
+
+        # Only the second room gets sent down sync.
+        response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
+
+        self.assertCountEqual(
+            response_body["rooms"].keys(), {room_id2}, response_body["rooms"]
+        )
+
+        # We now send another event to room1 so it comes down sync
+        resp = self.helper.send(room_id1, "msg2", tok=user1_tok)
+        expected_events.append(resp["event_id"])
+
+        # This sync should contain the messages from room1 not yet sent down.
+        response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
+
+        self.assertCountEqual(
+            response_body["rooms"].keys(), {room_id1}, response_body["rooms"]
+        )
+
+        self.assertEqual(
+            [ev["event_id"] for ev in response_body["rooms"][room_id1]["timeline"]],
+            expected_events,
+        )
+        self.assertEqual(response_body["rooms"][room_id1]["limited"], True)
+        self.assertEqual(response_body["rooms"][room_id1]["initial"], True)
+
     def test_rooms_with_no_updates_do_not_come_down_incremental_sync(self) -> None:
         """
         Test that rooms with no updates are returned in subsequent incremental
@@ -4908,18 +4884,16 @@ class SlidingSyncTestCase(SlidingSyncBase):
             }
         }
 
-        _, after_room_token = self.do_sync(sync_body, tok=user1_tok)
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
 
-        # Make the Sliding Sync request
-        response_body, _ = self.do_sync(
-            sync_body, since=after_room_token, tok=user1_tok
-        )
+        # Make the incremental Sliding Sync request
+        response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
 
         # Nothing has happened in the room, so the room should not come down
         # /sync.
         self.assertIsNone(response_body["rooms"].get(room_id1))
 
-    def test_empty_room_comes_down_sync(self) -> None:
+    def test_empty_initial_room_comes_down_sync(self) -> None:
         """
         Test that rooms come down /sync even with empty required state and
         timeline limit in initial sync.