summary refs log tree commit diff
path: root/synapse/federation/transport/client.py
diff options
context:
space:
mode:
authorOlivier Wilkinson (reivilibre) <oliverw@matrix.org>2021-12-14 14:22:01 +0000
committerOlivier Wilkinson (reivilibre) <oliverw@matrix.org>2021-12-14 14:22:01 +0000
commit4dd9ea8f4fc002f35fa604361a792ea1d3d6671c (patch)
tree1690b0d4658301b51907afeacf9e46bda7ce1152 /synapse/federation/transport/client.py
parentRevert accidental fast-forward merge from v1.49.0rc1 (diff)
downloadsynapse-4dd9ea8f4fc002f35fa604361a792ea1d3d6671c.tar.xz
Revert "Revert accidental fast-forward merge from v1.49.0rc1"
This reverts commit 158d73ebdd61eef33831ae5f6990acf07244fc55.
Diffstat (limited to 'synapse/federation/transport/client.py')
-rw-r--r--synapse/federation/transport/client.py91
1 files changed, 79 insertions, 12 deletions
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 10b5aa5af8..9fc4c31c93 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -21,6 +21,7 @@ from typing import (
     Callable,
     Collection,
     Dict,
+    Generator,
     Iterable,
     List,
     Mapping,
@@ -149,6 +150,42 @@ class TransportLayerClient:
         )
 
     @log_function
+    async def timestamp_to_event(
+        self, destination: str, room_id: str, timestamp: int, direction: str
+    ) -> Union[JsonDict, List]:
+        """
+        Calls a remote federating server at `destination` asking for their
+        closest event to the given timestamp in the given direction.
+
+        Args:
+            destination: Domain name of the remote homeserver
+            room_id: Room to fetch the event from
+            timestamp: The point in time (inclusive) we should navigate from in
+                the given direction to find the closest event.
+            direction: ["f"|"b"] to indicate whether we should navigate forward
+                or backward from the given timestamp to find the closest event.
+
+        Returns:
+            Response dict received from the remote homeserver.
+
+        Raises:
+            Various exceptions when the request fails
+        """
+        path = _create_path(
+            FEDERATION_UNSTABLE_PREFIX,
+            "/org.matrix.msc3030/timestamp_to_event/%s",
+            room_id,
+        )
+
+        args = {"ts": [str(timestamp)], "dir": [direction]}
+
+        remote_response = await self.client.get_json(
+            destination, path=path, args=args, try_trailing_slash_on_400=True
+        )
+
+        return remote_response
+
+    @log_function
     async def send_transaction(
         self,
         transaction: Transaction,
@@ -199,11 +236,16 @@ class TransportLayerClient:
 
     @log_function
     async def make_query(
-        self, destination, query_type, args, retry_on_dns_fail, ignore_backoff=False
-    ):
+        self,
+        destination: str,
+        query_type: str,
+        args: dict,
+        retry_on_dns_fail: bool,
+        ignore_backoff: bool = False,
+    ) -> JsonDict:
         path = _create_v1_path("/query/%s", query_type)
 
-        content = await self.client.get_json(
+        return await self.client.get_json(
             destination=destination,
             path=path,
             args=args,
@@ -212,8 +254,6 @@ class TransportLayerClient:
             ignore_backoff=ignore_backoff,
         )
 
-        return content
-
     @log_function
     async def make_membership_event(
         self,
@@ -1192,10 +1232,24 @@ class TransportLayerClient:
         )
 
     async def get_room_hierarchy(
-        self,
-        destination: str,
-        room_id: str,
-        suggested_only: bool,
+        self, destination: str, room_id: str, suggested_only: bool
+    ) -> JsonDict:
+        """
+        Args:
+            destination: The remote server
+            room_id: The room ID to ask about.
+            suggested_only: if True, only suggested rooms will be returned
+        """
+        path = _create_v1_path("/hierarchy/%s", room_id)
+
+        return await self.client.get_json(
+            destination=destination,
+            path=path,
+            args={"suggested_only": "true" if suggested_only else "false"},
+        )
+
+    async def get_room_hierarchy_unstable(
+        self, destination: str, room_id: str, suggested_only: bool
     ) -> JsonDict:
         """
         Args:
@@ -1267,7 +1321,7 @@ class SendJoinResponse:
 
 
 @ijson.coroutine
-def _event_parser(event_dict: JsonDict):
+def _event_parser(event_dict: JsonDict) -> Generator[None, Tuple[str, Any], None]:
     """Helper function for use with `ijson.kvitems_coro` to parse key-value pairs
     to add them to a given dictionary.
     """
@@ -1278,7 +1332,9 @@ def _event_parser(event_dict: JsonDict):
 
 
 @ijson.coroutine
-def _event_list_parser(room_version: RoomVersion, events: List[EventBase]):
+def _event_list_parser(
+    room_version: RoomVersion, events: List[EventBase]
+) -> Generator[None, JsonDict, None]:
     """Helper function for use with `ijson.items_coro` to parse an array of
     events and add them to the given list.
     """
@@ -1317,15 +1373,26 @@ class SendJoinParser(ByteParser[SendJoinResponse]):
             prefix + "auth_chain.item",
             use_float=True,
         )
-        self._coro_event = ijson.kvitems_coro(
+        # TODO Remove the unstable prefix when servers have updated.
+        #
+        # By re-using the same event dictionary this will cause the parsing of
+        # org.matrix.msc3083.v2.event and event to stomp over each other.
+        # Generally this should be fine.
+        self._coro_unstable_event = ijson.kvitems_coro(
             _event_parser(self._response.event_dict),
             prefix + "org.matrix.msc3083.v2.event",
             use_float=True,
         )
+        self._coro_event = ijson.kvitems_coro(
+            _event_parser(self._response.event_dict),
+            prefix + "event",
+            use_float=True,
+        )
 
     def write(self, data: bytes) -> int:
         self._coro_state.send(data)
         self._coro_auth.send(data)
+        self._coro_unstable_event.send(data)
         self._coro_event.send(data)
 
         return len(data)