summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/federation_client.py142
-rw-r--r--synapse/federation/federation_server.py141
-rw-r--r--synapse/federation/replication.py2
-rw-r--r--synapse/federation/transport/client.py16
-rw-r--r--synapse/federation/transport/server.py21
5 files changed, 296 insertions, 26 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index c80f4c61bc..e1539bd0e0 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -20,6 +20,13 @@ from .units import Edu
 
 from synapse.util.logutils import log_function
 from synapse.events import FrozenEvent
+from synapse.events.utils import prune_event
+
+from syutil.jsonutil import encode_canonical_json
+
+from synapse.crypto.event_signing import check_event_content_hash
+
+from synapse.api.errors import SynapseError
 
 import logging
 
@@ -126,6 +133,11 @@ class FederationClient(object):
             for p in transaction_data["pdus"]
         ]
 
+        for i, pdu in enumerate(pdus):
+            pdus[i] = yield self._check_sigs_and_hash(pdu)
+
+            # FIXME: We should handle signature failures more gracefully.
+
         defer.returnValue(pdus)
 
     @defer.inlineCallbacks
@@ -159,6 +171,22 @@ class FederationClient(object):
                 transaction_data = yield self.transport_layer.get_event(
                     destination, event_id
                 )
+
+                logger.debug("transaction_data %r", transaction_data)
+
+                pdu_list = [
+                    self.event_from_pdu_json(p, outlier=outlier)
+                    for p in transaction_data["pdus"]
+                ]
+
+                if pdu_list:
+                    pdu = pdu_list[0]
+
+                    # Check signatures are correct.
+                    pdu = yield self._check_sigs_and_hash(pdu)
+
+                    break
+
             except Exception as e:
                 logger.info(
                     "Failed to get PDU %s from %s because %s",
@@ -166,18 +194,6 @@ class FederationClient(object):
                 )
                 continue
 
-            logger.debug("transaction_data %r", transaction_data)
-
-            pdu_list = [
-                self.event_from_pdu_json(p, outlier=outlier)
-                for p in transaction_data["pdus"]
-            ]
-
-            if pdu_list:
-                pdu = pdu_list[0]
-                # TODO: We need to check signatures here
-                break
-
         defer.returnValue(pdu)
 
     @defer.inlineCallbacks
@@ -208,6 +224,16 @@ class FederationClient(object):
             for p in result.get("auth_chain", [])
         ]
 
+        for i, pdu in enumerate(pdus):
+            pdus[i] = yield self._check_sigs_and_hash(pdu)
+
+            # FIXME: We should handle signature failures more gracefully.
+
+        for i, pdu in enumerate(auth_chain):
+            auth_chain[i] = yield self._check_sigs_and_hash(pdu)
+
+            # FIXME: We should handle signature failures more gracefully.
+
         defer.returnValue((pdus, auth_chain))
 
     @defer.inlineCallbacks
@@ -222,6 +248,11 @@ class FederationClient(object):
             for p in res["auth_chain"]
         ]
 
+        for i, pdu in enumerate(auth_chain):
+            auth_chain[i] = yield self._check_sigs_and_hash(pdu)
+
+            # FIXME: We should handle signature failures more gracefully.
+
         auth_chain.sort(key=lambda e: e.depth)
 
         defer.returnValue(auth_chain)
@@ -260,6 +291,16 @@ class FederationClient(object):
             for p in content.get("auth_chain", [])
         ]
 
+        for i, pdu in enumerate(state):
+            state[i] = yield self._check_sigs_and_hash(pdu)
+
+            # FIXME: We should handle signature failures more gracefully.
+
+        for i, pdu in enumerate(auth_chain):
+            auth_chain[i] = yield self._check_sigs_and_hash(pdu)
+
+            # FIXME: We should handle signature failures more gracefully.
+
         auth_chain.sort(key=lambda e: e.depth)
 
         defer.returnValue({
@@ -281,7 +322,48 @@ class FederationClient(object):
 
         logger.debug("Got response to send_invite: %s", pdu_dict)
 
-        defer.returnValue(self.event_from_pdu_json(pdu_dict))
+        pdu = self.event_from_pdu_json(pdu_dict)
+
+        # Check signatures are correct.
+        pdu = yield self._check_sigs_and_hash(pdu)
+
+        # FIXME: We should handle signature failures more gracefully.
+
+        defer.returnValue(pdu)
+
+    @defer.inlineCallbacks
+    def query_auth(self, destination, room_id, event_id, local_auth):
+        """
+        Params:
+            destination (str)
+            event_it (str)
+            local_auth (list)
+        """
+        time_now = self._clock.time_msec()
+
+        send_content = {
+            "auth_chain": [e.get_pdu_json(time_now) for e in local_auth],
+        }
+
+        code, content = yield self.transport_layer.send_query_auth(
+            destination=destination,
+            room_id=room_id,
+            event_id=event_id,
+            content=send_content,
+        )
+
+        auth_chain = [
+            (yield self._check_sigs_and_hash(self.event_from_pdu_json(e)))
+            for e in content["auth_chain"]
+        ]
+
+        ret = {
+            "auth_chain": auth_chain,
+            "rejects": content.get("rejects", []),
+            "missing": content.get("missing", []),
+        }
+
+        defer.returnValue(ret)
 
     def event_from_pdu_json(self, pdu_json, outlier=False):
         event = FrozenEvent(
@@ -291,3 +373,37 @@ class FederationClient(object):
         event.internal_metadata.outlier = outlier
 
         return event
+
+    @defer.inlineCallbacks
+    def _check_sigs_and_hash(self, pdu):
+        """Throws a SynapseError if the PDU does not have the correct
+        signatures.
+
+        Returns:
+            FrozenEvent: Either the given event or it redacted if it failed the
+            content hash check.
+        """
+        # Check signatures are correct.
+        redacted_event = prune_event(pdu)
+        redacted_pdu_json = redacted_event.get_pdu_json()
+
+        try:
+            yield self.keyring.verify_json_for_server(
+                pdu.origin, redacted_pdu_json
+            )
+        except SynapseError:
+            logger.warn(
+                "Signature check failed for %s redacted to %s",
+                encode_canonical_json(pdu.get_pdu_json()),
+                encode_canonical_json(redacted_pdu_json),
+            )
+            raise
+
+        if not check_event_content_hash(pdu):
+            logger.warn(
+                "Event content has been tampered, redacting %s, %s",
+                pdu.event_id, encode_canonical_json(pdu.get_dict())
+            )
+            defer.returnValue(redacted_event)
+
+        defer.returnValue(pdu)
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 0597725ce7..5fbd8b19de 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -21,6 +21,13 @@ from .units import Transaction, Edu
 from synapse.util.logutils import log_function
 from synapse.util.logcontext import PreserveLoggingContext
 from synapse.events import FrozenEvent
+from synapse.events.utils import prune_event
+
+from syutil.jsonutil import encode_canonical_json
+
+from synapse.crypto.event_signing import check_event_content_hash
+
+from synapse.api.errors import FederationError, SynapseError
 
 import logging
 
@@ -97,8 +104,10 @@ class FederationServer(object):
         response = yield self.transaction_actions.have_responded(transaction)
 
         if response:
-            logger.debug("[%s] We've already responed to this request",
-                         transaction.transaction_id)
+            logger.debug(
+                "[%s] We've already responed to this request",
+                transaction.transaction_id
+            )
             defer.returnValue(response)
             return
 
@@ -221,6 +230,54 @@ class FederationServer(object):
             "auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus],
         }))
 
+    @defer.inlineCallbacks
+    def on_query_auth_request(self, origin, content, event_id):
+        """
+        Content is a dict with keys::
+            auth_chain (list): A list of events that give the auth chain.
+            missing (list): A list of event_ids indicating what the other
+              side (`origin`) think we're missing.
+            rejects (dict): A mapping from event_id to a 2-tuple of reason
+              string and a proof (or None) of why the event was rejected.
+              The keys of this dict give the list of events the `origin` has
+              rejected.
+
+        Args:
+            origin (str)
+            content (dict)
+            event_id (str)
+
+        Returns:
+            Deferred: Results in `dict` with the same format as `content`
+        """
+        auth_chain = [
+            (yield self._check_sigs_and_hash(self.event_from_pdu_json(e)))
+            for e in content["auth_chain"]
+        ]
+
+        missing = [
+            (yield self._check_sigs_and_hash(self.event_from_pdu_json(e)))
+            for e in content.get("missing", [])
+        ]
+
+        ret = yield self.handler.on_query_auth(
+            origin, event_id, auth_chain, content.get("rejects", []), missing
+        )
+
+        time_now = self._clock.time_msec()
+        send_content = {
+            "auth_chain": [
+                e.get_pdu_json(time_now)
+                for e in ret["auth_chain"]
+            ],
+            "rejects": ret.get("rejects", []),
+            "missing": ret.get("missing", []),
+        }
+
+        defer.returnValue(
+            (200, send_content)
+        )
+
     @log_function
     def _get_persisted_pdu(self, origin, event_id, do_auth=True):
         """ Get a PDU from the database with given origin and id.
@@ -253,6 +310,9 @@ class FederationServer(object):
             origin, pdu.event_id, do_auth=False
         )
 
+        # FIXME: Currently we fetch an event again when we already have it
+        # if it has been marked as an outlier.
+
         already_seen = (
             existing and (
                 not existing.internal_metadata.is_outlier()
@@ -264,14 +324,27 @@ class FederationServer(object):
             defer.returnValue({})
             return
 
+        # Check signature.
+        try:
+            pdu = yield self._check_sigs_and_hash(pdu)
+        except SynapseError as e:
+            raise FederationError(
+                "ERROR",
+                e.code,
+                e.msg,
+                affected=pdu.event_id,
+            )
+
         state = None
 
         auth_chain = []
 
         have_seen = yield self.store.have_events(
-            [e for e, _ in pdu.prev_events]
+            [ev for ev, _ in pdu.prev_events]
         )
 
+        fetch_state = False
+
         # Get missing pdus if necessary.
         if not pdu.internal_metadata.is_outlier():
             # We only backfill backwards to the min depth.
@@ -308,19 +381,29 @@ class FederationServer(object):
                                 logger.debug("Processed pdu %s", event_id)
                             else:
                                 logger.warn("Failed to get PDU %s", event_id)
+                                fetch_state = True
                         except:
                             # TODO(erikj): Do some more intelligent retries.
                             logger.exception("Failed to get PDU")
+                            fetch_state = True
             else:
-                # We need to get the state at this event, since we have reached
-                # a backward extremity edge.
-                logger.debug(
-                    "_handle_new_pdu getting state for %s",
-                    pdu.room_id
-                )
-                state, auth_chain = yield self.get_state_for_room(
-                    origin, pdu.room_id, pdu.event_id,
-                )
+                prevs = {e_id for e_id, _ in pdu.prev_events}
+                seen = set(have_seen.keys())
+                if prevs - seen:
+                    fetch_state = True
+        else:
+            fetch_state = True
+
+        if fetch_state:
+            # We need to get the state at this event, since we haven't
+            # processed all the prev events.
+            logger.debug(
+                "_handle_new_pdu getting state for %s",
+                pdu.room_id
+            )
+            state, auth_chain = yield self.get_state_for_room(
+                origin, pdu.room_id, pdu.event_id,
+            )
 
         ret = yield self.handler.on_receive_pdu(
             origin,
@@ -343,3 +426,37 @@ class FederationServer(object):
         event.internal_metadata.outlier = outlier
 
         return event
+
+    @defer.inlineCallbacks
+    def _check_sigs_and_hash(self, pdu):
+        """Throws a SynapseError if the PDU does not have the correct
+        signatures.
+
+        Returns:
+            FrozenEvent: Either the given event or it redacted if it failed the
+            content hash check.
+        """
+        # Check signatures are correct.
+        redacted_event = prune_event(pdu)
+        redacted_pdu_json = redacted_event.get_pdu_json()
+
+        try:
+            yield self.keyring.verify_json_for_server(
+                pdu.origin, redacted_pdu_json
+            )
+        except SynapseError:
+            logger.warn(
+                "Signature check failed for %s redacted to %s",
+                encode_canonical_json(pdu.get_pdu_json()),
+                encode_canonical_json(redacted_pdu_json),
+            )
+            raise
+
+        if not check_event_content_hash(pdu):
+            logger.warn(
+                "Event content has been tampered, redacting %s, %s",
+                pdu.event_id, encode_canonical_json(pdu.get_dict())
+            )
+            defer.returnValue(redacted_event)
+
+        defer.returnValue(pdu)
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index 9ef4834927..e442c6c5d5 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -51,6 +51,8 @@ class ReplicationLayer(FederationClient, FederationServer):
     def __init__(self, hs, transport_layer):
         self.server_name = hs.hostname
 
+        self.keyring = hs.get_keyring()
+
         self.transport_layer = transport_layer
         self.transport_layer.register_received_handler(self)
         self.transport_layer.register_request_handler(self)
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index e634a3a213..4cb1dea2de 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -213,3 +213,19 @@ class TransportLayerClient(object):
         )
 
         defer.returnValue(response)
+
+    @defer.inlineCallbacks
+    @log_function
+    def send_query_auth(self, destination, room_id, event_id, content):
+        path = PREFIX + "/query_auth/%s/%s" % (room_id, event_id)
+
+        code, content = yield self.client.post_json(
+            destination=destination,
+            path=path,
+            data=content,
+        )
+
+        if not 200 <= code < 300:
+            raise RuntimeError("Got %d from send_invite", code)
+
+        defer.returnValue(json.loads(content))
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index a380a6910b..9c9f8d525b 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -42,7 +42,7 @@ class TransportLayerServer(object):
         content = None
         origin = None
 
-        if request.method == "PUT":
+        if request.method in ["PUT", "POST"]:
             # TODO: Handle other method types? other content types?
             try:
                 content_bytes = request.content.read()
@@ -234,6 +234,16 @@ class TransportLayerServer(object):
                 )
             )
         )
+        self.server.register_path(
+            "POST",
+            re.compile("^" + PREFIX + "/query_auth/([^/]*)/([^/]*)$"),
+            self._with_authentication(
+                lambda origin, content, query, context, event_id:
+                self._on_query_auth_request(
+                    origin, content, event_id,
+                )
+            )
+        )
 
     @defer.inlineCallbacks
     @log_function
@@ -325,3 +335,12 @@ class TransportLayerServer(object):
         )
 
         defer.returnValue((200, content))
+
+    @defer.inlineCallbacks
+    @log_function
+    def _on_query_auth_request(self, origin, content, event_id):
+        new_content = yield self.request_handler.on_query_auth_request(
+            origin, content, event_id
+        )
+
+        defer.returnValue((200, new_content))