summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/federation/transaction_queue.py331
1 files changed, 173 insertions, 158 deletions
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 5c7245d383..f8d3fffe95 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -170,44 +170,55 @@ class TransactionQueue(object):
 
     @defer.inlineCallbacks
     def _attempt_new_transaction(self, destination):
-        yield run_on_reactor()
-        while True:
-            # list of (pending_pdu, deferred, order)
-            if destination in self.pending_transactions:
-                # XXX: pending_transactions can get stuck on by a never-ending
-                # request at which point pending_pdus_by_dest just keeps growing.
-                # we need application-layer timeouts of some flavour of these
-                # requests
-                logger.debug(
-                    "TX [%s] Transaction already in progress",
-                    destination
-                )
-                return
+        # list of (pending_pdu, deferred, order)
+        if destination in self.pending_transactions:
+            # XXX: pending_transactions can get stuck on by a never-ending
+            # request at which point pending_pdus_by_dest just keeps growing.
+            # we need application-layer timeouts of some flavour of these
+            # requests
+            logger.debug(
+                "TX [%s] Transaction already in progress",
+                destination
+            )
+            return
 
-            pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
-            pending_edus = self.pending_edus_by_dest.pop(destination, [])
-            pending_failures = self.pending_failures_by_dest.pop(destination, [])
+        try:
+            self.pending_transactions[destination] = 1
 
-            device_message_edus, device_stream_id = (
-                yield self._get_new_device_messages(destination)
-            )
+            yield run_on_reactor()
 
-            pending_edus.extend(device_message_edus)
+            while True:
+                    pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
+                    pending_edus = self.pending_edus_by_dest.pop(destination, [])
+                    pending_failures = self.pending_failures_by_dest.pop(destination, [])
 
-            if pending_pdus:
-                logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
-                             destination, len(pending_pdus))
+                    device_message_edus, device_stream_id = (
+                        yield self._get_new_device_messages(destination)
+                    )
 
-            if not pending_pdus and not pending_edus and not pending_failures:
-                logger.debug("TX [%s] Nothing to send", destination)
-                self.last_device_stream_id_by_dest[destination] = device_stream_id
-                return
+                    pending_edus.extend(device_message_edus)
 
-            yield self._send_new_transaction(
-                destination, pending_pdus, pending_edus, pending_failures,
-                device_stream_id,
-                should_delete_from_device_stream=bool(device_message_edus)
-            )
+                    if pending_pdus:
+                        logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
+                                     destination, len(pending_pdus))
+
+                    if not pending_pdus and not pending_edus and not pending_failures:
+                        logger.debug("TX [%s] Nothing to send", destination)
+                        self.last_device_stream_id_by_dest[destination] = (
+                            device_stream_id
+                        )
+                        return
+
+                    success = yield self._send_new_transaction(
+                        destination, pending_pdus, pending_edus, pending_failures,
+                        device_stream_id,
+                        should_delete_from_device_stream=bool(device_message_edus)
+                    )
+                    if not success:
+                        break
+        finally:
+            # We want to be *very* sure we delete this after we stop processing
+            self.pending_transactions.pop(destination, None)
 
     @defer.inlineCallbacks
     def _get_new_device_messages(self, destination):
@@ -233,149 +244,153 @@ class TransactionQueue(object):
                               pending_failures, device_stream_id,
                               should_delete_from_device_stream):
 
-            # Sort based on the order field
-            pending_pdus.sort(key=lambda t: t[1])
-            pdus = [x[0] for x in pending_pdus]
-            edus = pending_edus
-            failures = [x.get_dict() for x in pending_failures]
+        # Sort based on the order field
+        pending_pdus.sort(key=lambda t: t[1])
+        pdus = [x[0] for x in pending_pdus]
+        edus = pending_edus
+        failures = [x.get_dict() for x in pending_failures]
 
-            try:
-                self.pending_transactions[destination] = 1
+        success = True
 
-                logger.debug("TX [%s] _attempt_new_transaction", destination)
+        try:
+            logger.debug("TX [%s] _attempt_new_transaction", destination)
 
-                txn_id = str(self._next_txn_id)
+            txn_id = str(self._next_txn_id)
 
-                limiter = yield get_retry_limiter(
-                    destination,
-                    self.clock,
-                    self.store,
-                )
-
-                logger.debug(
-                    "TX [%s] {%s} Attempting new transaction"
-                    " (pdus: %d, edus: %d, failures: %d)",
-                    destination, txn_id,
-                    len(pdus),
-                    len(edus),
-                    len(failures)
-                )
+            limiter = yield get_retry_limiter(
+                destination,
+                self.clock,
+                self.store,
+            )
 
-                logger.debug("TX [%s] Persisting transaction...", destination)
+            logger.debug(
+                "TX [%s] {%s} Attempting new transaction"
+                " (pdus: %d, edus: %d, failures: %d)",
+                destination, txn_id,
+                len(pdus),
+                len(edus),
+                len(failures)
+            )
 
-                transaction = Transaction.create_new(
-                    origin_server_ts=int(self.clock.time_msec()),
-                    transaction_id=txn_id,
-                    origin=self.server_name,
-                    destination=destination,
-                    pdus=pdus,
-                    edus=edus,
-                    pdu_failures=failures,
-                )
+            logger.debug("TX [%s] Persisting transaction...", destination)
 
-                self._next_txn_id += 1
+            transaction = Transaction.create_new(
+                origin_server_ts=int(self.clock.time_msec()),
+                transaction_id=txn_id,
+                origin=self.server_name,
+                destination=destination,
+                pdus=pdus,
+                edus=edus,
+                pdu_failures=failures,
+            )
 
-                yield self.transaction_actions.prepare_to_send(transaction)
+            self._next_txn_id += 1
 
-                logger.debug("TX [%s] Persisted transaction", destination)
-                logger.info(
-                    "TX [%s] {%s} Sending transaction [%s],"
-                    " (PDUs: %d, EDUs: %d, failures: %d)",
-                    destination, txn_id,
-                    transaction.transaction_id,
-                    len(pdus),
-                    len(edus),
-                    len(failures),
-                )
+            yield self.transaction_actions.prepare_to_send(transaction)
 
-                with limiter:
-                    # Actually send the transaction
-
-                    # FIXME (erikj): This is a bit of a hack to make the Pdu age
-                    # keys work
-                    def json_data_cb():
-                        data = transaction.get_dict()
-                        now = int(self.clock.time_msec())
-                        if "pdus" in data:
-                            for p in data["pdus"]:
-                                if "age_ts" in p:
-                                    unsigned = p.setdefault("unsigned", {})
-                                    unsigned["age"] = now - int(p["age_ts"])
-                                    del p["age_ts"]
-                        return data
-
-                    try:
-                        response = yield self.transport_layer.send_transaction(
-                            transaction, json_data_cb
-                        )
-                        code = 200
-
-                        if response:
-                            for e_id, r in response.get("pdus", {}).items():
-                                if "error" in r:
-                                    logger.warn(
-                                        "Transaction returned error for %s: %s",
-                                        e_id, r,
-                                    )
-                    except HttpResponseException as e:
-                        code = e.code
-                        response = e.response
+            logger.debug("TX [%s] Persisted transaction", destination)
+            logger.info(
+                "TX [%s] {%s} Sending transaction [%s],"
+                " (PDUs: %d, EDUs: %d, failures: %d)",
+                destination, txn_id,
+                transaction.transaction_id,
+                len(pdus),
+                len(edus),
+                len(failures),
+            )
 
-                    logger.info(
-                        "TX [%s] {%s} got %d response",
-                        destination, txn_id, code
+            with limiter:
+                # Actually send the transaction
+
+                # FIXME (erikj): This is a bit of a hack to make the Pdu age
+                # keys work
+                def json_data_cb():
+                    data = transaction.get_dict()
+                    now = int(self.clock.time_msec())
+                    if "pdus" in data:
+                        for p in data["pdus"]:
+                            if "age_ts" in p:
+                                unsigned = p.setdefault("unsigned", {})
+                                unsigned["age"] = now - int(p["age_ts"])
+                                del p["age_ts"]
+                    return data
+
+                try:
+                    response = yield self.transport_layer.send_transaction(
+                        transaction, json_data_cb
                     )
+                    code = 200
+
+                    if response:
+                        for e_id, r in response.get("pdus", {}).items():
+                            if "error" in r:
+                                logger.warn(
+                                    "Transaction returned error for %s: %s",
+                                    e_id, r,
+                                )
+                except HttpResponseException as e:
+                    code = e.code
+                    response = e.response
 
-                    logger.debug("TX [%s] Sent transaction", destination)
-                    logger.debug("TX [%s] Marking as delivered...", destination)
-
-                yield self.transaction_actions.delivered(
-                    transaction, code, response
+                logger.info(
+                    "TX [%s] {%s} got %d response",
+                    destination, txn_id, code
                 )
 
-                logger.debug("TX [%s] Marked as delivered", destination)
+                logger.debug("TX [%s] Sent transaction", destination)
+                logger.debug("TX [%s] Marking as delivered...", destination)
 
-                if code != 200:
-                    for p in pdus:
-                        logger.info(
-                            "Failed to send event %s to %s", p.event_id, destination
-                        )
-                else:
-                    # Remove the acknowledged device messages from the database
-                    if should_delete_from_device_stream:
-                        yield self.store.delete_device_msgs_for_remote(
-                            destination, device_stream_id
-                        )
-                    self.last_device_stream_id_by_dest[destination] = device_stream_id
-            except NotRetryingDestination:
-                logger.info(
-                    "TX [%s] not ready for retry yet - "
-                    "dropping transaction for now",
-                    destination,
-                )
-            except RuntimeError as e:
-                # We capture this here as there as nothing actually listens
-                # for this finishing functions deferred.
-                logger.warn(
-                    "TX [%s] Problem in _attempt_transaction: %s",
-                    destination,
-                    e,
-                )
+            yield self.transaction_actions.delivered(
+                transaction, code, response
+            )
 
-                for p in pdus:
-                    logger.info("Failed to send event %s to %s", p.event_id, destination)
-            except Exception as e:
-                # We capture this here as there as nothing actually listens
-                # for this finishing functions deferred.
-                logger.warn(
-                    "TX [%s] Problem in _attempt_transaction: %s",
-                    destination,
-                    e,
-                )
+            logger.debug("TX [%s] Marked as delivered", destination)
 
+            if code != 200:
                 for p in pdus:
-                    logger.info("Failed to send event %s to %s", p.event_id, destination)
+                    logger.info(
+                        "Failed to send event %s to %s", p.event_id, destination
+                    )
+                success = False
+            else:
+                # Remove the acknowledged device messages from the database
+                if should_delete_from_device_stream:
+                    yield self.store.delete_device_msgs_for_remote(
+                        destination, device_stream_id
+                    )
+                self.last_device_stream_id_by_dest[destination] = device_stream_id
+        except NotRetryingDestination:
+            logger.info(
+                "TX [%s] not ready for retry yet - "
+                "dropping transaction for now",
+                destination,
+            )
+            success = False
+        except RuntimeError as e:
+            # We capture this here as there as nothing actually listens
+            # for this finishing functions deferred.
+            logger.warn(
+                "TX [%s] Problem in _attempt_transaction: %s",
+                destination,
+                e,
+            )
+
+            success = False
+
+            for p in pdus:
+                logger.info("Failed to send event %s to %s", p.event_id, destination)
+        except Exception as e:
+            # We capture this here as there as nothing actually listens
+            # for this finishing functions deferred.
+            logger.warn(
+                "TX [%s] Problem in _attempt_transaction: %s",
+                destination,
+                e,
+            )
+
+            success = False
+
+            for p in pdus:
+                logger.info("Failed to send event %s to %s", p.event_id, destination)
 
-            finally:
-                # We want to be *very* sure we delete this after we stop processing
-                self.pending_transactions.pop(destination, None)
+        defer.returnValue(success)