summary refs log tree commit diff
path: root/synapse/federation/federation_client.py
diff options
context:
space:
mode:
authorKegan Dougal <kegan@matrix.org>2015-02-05 14:28:03 +0000
committerKegan Dougal <kegan@matrix.org>2015-02-05 14:28:03 +0000
commit951690e54d5673431abefafc30c94af6e11341e3 (patch)
tree779471113f16dfbf7e67504731dd412101eee180 /synapse/federation/federation_client.py
parentFix user query checks. HS>AS pushing now works. (diff)
parentMerge branch 'federation_client_retries' of github.com:matrix-org/synapse int... (diff)
downloadsynapse-951690e54d5673431abefafc30c94af6e11341e3.tar.xz
Merge branch 'develop' into application-services
Diffstat (limited to 'synapse/federation/federation_client.py')
-rw-r--r--synapse/federation/federation_client.py181
1 files changed, 86 insertions, 95 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index e1539bd0e0..70c9a6f46b 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -16,17 +16,12 @@
 
 from twisted.internet import defer
 
+from .federation_base import FederationBase
 from .units import Edu
 
+from synapse.api.errors import CodeMessageException
 from synapse.util.logutils import log_function
 from synapse.events import FrozenEvent
-from synapse.events.utils import prune_event
-
-from syutil.jsonutil import encode_canonical_json
-
-from synapse.crypto.event_signing import check_event_content_hash
-
-from synapse.api.errors import SynapseError
 
 import logging
 
@@ -34,7 +29,7 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-class FederationClient(object):
+class FederationClient(FederationBase):
     @log_function
     def send_pdu(self, pdu, destinations):
         """Informs the replication layer about a new PDU generated within the
@@ -186,7 +181,8 @@ class FederationClient(object):
                     pdu = yield self._check_sigs_and_hash(pdu)
 
                     break
-
+            except CodeMessageException:
+                raise
             except Exception as e:
                 logger.info(
                     "Failed to get PDU %s from %s because %s",
@@ -224,17 +220,17 @@ class FederationClient(object):
             for p in result.get("auth_chain", [])
         ]
 
-        for i, pdu in enumerate(pdus):
-            pdus[i] = yield self._check_sigs_and_hash(pdu)
-
-            # FIXME: We should handle signature failures more gracefully.
+        signed_pdus = yield self._check_sigs_and_hash_and_fetch(
+            destination, pdus, outlier=True
+        )
 
-        for i, pdu in enumerate(auth_chain):
-            auth_chain[i] = yield self._check_sigs_and_hash(pdu)
+        signed_auth = yield self._check_sigs_and_hash_and_fetch(
+            destination, auth_chain, outlier=True
+        )
 
-            # FIXME: We should handle signature failures more gracefully.
+        signed_auth.sort(key=lambda e: e.depth)
 
-        defer.returnValue((pdus, auth_chain))
+        defer.returnValue((signed_pdus, signed_auth))
 
     @defer.inlineCallbacks
     @log_function
@@ -248,65 +244,88 @@ class FederationClient(object):
             for p in res["auth_chain"]
         ]
 
-        for i, pdu in enumerate(auth_chain):
-            auth_chain[i] = yield self._check_sigs_and_hash(pdu)
-
-            # FIXME: We should handle signature failures more gracefully.
+        signed_auth = yield self._check_sigs_and_hash_and_fetch(
+            destination, auth_chain, outlier=True
+        )
 
-        auth_chain.sort(key=lambda e: e.depth)
+        signed_auth.sort(key=lambda e: e.depth)
 
-        defer.returnValue(auth_chain)
+        defer.returnValue(signed_auth)
 
     @defer.inlineCallbacks
-    def make_join(self, destination, room_id, user_id):
-        ret = yield self.transport_layer.make_join(
-            destination, room_id, user_id
-        )
+    def make_join(self, destinations, room_id, user_id):
+        for destination in destinations:
+            try:
+                ret = yield self.transport_layer.make_join(
+                    destination, room_id, user_id
+                )
 
-        pdu_dict = ret["event"]
+                pdu_dict = ret["event"]
 
-        logger.debug("Got response to make_join: %s", pdu_dict)
+                logger.debug("Got response to make_join: %s", pdu_dict)
 
-        defer.returnValue(self.event_from_pdu_json(pdu_dict))
+                defer.returnValue(
+                    (destination, self.event_from_pdu_json(pdu_dict))
+                )
+                break
+            except CodeMessageException:
+                raise
+            except Exception as e:
+                logger.warn(
+                    "Failed to make_join via %s: %s",
+                    destination, e.message
+                )
 
-    @defer.inlineCallbacks
-    def send_join(self, destination, pdu):
-        time_now = self._clock.time_msec()
-        _, content = yield self.transport_layer.send_join(
-            destination=destination,
-            room_id=pdu.room_id,
-            event_id=pdu.event_id,
-            content=pdu.get_pdu_json(time_now),
-        )
+        raise RuntimeError("Failed to send to any server.")
 
-        logger.debug("Got content: %s", content)
+    @defer.inlineCallbacks
+    def send_join(self, destinations, pdu):
+        for destination in destinations:
+            try:
+                time_now = self._clock.time_msec()
+                _, content = yield self.transport_layer.send_join(
+                    destination=destination,
+                    room_id=pdu.room_id,
+                    event_id=pdu.event_id,
+                    content=pdu.get_pdu_json(time_now),
+                )
 
-        state = [
-            self.event_from_pdu_json(p, outlier=True)
-            for p in content.get("state", [])
-        ]
+                logger.debug("Got content: %s", content)
 
-        auth_chain = [
-            self.event_from_pdu_json(p, outlier=True)
-            for p in content.get("auth_chain", [])
-        ]
+                state = [
+                    self.event_from_pdu_json(p, outlier=True)
+                    for p in content.get("state", [])
+                ]
 
-        for i, pdu in enumerate(state):
-            state[i] = yield self._check_sigs_and_hash(pdu)
+                auth_chain = [
+                    self.event_from_pdu_json(p, outlier=True)
+                    for p in content.get("auth_chain", [])
+                ]
 
-            # FIXME: We should handle signature failures more gracefully.
+                signed_state = yield self._check_sigs_and_hash_and_fetch(
+                    destination, state, outlier=True
+                )
 
-        for i, pdu in enumerate(auth_chain):
-            auth_chain[i] = yield self._check_sigs_and_hash(pdu)
+                signed_auth = yield self._check_sigs_and_hash_and_fetch(
+                    destination, auth_chain, outlier=True
+                )
 
-            # FIXME: We should handle signature failures more gracefully.
+                auth_chain.sort(key=lambda e: e.depth)
 
-        auth_chain.sort(key=lambda e: e.depth)
+                defer.returnValue({
+                    "state": signed_state,
+                    "auth_chain": signed_auth,
+                    "origin": destination,
+                })
+            except CodeMessageException:
+                raise
+            except Exception as e:
+                logger.warn(
+                    "Failed to send_join via %s: %s",
+                    destination, e.message
+                )
 
-        defer.returnValue({
-            "state": state,
-            "auth_chain": auth_chain,
-        })
+        raise RuntimeError("Failed to send to any server.")
 
     @defer.inlineCallbacks
     def send_invite(self, destination, room_id, event_id, pdu):
@@ -353,12 +372,18 @@ class FederationClient(object):
         )
 
         auth_chain = [
-            (yield self._check_sigs_and_hash(self.event_from_pdu_json(e)))
+            self.event_from_pdu_json(e)
             for e in content["auth_chain"]
         ]
 
+        signed_auth = yield self._check_sigs_and_hash_and_fetch(
+            destination, auth_chain, outlier=True
+        )
+
+        signed_auth.sort(key=lambda e: e.depth)
+
         ret = {
-            "auth_chain": auth_chain,
+            "auth_chain": signed_auth,
             "rejects": content.get("rejects", []),
             "missing": content.get("missing", []),
         }
@@ -373,37 +398,3 @@ class FederationClient(object):
         event.internal_metadata.outlier = outlier
 
         return event
-
-    @defer.inlineCallbacks
-    def _check_sigs_and_hash(self, pdu):
-        """Throws a SynapseError if the PDU does not have the correct
-        signatures.
-
-        Returns:
-            FrozenEvent: Either the given event or it redacted if it failed the
-            content hash check.
-        """
-        # Check signatures are correct.
-        redacted_event = prune_event(pdu)
-        redacted_pdu_json = redacted_event.get_pdu_json()
-
-        try:
-            yield self.keyring.verify_json_for_server(
-                pdu.origin, redacted_pdu_json
-            )
-        except SynapseError:
-            logger.warn(
-                "Signature check failed for %s redacted to %s",
-                encode_canonical_json(pdu.get_pdu_json()),
-                encode_canonical_json(redacted_pdu_json),
-            )
-            raise
-
-        if not check_event_content_hash(pdu):
-            logger.warn(
-                "Event content has been tampered, redacting %s, %s",
-                pdu.event_id, encode_canonical_json(pdu.get_dict())
-            )
-            defer.returnValue(redacted_event)
-
-        defer.returnValue(pdu)