summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2015-02-04 16:28:12 +0000
committerErik Johnston <erik@matrix.org>2015-02-04 16:28:12 +0000
commitae46f10fc5dba0f81518e3144ab8d9ed7a7d03bc (patch)
treeca543f43db03346741aae11c9b575e66a39dbf7a /synapse/federation
parentWhen returning lists of servers from alias lookups, put the current server fi... (diff)
downloadsynapse-ae46f10fc5dba0f81518e3144ab8d9ed7a7d03bc.tar.xz
Apply sanity to the transport client interface. Convert 'make_join' and 'send_join' to accept iterables of destinations
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/federation_client.py82
-rw-r--r--synapse/federation/transaction_queue.py23
-rw-r--r--synapse/federation/transport/client.py42
3 files changed, 85 insertions, 62 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index d6b8c43916..eb36ec040b 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -19,6 +19,7 @@ from twisted.internet import defer
 from .federation_base import FederationBase
 from .units import Edu
 
+from synapse.api.errors import CodeMessageException
 from synapse.util.logutils import log_function
 from synapse.events import FrozenEvent
 
@@ -180,7 +181,8 @@ class FederationClient(FederationBase):
                     pdu = yield self._check_sigs_and_hash(pdu)
 
                     break
-
+            except CodeMessageException:
+                raise
             except Exception as e:
                 logger.info(
                     "Failed to get PDU %s from %s because %s",
@@ -264,45 +266,63 @@ class FederationClient(FederationBase):
 
                 defer.returnValue(self.event_from_pdu_json(pdu_dict))
                 break
-            except Exception as e:
-                logger.warn("Failed to make_join via %s", destination)
+            except CodeMessageException:
+                raise
+            except RuntimeError as e:
+                logger.warn(
+                    "Failed to make_join via %s: %s",
+                    destination, e.message
+                )
+
+        raise RuntimeError("Failed to send to any server.")
 
     @defer.inlineCallbacks
-    def send_join(self, destination, pdu):
-        time_now = self._clock.time_msec()
-        _, content = yield self.transport_layer.send_join(
-            destination=destination,
-            room_id=pdu.room_id,
-            event_id=pdu.event_id,
-            content=pdu.get_pdu_json(time_now),
-        )
+    def send_join(self, destinations, pdu):
+        for destination in destinations:
+            try:
+                time_now = self._clock.time_msec()
+                _, content = yield self.transport_layer.send_join(
+                    destination=destination,
+                    room_id=pdu.room_id,
+                    event_id=pdu.event_id,
+                    content=pdu.get_pdu_json(time_now),
+                )
 
-        logger.debug("Got content: %s", content)
+                logger.debug("Got content: %s", content)
 
-        state = [
-            self.event_from_pdu_json(p, outlier=True)
-            for p in content.get("state", [])
-        ]
+                state = [
+                    self.event_from_pdu_json(p, outlier=True)
+                    for p in content.get("state", [])
+                ]
 
-        auth_chain = [
-            self.event_from_pdu_json(p, outlier=True)
-            for p in content.get("auth_chain", [])
-        ]
+                auth_chain = [
+                    self.event_from_pdu_json(p, outlier=True)
+                    for p in content.get("auth_chain", [])
+                ]
 
-        signed_state = yield self._check_sigs_and_hash_and_fetch(
-            destination, state, outlier=True
-        )
+                signed_state = yield self._check_sigs_and_hash_and_fetch(
+                    destination, state, outlier=True
+                )
 
-        signed_auth = yield self._check_sigs_and_hash_and_fetch(
-            destination, auth_chain, outlier=True
-        )
+                signed_auth = yield self._check_sigs_and_hash_and_fetch(
+                    destination, auth_chain, outlier=True
+                )
 
-        auth_chain.sort(key=lambda e: e.depth)
+                auth_chain.sort(key=lambda e: e.depth)
+
+                defer.returnValue({
+                    "state": signed_state,
+                    "auth_chain": signed_auth,
+                })
+            except CodeMessageException:
+                raise
+            except RuntimeError as e:
+                logger.warn(
+                    "Failed to send_join via %s: %s",
+                    destination, e.message
+                )
 
-        defer.returnValue({
-            "state": signed_state,
-            "auth_chain": signed_auth,
-        })
+        raise RuntimeError("Failed to send to any server.")
 
     @defer.inlineCallbacks
     def send_invite(self, destination, room_id, event_id, pdu):
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 9d4f2c09a2..f38aeba7cc 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -19,6 +19,7 @@ from twisted.internet import defer
 from .persistence import TransactionActions
 from .units import Transaction
 
+from synapse.api.errors import HttpResponseException
 from synapse.util.logutils import log_function
 from synapse.util.logcontext import PreserveLoggingContext
 
@@ -238,9 +239,14 @@ class TransactionQueue(object):
                             del p["age_ts"]
                 return data
 
-            code, response = yield self.transport_layer.send_transaction(
-                transaction, json_data_cb
-            )
+            try:
+                response = yield self.transport_layer.send_transaction(
+                    transaction, json_data_cb
+                )
+                code = 200
+            except HttpResponseException as e:
+                code = e.code
+                response = e.response
 
             logger.info("TX [%s] got %d response", destination, code)
 
@@ -274,8 +280,7 @@ class TransactionQueue(object):
                     pass
 
             logger.debug("TX [%s] Yielded to callbacks", destination)
-
-        except Exception as e:
+        except RuntimeError as e:
             # We capture this here as there as nothing actually listens
             # for this finishing functions deferred.
             logger.warn(
@@ -283,6 +288,14 @@ class TransactionQueue(object):
                 destination,
                 e,
             )
+        except Exception as e:
+            # We capture this here as there as nothing actually listens
+            # for this finishing functions deferred.
+            logger.exception(
+                "TX [%s] Problem in _attempt_transaction: %s",
+                destination,
+                e,
+            )
 
             self.set_retrying(destination, retry_interval)
 
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 4cb1dea2de..8b137e7128 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -19,7 +19,6 @@ from synapse.api.urls import FEDERATION_PREFIX as PREFIX
 from synapse.util.logutils import log_function
 
 import logging
-import json
 
 
 logger = logging.getLogger(__name__)
@@ -129,7 +128,7 @@ class TransportLayerClient(object):
         # generated by the json_data_callback.
         json_data = transaction.get_dict()
 
-        code, response = yield self.client.put_json(
+        response = yield self.client.put_json(
             transaction.destination,
             path=PREFIX + "/send/%s/" % transaction.transaction_id,
             data=json_data,
@@ -137,95 +136,86 @@ class TransportLayerClient(object):
         )
 
         logger.debug(
-            "send_data dest=%s, txid=%s, got response: %d",
-            transaction.destination, transaction.transaction_id, code
+            "send_data dest=%s, txid=%s, got response: 200",
+            transaction.destination, transaction.transaction_id,
         )
 
-        defer.returnValue((code, response))
+        defer.returnValue(response)
 
     @defer.inlineCallbacks
     @log_function
     def make_query(self, destination, query_type, args, retry_on_dns_fail):
         path = PREFIX + "/query/%s" % query_type
 
-        response = yield self.client.get_json(
+        content = yield self.client.get_json(
             destination=destination,
             path=path,
             args=args,
             retry_on_dns_fail=retry_on_dns_fail,
         )
 
-        defer.returnValue(response)
+        defer.returnValue(content)
 
     @defer.inlineCallbacks
     @log_function
     def make_join(self, destination, room_id, user_id, retry_on_dns_fail=True):
         path = PREFIX + "/make_join/%s/%s" % (room_id, user_id)
 
-        response = yield self.client.get_json(
+        content = yield self.client.get_json(
             destination=destination,
             path=path,
             retry_on_dns_fail=retry_on_dns_fail,
         )
 
-        defer.returnValue(response)
+        defer.returnValue(content)
 
     @defer.inlineCallbacks
     @log_function
     def send_join(self, destination, room_id, event_id, content):
         path = PREFIX + "/send_join/%s/%s" % (room_id, event_id)
 
-        code, content = yield self.client.put_json(
+        response = yield self.client.put_json(
             destination=destination,
             path=path,
             data=content,
         )
 
-        if not 200 <= code < 300:
-            raise RuntimeError("Got %d from send_join", code)
-
-        defer.returnValue(json.loads(content))
+        defer.returnValue(response)
 
     @defer.inlineCallbacks
     @log_function
     def send_invite(self, destination, room_id, event_id, content):
         path = PREFIX + "/invite/%s/%s" % (room_id, event_id)
 
-        code, content = yield self.client.put_json(
+        response = yield self.client.put_json(
             destination=destination,
             path=path,
             data=content,
         )
 
-        if not 200 <= code < 300:
-            raise RuntimeError("Got %d from send_invite", code)
-
-        defer.returnValue(json.loads(content))
+        defer.returnValue(response)
 
     @defer.inlineCallbacks
     @log_function
     def get_event_auth(self, destination, room_id, event_id):
         path = PREFIX + "/event_auth/%s/%s" % (room_id, event_id)
 
-        response = yield self.client.get_json(
+        content = yield self.client.get_json(
             destination=destination,
             path=path,
         )
 
-        defer.returnValue(response)
+        defer.returnValue(content)
 
     @defer.inlineCallbacks
     @log_function
     def send_query_auth(self, destination, room_id, event_id, content):
         path = PREFIX + "/query_auth/%s/%s" % (room_id, event_id)
 
-        code, content = yield self.client.post_json(
+        content = yield self.client.post_json(
             destination=destination,
             path=path,
             data=content,
         )
 
-        if not 200 <= code < 300:
-            raise RuntimeError("Got %d from send_invite", code)
-
-        defer.returnValue(json.loads(content))
+        defer.returnValue(content)