diff options
author | Erik Johnston <erik@matrix.org> | 2016-09-19 17:20:25 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2016-09-19 17:20:25 +0100 |
commit | 88acb99747c05f43f4563774f498993b27915493 (patch) | |
tree | 3184c8b912fdf11d29f7254ab879447f677e353b /synapse/federation/federation_client.py | |
parent | Merge branch 'release-v0.17.3' of github.com:matrix-org/synapse (diff) | |
parent | Bump version and changelog (diff) | |
download | synapse-88acb99747c05f43f4563774f498993b27915493.tar.xz |
Merge branch 'release-v0.18.0' of github.com:matrix-org/synapse v0.18.0
Diffstat (limited to 'synapse/federation/federation_client.py')
-rw-r--r-- | synapse/federation/federation_client.py | 53 |
1 files changed, 28 insertions, 25 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 627acc6a4f..06d0320b1a 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -24,7 +24,6 @@ from synapse.api.errors import ( CodeMessageException, HttpResponseException, SynapseError, ) from synapse.util import unwrapFirstError -from synapse.util.async import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.logutils import log_function from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred @@ -122,8 +121,12 @@ class FederationClient(FederationBase): pdu.event_id ) + def send_presence(self, destination, states): + if destination != self.server_name: + self._transaction_queue.enqueue_presence(destination, states) + @log_function - def send_edu(self, destination, edu_type, content): + def send_edu(self, destination, edu_type, content, key=None): edu = Edu( origin=self.server_name, destination=destination, @@ -134,10 +137,16 @@ class FederationClient(FederationBase): sent_edus_counter.inc() # TODO, add errback, etc. - self._transaction_queue.enqueue_edu(edu) + self._transaction_queue.enqueue_edu(edu, key=key) return defer.succeed(None) @log_function + def send_device_messages(self, destination): + """Sends the device messages in the local database to the remote + destination""" + self._transaction_queue.enqueue_device_messages(destination) + + @log_function def send_failure(self, failure, destination): self._transaction_queue.enqueue_failure(failure, destination) return defer.succeed(None) @@ -166,7 +175,7 @@ class FederationClient(FederationBase): ) @log_function - def query_client_keys(self, destination, content): + def query_client_keys(self, destination, content, timeout): """Query device keys for a device hosted on a remote server. Args: @@ -178,10 +187,12 @@ class FederationClient(FederationBase): response """ sent_queries_counter.inc("client_device_keys") - return self.transport_layer.query_client_keys(destination, content) + return self.transport_layer.query_client_keys( + destination, content, timeout + ) @log_function - def claim_client_keys(self, destination, content): + def claim_client_keys(self, destination, content, timeout): """Claims one-time keys for a device hosted on a remote server. Args: @@ -193,7 +204,9 @@ class FederationClient(FederationBase): response """ sent_queries_counter.inc("client_one_time_keys") - return self.transport_layer.claim_client_keys(destination, content) + return self.transport_layer.claim_client_keys( + destination, content, timeout + ) @defer.inlineCallbacks @log_function @@ -471,7 +484,7 @@ class FederationClient(FederationBase): defer.DeferredList(deferreds, consumeErrors=True) ) for success, result in res: - if success: + if success and result: signed_events.append(result) batch.discard(result.event_id) @@ -705,24 +718,14 @@ class FederationClient(FederationBase): raise RuntimeError("Failed to send to any server.") - @defer.inlineCallbacks - def get_public_rooms(self, destinations): - results_by_server = {} - - @defer.inlineCallbacks - def _get_result(s): - if s == self.server_name: - defer.returnValue() - - try: - result = yield self.transport_layer.get_public_rooms(s) - results_by_server[s] = result - except: - logger.exception("Error getting room list from server %r", s) - - yield concurrently_execute(_get_result, destinations, 3) + def get_public_rooms(self, destination, limit=None, since_token=None, + search_filter=None): + if destination == self.server_name: + return - defer.returnValue(results_by_server) + return self.transport_layer.get_public_rooms( + destination, limit, since_token, search_filter + ) @defer.inlineCallbacks def query_auth(self, destination, room_id, event_id, local_auth): |