summary refs log tree commit diff
path: root/synapse/federation/transaction_queue.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/transaction_queue.py')
-rw-r--r--synapse/federation/transaction_queue.py256
1 files changed, 134 insertions, 122 deletions
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 6900b0121b..f8d3fffe95 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -209,11 +209,13 @@ class TransactionQueue(object):
                         )
                         return
 
-                    yield self._send_new_transaction(
+                    success = yield self._send_new_transaction(
                         destination, pending_pdus, pending_edus, pending_failures,
                         device_stream_id,
                         should_delete_from_device_stream=bool(device_message_edus)
                     )
+                    if not success:
+                        break
         finally:
             # We want to be *very* sure we delete this after we stop processing
             self.pending_transactions.pop(destination, None)
@@ -242,143 +244,153 @@ class TransactionQueue(object):
                               pending_failures, device_stream_id,
                               should_delete_from_device_stream):
 
-            # Sort based on the order field
-            pending_pdus.sort(key=lambda t: t[1])
-            pdus = [x[0] for x in pending_pdus]
-            edus = pending_edus
-            failures = [x.get_dict() for x in pending_failures]
+        # Sort based on the order field
+        pending_pdus.sort(key=lambda t: t[1])
+        pdus = [x[0] for x in pending_pdus]
+        edus = pending_edus
+        failures = [x.get_dict() for x in pending_failures]
 
-            try:
-                logger.debug("TX [%s] _attempt_new_transaction", destination)
+        success = True
 
-                txn_id = str(self._next_txn_id)
+        try:
+            logger.debug("TX [%s] _attempt_new_transaction", destination)
 
-                limiter = yield get_retry_limiter(
-                    destination,
-                    self.clock,
-                    self.store,
-                )
+            txn_id = str(self._next_txn_id)
 
-                logger.debug(
-                    "TX [%s] {%s} Attempting new transaction"
-                    " (pdus: %d, edus: %d, failures: %d)",
-                    destination, txn_id,
-                    len(pdus),
-                    len(edus),
-                    len(failures)
-                )
+            limiter = yield get_retry_limiter(
+                destination,
+                self.clock,
+                self.store,
+            )
 
-                logger.debug("TX [%s] Persisting transaction...", destination)
+            logger.debug(
+                "TX [%s] {%s} Attempting new transaction"
+                " (pdus: %d, edus: %d, failures: %d)",
+                destination, txn_id,
+                len(pdus),
+                len(edus),
+                len(failures)
+            )
 
-                transaction = Transaction.create_new(
-                    origin_server_ts=int(self.clock.time_msec()),
-                    transaction_id=txn_id,
-                    origin=self.server_name,
-                    destination=destination,
-                    pdus=pdus,
-                    edus=edus,
-                    pdu_failures=failures,
-                )
+            logger.debug("TX [%s] Persisting transaction...", destination)
 
-                self._next_txn_id += 1
+            transaction = Transaction.create_new(
+                origin_server_ts=int(self.clock.time_msec()),
+                transaction_id=txn_id,
+                origin=self.server_name,
+                destination=destination,
+                pdus=pdus,
+                edus=edus,
+                pdu_failures=failures,
+            )
 
-                yield self.transaction_actions.prepare_to_send(transaction)
+            self._next_txn_id += 1
 
-                logger.debug("TX [%s] Persisted transaction", destination)
-                logger.info(
-                    "TX [%s] {%s} Sending transaction [%s],"
-                    " (PDUs: %d, EDUs: %d, failures: %d)",
-                    destination, txn_id,
-                    transaction.transaction_id,
-                    len(pdus),
-                    len(edus),
-                    len(failures),
-                )
+            yield self.transaction_actions.prepare_to_send(transaction)
 
-                with limiter:
-                    # Actually send the transaction
-
-                    # FIXME (erikj): This is a bit of a hack to make the Pdu age
-                    # keys work
-                    def json_data_cb():
-                        data = transaction.get_dict()
-                        now = int(self.clock.time_msec())
-                        if "pdus" in data:
-                            for p in data["pdus"]:
-                                if "age_ts" in p:
-                                    unsigned = p.setdefault("unsigned", {})
-                                    unsigned["age"] = now - int(p["age_ts"])
-                                    del p["age_ts"]
-                        return data
-
-                    try:
-                        response = yield self.transport_layer.send_transaction(
-                            transaction, json_data_cb
-                        )
-                        code = 200
-
-                        if response:
-                            for e_id, r in response.get("pdus", {}).items():
-                                if "error" in r:
-                                    logger.warn(
-                                        "Transaction returned error for %s: %s",
-                                        e_id, r,
-                                    )
-                    except HttpResponseException as e:
-                        code = e.code
-                        response = e.response
+            logger.debug("TX [%s] Persisted transaction", destination)
+            logger.info(
+                "TX [%s] {%s} Sending transaction [%s],"
+                " (PDUs: %d, EDUs: %d, failures: %d)",
+                destination, txn_id,
+                transaction.transaction_id,
+                len(pdus),
+                len(edus),
+                len(failures),
+            )
 
-                    logger.info(
-                        "TX [%s] {%s} got %d response",
-                        destination, txn_id, code
+            with limiter:
+                # Actually send the transaction
+
+                # FIXME (erikj): This is a bit of a hack to make the Pdu age
+                # keys work
+                def json_data_cb():
+                    data = transaction.get_dict()
+                    now = int(self.clock.time_msec())
+                    if "pdus" in data:
+                        for p in data["pdus"]:
+                            if "age_ts" in p:
+                                unsigned = p.setdefault("unsigned", {})
+                                unsigned["age"] = now - int(p["age_ts"])
+                                del p["age_ts"]
+                    return data
+
+                try:
+                    response = yield self.transport_layer.send_transaction(
+                        transaction, json_data_cb
                     )
+                    code = 200
+
+                    if response:
+                        for e_id, r in response.get("pdus", {}).items():
+                            if "error" in r:
+                                logger.warn(
+                                    "Transaction returned error for %s: %s",
+                                    e_id, r,
+                                )
+                except HttpResponseException as e:
+                    code = e.code
+                    response = e.response
 
-                    logger.debug("TX [%s] Sent transaction", destination)
-                    logger.debug("TX [%s] Marking as delivered...", destination)
-
-                yield self.transaction_actions.delivered(
-                    transaction, code, response
+                logger.info(
+                    "TX [%s] {%s} got %d response",
+                    destination, txn_id, code
                 )
 
-                logger.debug("TX [%s] Marked as delivered", destination)
+                logger.debug("TX [%s] Sent transaction", destination)
+                logger.debug("TX [%s] Marking as delivered...", destination)
 
-                if code != 200:
-                    for p in pdus:
-                        logger.info(
-                            "Failed to send event %s to %s", p.event_id, destination
-                        )
-                else:
-                    # Remove the acknowledged device messages from the database
-                    if should_delete_from_device_stream:
-                        yield self.store.delete_device_msgs_for_remote(
-                            destination, device_stream_id
-                        )
-                    self.last_device_stream_id_by_dest[destination] = device_stream_id
-            except NotRetryingDestination:
-                logger.info(
-                    "TX [%s] not ready for retry yet - "
-                    "dropping transaction for now",
-                    destination,
-                )
-            except RuntimeError as e:
-                # We capture this here as there as nothing actually listens
-                # for this finishing functions deferred.
-                logger.warn(
-                    "TX [%s] Problem in _attempt_transaction: %s",
-                    destination,
-                    e,
-                )
+            yield self.transaction_actions.delivered(
+                transaction, code, response
+            )
 
-                for p in pdus:
-                    logger.info("Failed to send event %s to %s", p.event_id, destination)
-            except Exception as e:
-                # We capture this here as there as nothing actually listens
-                # for this finishing functions deferred.
-                logger.warn(
-                    "TX [%s] Problem in _attempt_transaction: %s",
-                    destination,
-                    e,
-                )
+            logger.debug("TX [%s] Marked as delivered", destination)
 
+            if code != 200:
                 for p in pdus:
-                    logger.info("Failed to send event %s to %s", p.event_id, destination)
+                    logger.info(
+                        "Failed to send event %s to %s", p.event_id, destination
+                    )
+                success = False
+            else:
+                # Remove the acknowledged device messages from the database
+                if should_delete_from_device_stream:
+                    yield self.store.delete_device_msgs_for_remote(
+                        destination, device_stream_id
+                    )
+                self.last_device_stream_id_by_dest[destination] = device_stream_id
+        except NotRetryingDestination:
+            logger.info(
+                "TX [%s] not ready for retry yet - "
+                "dropping transaction for now",
+                destination,
+            )
+            success = False
+        except RuntimeError as e:
+            # We capture this here as there as nothing actually listens
+            # for this finishing functions deferred.
+            logger.warn(
+                "TX [%s] Problem in _attempt_transaction: %s",
+                destination,
+                e,
+            )
+
+            success = False
+
+            for p in pdus:
+                logger.info("Failed to send event %s to %s", p.event_id, destination)
+        except Exception as e:
+            # We capture this here as there as nothing actually listens
+            # for this finishing functions deferred.
+            logger.warn(
+                "TX [%s] Problem in _attempt_transaction: %s",
+                destination,
+                e,
+            )
+
+            success = False
+
+            for p in pdus:
+                logger.info("Failed to send event %s to %s", p.event_id, destination)
+
+        defer.returnValue(success)