summary refs log tree commit diff
path: root/synapse/federation/federation_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/federation_client.py')
-rw-r--r--synapse/federation/federation_client.py53
1 files changed, 28 insertions, 25 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 627acc6a4f..06d0320b1a 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -24,7 +24,6 @@ from synapse.api.errors import (
     CodeMessageException, HttpResponseException, SynapseError,
 )
 from synapse.util import unwrapFirstError
-from synapse.util.async import concurrently_execute
 from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.util.logutils import log_function
 from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
@@ -122,8 +121,12 @@ class FederationClient(FederationBase):
             pdu.event_id
         )
 
+    def send_presence(self, destination, states):
+        if destination != self.server_name:
+            self._transaction_queue.enqueue_presence(destination, states)
+
     @log_function
-    def send_edu(self, destination, edu_type, content):
+    def send_edu(self, destination, edu_type, content, key=None):
         edu = Edu(
             origin=self.server_name,
             destination=destination,
@@ -134,10 +137,16 @@ class FederationClient(FederationBase):
         sent_edus_counter.inc()
 
         # TODO, add errback, etc.
-        self._transaction_queue.enqueue_edu(edu)
+        self._transaction_queue.enqueue_edu(edu, key=key)
         return defer.succeed(None)
 
     @log_function
+    def send_device_messages(self, destination):
+        """Sends the device messages in the local database to the remote
+        destination"""
+        self._transaction_queue.enqueue_device_messages(destination)
+
+    @log_function
     def send_failure(self, failure, destination):
         self._transaction_queue.enqueue_failure(failure, destination)
         return defer.succeed(None)
@@ -166,7 +175,7 @@ class FederationClient(FederationBase):
         )
 
     @log_function
-    def query_client_keys(self, destination, content):
+    def query_client_keys(self, destination, content, timeout):
         """Query device keys for a device hosted on a remote server.
 
         Args:
@@ -178,10 +187,12 @@ class FederationClient(FederationBase):
             response
         """
         sent_queries_counter.inc("client_device_keys")
-        return self.transport_layer.query_client_keys(destination, content)
+        return self.transport_layer.query_client_keys(
+            destination, content, timeout
+        )
 
     @log_function
-    def claim_client_keys(self, destination, content):
+    def claim_client_keys(self, destination, content, timeout):
         """Claims one-time keys for a device hosted on a remote server.
 
         Args:
@@ -193,7 +204,9 @@ class FederationClient(FederationBase):
             response
         """
         sent_queries_counter.inc("client_one_time_keys")
-        return self.transport_layer.claim_client_keys(destination, content)
+        return self.transport_layer.claim_client_keys(
+            destination, content, timeout
+        )
 
     @defer.inlineCallbacks
     @log_function
@@ -471,7 +484,7 @@ class FederationClient(FederationBase):
                 defer.DeferredList(deferreds, consumeErrors=True)
             )
             for success, result in res:
-                if success:
+                if success and result:
                     signed_events.append(result)
                     batch.discard(result.event_id)
 
@@ -705,24 +718,14 @@ class FederationClient(FederationBase):
 
         raise RuntimeError("Failed to send to any server.")
 
-    @defer.inlineCallbacks
-    def get_public_rooms(self, destinations):
-        results_by_server = {}
-
-        @defer.inlineCallbacks
-        def _get_result(s):
-            if s == self.server_name:
-                defer.returnValue()
-
-            try:
-                result = yield self.transport_layer.get_public_rooms(s)
-                results_by_server[s] = result
-            except:
-                logger.exception("Error getting room list from server %r", s)
-
-        yield concurrently_execute(_get_result, destinations, 3)
+    def get_public_rooms(self, destination, limit=None, since_token=None,
+                         search_filter=None):
+        if destination == self.server_name:
+            return
 
-        defer.returnValue(results_by_server)
+        return self.transport_layer.get_public_rooms(
+            destination, limit, since_token, search_filter
+        )
 
     @defer.inlineCallbacks
     def query_auth(self, destination, room_id, event_id, local_auth):