summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2017-03-23 00:12:21 +0000
committerRichard van der Hoff <richard@matrix.org>2017-03-23 09:28:46 +0000
commit4bd597d9fcb8e6c6888ee3e8fa683ba812272997 (patch)
tree14560ff81374c7d41d1ed24af3f3f47e46f5373c /synapse
parentMatrixFederationHttpClient: clean up (diff)
downloadsynapse-4bd597d9fcb8e6c6888ee3e8fa683ba812272997.tar.xz
push federation retry limiter down to matrixfederationclient
rather than having to instrument everywhere we make a federation call,
make the MatrixFederationHttpClient manage the retry limiter.
Diffstat (limited to '')
-rw-r--r--synapse/crypto/keyring.py39
-rw-r--r--synapse/federation/federation_client.py33
-rw-r--r--synapse/federation/transaction_queue.py216
-rw-r--r--synapse/federation/transport/client.py1
-rw-r--r--synapse/handlers/e2e_keys.py32
-rw-r--r--synapse/http/matrixfederationclient.py228
-rw-r--r--synapse/util/retryutils.py16
7 files changed, 278 insertions, 287 deletions
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index 80f27f8c53..c4bc4f4d31 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -15,7 +15,6 @@
 
 from synapse.crypto.keyclient import fetch_server_key
 from synapse.api.errors import SynapseError, Codes
-from synapse.util.retryutils import get_retry_limiter
 from synapse.util import unwrapFirstError
 from synapse.util.async import ObservableDeferred
 from synapse.util.logcontext import (
@@ -363,30 +362,24 @@ class Keyring(object):
     def get_keys_from_server(self, server_name_and_key_ids):
         @defer.inlineCallbacks
         def get_key(server_name, key_ids):
-            limiter = yield get_retry_limiter(
-                server_name,
-                self.clock,
-                self.store,
-            )
-            with limiter:
-                keys = None
-                try:
-                    keys = yield self.get_server_verify_key_v2_direct(
-                        server_name, key_ids
-                    )
-                except Exception as e:
-                    logger.info(
-                        "Unable to get key %r for %r directly: %s %s",
-                        key_ids, server_name,
-                        type(e).__name__, str(e.message),
-                    )
+            keys = None
+            try:
+                keys = yield self.get_server_verify_key_v2_direct(
+                    server_name, key_ids
+                )
+            except Exception as e:
+                logger.info(
+                    "Unable to get key %r for %r directly: %s %s",
+                    key_ids, server_name,
+                    type(e).__name__, str(e.message),
+                )
 
-                if not keys:
-                    keys = yield self.get_server_verify_key_v1_direct(
-                        server_name, key_ids
-                    )
+            if not keys:
+                keys = yield self.get_server_verify_key_v1_direct(
+                    server_name, key_ids
+                )
 
-                    keys = {server_name: keys}
+                keys = {server_name: keys}
 
             defer.returnValue(keys)
 
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 5dcd4eecce..dc44727b36 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -29,7 +29,7 @@ from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
 from synapse.events import FrozenEvent, builder
 import synapse.metrics
 
-from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination
+from synapse.util.retryutils import NotRetryingDestination
 
 import copy
 import itertools
@@ -234,31 +234,24 @@ class FederationClient(FederationBase):
                 continue
 
             try:
-                limiter = yield get_retry_limiter(
-                    destination,
-                    self._clock,
-                    self.store,
+                transaction_data = yield self.transport_layer.get_event(
+                    destination, event_id, timeout=timeout,
                 )
 
-                with limiter:
-                    transaction_data = yield self.transport_layer.get_event(
-                        destination, event_id, timeout=timeout,
-                    )
-
-                    logger.debug("transaction_data %r", transaction_data)
+                logger.debug("transaction_data %r", transaction_data)
 
-                    pdu_list = [
-                        self.event_from_pdu_json(p, outlier=outlier)
-                        for p in transaction_data["pdus"]
-                    ]
+                pdu_list = [
+                    self.event_from_pdu_json(p, outlier=outlier)
+                    for p in transaction_data["pdus"]
+                ]
 
-                    if pdu_list and pdu_list[0]:
-                        pdu = pdu_list[0]
+                if pdu_list and pdu_list[0]:
+                    pdu = pdu_list[0]
 
-                        # Check signatures are correct.
-                        signed_pdu = yield self._check_sigs_and_hashes([pdu])[0]
+                    # Check signatures are correct.
+                    signed_pdu = yield self._check_sigs_and_hashes([pdu])[0]
 
-                        break
+                    break
 
                 pdu_attempts[destination] = now
 
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index c802dd67a3..d7ecefcc64 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -12,7 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
+import datetime
 
 from twisted.internet import defer
 
@@ -22,9 +22,7 @@ from .units import Transaction, Edu
 from synapse.api.errors import HttpResponseException
 from synapse.util.async import run_on_reactor
 from synapse.util.logcontext import preserve_context_over_fn
-from synapse.util.retryutils import (
-    get_retry_limiter, NotRetryingDestination,
-)
+from synapse.util.retryutils import NotRetryingDestination
 from synapse.util.metrics import measure_func
 from synapse.types import get_domain_from_id
 from synapse.handlers.presence import format_user_presence_state
@@ -312,13 +310,6 @@ class TransactionQueue(object):
             yield run_on_reactor()
 
             while True:
-                limiter = yield get_retry_limiter(
-                    destination,
-                    self.clock,
-                    self.store,
-                    backoff_on_404=True,  # If we get a 404 the other side has gone
-                )
-
                 device_message_edus, device_stream_id, dev_list_id = (
                     yield self._get_new_device_messages(destination)
                 )
@@ -374,7 +365,6 @@ class TransactionQueue(object):
 
                 success = yield self._send_new_transaction(
                     destination, pending_pdus, pending_edus, pending_failures,
-                    limiter=limiter,
                 )
                 if success:
                     # Remove the acknowledged device messages from the database
@@ -392,12 +382,24 @@ class TransactionQueue(object):
                     self.last_device_list_stream_id_by_dest[destination] = dev_list_id
                 else:
                     break
-        except NotRetryingDestination:
+        except NotRetryingDestination as e:
             logger.debug(
-                "TX [%s] not ready for retry yet - "
+                "TX [%s] not ready for retry yet (next retry at %s) - "
                 "dropping transaction for now",
                 destination,
+                datetime.datetime.fromtimestamp(
+                    (e.retry_last_ts + e.retry_interval) / 1000.0
+                ),
+            )
+        except Exception as e:
+            logger.warn(
+                "TX [%s] Failed to send transaction: %s",
+                destination,
+                e,
             )
+            for p in pending_pdus:
+                logger.info("Failed to send event %s to %s", p.event_id,
+                            destination)
         finally:
             # We want to be *very* sure we delete this after we stop processing
             self.pending_transactions.pop(destination, None)
@@ -437,7 +439,7 @@ class TransactionQueue(object):
     @measure_func("_send_new_transaction")
     @defer.inlineCallbacks
     def _send_new_transaction(self, destination, pending_pdus, pending_edus,
-                              pending_failures, limiter):
+                              pending_failures):
 
         # Sort based on the order field
         pending_pdus.sort(key=lambda t: t[1])
@@ -447,132 +449,104 @@ class TransactionQueue(object):
 
         success = True
 
-        try:
-            logger.debug("TX [%s] _attempt_new_transaction", destination)
+        logger.debug("TX [%s] _attempt_new_transaction", destination)
 
-            txn_id = str(self._next_txn_id)
+        txn_id = str(self._next_txn_id)
 
-            logger.debug(
-                "TX [%s] {%s} Attempting new transaction"
-                " (pdus: %d, edus: %d, failures: %d)",
-                destination, txn_id,
-                len(pdus),
-                len(edus),
-                len(failures)
-            )
+        logger.debug(
+            "TX [%s] {%s} Attempting new transaction"
+            " (pdus: %d, edus: %d, failures: %d)",
+            destination, txn_id,
+            len(pdus),
+            len(edus),
+            len(failures)
+        )
 
-            logger.debug("TX [%s] Persisting transaction...", destination)
+        logger.debug("TX [%s] Persisting transaction...", destination)
 
-            transaction = Transaction.create_new(
-                origin_server_ts=int(self.clock.time_msec()),
-                transaction_id=txn_id,
-                origin=self.server_name,
-                destination=destination,
-                pdus=pdus,
-                edus=edus,
-                pdu_failures=failures,
-            )
+        transaction = Transaction.create_new(
+            origin_server_ts=int(self.clock.time_msec()),
+            transaction_id=txn_id,
+            origin=self.server_name,
+            destination=destination,
+            pdus=pdus,
+            edus=edus,
+            pdu_failures=failures,
+        )
 
-            self._next_txn_id += 1
+        self._next_txn_id += 1
 
-            yield self.transaction_actions.prepare_to_send(transaction)
+        yield self.transaction_actions.prepare_to_send(transaction)
 
-            logger.debug("TX [%s] Persisted transaction", destination)
-            logger.info(
-                "TX [%s] {%s} Sending transaction [%s],"
-                " (PDUs: %d, EDUs: %d, failures: %d)",
-                destination, txn_id,
-                transaction.transaction_id,
-                len(pdus),
-                len(edus),
-                len(failures),
-            )
+        logger.debug("TX [%s] Persisted transaction", destination)
+        logger.info(
+            "TX [%s] {%s} Sending transaction [%s],"
+            " (PDUs: %d, EDUs: %d, failures: %d)",
+            destination, txn_id,
+            transaction.transaction_id,
+            len(pdus),
+            len(edus),
+            len(failures),
+        )
 
-            with limiter:
-                # Actually send the transaction
-
-                # FIXME (erikj): This is a bit of a hack to make the Pdu age
-                # keys work
-                def json_data_cb():
-                    data = transaction.get_dict()
-                    now = int(self.clock.time_msec())
-                    if "pdus" in data:
-                        for p in data["pdus"]:
-                            if "age_ts" in p:
-                                unsigned = p.setdefault("unsigned", {})
-                                unsigned["age"] = now - int(p["age_ts"])
-                                del p["age_ts"]
-                    return data
-
-                try:
-                    response = yield self.transport_layer.send_transaction(
-                        transaction, json_data_cb
-                    )
-                    code = 200
-
-                    if response:
-                        for e_id, r in response.get("pdus", {}).items():
-                            if "error" in r:
-                                logger.warn(
-                                    "Transaction returned error for %s: %s",
-                                    e_id, r,
-                                )
-                except HttpResponseException as e:
-                    code = e.code
-                    response = e.response
-
-                    if e.code in (401, 404, 429) or 500 <= e.code:
-                        logger.info(
-                            "TX [%s] {%s} got %d response",
-                            destination, txn_id, code
+        # Actually send the transaction
+
+        # FIXME (erikj): This is a bit of a hack to make the Pdu age
+        # keys work
+        def json_data_cb():
+            data = transaction.get_dict()
+            now = int(self.clock.time_msec())
+            if "pdus" in data:
+                for p in data["pdus"]:
+                    if "age_ts" in p:
+                        unsigned = p.setdefault("unsigned", {})
+                        unsigned["age"] = now - int(p["age_ts"])
+                        del p["age_ts"]
+            return data
+
+        try:
+            response = yield self.transport_layer.send_transaction(
+                transaction, json_data_cb
+            )
+            code = 200
+
+            if response:
+                for e_id, r in response.get("pdus", {}).items():
+                    if "error" in r:
+                        logger.warn(
+                            "Transaction returned error for %s: %s",
+                            e_id, r,
                         )
-                        raise e
+        except HttpResponseException as e:
+            code = e.code
+            response = e.response
 
+            if e.code in (401, 404, 429) or 500 <= e.code:
                 logger.info(
                     "TX [%s] {%s} got %d response",
                     destination, txn_id, code
                 )
+                raise e
 
-                logger.debug("TX [%s] Sent transaction", destination)
-                logger.debug("TX [%s] Marking as delivered...", destination)
-
-            yield self.transaction_actions.delivered(
-                transaction, code, response
-            )
+        logger.info(
+            "TX [%s] {%s} got %d response",
+            destination, txn_id, code
+        )
 
-            logger.debug("TX [%s] Marked as delivered", destination)
+        logger.debug("TX [%s] Sent transaction", destination)
+        logger.debug("TX [%s] Marking as delivered...", destination)
 
-            if code != 200:
-                for p in pdus:
-                    logger.info(
-                        "Failed to send event %s to %s", p.event_id, destination
-                    )
-                success = False
-        except RuntimeError as e:
-            # We capture this here as there as nothing actually listens
-            # for this finishing functions deferred.
-            logger.warn(
-                "TX [%s] Problem in _attempt_transaction: %s",
-                destination,
-                e,
-            )
+        yield self.transaction_actions.delivered(
+            transaction, code, response
+        )
 
-            success = False
+        logger.debug("TX [%s] Marked as delivered", destination)
 
+        if code != 200:
             for p in pdus:
-                logger.info("Failed to send event %s to %s", p.event_id, destination)
-        except Exception as e:
-            # We capture this here as there as nothing actually listens
-            # for this finishing functions deferred.
-            logger.warn(
-                "TX [%s] Problem in _attempt_transaction: %s",
-                destination,
-                e,
-            )
-
+                logger.info(
+                    "Failed to send event %s to %s", p.event_id, destination
+                )
             success = False
 
-            for p in pdus:
-                logger.info("Failed to send event %s to %s", p.event_id, destination)
-
         defer.returnValue(success)
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index f49e8a2cc4..cc9bc7f14b 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -163,6 +163,7 @@ class TransportLayerClient(object):
             data=json_data,
             json_data_callback=json_data_callback,
             long_retries=True,
+            backoff_on_404=True,  # If we get a 404 the other side has gone
         )
 
         logger.debug(
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index e40495d1ab..a33135de67 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -22,7 +22,7 @@ from twisted.internet import defer
 from synapse.api.errors import SynapseError, CodeMessageException
 from synapse.types import get_domain_from_id
 from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
-from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination
+from synapse.util.retryutils import NotRetryingDestination
 
 logger = logging.getLogger(__name__)
 
@@ -121,15 +121,11 @@ class E2eKeysHandler(object):
         def do_remote_query(destination):
             destination_query = remote_queries_not_in_cache[destination]
             try:
-                limiter = yield get_retry_limiter(
-                    destination, self.clock, self.store
+                remote_result = yield self.federation.query_client_keys(
+                    destination,
+                    {"device_keys": destination_query},
+                    timeout=timeout
                 )
-                with limiter:
-                    remote_result = yield self.federation.query_client_keys(
-                        destination,
-                        {"device_keys": destination_query},
-                        timeout=timeout
-                    )
 
                 for user_id, keys in remote_result["device_keys"].items():
                     if user_id in destination_query:
@@ -239,18 +235,14 @@ class E2eKeysHandler(object):
         def claim_client_keys(destination):
             device_keys = remote_queries[destination]
             try:
-                limiter = yield get_retry_limiter(
-                    destination, self.clock, self.store
+                remote_result = yield self.federation.claim_client_keys(
+                    destination,
+                    {"one_time_keys": device_keys},
+                    timeout=timeout
                 )
-                with limiter:
-                    remote_result = yield self.federation.claim_client_keys(
-                        destination,
-                        {"one_time_keys": device_keys},
-                        timeout=timeout
-                    )
-                    for user_id, keys in remote_result["one_time_keys"].items():
-                        if user_id in device_keys:
-                            json_result[user_id] = keys
+                for user_id, keys in remote_result["one_time_keys"].items():
+                    if user_id in device_keys:
+                        json_result[user_id] = keys
             except CodeMessageException as e:
                 failures[destination] = {
                     "status": e.code, "message": e.message
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index f15903f862..b0885dc979 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -12,8 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-
+import synapse.util.retryutils
 from twisted.internet import defer, reactor, protocol
 from twisted.internet.error import DNSLookupError
 from twisted.web.client import readBody, HTTPConnectionPool, Agent
@@ -94,6 +93,7 @@ class MatrixFederationHttpClient(object):
             reactor, MatrixFederationEndpointFactory(hs), pool=pool
         )
         self.clock = hs.get_clock()
+        self._store = hs.get_datastore()
         self.version_string = hs.version_string
         self._next_id = 1
 
@@ -106,133 +106,143 @@ class MatrixFederationHttpClient(object):
     def _request(self, destination, method, path,
                  body_callback, headers_dict={}, param_bytes=b"",
                  query_bytes=b"", retry_on_dns_fail=True,
-                 timeout=None, long_retries=False):
+                 timeout=None, long_retries=False, backoff_on_404=False):
         """ Creates and sends a request to the given server
         Args:
             destination (str): The remote server to send the HTTP request to.
             method (str): HTTP method
             path (str): The HTTP path
+            backoff_on_404 (bool): Back off if we get a 404
 
         Returns:
             Deferred: resolves with the http response object on success.
 
             Fails with ``HTTPRequestException``: if we get an HTTP response
-            code >= 300.
+                code >= 300.
+            Fails with ``NotRetryingDestination`` if we are not yet ready
+                to retry this server.
         """
+        limiter = yield synapse.util.retryutils.get_retry_limiter(
+            destination,
+            self.clock,
+            self._store,
+            backoff_on_404=backoff_on_404,
+        )
+
         destination = destination.encode("ascii")
         path_bytes = path.encode("ascii")
+        with limiter:
+            headers_dict[b"User-Agent"] = [self.version_string]
+            headers_dict[b"Host"] = [destination]
 
-        headers_dict[b"User-Agent"] = [self.version_string]
-        headers_dict[b"Host"] = [destination]
+            url_bytes = self._create_url(
+                destination, path_bytes, param_bytes, query_bytes
+            )
 
-        url_bytes = self._create_url(
-            destination, path_bytes, param_bytes, query_bytes
-        )
+            txn_id = "%s-O-%s" % (method, self._next_id)
+            self._next_id = (self._next_id + 1) % (sys.maxint - 1)
 
-        txn_id = "%s-O-%s" % (method, self._next_id)
-        self._next_id = (self._next_id + 1) % (sys.maxint - 1)
+            outbound_logger.info(
+                "{%s} [%s] Sending request: %s %s",
+                txn_id, destination, method, url_bytes
+            )
 
-        outbound_logger.info(
-            "{%s} [%s] Sending request: %s %s",
-            txn_id, destination, method, url_bytes
-        )
+            # XXX: Would be much nicer to retry only at the transaction-layer
+            # (once we have reliable transactions in place)
+            if long_retries:
+                retries_left = MAX_LONG_RETRIES
+            else:
+                retries_left = MAX_SHORT_RETRIES
 
-        # XXX: Would be much nicer to retry only at the transaction-layer
-        # (once we have reliable transactions in place)
-        if long_retries:
-            retries_left = MAX_LONG_RETRIES
-        else:
-            retries_left = MAX_SHORT_RETRIES
+            http_url_bytes = urlparse.urlunparse(
+                ("", "", path_bytes, param_bytes, query_bytes, "")
+            )
 
-        http_url_bytes = urlparse.urlunparse(
-            ("", "", path_bytes, param_bytes, query_bytes, "")
-        )
+            log_result = None
+            try:
+                while True:
+                    producer = None
+                    if body_callback:
+                        producer = body_callback(method, http_url_bytes, headers_dict)
+
+                    try:
+                        def send_request():
+                            request_deferred = preserve_context_over_fn(
+                                self.agent.request,
+                                method,
+                                url_bytes,
+                                Headers(headers_dict),
+                                producer
+                            )
+
+                            return self.clock.time_bound_deferred(
+                                request_deferred,
+                                time_out=timeout / 1000. if timeout else 60,
+                            )
+
+                        response = yield preserve_context_over_fn(send_request)
+
+                        log_result = "%d %s" % (response.code, response.phrase,)
+                        break
+                    except Exception as e:
+                        if not retry_on_dns_fail and isinstance(e, DNSLookupError):
+                            logger.warn(
+                                "DNS Lookup failed to %s with %s",
+                                destination,
+                                e
+                            )
+                            log_result = "DNS Lookup failed to %s with %s" % (
+                                destination, e
+                            )
+                            raise
 
-        log_result = None
-        try:
-            while True:
-                producer = None
-                if body_callback:
-                    producer = body_callback(method, http_url_bytes, headers_dict)
-
-                try:
-                    def send_request():
-                        request_deferred = preserve_context_over_fn(
-                            self.agent.request,
+                        logger.warn(
+                            "{%s} Sending request failed to %s: %s %s: %s - %s",
+                            txn_id,
+                            destination,
                             method,
                             url_bytes,
-                            Headers(headers_dict),
-                            producer
+                            type(e).__name__,
+                            _flatten_response_never_received(e),
                         )
 
-                        return self.clock.time_bound_deferred(
-                            request_deferred,
-                            time_out=timeout / 1000. if timeout else 60,
+                        log_result = "%s - %s" % (
+                            type(e).__name__, _flatten_response_never_received(e),
                         )
 
-                    response = yield preserve_context_over_fn(send_request)
-
-                    log_result = "%d %s" % (response.code, response.phrase,)
-                    break
-                except Exception as e:
-                    if not retry_on_dns_fail and isinstance(e, DNSLookupError):
-                        logger.warn(
-                            "DNS Lookup failed to %s with %s",
-                            destination,
-                            e
-                        )
-                        log_result = "DNS Lookup failed to %s with %s" % (
-                            destination, e
-                        )
-                        raise
-
-                    logger.warn(
-                        "{%s} Sending request failed to %s: %s %s: %s - %s",
-                        txn_id,
-                        destination,
-                        method,
-                        url_bytes,
-                        type(e).__name__,
-                        _flatten_response_never_received(e),
-                    )
-
-                    log_result = "%s - %s" % (
-                        type(e).__name__, _flatten_response_never_received(e),
-                    )
-
-                    if retries_left and not timeout:
-                        if long_retries:
-                            delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
-                            delay = min(delay, 60)
-                            delay *= random.uniform(0.8, 1.4)
+                        if retries_left and not timeout:
+                            if long_retries:
+                                delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
+                                delay = min(delay, 60)
+                                delay *= random.uniform(0.8, 1.4)
+                            else:
+                                delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
+                                delay = min(delay, 2)
+                                delay *= random.uniform(0.8, 1.4)
+
+                            yield sleep(delay)
+                            retries_left -= 1
                         else:
-                            delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
-                            delay = min(delay, 2)
-                            delay *= random.uniform(0.8, 1.4)
-
-                        yield sleep(delay)
-                        retries_left -= 1
-                    else:
-                        raise
-        finally:
-            outbound_logger.info(
-                "{%s} [%s] Result: %s",
-                txn_id,
-                destination,
-                log_result,
-            )
+                            raise
+            finally:
+                outbound_logger.info(
+                    "{%s} [%s] Result: %s",
+                    txn_id,
+                    destination,
+                    log_result,
+                )
 
-        if 200 <= response.code < 300:
-            pass
-        else:
-            # :'(
-            # Update transactions table?
-            body = yield preserve_context_over_fn(readBody, response)
-            raise HttpResponseException(
-                response.code, response.phrase, body
-            )
+            if 200 <= response.code < 300:
+                pass
+            else:
+                # :'(
+                # Update transactions table?
+                body = yield preserve_context_over_fn(readBody, response)
+                raise HttpResponseException(
+                    response.code, response.phrase, body
+                )
 
-        defer.returnValue(response)
+            defer.returnValue(response)
 
     def sign_request(self, destination, method, url_bytes, headers_dict,
                      content=None):
@@ -261,7 +271,7 @@ class MatrixFederationHttpClient(object):
 
     @defer.inlineCallbacks
     def put_json(self, destination, path, data={}, json_data_callback=None,
-                 long_retries=False, timeout=None):
+                 long_retries=False, timeout=None, backoff_on_404=False):
         """ Sends the specifed json data using PUT
 
         Args:
@@ -276,11 +286,17 @@ class MatrixFederationHttpClient(object):
                 retry for a short or long time.
             timeout(int): How long to try (in ms) the destination for before
                 giving up. None indicates no timeout.
+            backoff_on_404 (bool): True if we should count a 404 response as
+                a failure of the server (and should therefore back off future
+                requests)
 
         Returns:
             Deferred: Succeeds when we get a 2xx HTTP response. The result
             will be the decoded JSON body. On a 4xx or 5xx error response a
             CodeMessageException is raised.
+
+            Fails with ``NotRetryingDestination`` if we are not yet ready
+            to retry this server.
         """
 
         if not json_data_callback:
@@ -303,6 +319,7 @@ class MatrixFederationHttpClient(object):
             headers_dict={"Content-Type": ["application/json"]},
             long_retries=long_retries,
             timeout=timeout,
+            backoff_on_404=backoff_on_404,
         )
 
         if 200 <= response.code < 300:
@@ -332,6 +349,9 @@ class MatrixFederationHttpClient(object):
             Deferred: Succeeds when we get a 2xx HTTP response. The result
             will be the decoded JSON body. On a 4xx or 5xx error response a
             CodeMessageException is raised.
+
+            Fails with ``NotRetryingDestination`` if we are not yet ready
+            to retry this server.
         """
 
         def body_callback(method, url_bytes, headers_dict):
@@ -377,6 +397,9 @@ class MatrixFederationHttpClient(object):
 
             The result of the deferred is a tuple of `(code, response)`,
             where `response` is a dict representing the decoded JSON body.
+
+            Fails with ``NotRetryingDestination`` if we are not yet ready
+            to retry this server.
         """
         logger.debug("get_json args: %s", args)
 
@@ -426,6 +449,9 @@ class MatrixFederationHttpClient(object):
 
             Fails with ``HTTPRequestException`` if we get an HTTP response code
             >= 300
+
+            Fails with ``NotRetryingDestination`` if we are not yet ready
+            to retry this server.
         """
 
         encoded_args = {}
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 153ef001ad..7e5a952584 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -124,7 +124,13 @@ class RetryDestinationLimiter(object):
 
     def __exit__(self, exc_type, exc_val, exc_tb):
         valid_err_code = False
-        if exc_type is not None and issubclass(exc_type, CodeMessageException):
+        if exc_type is None:
+            valid_err_code = True
+        elif not issubclass(exc_type, Exception):
+            # avoid treating exceptions which don't derive from Exception as
+            # failures; this is mostly so as not to catch defer._DefGen.
+            valid_err_code = True
+        elif issubclass(exc_type, CodeMessageException):
             # Some error codes are perfectly fine for some APIs, whereas other
             # APIs may expect to never received e.g. a 404. It's important to
             # handle 404 as some remote servers will return a 404 when the HS
@@ -142,11 +148,13 @@ class RetryDestinationLimiter(object):
             else:
                 valid_err_code = False
 
-        if exc_type is None or valid_err_code:
+        if valid_err_code:
             # We connected successfully.
             if not self.retry_interval:
                 return
 
+            logger.debug("Connection to %s was successful; clearing backoff",
+                         self.destination)
             retry_last_ts = 0
             self.retry_interval = 0
         else:
@@ -160,6 +168,10 @@ class RetryDestinationLimiter(object):
             else:
                 self.retry_interval = self.min_retry_interval
 
+            logger.debug(
+                "Connection to %s was unsuccessful (%s(%s)); backoff now %i",
+                self.destination, exc_type, exc_val, self.retry_interval
+            )
             retry_last_ts = int(self.clock.time_msec())
 
         @defer.inlineCallbacks