summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/crypto/keyring.py108
-rw-r--r--synapse/federation/transaction_queue.py66
-rw-r--r--synapse/util/retryutils.py10
3 files changed, 95 insertions, 89 deletions
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index ea00c830c0..828aced44a 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -101,63 +101,63 @@ class Keyring(object):
                 server_name, self.hs.tls_context_factory
             )
 
-            # Check the response.
+        # Check the response.
 
-            x509_certificate_bytes = crypto.dump_certificate(
-                crypto.FILETYPE_ASN1, tls_certificate
-            )
+        x509_certificate_bytes = crypto.dump_certificate(
+            crypto.FILETYPE_ASN1, tls_certificate
+        )
 
-            if ("signatures" not in response
-                    or server_name not in response["signatures"]):
-                raise ValueError("Key response not signed by remote server")
-
-            if "tls_certificate" not in response:
-                raise ValueError("Key response missing TLS certificate")
-
-            tls_certificate_b64 = response["tls_certificate"]
-
-            if encode_base64(x509_certificate_bytes) != tls_certificate_b64:
-                raise ValueError("TLS certificate doesn't match")
-
-            verify_keys = {}
-            for key_id, key_base64 in response["verify_keys"].items():
-                if is_signing_algorithm_supported(key_id):
-                    key_bytes = decode_base64(key_base64)
-                    verify_key = decode_verify_key_bytes(key_id, key_bytes)
-                    verify_keys[key_id] = verify_key
-
-            for key_id in response["signatures"][server_name]:
-                if key_id not in response["verify_keys"]:
-                    raise ValueError(
-                        "Key response must include verification keys for all"
-                        " signatures"
-                    )
-                if key_id in verify_keys:
-                    verify_signed_json(
-                        response,
-                        server_name,
-                        verify_keys[key_id]
-                    )
-
-            # Cache the result in the datastore.
-
-            time_now_ms = self.clock.time_msec()
-
-            yield self.store.store_server_certificate(
-                server_name,
-                server_name,
-                time_now_ms,
-                tls_certificate,
-            )
+        if ("signatures" not in response
+                or server_name not in response["signatures"]):
+            raise ValueError("Key response not signed by remote server")
+
+        if "tls_certificate" not in response:
+            raise ValueError("Key response missing TLS certificate")
 
-            for key_id, key in verify_keys.items():
-                yield self.store.store_server_verify_key(
-                    server_name, server_name, time_now_ms, key
+        tls_certificate_b64 = response["tls_certificate"]
+
+        if encode_base64(x509_certificate_bytes) != tls_certificate_b64:
+            raise ValueError("TLS certificate doesn't match")
+
+        verify_keys = {}
+        for key_id, key_base64 in response["verify_keys"].items():
+            if is_signing_algorithm_supported(key_id):
+                key_bytes = decode_base64(key_base64)
+                verify_key = decode_verify_key_bytes(key_id, key_bytes)
+                verify_keys[key_id] = verify_key
+
+        for key_id in response["signatures"][server_name]:
+            if key_id not in response["verify_keys"]:
+                raise ValueError(
+                    "Key response must include verification keys for all"
+                    " signatures"
+                )
+            if key_id in verify_keys:
+                verify_signed_json(
+                    response,
+                    server_name,
+                    verify_keys[key_id]
                 )
 
-            for key_id in key_ids:
-                if key_id in verify_keys:
-                    defer.returnValue(verify_keys[key_id])
-                    return
+        # Cache the result in the datastore.
+
+        time_now_ms = self.clock.time_msec()
+
+        yield self.store.store_server_certificate(
+            server_name,
+            server_name,
+            time_now_ms,
+            tls_certificate,
+        )
+
+        for key_id, key in verify_keys.items():
+            yield self.store.store_server_verify_key(
+                server_name, server_name, time_now_ms, key
+            )
+
+        for key_id in key_ids:
+            if key_id in verify_keys:
+                defer.returnValue(verify_keys[key_id])
+                return
 
-            raise ValueError("No verification key found for given key ids")
+        raise ValueError("No verification key found for given key ids")
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 7d02afe163..dc0f30cb64 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -167,15 +167,6 @@ class TransactionQueue(object):
             logger.info("TX [%s] Nothing to send", destination)
             return
 
-        logger.debug(
-            "TX [%s] Attempting new transaction"
-            " (pdus: %d, edus: %d, failures: %d)",
-            destination,
-            len(pending_pdus),
-            len(pending_edus),
-            len(pending_failures)
-        )
-
         # Sort based on the order field
         pending_pdus.sort(key=lambda t: t[2])
 
@@ -194,32 +185,41 @@ class TransactionQueue(object):
                 self.store,
             )
 
-            with limiter:
-                self.pending_transactions[destination] = 1
+            logger.debug(
+                "TX [%s] Attempting new transaction"
+                " (pdus: %d, edus: %d, failures: %d)",
+                destination,
+                len(pending_pdus),
+                len(pending_edus),
+                len(pending_failures)
+            )
+
+            self.pending_transactions[destination] = 1
 
-                logger.debug("TX [%s] Persisting transaction...", destination)
+            logger.debug("TX [%s] Persisting transaction...", destination)
 
-                transaction = Transaction.create_new(
-                    origin_server_ts=int(self._clock.time_msec()),
-                    transaction_id=str(self._next_txn_id),
-                    origin=self.server_name,
-                    destination=destination,
-                    pdus=pdus,
-                    edus=edus,
-                    pdu_failures=failures,
-                )
+            transaction = Transaction.create_new(
+                origin_server_ts=int(self._clock.time_msec()),
+                transaction_id=str(self._next_txn_id),
+                origin=self.server_name,
+                destination=destination,
+                pdus=pdus,
+                edus=edus,
+                pdu_failures=failures,
+            )
 
-                self._next_txn_id += 1
+            self._next_txn_id += 1
 
-                yield self.transaction_actions.prepare_to_send(transaction)
+            yield self.transaction_actions.prepare_to_send(transaction)
 
-                logger.debug("TX [%s] Persisted transaction", destination)
-                logger.info(
-                    "TX [%s] Sending transaction [%s]",
-                    destination,
-                    transaction.transaction_id,
-                )
+            logger.debug("TX [%s] Persisted transaction", destination)
+            logger.info(
+                "TX [%s] Sending transaction [%s]",
+                destination,
+                transaction.transaction_id,
+            )
 
+            with limiter:
                 # Actually send the transaction
 
                 # FIXME (erikj): This is a bit of a hack to make the Pdu age
@@ -249,11 +249,11 @@ class TransactionQueue(object):
                 logger.debug("TX [%s] Sent transaction", destination)
                 logger.debug("TX [%s] Marking as delivered...", destination)
 
-                yield self.transaction_actions.delivered(
-                    transaction, code, response
-                )
+            yield self.transaction_actions.delivered(
+                transaction, code, response
+            )
 
-                logger.debug("TX [%s] Marked as delivered", destination)
+            logger.debug("TX [%s] Marked as delivered", destination)
 
             logger.debug("TX [%s] Yielding to callbacks...", destination)
 
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 888b7ef2e9..d190100c8c 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -15,6 +15,8 @@
 
 from twisted.internet import defer
 
+from synapse.api.errors import CodeMessageException
+
 import logging
 
 
@@ -67,7 +69,7 @@ def get_retry_limiter(destination, clock, store, **kwargs):
 class RetryDestinationLimiter(object):
     def __init__(self, destination, clock, store, retry_interval,
                  min_retry_interval=20000, max_retry_interval=60 * 60 * 1000,
-                 multiplier_retry_interval=2):
+                 multiplier_retry_interval=2,):
         self.clock = clock
         self.store = store
         self.destination = destination
@@ -87,7 +89,11 @@ class RetryDestinationLimiter(object):
                 failure.value
             )
 
-        if exc_type is None and exc_val is None and exc_tb is None:
+        valid_err_code = False
+        if exc_type is CodeMessageException:
+            valid_err_code = 0 <= exc_val.code < 500
+
+        if exc_type is None or valid_err_code:
             # We connected successfully.
             if not self.retry_interval:
                 return