summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/federation_server.py11
-rw-r--r--synapse/federation/send_queue.py63
-rw-r--r--synapse/federation/transaction_queue.py57
-rw-r--r--synapse/federation/transport/server.py9
-rw-r--r--synapse/federation/units.py1
5 files changed, 28 insertions, 113 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 10e71c78ce..bf89d568af 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -24,6 +24,7 @@ from prometheus_client import Counter
 
 from twisted.internet import defer
 from twisted.internet.abstract import isIPAddress
+from twisted.python import failure
 
 from synapse.api.constants import EventTypes
 from synapse.api.errors import AuthError, FederationError, NotFoundError, SynapseError
@@ -186,8 +187,12 @@ class FederationServer(FederationBase):
                     logger.warn("Error handling PDU %s: %s", event_id, e)
                     pdu_results[event_id] = {"error": str(e)}
                 except Exception as e:
+                    f = failure.Failure()
                     pdu_results[event_id] = {"error": str(e)}
-                    logger.exception("Failed to handle PDU %s", event_id)
+                    logger.error(
+                        "Failed to handle PDU %s: %s",
+                        event_id, f.getTraceback().rstrip(),
+                    )
 
         yield async.concurrently_execute(
             process_pdus_for_room, pdus_by_room.keys(),
@@ -202,10 +207,6 @@ class FederationServer(FederationBase):
                     edu.content
                 )
 
-        pdu_failures = getattr(transaction, "pdu_failures", [])
-        for failure in pdu_failures:
-            logger.info("Got failure %r", failure)
-
         response = {
             "pdus": pdu_results,
         }
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 5157c3860d..0bb468385d 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -62,8 +62,6 @@ class FederationRemoteSendQueue(object):
 
         self.edus = SortedDict()  # stream position -> Edu
 
-        self.failures = SortedDict()  # stream position -> (destination, Failure)
-
         self.device_messages = SortedDict()  # stream position -> destination
 
         self.pos = 1
@@ -79,7 +77,7 @@ class FederationRemoteSendQueue(object):
 
         for queue_name in [
             "presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed",
-            "edus", "failures", "device_messages", "pos_time",
+            "edus", "device_messages", "pos_time",
         ]:
             register(queue_name, getattr(self, queue_name))
 
@@ -149,12 +147,6 @@ class FederationRemoteSendQueue(object):
             for key in keys[:i]:
                 del self.edus[key]
 
-            # Delete things out of failure map
-            keys = self.failures.keys()
-            i = self.failures.bisect_left(position_to_delete)
-            for key in keys[:i]:
-                del self.failures[key]
-
             # Delete things out of device map
             keys = self.device_messages.keys()
             i = self.device_messages.bisect_left(position_to_delete)
@@ -204,13 +196,6 @@ class FederationRemoteSendQueue(object):
 
         self.notifier.on_new_replication_data()
 
-    def send_failure(self, failure, destination):
-        """As per TransactionQueue"""
-        pos = self._next_pos()
-
-        self.failures[pos] = (destination, str(failure))
-        self.notifier.on_new_replication_data()
-
     def send_device_messages(self, destination):
         """As per TransactionQueue"""
         pos = self._next_pos()
@@ -285,17 +270,6 @@ class FederationRemoteSendQueue(object):
         for (pos, edu) in edus:
             rows.append((pos, EduRow(edu)))
 
-        # Fetch changed failures
-        i = self.failures.bisect_right(from_token)
-        j = self.failures.bisect_right(to_token) + 1
-        failures = self.failures.items()[i:j]
-
-        for (pos, (destination, failure)) in failures:
-            rows.append((pos, FailureRow(
-                destination=destination,
-                failure=failure,
-            )))
-
         # Fetch changed device messages
         i = self.device_messages.bisect_right(from_token)
         j = self.device_messages.bisect_right(to_token) + 1
@@ -417,34 +391,6 @@ class EduRow(BaseFederationRow, namedtuple("EduRow", (
         buff.edus.setdefault(self.edu.destination, []).append(self.edu)
 
 
-class FailureRow(BaseFederationRow, namedtuple("FailureRow", (
-    "destination",  # str
-    "failure",
-))):
-    """Streams failures to a remote server. Failures are issued when there was
-    something wrong with a transaction the remote sent us, e.g. it included
-    an event that was invalid.
-    """
-
-    TypeId = "f"
-
-    @staticmethod
-    def from_data(data):
-        return FailureRow(
-            destination=data["destination"],
-            failure=data["failure"],
-        )
-
-    def to_data(self):
-        return {
-            "destination": self.destination,
-            "failure": self.failure,
-        }
-
-    def add_to_buffer(self, buff):
-        buff.failures.setdefault(self.destination, []).append(self.failure)
-
-
 class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", (
     "destination",  # str
 ))):
@@ -471,7 +417,6 @@ TypeToRow = {
         PresenceRow,
         KeyedEduRow,
         EduRow,
-        FailureRow,
         DeviceRow,
     )
 }
@@ -481,7 +426,6 @@ ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", (
     "presence",  # list(UserPresenceState)
     "keyed_edus",  # dict of destination -> { key -> Edu }
     "edus",  # dict of destination -> [Edu]
-    "failures",  # dict of destination -> [failures]
     "device_destinations",  # set of destinations
 ))
 
@@ -503,7 +447,6 @@ def process_rows_for_federation(transaction_queue, rows):
         presence=[],
         keyed_edus={},
         edus={},
-        failures={},
         device_destinations=set(),
     )
 
@@ -532,9 +475,5 @@ def process_rows_for_federation(transaction_queue, rows):
                 edu.destination, edu.edu_type, edu.content, key=None,
             )
 
-    for destination, failure_list in iteritems(buff.failures):
-        for failure in failure_list:
-            transaction_queue.send_failure(destination, failure)
-
     for destination in buff.device_destinations:
         transaction_queue.send_device_messages(destination)
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 5a956ecfb3..78f9d40a3a 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -30,7 +30,8 @@ from synapse.metrics import (
     sent_edus_counter,
     sent_transactions_counter,
 )
-from synapse.util import PreserveLoggingContext, logcontext
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util import logcontext
 from synapse.util.metrics import measure_func
 from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
 
@@ -115,9 +116,6 @@ class TransactionQueue(object):
             ),
         )
 
-        # destination -> list of tuple(failure, deferred)
-        self.pending_failures_by_dest = {}
-
         # destination -> stream_id of last successfully sent to-device message.
         # NB: may be a long or an int.
         self.last_device_stream_id_by_dest = {}
@@ -165,10 +163,11 @@ class TransactionQueue(object):
         if self._is_processing:
             return
 
-        # fire off a processing loop in the background. It's likely it will
-        # outlast the current request, so run it in the sentinel logcontext.
-        with PreserveLoggingContext():
-            self._process_event_queue_loop()
+        # fire off a processing loop in the background
+        run_as_background_process(
+            "process_event_queue_for_federation",
+            self._process_event_queue_loop,
+        )
 
     @defer.inlineCallbacks
     def _process_event_queue_loop(self):
@@ -380,19 +379,6 @@ class TransactionQueue(object):
 
         self._attempt_new_transaction(destination)
 
-    def send_failure(self, failure, destination):
-        if destination == self.server_name or destination == "localhost":
-            return
-
-        if not self.can_send_to(destination):
-            return
-
-        self.pending_failures_by_dest.setdefault(
-            destination, []
-        ).append(failure)
-
-        self._attempt_new_transaction(destination)
-
     def send_device_messages(self, destination):
         if destination == self.server_name or destination == "localhost":
             return
@@ -432,14 +418,11 @@ class TransactionQueue(object):
 
         logger.debug("TX [%s] Starting transaction loop", destination)
 
-        # Drop the logcontext before starting the transaction. It doesn't
-        # really make sense to log all the outbound transactions against
-        # whatever path led us to this point: that's pretty arbitrary really.
-        #
-        # (this also means we can fire off _perform_transaction without
-        # yielding)
-        with logcontext.PreserveLoggingContext():
-            self._transaction_transmission_loop(destination)
+        run_as_background_process(
+            "federation_transaction_transmission_loop",
+            self._transaction_transmission_loop,
+            destination,
+        )
 
     @defer.inlineCallbacks
     def _transaction_transmission_loop(self, destination):
@@ -470,7 +453,6 @@ class TransactionQueue(object):
                 pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
                 pending_edus = self.pending_edus_by_dest.pop(destination, [])
                 pending_presence = self.pending_presence_by_dest.pop(destination, {})
-                pending_failures = self.pending_failures_by_dest.pop(destination, [])
 
                 pending_edus.extend(
                     self.pending_edus_keyed_by_dest.pop(destination, {}).values()
@@ -498,7 +480,7 @@ class TransactionQueue(object):
                     logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
                                  destination, len(pending_pdus))
 
-                if not pending_pdus and not pending_edus and not pending_failures:
+                if not pending_pdus and not pending_edus:
                     logger.debug("TX [%s] Nothing to send", destination)
                     self.last_device_stream_id_by_dest[destination] = (
                         device_stream_id
@@ -508,7 +490,7 @@ class TransactionQueue(object):
                 # END CRITICAL SECTION
 
                 success = yield self._send_new_transaction(
-                    destination, pending_pdus, pending_edus, pending_failures,
+                    destination, pending_pdus, pending_edus,
                 )
                 if success:
                     sent_transactions_counter.inc()
@@ -585,14 +567,12 @@ class TransactionQueue(object):
 
     @measure_func("_send_new_transaction")
     @defer.inlineCallbacks
-    def _send_new_transaction(self, destination, pending_pdus, pending_edus,
-                              pending_failures):
+    def _send_new_transaction(self, destination, pending_pdus, pending_edus):
 
         # Sort based on the order field
         pending_pdus.sort(key=lambda t: t[1])
         pdus = [x[0] for x in pending_pdus]
         edus = pending_edus
-        failures = [x.get_dict() for x in pending_failures]
 
         success = True
 
@@ -602,11 +582,10 @@ class TransactionQueue(object):
 
         logger.debug(
             "TX [%s] {%s} Attempting new transaction"
-            " (pdus: %d, edus: %d, failures: %d)",
+            " (pdus: %d, edus: %d)",
             destination, txn_id,
             len(pdus),
             len(edus),
-            len(failures)
         )
 
         logger.debug("TX [%s] Persisting transaction...", destination)
@@ -618,7 +597,6 @@ class TransactionQueue(object):
             destination=destination,
             pdus=pdus,
             edus=edus,
-            pdu_failures=failures,
         )
 
         self._next_txn_id += 1
@@ -628,12 +606,11 @@ class TransactionQueue(object):
         logger.debug("TX [%s] Persisted transaction", destination)
         logger.info(
             "TX [%s] {%s} Sending transaction [%s],"
-            " (PDUs: %d, EDUs: %d, failures: %d)",
+            " (PDUs: %d, EDUs: %d)",
             destination, txn_id,
             transaction.transaction_id,
             len(pdus),
             len(edus),
-            len(failures),
         )
 
         # Actually send the transaction
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index c9beca27c2..eae5f2b427 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -165,7 +165,7 @@ def _parse_auth_header(header_bytes):
         param_dict = dict(kv.split("=") for kv in params)
 
         def strip_quotes(value):
-            if value.startswith(b"\""):
+            if value.startswith("\""):
                 return value[1:-1]
             else:
                 return value
@@ -283,11 +283,10 @@ class FederationSendServlet(BaseFederationServlet):
             )
 
             logger.info(
-                "Received txn %s from %s. (PDUs: %d, EDUs: %d, failures: %d)",
+                "Received txn %s from %s. (PDUs: %d, EDUs: %d)",
                 transaction_id, origin,
                 len(transaction_data.get("pdus", [])),
                 len(transaction_data.get("edus", [])),
-                len(transaction_data.get("failures", [])),
             )
 
             # We should ideally be getting this from the security layer.
@@ -404,10 +403,10 @@ class FederationMakeLeaveServlet(BaseFederationServlet):
 
 
 class FederationSendLeaveServlet(BaseFederationServlet):
-    PATH = "/send_leave/(?P<room_id>[^/]*)/(?P<txid>[^/]*)"
+    PATH = "/send_leave/(?P<room_id>[^/]*)/(?P<event_id>[^/]*)"
 
     @defer.inlineCallbacks
-    def on_PUT(self, origin, content, query, room_id, txid):
+    def on_PUT(self, origin, content, query, room_id, event_id):
         content = yield self.handler.on_send_leave_request(origin, content)
         defer.returnValue((200, content))
 
diff --git a/synapse/federation/units.py b/synapse/federation/units.py
index bb1b3b13f7..c5ab14314e 100644
--- a/synapse/federation/units.py
+++ b/synapse/federation/units.py
@@ -73,7 +73,6 @@ class Transaction(JsonEncodedObject):
         "previous_ids",
         "pdus",
         "edus",
-        "pdu_failures",
     ]
 
     internal_keys = [