| diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index e2de7fce91..4fa9d1a03c 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -12,7 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
+import synapse.util.logcontext
 from twisted.internet import defer
 
 from synapse.api.errors import CodeMessageException
@@ -35,7 +35,8 @@ class NotRetryingDestination(Exception):
 
 
 @defer.inlineCallbacks
-def get_retry_limiter(destination, clock, store, **kwargs):
+def get_retry_limiter(destination, clock, store, ignore_backoff=False,
+                      **kwargs):
     """For a given destination check if we have previously failed to
     send a request there and are waiting before retrying the destination.
     If we are not ready to retry the destination, this will raise a
@@ -43,6 +44,14 @@ def get_retry_limiter(destination, clock, store, **kwargs):
     that will mark the destination as down if an exception is thrown (excluding
     CodeMessageException with code < 500)
 
+    Args:
+        destination (str): name of homeserver
+        clock (synapse.util.clock): timing source
+        store (synapse.storage.transactions.TransactionStore): datastore
+        ignore_backoff (bool): true to ignore the historical backoff data and
+            try the request anyway. We will still update the next
+            retry_interval on success/failure.
+
     Example usage:
 
         try:
@@ -66,7 +75,7 @@ def get_retry_limiter(destination, clock, store, **kwargs):
 
         now = int(clock.time_msec())
 
-        if retry_last_ts + retry_interval > now:
+        if not ignore_backoff and retry_last_ts + retry_interval > now:
             raise NotRetryingDestination(
                 retry_last_ts=retry_last_ts,
                 retry_interval=retry_interval,
@@ -88,7 +97,7 @@ class RetryDestinationLimiter(object):
     def __init__(self, destination, clock, store, retry_interval,
                  min_retry_interval=10 * 60 * 1000,
                  max_retry_interval=24 * 60 * 60 * 1000,
-                 multiplier_retry_interval=5,):
+                 multiplier_retry_interval=5, backoff_on_404=False):
         """Marks the destination as "down" if an exception is thrown in the
         context, except for CodeMessageException with code < 500.
 
@@ -107,6 +116,7 @@ class RetryDestinationLimiter(object):
                 a failed request, in milliseconds.
             multiplier_retry_interval (int): The multiplier to use to increase
                 the retry interval after a failed request.
+            backoff_on_404 (bool): Back off if we get a 404
         """
         self.clock = clock
         self.store = store
@@ -116,20 +126,44 @@ class RetryDestinationLimiter(object):
         self.min_retry_interval = min_retry_interval
         self.max_retry_interval = max_retry_interval
         self.multiplier_retry_interval = multiplier_retry_interval
+        self.backoff_on_404 = backoff_on_404
 
     def __enter__(self):
         pass
 
     def __exit__(self, exc_type, exc_val, exc_tb):
         valid_err_code = False
-        if exc_type is not None and issubclass(exc_type, CodeMessageException):
-            valid_err_code = exc_val.code != 429 and 0 <= exc_val.code < 500
+        if exc_type is None:
+            valid_err_code = True
+        elif not issubclass(exc_type, Exception):
+            # avoid treating exceptions which don't derive from Exception as
+            # failures; this is mostly so as not to catch defer._DefGen.
+            valid_err_code = True
+        elif issubclass(exc_type, CodeMessageException):
+            # Some error codes are perfectly fine for some APIs, whereas other
+            # APIs may expect to never received e.g. a 404. It's important to
+            # handle 404 as some remote servers will return a 404 when the HS
+            # has been decommissioned.
+            # If we get a 401, then we should probably back off since they
+            # won't accept our requests for at least a while.
+            # 429 is us being aggresively rate limited, so lets rate limit
+            # ourselves.
+            if exc_val.code == 404 and self.backoff_on_404:
+                valid_err_code = False
+            elif exc_val.code in (401, 429):
+                valid_err_code = False
+            elif exc_val.code < 500:
+                valid_err_code = True
+            else:
+                valid_err_code = False
 
-        if exc_type is None or valid_err_code:
+        if valid_err_code:
             # We connected successfully.
             if not self.retry_interval:
                 return
 
+            logger.debug("Connection to %s was successful; clearing backoff",
+                         self.destination)
             retry_last_ts = 0
             self.retry_interval = 0
         else:
@@ -143,6 +177,10 @@ class RetryDestinationLimiter(object):
             else:
                 self.retry_interval = self.min_retry_interval
 
+            logger.debug(
+                "Connection to %s was unsuccessful (%s(%s)); backoff now %i",
+                self.destination, exc_type, exc_val, self.retry_interval
+            )
             retry_last_ts = int(self.clock.time_msec())
 
         @defer.inlineCallbacks
@@ -156,4 +194,5 @@ class RetryDestinationLimiter(object):
                     "Failed to store set_destination_retry_timings",
                 )
 
-        store_retry_timings()
+        # we deliberately do this in the background.
+        synapse.util.logcontext.preserve_fn(store_retry_timings)()
 |