summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/api/errors.py8
-rw-r--r--synapse/federation/federation_client.py100
-rw-r--r--synapse/federation/transaction_queue.py23
-rw-r--r--synapse/federation/transport/client.py42
-rw-r--r--synapse/handlers/directory.py11
-rw-r--r--synapse/handlers/federation.py20
-rw-r--r--synapse/handlers/room.py12
-rw-r--r--synapse/http/matrixfederationclient.py42
8 files changed, 169 insertions, 89 deletions
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index ad478aa6b7..5041828f18 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -39,7 +39,7 @@ class Codes(object):
     TOO_LARGE = "M_TOO_LARGE"
 
 
-class CodeMessageException(Exception):
+class CodeMessageException(RuntimeError):
     """An exception with integer code and message string attributes."""
 
     def __init__(self, code, msg):
@@ -227,3 +227,9 @@ class FederationError(RuntimeError):
             "affected": self.affected,
             "source": self.source if self.source else self.affected,
         }
+
+
+class HttpResponseException(CodeMessageException):
+    def __init__(self, code, msg, response):
+        self.response = response
+        super(HttpResponseException, self).__init__(code, msg)
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 5fac629709..70c9a6f46b 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -19,6 +19,7 @@ from twisted.internet import defer
 from .federation_base import FederationBase
 from .units import Edu
 
+from synapse.api.errors import CodeMessageException
 from synapse.util.logutils import log_function
 from synapse.events import FrozenEvent
 
@@ -180,7 +181,8 @@ class FederationClient(FederationBase):
                     pdu = yield self._check_sigs_and_hash(pdu)
 
                     break
-
+            except CodeMessageException:
+                raise
             except Exception as e:
                 logger.info(
                     "Failed to get PDU %s from %s because %s",
@@ -251,53 +253,79 @@ class FederationClient(FederationBase):
         defer.returnValue(signed_auth)
 
     @defer.inlineCallbacks
-    def make_join(self, destination, room_id, user_id):
-        ret = yield self.transport_layer.make_join(
-            destination, room_id, user_id
-        )
+    def make_join(self, destinations, room_id, user_id):
+        for destination in destinations:
+            try:
+                ret = yield self.transport_layer.make_join(
+                    destination, room_id, user_id
+                )
 
-        pdu_dict = ret["event"]
+                pdu_dict = ret["event"]
 
-        logger.debug("Got response to make_join: %s", pdu_dict)
+                logger.debug("Got response to make_join: %s", pdu_dict)
+
+                defer.returnValue(
+                    (destination, self.event_from_pdu_json(pdu_dict))
+                )
+                break
+            except CodeMessageException:
+                raise
+            except Exception as e:
+                logger.warn(
+                    "Failed to make_join via %s: %s",
+                    destination, e.message
+                )
 
-        defer.returnValue(self.event_from_pdu_json(pdu_dict))
+        raise RuntimeError("Failed to send to any server.")
 
     @defer.inlineCallbacks
-    def send_join(self, destination, pdu):
-        time_now = self._clock.time_msec()
-        _, content = yield self.transport_layer.send_join(
-            destination=destination,
-            room_id=pdu.room_id,
-            event_id=pdu.event_id,
-            content=pdu.get_pdu_json(time_now),
-        )
+    def send_join(self, destinations, pdu):
+        for destination in destinations:
+            try:
+                time_now = self._clock.time_msec()
+                _, content = yield self.transport_layer.send_join(
+                    destination=destination,
+                    room_id=pdu.room_id,
+                    event_id=pdu.event_id,
+                    content=pdu.get_pdu_json(time_now),
+                )
 
-        logger.debug("Got content: %s", content)
+                logger.debug("Got content: %s", content)
 
-        state = [
-            self.event_from_pdu_json(p, outlier=True)
-            for p in content.get("state", [])
-        ]
+                state = [
+                    self.event_from_pdu_json(p, outlier=True)
+                    for p in content.get("state", [])
+                ]
 
-        auth_chain = [
-            self.event_from_pdu_json(p, outlier=True)
-            for p in content.get("auth_chain", [])
-        ]
+                auth_chain = [
+                    self.event_from_pdu_json(p, outlier=True)
+                    for p in content.get("auth_chain", [])
+                ]
 
-        signed_state = yield self._check_sigs_and_hash_and_fetch(
-            destination, state, outlier=True
-        )
+                signed_state = yield self._check_sigs_and_hash_and_fetch(
+                    destination, state, outlier=True
+                )
 
-        signed_auth = yield self._check_sigs_and_hash_and_fetch(
-            destination, auth_chain, outlier=True
-        )
+                signed_auth = yield self._check_sigs_and_hash_and_fetch(
+                    destination, auth_chain, outlier=True
+                )
 
-        auth_chain.sort(key=lambda e: e.depth)
+                auth_chain.sort(key=lambda e: e.depth)
 
-        defer.returnValue({
-            "state": signed_state,
-            "auth_chain": signed_auth,
-        })
+                defer.returnValue({
+                    "state": signed_state,
+                    "auth_chain": signed_auth,
+                    "origin": destination,
+                })
+            except CodeMessageException:
+                raise
+            except Exception as e:
+                logger.warn(
+                    "Failed to send_join via %s: %s",
+                    destination, e.message
+                )
+
+        raise RuntimeError("Failed to send to any server.")
 
     @defer.inlineCallbacks
     def send_invite(self, destination, room_id, event_id, pdu):
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 9d4f2c09a2..f38aeba7cc 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -19,6 +19,7 @@ from twisted.internet import defer
 from .persistence import TransactionActions
 from .units import Transaction
 
+from synapse.api.errors import HttpResponseException
 from synapse.util.logutils import log_function
 from synapse.util.logcontext import PreserveLoggingContext
 
@@ -238,9 +239,14 @@ class TransactionQueue(object):
                             del p["age_ts"]
                 return data
 
-            code, response = yield self.transport_layer.send_transaction(
-                transaction, json_data_cb
-            )
+            try:
+                response = yield self.transport_layer.send_transaction(
+                    transaction, json_data_cb
+                )
+                code = 200
+            except HttpResponseException as e:
+                code = e.code
+                response = e.response
 
             logger.info("TX [%s] got %d response", destination, code)
 
@@ -274,8 +280,7 @@ class TransactionQueue(object):
                     pass
 
             logger.debug("TX [%s] Yielded to callbacks", destination)
-
-        except Exception as e:
+        except RuntimeError as e:
             # We capture this here as there as nothing actually listens
             # for this finishing functions deferred.
             logger.warn(
@@ -283,6 +288,14 @@ class TransactionQueue(object):
                 destination,
                 e,
             )
+        except Exception as e:
+            # We capture this here as there as nothing actually listens
+            # for this finishing functions deferred.
+            logger.exception(
+                "TX [%s] Problem in _attempt_transaction: %s",
+                destination,
+                e,
+            )
 
             self.set_retrying(destination, retry_interval)
 
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 4cb1dea2de..8b137e7128 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -19,7 +19,6 @@ from synapse.api.urls import FEDERATION_PREFIX as PREFIX
 from synapse.util.logutils import log_function
 
 import logging
-import json
 
 
 logger = logging.getLogger(__name__)
@@ -129,7 +128,7 @@ class TransportLayerClient(object):
         # generated by the json_data_callback.
         json_data = transaction.get_dict()
 
-        code, response = yield self.client.put_json(
+        response = yield self.client.put_json(
             transaction.destination,
             path=PREFIX + "/send/%s/" % transaction.transaction_id,
             data=json_data,
@@ -137,95 +136,86 @@ class TransportLayerClient(object):
         )
 
         logger.debug(
-            "send_data dest=%s, txid=%s, got response: %d",
-            transaction.destination, transaction.transaction_id, code
+            "send_data dest=%s, txid=%s, got response: 200",
+            transaction.destination, transaction.transaction_id,
         )
 
-        defer.returnValue((code, response))
+        defer.returnValue(response)
 
     @defer.inlineCallbacks
     @log_function
     def make_query(self, destination, query_type, args, retry_on_dns_fail):
         path = PREFIX + "/query/%s" % query_type
 
-        response = yield self.client.get_json(
+        content = yield self.client.get_json(
             destination=destination,
             path=path,
             args=args,
             retry_on_dns_fail=retry_on_dns_fail,
         )
 
-        defer.returnValue(response)
+        defer.returnValue(content)
 
     @defer.inlineCallbacks
     @log_function
     def make_join(self, destination, room_id, user_id, retry_on_dns_fail=True):
         path = PREFIX + "/make_join/%s/%s" % (room_id, user_id)
 
-        response = yield self.client.get_json(
+        content = yield self.client.get_json(
             destination=destination,
             path=path,
             retry_on_dns_fail=retry_on_dns_fail,
         )
 
-        defer.returnValue(response)
+        defer.returnValue(content)
 
     @defer.inlineCallbacks
     @log_function
     def send_join(self, destination, room_id, event_id, content):
         path = PREFIX + "/send_join/%s/%s" % (room_id, event_id)
 
-        code, content = yield self.client.put_json(
+        response = yield self.client.put_json(
             destination=destination,
             path=path,
             data=content,
         )
 
-        if not 200 <= code < 300:
-            raise RuntimeError("Got %d from send_join", code)
-
-        defer.returnValue(json.loads(content))
+        defer.returnValue(response)
 
     @defer.inlineCallbacks
     @log_function
     def send_invite(self, destination, room_id, event_id, content):
         path = PREFIX + "/invite/%s/%s" % (room_id, event_id)
 
-        code, content = yield self.client.put_json(
+        response = yield self.client.put_json(
             destination=destination,
             path=path,
             data=content,
         )
 
-        if not 200 <= code < 300:
-            raise RuntimeError("Got %d from send_invite", code)
-
-        defer.returnValue(json.loads(content))
+        defer.returnValue(response)
 
     @defer.inlineCallbacks
     @log_function
     def get_event_auth(self, destination, room_id, event_id):
         path = PREFIX + "/event_auth/%s/%s" % (room_id, event_id)
 
-        response = yield self.client.get_json(
+        content = yield self.client.get_json(
             destination=destination,
             path=path,
         )
 
-        defer.returnValue(response)
+        defer.returnValue(content)
 
     @defer.inlineCallbacks
     @log_function
     def send_query_auth(self, destination, room_id, event_id, content):
         path = PREFIX + "/query_auth/%s/%s" % (room_id, event_id)
 
-        code, content = yield self.client.post_json(
+        content = yield self.client.post_json(
             destination=destination,
             path=path,
             data=content,
         )
 
-        if not 200 <= code < 300:
-            raise RuntimeError("Got %d from send_invite", code)
-
-        defer.returnValue(json.loads(content))
+        defer.returnValue(content)
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 58e9a91562..7b60921040 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -113,7 +113,16 @@ class DirectoryHandler(BaseHandler):
             )
 
         extra_servers = yield self.store.get_joined_hosts_for_room(room_id)
-        servers = list(set(extra_servers) | set(servers))
+        servers = set(extra_servers) | set(servers)
+
+        # If this server is in the list of servers, return it first.
+        if self.server_name in servers:
+            servers = (
+                [self.server_name]
+                + [s for s in servers if s != self.server_name]
+            )
+        else:
+            servers = list(servers)
 
         defer.returnValue({
             "room_id": room_id,
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 2e2c23ef63..aba266c2bc 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -273,7 +273,7 @@ class FederationHandler(BaseHandler):
 
     @log_function
     @defer.inlineCallbacks
-    def do_invite_join(self, target_host, room_id, joinee, content, snapshot):
+    def do_invite_join(self, target_hosts, room_id, joinee, content, snapshot):
         """ Attempts to join the `joinee` to the room `room_id` via the
         server `target_host`.
 
@@ -287,8 +287,8 @@ class FederationHandler(BaseHandler):
         """
         logger.debug("Joining %s to %s", joinee, room_id)
 
-        pdu = yield self.replication_layer.make_join(
-            target_host,
+        origin, pdu = yield self.replication_layer.make_join(
+            target_hosts,
             room_id,
             joinee
         )
@@ -330,11 +330,17 @@ class FederationHandler(BaseHandler):
 
             new_event = builder.build()
 
+            # Try the host we successfully got a response to /make_join/
+            # request first.
+            target_hosts.remove(origin)
+            target_hosts.insert(0, origin)
+
             ret = yield self.replication_layer.send_join(
-                target_host,
+                target_hosts,
                 new_event
             )
 
+            origin = ret["origin"]
             state = ret["state"]
             auth_chain = ret["auth_chain"]
             auth_chain.sort(key=lambda e: e.depth)
@@ -371,7 +377,7 @@ class FederationHandler(BaseHandler):
                         if e.event_id in auth_ids
                     }
                     yield self._handle_new_event(
-                        target_host, e, auth_events=auth
+                        origin, e, auth_events=auth
                     )
                 except:
                     logger.exception(
@@ -391,7 +397,7 @@ class FederationHandler(BaseHandler):
                         if e.event_id in auth_ids
                     }
                     yield self._handle_new_event(
-                        target_host, e, auth_events=auth
+                        origin, e, auth_events=auth
                     )
                 except:
                     logger.exception(
@@ -406,7 +412,7 @@ class FederationHandler(BaseHandler):
             }
 
             yield self._handle_new_event(
-                target_host,
+                origin,
                 new_event,
                 state=state,
                 current_state=state,
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 23821d321f..0369b907a5 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -389,8 +389,6 @@ class RoomMemberHandler(BaseHandler):
         if not hosts:
             raise SynapseError(404, "No known servers")
 
-        host = hosts[0]
-
         # If event doesn't include a display name, add one.
         yield self.distributor.fire(
             "collect_presencelike_data", joinee, content
@@ -407,12 +405,12 @@ class RoomMemberHandler(BaseHandler):
         })
         event, context = yield self._create_new_client_event(builder)
 
-        yield self._do_join(event, context, room_host=host, do_auth=True)
+        yield self._do_join(event, context, room_hosts=hosts, do_auth=True)
 
         defer.returnValue({"room_id": room_id})
 
     @defer.inlineCallbacks
-    def _do_join(self, event, context, room_host=None, do_auth=True):
+    def _do_join(self, event, context, room_hosts=None, do_auth=True):
         joinee = UserID.from_string(event.state_key)
         # room_id = RoomID.from_string(event.room_id, self.hs)
         room_id = event.room_id
@@ -441,7 +439,7 @@ class RoomMemberHandler(BaseHandler):
 
         if is_host_in_room:
             should_do_dance = False
-        elif room_host:  # TODO: Shouldn't this be remote_room_host?
+        elif room_hosts:  # TODO: Shouldn't this be remote_room_host?
             should_do_dance = True
         else:
             # TODO(markjh): get prev_state from snapshot
@@ -453,7 +451,7 @@ class RoomMemberHandler(BaseHandler):
                 inviter = UserID.from_string(prev_state.user_id)
 
                 should_do_dance = not self.hs.is_mine(inviter)
-                room_host = inviter.domain
+                room_hosts = [inviter.domain]
             else:
                 # return the same error as join_room_alias does
                 raise SynapseError(404, "No known servers")
@@ -461,7 +459,7 @@ class RoomMemberHandler(BaseHandler):
         if should_do_dance:
             handler = self.hs.get_handlers().federation_handler
             yield handler.do_invite_join(
-                room_host,
+                room_hosts,
                 room_id,
                 event.user_id,
                 event.get_dict()["content"],  # FIXME To get a non-frozen dict
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index c7bf1b47b8..8559d06b7f 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -27,7 +27,9 @@ from synapse.util.logcontext import PreserveLoggingContext
 
 from syutil.jsonutil import encode_canonical_json
 
-from synapse.api.errors import CodeMessageException, SynapseError, Codes
+from synapse.api.errors import (
+    SynapseError, Codes, HttpResponseException,
+)
 
 from syutil.crypto.jsonsign import sign_json
 
@@ -163,13 +165,12 @@ class MatrixFederationHttpClient(object):
         )
 
         if 200 <= response.code < 300:
-            # We need to update the transactions table to say it was sent?
             pass
         else:
             # :'(
             # Update transactions table?
-            raise CodeMessageException(
-                response.code, response.phrase
+            raise HttpResponseException(
+                response.code, response.phrase, response
             )
 
         defer.returnValue(response)
@@ -238,11 +239,20 @@ class MatrixFederationHttpClient(object):
             headers_dict={"Content-Type": ["application/json"]},
         )
 
+        if 200 <= response.code < 300:
+            # We need to update the transactions table to say it was sent?
+            c_type = response.headers.getRawHeaders("Content-Type")
+
+            if "application/json" not in c_type:
+                raise RuntimeError(
+                    "Content-Type not application/json"
+                )
+
         logger.debug("Getting resp body")
         body = yield readBody(response)
         logger.debug("Got resp body")
 
-        defer.returnValue((response.code, body))
+        defer.returnValue(json.loads(body))
 
     @defer.inlineCallbacks
     def post_json(self, destination, path, data={}):
@@ -275,11 +285,20 @@ class MatrixFederationHttpClient(object):
             headers_dict={"Content-Type": ["application/json"]},
         )
 
+        if 200 <= response.code < 300:
+            # We need to update the transactions table to say it was sent?
+            c_type = response.headers.getRawHeaders("Content-Type")
+
+            if "application/json" not in c_type:
+                raise RuntimeError(
+                    "Content-Type not application/json"
+                )
+
         logger.debug("Getting resp body")
         body = yield readBody(response)
         logger.debug("Got resp body")
 
-        defer.returnValue((response.code, body))
+        defer.returnValue(json.loads(body))
 
     @defer.inlineCallbacks
     def get_json(self, destination, path, args={}, retry_on_dns_fail=True):
@@ -321,7 +340,18 @@ class MatrixFederationHttpClient(object):
             retry_on_dns_fail=retry_on_dns_fail
         )
 
+        if 200 <= response.code < 300:
+            # We need to update the transactions table to say it was sent?
+            c_type = response.headers.getRawHeaders("Content-Type")
+
+            if "application/json" not in c_type:
+                raise RuntimeError(
+                    "Content-Type not application/json"
+                )
+
+        logger.debug("Getting resp body")
         body = yield readBody(response)
+        logger.debug("Got resp body")
 
         defer.returnValue(json.loads(body))