summary refs log tree commit diff
path: root/synapse/util
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/util')
-rw-r--r--synapse/util/async.py7
-rw-r--r--synapse/util/caches/__init__.py11
-rw-r--r--synapse/util/caches/descriptors.py114
-rw-r--r--synapse/util/caches/dictionary_cache.py6
-rw-r--r--synapse/util/caches/expiringcache.py41
-rw-r--r--synapse/util/caches/lrucache.py63
-rw-r--r--synapse/util/caches/treecache.py17
-rw-r--r--synapse/util/debug.py71
-rw-r--r--synapse/util/retryutils.py21
9 files changed, 208 insertions, 143 deletions
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 83875edc85..35380bf8ed 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -192,8 +192,11 @@ class Linearizer(object):
             logger.info(
                 "Waiting to acquire linearizer lock %r for key %r", self.name, key
             )
-            with PreserveLoggingContext():
-                yield current_defer
+            try:
+                with PreserveLoggingContext():
+                    yield current_defer
+            except:
+                logger.exception("Unexpected exception in Linearizer")
 
         logger.info("Acquired linearizer lock %r for key %r", self.name, key)
 
diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py
index ebd715c5dc..8a7774a88e 100644
--- a/synapse/util/caches/__init__.py
+++ b/synapse/util/caches/__init__.py
@@ -40,8 +40,8 @@ def register_cache(name, cache):
     )
 
 
-_string_cache = LruCache(int(5000 * CACHE_SIZE_FACTOR))
-caches_by_name["string_cache"] = _string_cache
+_string_cache = LruCache(int(100000 * CACHE_SIZE_FACTOR))
+_stirng_cache_metrics = register_cache("string_cache", _string_cache)
 
 
 KNOWN_KEYS = {
@@ -69,7 +69,12 @@ KNOWN_KEYS = {
 def intern_string(string):
     """Takes a (potentially) unicode string and interns using custom cache
     """
-    return _string_cache.setdefault(string, string)
+    new_str = _string_cache.setdefault(string, string)
+    if new_str is string:
+        _stirng_cache_metrics.inc_hits()
+    else:
+        _stirng_cache_metrics.inc_misses()
+    return new_str
 
 
 def intern_dict(dictionary):
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 8dba61d49f..998de70d29 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -17,7 +17,7 @@ import logging
 from synapse.util.async import ObservableDeferred
 from synapse.util import unwrapFirstError
 from synapse.util.caches.lrucache import LruCache
-from synapse.util.caches.treecache import TreeCache
+from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
 from synapse.util.logcontext import (
     PreserveLoggingContext, preserve_context_over_deferred, preserve_context_over_fn
 )
@@ -42,6 +42,25 @@ _CacheSentinel = object()
 CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
 
 
+class CacheEntry(object):
+    __slots__ = [
+        "deferred", "sequence", "callbacks", "invalidated"
+    ]
+
+    def __init__(self, deferred, sequence, callbacks):
+        self.deferred = deferred
+        self.sequence = sequence
+        self.callbacks = set(callbacks)
+        self.invalidated = False
+
+    def invalidate(self):
+        if not self.invalidated:
+            self.invalidated = True
+            for callback in self.callbacks:
+                callback()
+            self.callbacks.clear()
+
+
 class Cache(object):
     __slots__ = (
         "cache",
@@ -51,12 +70,16 @@ class Cache(object):
         "sequence",
         "thread",
         "metrics",
+        "_pending_deferred_cache",
     )
 
-    def __init__(self, name, max_entries=1000, keylen=1, tree=False):
+    def __init__(self, name, max_entries=1000, keylen=1, tree=False, iterable=False):
         cache_type = TreeCache if tree else dict
+        self._pending_deferred_cache = cache_type()
+
         self.cache = LruCache(
-            max_size=max_entries, keylen=keylen, cache_type=cache_type
+            max_size=max_entries, keylen=keylen, cache_type=cache_type,
+            size_callback=(lambda d: len(d.result)) if iterable else None,
         )
 
         self.name = name
@@ -76,7 +99,15 @@ class Cache(object):
                 )
 
     def get(self, key, default=_CacheSentinel, callback=None):
-        val = self.cache.get(key, _CacheSentinel, callback=callback)
+        callbacks = [callback] if callback else []
+        val = self._pending_deferred_cache.get(key, _CacheSentinel)
+        if val is not _CacheSentinel:
+            if val.sequence == self.sequence:
+                val.callbacks.update(callbacks)
+                self.metrics.inc_hits()
+                return val.deferred
+
+        val = self.cache.get(key, _CacheSentinel, callbacks=callbacks)
         if val is not _CacheSentinel:
             self.metrics.inc_hits()
             return val
@@ -88,15 +119,39 @@ class Cache(object):
         else:
             return default
 
-    def update(self, sequence, key, value, callback=None):
+    def set(self, key, value, callback=None):
+        callbacks = [callback] if callback else []
         self.check_thread()
-        if self.sequence == sequence:
-            # Only update the cache if the caches sequence number matches the
-            # number that the cache had before the SELECT was started (SYN-369)
-            self.prefill(key, value, callback=callback)
+        entry = CacheEntry(
+            deferred=value,
+            sequence=self.sequence,
+            callbacks=callbacks,
+        )
+
+        entry.callbacks.update(callbacks)
+
+        existing_entry = self._pending_deferred_cache.pop(key, None)
+        if existing_entry:
+            existing_entry.invalidate()
+
+        self._pending_deferred_cache[key] = entry
+
+        def shuffle(result):
+            if self.sequence == entry.sequence:
+                existing_entry = self._pending_deferred_cache.pop(key, None)
+                if existing_entry is entry:
+                    self.cache.set(key, entry.deferred, entry.callbacks)
+                else:
+                    entry.invalidate()
+            else:
+                entry.invalidate()
+            return result
+
+        entry.deferred.addCallback(shuffle)
 
     def prefill(self, key, value, callback=None):
-        self.cache.set(key, value, callback=callback)
+        callbacks = [callback] if callback else []
+        self.cache.set(key, value, callbacks=callbacks)
 
     def invalidate(self, key):
         self.check_thread()
@@ -108,6 +163,10 @@ class Cache(object):
         # Increment the sequence number so that any SELECT statements that
         # raced with the INSERT don't update the cache (SYN-369)
         self.sequence += 1
+        entry = self._pending_deferred_cache.pop(key, None)
+        if entry:
+            entry.invalidate()
+
         self.cache.pop(key, None)
 
     def invalidate_many(self, key):
@@ -119,6 +178,11 @@ class Cache(object):
         self.sequence += 1
         self.cache.del_multi(key)
 
+        entry_dict = self._pending_deferred_cache.pop(key, None)
+        if entry_dict is not None:
+            for entry in iterate_tree_cache_entry(entry_dict):
+                entry.invalidate()
+
     def invalidate_all(self):
         self.check_thread()
         self.sequence += 1
@@ -155,7 +219,7 @@ class CacheDescriptor(object):
 
     """
     def __init__(self, orig, max_entries=1000, num_args=1, tree=False,
-                 inlineCallbacks=False, cache_context=False):
+                 inlineCallbacks=False, cache_context=False, iterable=False):
         max_entries = int(max_entries * CACHE_SIZE_FACTOR)
 
         self.orig = orig
@@ -169,6 +233,8 @@ class CacheDescriptor(object):
         self.num_args = num_args
         self.tree = tree
 
+        self.iterable = iterable
+
         all_args = inspect.getargspec(orig)
         self.arg_names = all_args.args[1:num_args + 1]
 
@@ -203,6 +269,7 @@ class CacheDescriptor(object):
             max_entries=self.max_entries,
             keylen=self.num_args,
             tree=self.tree,
+            iterable=self.iterable,
         )
 
         @functools.wraps(self.orig)
@@ -243,11 +310,6 @@ class CacheDescriptor(object):
 
                 return preserve_context_over_deferred(observer)
             except KeyError:
-                # Get the sequence number of the cache before reading from the
-                # database so that we can tell if the cache is invalidated
-                # while the SELECT is executing (SYN-369)
-                sequence = cache.sequence
-
                 ret = defer.maybeDeferred(
                     preserve_context_over_fn,
                     self.function_to_call,
@@ -261,7 +323,7 @@ class CacheDescriptor(object):
                 ret.addErrback(onErr)
 
                 ret = ObservableDeferred(ret, consumeErrors=True)
-                cache.update(sequence, cache_key, ret, callback=invalidate_callback)
+                cache.set(cache_key, ret, callback=invalidate_callback)
 
                 return preserve_context_over_deferred(ret.observe())
 
@@ -359,7 +421,6 @@ class CacheListDescriptor(object):
                     missing.append(arg)
 
             if missing:
-                sequence = cache.sequence
                 args_to_call = dict(arg_dict)
                 args_to_call[self.list_name] = missing
 
@@ -382,8 +443,8 @@ class CacheListDescriptor(object):
 
                     key = list(keyargs)
                     key[self.list_pos] = arg
-                    cache.update(
-                        sequence, tuple(key), observer,
+                    cache.set(
+                        tuple(key), observer,
                         callback=invalidate_callback
                     )
 
@@ -417,21 +478,29 @@ class CacheListDescriptor(object):
 
 
 class _CacheContext(namedtuple("_CacheContext", ("cache", "key"))):
+    # We rely on _CacheContext implementing __eq__ and __hash__ sensibly,
+    # which namedtuple does for us (i.e. two _CacheContext are the same if
+    # their caches and keys match). This is important in particular to
+    # dedupe when we add callbacks to lru cache nodes, otherwise the number
+    # of callbacks would grow.
     def invalidate(self):
         self.cache.invalidate(self.key)
 
 
-def cached(max_entries=1000, num_args=1, tree=False, cache_context=False):
+def cached(max_entries=1000, num_args=1, tree=False, cache_context=False,
+           iterable=False):
     return lambda orig: CacheDescriptor(
         orig,
         max_entries=max_entries,
         num_args=num_args,
         tree=tree,
         cache_context=cache_context,
+        iterable=iterable,
     )
 
 
-def cachedInlineCallbacks(max_entries=1000, num_args=1, tree=False, cache_context=False):
+def cachedInlineCallbacks(max_entries=1000, num_args=1, tree=False, cache_context=False,
+                          iterable=False):
     return lambda orig: CacheDescriptor(
         orig,
         max_entries=max_entries,
@@ -439,6 +508,7 @@ def cachedInlineCallbacks(max_entries=1000, num_args=1, tree=False, cache_contex
         tree=tree,
         inlineCallbacks=True,
         cache_context=cache_context,
+        iterable=iterable,
     )
 
 
diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py
index b0ca1bb79d..cb6933c61c 100644
--- a/synapse/util/caches/dictionary_cache.py
+++ b/synapse/util/caches/dictionary_cache.py
@@ -23,7 +23,9 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-DictionaryEntry = namedtuple("DictionaryEntry", ("full", "value"))
+class DictionaryEntry(namedtuple("DictionaryEntry", ("full", "value"))):
+    def __len__(self):
+        return len(self.value)
 
 
 class DictionaryCache(object):
@@ -32,7 +34,7 @@ class DictionaryCache(object):
     """
 
     def __init__(self, name, max_entries=1000):
-        self.cache = LruCache(max_size=max_entries)
+        self.cache = LruCache(max_size=max_entries, size_callback=len)
 
         self.name = name
         self.sequence = 0
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index 080388958f..2987c38a2d 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -15,6 +15,7 @@
 
 from synapse.util.caches import register_cache
 
+from collections import OrderedDict
 import logging
 
 
@@ -23,7 +24,7 @@ logger = logging.getLogger(__name__)
 
 class ExpiringCache(object):
     def __init__(self, cache_name, clock, max_len=0, expiry_ms=0,
-                 reset_expiry_on_get=False):
+                 reset_expiry_on_get=False, iterable=False):
         """
         Args:
             cache_name (str): Name of this cache, used for logging.
@@ -36,6 +37,8 @@ class ExpiringCache(object):
                 evicted based on time.
             reset_expiry_on_get (bool): If true, will reset the expiry time for
                 an item on access. Defaults to False.
+            iterable (bool): If true, the size is calculated by summing the
+                sizes of all entries, rather than the number of entries.
 
         """
         self._cache_name = cache_name
@@ -47,9 +50,13 @@ class ExpiringCache(object):
 
         self._reset_expiry_on_get = reset_expiry_on_get
 
-        self._cache = {}
+        self._cache = OrderedDict()
 
-        self.metrics = register_cache(cache_name, self._cache)
+        self.metrics = register_cache(cache_name, self)
+
+        self.iterable = iterable
+
+        self._size_estimate = 0
 
     def start(self):
         if not self._expiry_ms:
@@ -65,15 +72,14 @@ class ExpiringCache(object):
         now = self._clock.time_msec()
         self._cache[key] = _CacheEntry(now, value)
 
-        # Evict if there are now too many items
-        if self._max_len and len(self._cache.keys()) > self._max_len:
-            sorted_entries = sorted(
-                self._cache.items(),
-                key=lambda item: item[1].time,
-            )
+        if self.iterable:
+            self._size_estimate += len(value)
 
-            for k, _ in sorted_entries[self._max_len:]:
-                self._cache.pop(k)
+        # Evict if there are now too many items
+        while self._max_len and len(self) > self._max_len:
+            _key, value = self._cache.popitem(last=False)
+            if self.iterable:
+                self._size_estimate -= len(value.value)
 
     def __getitem__(self, key):
         try:
@@ -99,7 +105,7 @@ class ExpiringCache(object):
             # zero expiry time means don't expire. This should never get called
             # since we have this check in start too.
             return
-        begin_length = len(self._cache)
+        begin_length = len(self)
 
         now = self._clock.time_msec()
 
@@ -110,15 +116,20 @@ class ExpiringCache(object):
                 keys_to_delete.add(key)
 
         for k in keys_to_delete:
-            self._cache.pop(k)
+            value = self._cache.pop(k)
+            if self.iterable:
+                self._size_estimate -= len(value.value)
 
         logger.debug(
             "[%s] _prune_cache before: %d, after len: %d",
-            self._cache_name, begin_length, len(self._cache)
+            self._cache_name, begin_length, len(self)
         )
 
     def __len__(self):
-        return len(self._cache)
+        if self.iterable:
+            return self._size_estimate
+        else:
+            return len(self._cache)
 
 
 class _CacheEntry(object):
diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py
index 9c4c679175..cf5fbb679c 100644
--- a/synapse/util/caches/lrucache.py
+++ b/synapse/util/caches/lrucache.py
@@ -49,7 +49,7 @@ class LruCache(object):
     Can also set callbacks on objects when getting/setting which are fired
     when that key gets invalidated/evicted.
     """
-    def __init__(self, max_size, keylen=1, cache_type=dict):
+    def __init__(self, max_size, keylen=1, cache_type=dict, size_callback=None):
         cache = cache_type()
         self.cache = cache  # Used for introspection.
         list_root = _Node(None, None, None, None)
@@ -58,6 +58,12 @@ class LruCache(object):
 
         lock = threading.Lock()
 
+        def evict():
+            while cache_len() > max_size:
+                todelete = list_root.prev_node
+                delete_node(todelete)
+                cache.pop(todelete.key, None)
+
         def synchronized(f):
             @wraps(f)
             def inner(*args, **kwargs):
@@ -66,6 +72,16 @@ class LruCache(object):
 
             return inner
 
+        cached_cache_len = [0]
+        if size_callback is not None:
+            def cache_len():
+                return cached_cache_len[0]
+        else:
+            def cache_len():
+                return len(cache)
+
+        self.len = synchronized(cache_len)
+
         def add_node(key, value, callbacks=set()):
             prev_node = list_root
             next_node = prev_node.next_node
@@ -74,6 +90,9 @@ class LruCache(object):
             next_node.prev_node = node
             cache[key] = node
 
+            if size_callback:
+                cached_cache_len[0] += size_callback(node.value)
+
         def move_node_to_front(node):
             prev_node = node.prev_node
             next_node = node.next_node
@@ -92,23 +111,25 @@ class LruCache(object):
             prev_node.next_node = next_node
             next_node.prev_node = prev_node
 
+            if size_callback:
+                cached_cache_len[0] -= size_callback(node.value)
+
             for cb in node.callbacks:
                 cb()
             node.callbacks.clear()
 
         @synchronized
-        def cache_get(key, default=None, callback=None):
+        def cache_get(key, default=None, callbacks=[]):
             node = cache.get(key, None)
             if node is not None:
                 move_node_to_front(node)
-                if callback:
-                    node.callbacks.add(callback)
+                node.callbacks.update(callbacks)
                 return node.value
             else:
                 return default
 
         @synchronized
-        def cache_set(key, value, callback=None):
+        def cache_set(key, value, callbacks=[]):
             node = cache.get(key, None)
             if node is not None:
                 if value != node.value:
@@ -116,21 +137,18 @@ class LruCache(object):
                         cb()
                     node.callbacks.clear()
 
-                if callback:
-                    node.callbacks.add(callback)
+                    if size_callback:
+                        cached_cache_len[0] -= size_callback(node.value)
+                        cached_cache_len[0] += size_callback(value)
+
+                node.callbacks.update(callbacks)
 
                 move_node_to_front(node)
                 node.value = value
             else:
-                if callback:
-                    callbacks = set([callback])
-                else:
-                    callbacks = set()
-                add_node(key, value, callbacks)
-                if len(cache) > max_size:
-                    todelete = list_root.prev_node
-                    delete_node(todelete)
-                    cache.pop(todelete.key, None)
+                add_node(key, value, set(callbacks))
+
+            evict()
 
         @synchronized
         def cache_set_default(key, value):
@@ -139,10 +157,7 @@ class LruCache(object):
                 return node.value
             else:
                 add_node(key, value)
-                if len(cache) > max_size:
-                    todelete = list_root.prev_node
-                    delete_node(todelete)
-                    cache.pop(todelete.key, None)
+                evict()
                 return value
 
         @synchronized
@@ -174,10 +189,8 @@ class LruCache(object):
                 for cb in node.callbacks:
                     cb()
             cache.clear()
-
-        @synchronized
-        def cache_len():
-            return len(cache)
+            if size_callback:
+                cached_cache_len[0] = 0
 
         @synchronized
         def cache_contains(key):
@@ -190,7 +203,7 @@ class LruCache(object):
         self.pop = cache_pop
         if cache_type is TreeCache:
             self.del_multi = cache_del_multi
-        self.len = cache_len
+        self.len = synchronized(cache_len)
         self.contains = cache_contains
         self.clear = cache_clear
 
diff --git a/synapse/util/caches/treecache.py b/synapse/util/caches/treecache.py
index c31585aea3..fcc341a6b7 100644
--- a/synapse/util/caches/treecache.py
+++ b/synapse/util/caches/treecache.py
@@ -65,12 +65,27 @@ class TreeCache(object):
         return popped
 
     def values(self):
-        return [e.value for e in self.root.values()]
+        return list(iterate_tree_cache_entry(self.root))
 
     def __len__(self):
         return self.size
 
 
+def iterate_tree_cache_entry(d):
+    """Helper function to iterate over the leaves of a tree, i.e. a dict of that
+    can contain dicts.
+    """
+    if isinstance(d, dict):
+        for value_d in d.itervalues():
+            for value in iterate_tree_cache_entry(value_d):
+                yield value
+    else:
+        if isinstance(d, _Entry):
+            yield d.value
+        else:
+            yield d
+
+
 class _Entry(object):
     __slots__ = ["value"]
 
diff --git a/synapse/util/debug.py b/synapse/util/debug.py
deleted file mode 100644
index dc49162e6a..0000000000
--- a/synapse/util/debug.py
+++ /dev/null
@@ -1,71 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2015, 2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from twisted.internet import defer, reactor
-from functools import wraps
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
-
-
-def debug_deferreds():
-    """Cause all deferreds to wait for a reactor tick before running their
-    callbacks. This increases the chance of getting a stack trace out of
-    a defer.inlineCallback since the code waiting on the deferred will get
-    a chance to add an errback before the deferred runs."""
-
-    # Helper method for retrieving and restoring the current logging context
-    # around a callback.
-    def with_logging_context(fn):
-        context = LoggingContext.current_context()
-
-        def restore_context_callback(x):
-            with PreserveLoggingContext(context):
-                return fn(x)
-
-        return restore_context_callback
-
-    # We are going to modify the __init__ method of defer.Deferred so we
-    # need to get a copy of the old method so we can still call it.
-    old__init__ = defer.Deferred.__init__
-
-    # We need to create a deferred to bounce the callbacks through the reactor
-    # but we don't want to add a callback when we create that deferred so we
-    # we create a new type of deferred that uses the old __init__ method.
-    # This is safe as long as the old __init__ method doesn't invoke an
-    # __init__ using super.
-    class Bouncer(defer.Deferred):
-        __init__ = old__init__
-
-    # We'll add this as a callback to all Deferreds. Twisted will wait until
-    # the bouncer deferred resolves before calling the callbacks of the
-    # original deferred.
-    def bounce_callback(x):
-        bouncer = Bouncer()
-        reactor.callLater(0, with_logging_context(bouncer.callback), x)
-        return bouncer
-
-    # We'll add this as an errback to all Deferreds. Twisted will wait until
-    # the bouncer deferred resolves before calling the errbacks of the
-    # original deferred.
-    def bounce_errback(x):
-        bouncer = Bouncer()
-        reactor.callLater(0, with_logging_context(bouncer.errback), x)
-        return bouncer
-
-    @wraps(old__init__)
-    def new__init__(self, *args, **kargs):
-        old__init__(self, *args, **kargs)
-        self.addCallbacks(bounce_callback, bounce_errback)
-
-    defer.Deferred.__init__ = new__init__
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index e2de7fce91..153ef001ad 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -88,7 +88,7 @@ class RetryDestinationLimiter(object):
     def __init__(self, destination, clock, store, retry_interval,
                  min_retry_interval=10 * 60 * 1000,
                  max_retry_interval=24 * 60 * 60 * 1000,
-                 multiplier_retry_interval=5,):
+                 multiplier_retry_interval=5, backoff_on_404=False):
         """Marks the destination as "down" if an exception is thrown in the
         context, except for CodeMessageException with code < 500.
 
@@ -107,6 +107,7 @@ class RetryDestinationLimiter(object):
                 a failed request, in milliseconds.
             multiplier_retry_interval (int): The multiplier to use to increase
                 the retry interval after a failed request.
+            backoff_on_404 (bool): Back off if we get a 404
         """
         self.clock = clock
         self.store = store
@@ -116,6 +117,7 @@ class RetryDestinationLimiter(object):
         self.min_retry_interval = min_retry_interval
         self.max_retry_interval = max_retry_interval
         self.multiplier_retry_interval = multiplier_retry_interval
+        self.backoff_on_404 = backoff_on_404
 
     def __enter__(self):
         pass
@@ -123,7 +125,22 @@ class RetryDestinationLimiter(object):
     def __exit__(self, exc_type, exc_val, exc_tb):
         valid_err_code = False
         if exc_type is not None and issubclass(exc_type, CodeMessageException):
-            valid_err_code = exc_val.code != 429 and 0 <= exc_val.code < 500
+            # Some error codes are perfectly fine for some APIs, whereas other
+            # APIs may expect to never received e.g. a 404. It's important to
+            # handle 404 as some remote servers will return a 404 when the HS
+            # has been decommissioned.
+            # If we get a 401, then we should probably back off since they
+            # won't accept our requests for at least a while.
+            # 429 is us being aggresively rate limited, so lets rate limit
+            # ourselves.
+            if exc_val.code == 404 and self.backoff_on_404:
+                valid_err_code = False
+            elif exc_val.code in (401, 429):
+                valid_err_code = False
+            elif exc_val.code < 500:
+                valid_err_code = True
+            else:
+                valid_err_code = False
 
         if exc_type is None or valid_err_code:
             # We connected successfully.