summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/federation_client.py41
-rw-r--r--synapse/federation/transport/client.py18
-rw-r--r--synapse/federation/transport/server.py10
3 files changed, 39 insertions, 30 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py

index 3395c9e41e..94e76b1978 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py
@@ -24,7 +24,6 @@ from synapse.api.errors import ( CodeMessageException, HttpResponseException, SynapseError, ) from synapse.util import unwrapFirstError -from synapse.util.async import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.logutils import log_function from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred @@ -137,9 +136,7 @@ class FederationClient(FederationBase): sent_edus_counter.inc() - # TODO, add errback, etc. self._transaction_queue.enqueue_edu(edu, key=key) - return defer.succeed(None) @log_function def send_device_messages(self, destination): @@ -176,7 +173,7 @@ class FederationClient(FederationBase): ) @log_function - def query_client_keys(self, destination, content): + def query_client_keys(self, destination, content, timeout): """Query device keys for a device hosted on a remote server. Args: @@ -188,10 +185,12 @@ class FederationClient(FederationBase): response """ sent_queries_counter.inc("client_device_keys") - return self.transport_layer.query_client_keys(destination, content) + return self.transport_layer.query_client_keys( + destination, content, timeout + ) @log_function - def claim_client_keys(self, destination, content): + def claim_client_keys(self, destination, content, timeout): """Claims one-time keys for a device hosted on a remote server. Args: @@ -203,7 +202,9 @@ class FederationClient(FederationBase): response """ sent_queries_counter.inc("client_one_time_keys") - return self.transport_layer.claim_client_keys(destination, content) + return self.transport_layer.claim_client_keys( + destination, content, timeout + ) @defer.inlineCallbacks @log_function @@ -481,7 +482,7 @@ class FederationClient(FederationBase): defer.DeferredList(deferreds, consumeErrors=True) ) for success, result in res: - if success: + if success and result: signed_events.append(result) batch.discard(result.event_id) @@ -715,24 +716,14 @@ class FederationClient(FederationBase): raise RuntimeError("Failed to send to any server.") - @defer.inlineCallbacks - def get_public_rooms(self, destinations): - results_by_server = {} - - @defer.inlineCallbacks - def _get_result(s): - if s == self.server_name: - defer.returnValue() - - try: - result = yield self.transport_layer.get_public_rooms(s) - results_by_server[s] = result - except: - logger.exception("Error getting room list from server %r", s) - - yield concurrently_execute(_get_result, destinations, 3) + def get_public_rooms(self, destination, limit=None, since_token=None, + search_filter=None): + if destination == self.server_name: + return - defer.returnValue(results_by_server) + return self.transport_layer.get_public_rooms( + destination, limit, since_token, search_filter + ) @defer.inlineCallbacks def query_auth(self, destination, room_id, event_id, local_auth): diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 3d088e43cb..db45c7826c 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py
@@ -248,12 +248,22 @@ class TransportLayerClient(object): @defer.inlineCallbacks @log_function - def get_public_rooms(self, remote_server): + def get_public_rooms(self, remote_server, limit, since_token, + search_filter=None): path = PREFIX + "/publicRooms" + args = {} + if limit: + args["limit"] = [str(limit)] + if since_token: + args["since"] = [since_token] + + # TODO(erikj): Actually send the search_filter across federation. + response = yield self.client.get_json( destination=remote_server, path=path, + args=args, ) defer.returnValue(response) @@ -298,7 +308,7 @@ class TransportLayerClient(object): @defer.inlineCallbacks @log_function - def query_client_keys(self, destination, query_content): + def query_client_keys(self, destination, query_content, timeout): """Query the device keys for a list of user ids hosted on a remote server. @@ -327,12 +337,13 @@ class TransportLayerClient(object): destination=destination, path=path, data=query_content, + timeout=timeout, ) defer.returnValue(content) @defer.inlineCallbacks @log_function - def claim_client_keys(self, destination, query_content): + def claim_client_keys(self, destination, query_content, timeout): """Claim one-time keys for a list of devices hosted on a remote server. Request: @@ -363,6 +374,7 @@ class TransportLayerClient(object): destination=destination, path=path, data=query_content, + timeout=timeout, ) defer.returnValue(content) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 37c0d4fbc4..fec337be64 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py
@@ -18,7 +18,9 @@ from twisted.internet import defer from synapse.api.urls import FEDERATION_PREFIX as PREFIX from synapse.api.errors import Codes, SynapseError from synapse.http.server import JsonResource -from synapse.http.servlet import parse_json_object_from_request +from synapse.http.servlet import ( + parse_json_object_from_request, parse_integer_from_args, parse_string_from_args, +) from synapse.util.ratelimitutils import FederationRateLimiter from synapse.util.versionstring import get_version_string @@ -554,7 +556,11 @@ class PublicRoomList(BaseFederationServlet): @defer.inlineCallbacks def on_GET(self, origin, content, query): - data = yield self.room_list_handler.get_local_public_room_list() + limit = parse_integer_from_args(query, "limit", 0) + since_token = parse_string_from_args(query, "since", None) + data = yield self.room_list_handler.get_local_public_room_list( + limit, since_token + ) defer.returnValue((200, data))