summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-11-22 17:47:29 +0000
committerAndrew Morgan <andrew@amorgan.xyz>2019-02-13 14:22:58 +0000
commitc400d9dcca4bdb75dbe25a0656cde0a6703de741 (patch)
treee12138df4c3fd8e7f9be46019d1da078826c787f
parentDon't verify stuff (diff)
downloadsynapse-c400d9dcca4bdb75dbe25a0656cde0a6703de741.tar.xz
Add backchatter
-rw-r--r--synapse/federation/federation_server.py24
-rw-r--r--synapse/federation/transaction_queue.py47
-rw-r--r--synapse/federation/transport/server.py8
3 files changed, 72 insertions, 7 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index e62bdf5bbe..b878023058 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -226,11 +226,11 @@ class FederationServer(FederationBase):
                     thread_id, new_thread = pdu_to_thread[pdu.event_id]
                     logger.info("Assigning thread %d to %s", thread_id, pdu.event_id)
                     try:
-                        yield self._handle_received_pdu(
+                        ret = yield self._handle_received_pdu(
                             origin, pdu, thread_id=thread_id,
                             new_thread=new_thread
                         )
-                        pdu_results[event_id] = {}
+                        pdu_results[event_id] = ret
                     except FederationError as e:
                         logger.warn("Error handling PDU %s: %s", event_id, e)
                         pdu_results[event_id] = {"error": str(e)}
@@ -259,7 +259,7 @@ class FederationServer(FederationBase):
             "pdus": pdu_results,
         }
 
-        logger.debug("Returning: %s", str(response))
+        logger.info("Returning: %s", str(response))
 
         yield self.transaction_actions.set_response(
             origin,
@@ -627,11 +627,29 @@ class FederationServer(FederationBase):
                 affected=pdu.event_id,
             )
 
+        destinations = pdu.unsigned.get("destinations", {})
+
+        costs = yield self.store.get_destination_healths(list(destinations))
+
+        logger.info("Destinations: %s", destinations)
+        logger.info("Costs: %s", costs)
+
+        dont_relay = set()
+        for dest, their_cost in destinations.items():
+            our_cost = costs.get(dest)
+            if our_cost and their_cost and their_cost < our_cost:
+                dont_relay.add(dest)
+
+        if destinations:
+            pdu.unsigned["destinations"] = {d: c for d, c in destinations.items() if d not in dont_relay}
+
         yield self.handler.on_receive_pdu(
             origin, pdu, sent_to_us_directly=True,
             thread_id=thread_id, new_thread=new_thread,
         )
 
+        defer.returnValue({"did_not_relay": list(dont_relay)})
+
     def __str__(self):
         return "<ReplicationLayer(%s)>" % self.server_name
 
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 52c483139c..b65254cfac 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -23,6 +23,7 @@ from twisted.internet import defer
 
 import synapse.metrics
 from synapse.api.errors import FederationDeniedError, HttpResponseException
+from synapse.events import FrozenEvent
 from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
 from synapse.metrics import (
     LaterGauge,
@@ -692,6 +693,10 @@ class TransactionQueue(object):
         logger.debug("TX [%s] {%s} Marked as delivered", destination, txn_id)
 
         if code == 200:
+            logger.info(
+                "TX [%s] {%s} got response json %s",
+                destination, txn_id, response
+            )
             pdu_results = response.get("pdus", {})
             for p in pdus:
                 yield self._pdu_send_result(
@@ -705,6 +710,7 @@ class TransactionQueue(object):
 
         defer.returnValue(success)
 
+    @defer.inlineCallbacks
     def _pdu_send_result(self, destination, txn_id, pdu, response):
         """Gets called after sending the event in a transaction, with the
         result for the event from the remote server.
@@ -715,7 +721,35 @@ class TransactionQueue(object):
                 "TX [%s] {%s} Remote returned error for %s: %s",
                 destination, txn_id, pdu.event_id, response,
             )
+            pdu_logger.info(
+                "SendErrorPDU",
+                extra={
+                    "event_id": pdu.event_id, "room_id": pdu.room_id,
+                    "destination": destination,
+                    "server": self.server_name,
+                },
+            )
+
+            new_destinations = set(pdu.unsigned.get("destinations", []))
+            new_destinations.discard(destination)
+            yield self._send_pdu(pdu, list(new_destinations))
+        elif "did_not_relay" in response and response["did_not_relay"]:
+            new_destinations = set(response["did_not_relay"])
+            new_destinations.discard(destination)
+
+            pdu_logger.info(
+                "DidNotRelayPDU",
+                extra={
+                    "event_id": pdu.event_id, "room_id": pdu.room_id,
+                    "destination": destination,
+                    "new_destinations": json.dumps(list(new_destinations)),
+                    "server": self.server_name,
+                },
+            )
+
+            yield self._send_pdu(pdu, list(new_destinations))
 
+    @defer.inlineCallbacks
     def _pdu_send_txn_failed(self, destination, txn_id, pdu):
         """Gets called when sending a transaction failed (after retries)
         """
@@ -724,3 +758,16 @@ class TransactionQueue(object):
             "TX [%s] {%s} Failed to send event %s",
             destination, txn_id, pdu.event_id,
         )
+
+        pdu_logger.info(
+            "SendFailPDU",
+            extra={
+                "event_id": pdu.event_id, "room_id": pdu.room_id,
+                "destination": destination,
+                "server": self.server_name,
+            },
+        )
+
+        new_destinations = set(pdu.unsigned.get("destinations", []))
+        new_destinations.discard(destination)
+        yield self._send_pdu(pdu, list(new_destinations))
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index e18a567d01..0e7690e285 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -120,10 +120,10 @@ class Authenticator(object):
         ):
             raise FederationDeniedError(origin)
 
-        if not json_request["signatures"]:
-            raise NoAuthenticationError(
-                401, "Missing Authorization headers", Codes.UNAUTHORIZED,
-            )
+        # if not json_request["signatures"]:
+        #     raise NoAuthenticationError(
+        #         401, "Missing Authorization headers", Codes.UNAUTHORIZED,
+        #     )
 
         # yield self.keyring.verify_json_for_server(origin, json_request)