summary refs log tree commit diff
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2022-02-17 16:11:59 +0000
committerGitHub <noreply@github.com>2022-02-17 16:11:59 +0000
commitda0e9f8efdac1571eab35ad2cc842073ca54769c (patch)
tree75824b2f9e0c9b1ba5eb8f943b21b13a411044ec
parentConfigure `tox` to use `venv` (#12015) (diff)
downloadsynapse-da0e9f8efdac1571eab35ad2cc842073ca54769c.tar.xz
Faster joins: parse msc3706 fields in send_join response (#12011)
Part of my work on #11249: add code to handle the new fields added in MSC3706.
-rw-r--r--changelog.d/12011.misc1
-rw-r--r--synapse/config/experimental.py4
-rw-r--r--synapse/federation/federation_client.py15
-rw-r--r--synapse/federation/transport/client.py118
-rw-r--r--synapse/python_dependencies.py3
-rw-r--r--tests/federation/transport/test_client.py32
6 files changed, 140 insertions, 33 deletions
diff --git a/changelog.d/12011.misc b/changelog.d/12011.misc
new file mode 100644
index 0000000000..258b0e389f
--- /dev/null
+++ b/changelog.d/12011.misc
@@ -0,0 +1 @@
+Preparation for faster-room-join work: parse msc3706 fields in send_join response.
diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py
index 09d692d9a1..12b5638cf8 100644
--- a/synapse/config/experimental.py
+++ b/synapse/config/experimental.py
@@ -64,3 +64,7 @@ class ExperimentalConfig(Config):
 
         # MSC3706 (server-side support for partial state in /send_join responses)
         self.msc3706_enabled: bool = experimental.get("msc3706_enabled", False)
+
+        # experimental support for faster joins over federation (msc2775, msc3706)
+        # requires a target server with msc3706_enabled enabled.
+        self.faster_joins_enabled: bool = experimental.get("faster_joins", False)
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 74f17aa4da..9f56f97d99 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -1,4 +1,4 @@
-# Copyright 2015-2021 The Matrix.org Foundation C.I.C.
+# Copyright 2015-2022 The Matrix.org Foundation C.I.C.
 # Copyright 2020 Sorunome
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
@@ -89,6 +89,12 @@ class SendJoinResult:
     state: List[EventBase]
     auth_chain: List[EventBase]
 
+    # True if 'state' elides non-critical membership events
+    partial_state: bool
+
+    # if 'partial_state' is set, a list of the servers in the room (otherwise empty)
+    servers_in_room: List[str]
+
 
 class FederationClient(FederationBase):
     def __init__(self, hs: "HomeServer"):
@@ -876,11 +882,18 @@ class FederationClient(FederationBase):
                     % (auth_chain_create_events,)
                 )
 
+            if response.partial_state and not response.servers_in_room:
+                raise InvalidResponseError(
+                    "partial_state was set, but no servers were listed in the room"
+                )
+
             return SendJoinResult(
                 event=event,
                 state=signed_state,
                 auth_chain=signed_auth,
                 origin=destination,
+                partial_state=response.partial_state,
+                servers_in_room=response.servers_in_room or [],
             )
 
         # MSC3083 defines additional error codes for room joins.
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 8782586cd6..dca6e5c45d 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -1,4 +1,4 @@
-# Copyright 2014-2021 The Matrix.org Foundation C.I.C.
+# Copyright 2014-2022 The Matrix.org Foundation C.I.C.
 # Copyright 2020 Sorunome
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
@@ -60,6 +60,7 @@ class TransportLayerClient:
     def __init__(self, hs):
         self.server_name = hs.hostname
         self.client = hs.get_federation_http_client()
+        self._faster_joins_enabled = hs.config.experimental.faster_joins_enabled
 
     async def get_room_state_ids(
         self, destination: str, room_id: str, event_id: str
@@ -336,10 +337,15 @@ class TransportLayerClient:
         content: JsonDict,
     ) -> "SendJoinResponse":
         path = _create_v2_path("/send_join/%s/%s", room_id, event_id)
+        query_params: Dict[str, str] = {}
+        if self._faster_joins_enabled:
+            # lazy-load state on join
+            query_params["org.matrix.msc3706.partial_state"] = "true"
 
         return await self.client.put_json(
             destination=destination,
             path=path,
+            args=query_params,
             data=content,
             parser=SendJoinParser(room_version, v1_api=False),
             max_response_size=MAX_RESPONSE_SIZE_SEND_JOIN,
@@ -1271,6 +1277,12 @@ class SendJoinResponse:
     # "event" is not included in the response.
     event: Optional[EventBase] = None
 
+    # The room state is incomplete
+    partial_state: bool = False
+
+    # List of servers in the room
+    servers_in_room: Optional[List[str]] = None
+
 
 @ijson.coroutine
 def _event_parser(event_dict: JsonDict) -> Generator[None, Tuple[str, Any], None]:
@@ -1297,6 +1309,32 @@ def _event_list_parser(
         events.append(event)
 
 
+@ijson.coroutine
+def _partial_state_parser(response: SendJoinResponse) -> Generator[None, Any, None]:
+    """Helper function for use with `ijson.items_coro`
+
+    Parses the partial_state field in send_join responses
+    """
+    while True:
+        val = yield
+        if not isinstance(val, bool):
+            raise TypeError("partial_state must be a boolean")
+        response.partial_state = val
+
+
+@ijson.coroutine
+def _servers_in_room_parser(response: SendJoinResponse) -> Generator[None, Any, None]:
+    """Helper function for use with `ijson.items_coro`
+
+    Parses the servers_in_room field in send_join responses
+    """
+    while True:
+        val = yield
+        if not isinstance(val, list) or any(not isinstance(x, str) for x in val):
+            raise TypeError("servers_in_room must be a list of strings")
+        response.servers_in_room = val
+
+
 class SendJoinParser(ByteParser[SendJoinResponse]):
     """A parser for the response to `/send_join` requests.
 
@@ -1308,44 +1346,62 @@ class SendJoinParser(ByteParser[SendJoinResponse]):
     CONTENT_TYPE = "application/json"
 
     def __init__(self, room_version: RoomVersion, v1_api: bool):
-        self._response = SendJoinResponse([], [], {})
+        self._response = SendJoinResponse([], [], event_dict={})
         self._room_version = room_version
+        self._coros = []
 
         # The V1 API has the shape of `[200, {...}]`, which we handle by
         # prefixing with `item.*`.
         prefix = "item." if v1_api else ""
 
-        self._coro_state = ijson.items_coro(
-            _event_list_parser(room_version, self._response.state),
-            prefix + "state.item",
-            use_float=True,
-        )
-        self._coro_auth = ijson.items_coro(
-            _event_list_parser(room_version, self._response.auth_events),
-            prefix + "auth_chain.item",
-            use_float=True,
-        )
-        # TODO Remove the unstable prefix when servers have updated.
-        #
-        # By re-using the same event dictionary this will cause the parsing of
-        # org.matrix.msc3083.v2.event and event to stomp over each other.
-        # Generally this should be fine.
-        self._coro_unstable_event = ijson.kvitems_coro(
-            _event_parser(self._response.event_dict),
-            prefix + "org.matrix.msc3083.v2.event",
-            use_float=True,
-        )
-        self._coro_event = ijson.kvitems_coro(
-            _event_parser(self._response.event_dict),
-            prefix + "event",
-            use_float=True,
-        )
+        self._coros = [
+            ijson.items_coro(
+                _event_list_parser(room_version, self._response.state),
+                prefix + "state.item",
+                use_float=True,
+            ),
+            ijson.items_coro(
+                _event_list_parser(room_version, self._response.auth_events),
+                prefix + "auth_chain.item",
+                use_float=True,
+            ),
+            # TODO Remove the unstable prefix when servers have updated.
+            #
+            # By re-using the same event dictionary this will cause the parsing of
+            # org.matrix.msc3083.v2.event and event to stomp over each other.
+            # Generally this should be fine.
+            ijson.kvitems_coro(
+                _event_parser(self._response.event_dict),
+                prefix + "org.matrix.msc3083.v2.event",
+                use_float=True,
+            ),
+            ijson.kvitems_coro(
+                _event_parser(self._response.event_dict),
+                prefix + "event",
+                use_float=True,
+            ),
+        ]
+
+        if not v1_api:
+            self._coros.append(
+                ijson.items_coro(
+                    _partial_state_parser(self._response),
+                    "org.matrix.msc3706.partial_state",
+                    use_float="True",
+                )
+            )
+
+            self._coros.append(
+                ijson.items_coro(
+                    _servers_in_room_parser(self._response),
+                    "org.matrix.msc3706.servers_in_room",
+                    use_float="True",
+                )
+            )
 
     def write(self, data: bytes) -> int:
-        self._coro_state.send(data)
-        self._coro_auth.send(data)
-        self._coro_unstable_event.send(data)
-        self._coro_event.send(data)
+        for c in self._coros:
+            c.send(data)
 
         return len(data)
 
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 86162e0f2c..f43fbb5842 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -87,7 +87,8 @@ REQUIREMENTS = [
     # We enforce that we have a `cryptography` version that bundles an `openssl`
     # with the latest security patches.
     "cryptography>=3.4.7",
-    "ijson>=3.1",
+    # ijson 3.1.4 fixes a bug with "." in property names
+    "ijson>=3.1.4",
     "matrix-common~=1.1.0",
 ]
 
diff --git a/tests/federation/transport/test_client.py b/tests/federation/transport/test_client.py
index a7031a55f2..c2320ce133 100644
--- a/tests/federation/transport/test_client.py
+++ b/tests/federation/transport/test_client.py
@@ -62,3 +62,35 @@ class SendJoinParserTestCase(TestCase):
         self.assertEqual(len(parsed_response.state), 1, parsed_response)
         self.assertEqual(parsed_response.event_dict, {}, parsed_response)
         self.assertIsNone(parsed_response.event, parsed_response)
+        self.assertFalse(parsed_response.partial_state, parsed_response)
+        self.assertEqual(parsed_response.servers_in_room, None, parsed_response)
+
+    def test_partial_state(self) -> None:
+        """Check that the partial_state flag is correctly parsed"""
+        parser = SendJoinParser(RoomVersions.V1, False)
+        response = {
+            "org.matrix.msc3706.partial_state": True,
+        }
+
+        serialised_response = json.dumps(response).encode()
+
+        # Send data to the parser
+        parser.write(serialised_response)
+
+        # Retrieve and check the parsed SendJoinResponse
+        parsed_response = parser.finish()
+        self.assertTrue(parsed_response.partial_state)
+
+    def test_servers_in_room(self) -> None:
+        """Check that the servers_in_room field is correctly parsed"""
+        parser = SendJoinParser(RoomVersions.V1, False)
+        response = {"org.matrix.msc3706.servers_in_room": ["hs1", "hs2"]}
+
+        serialised_response = json.dumps(response).encode()
+
+        # Send data to the parser
+        parser.write(serialised_response)
+
+        # Retrieve and check the parsed SendJoinResponse
+        parsed_response = parser.finish()
+        self.assertEqual(parsed_response.servers_in_room, ["hs1", "hs2"])