diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py
index 900575eb3c..7b065b195e 100644
--- a/synapse/util/caches/__init__.py
+++ b/synapse/util/caches/__init__.py
@@ -13,12 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from prometheus_client.core import Gauge, REGISTRY, GaugeMetricFamily
-
import os
-from six.moves import intern
import six
+from six.moves import intern
+
+from prometheus_client.core import REGISTRY, Gauge, GaugeMetricFamily
CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5))
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 65a1042de1..f8a07df6b8 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -13,10 +13,19 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import functools
+import inspect
import logging
+import threading
+from collections import namedtuple
+import six
+from six import itervalues, string_types
+
+from twisted.internet import defer
+
+from synapse.util import logcontext, unwrapFirstError
from synapse.util.async import ObservableDeferred
-from synapse.util import unwrapFirstError, logcontext
from synapse.util.caches import get_cache_factor_for
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
@@ -24,17 +33,6 @@ from synapse.util.stringutils import to_ascii
from . import register_cache
-from twisted.internet import defer
-from collections import namedtuple
-
-import functools
-import inspect
-import threading
-
-from six import string_types, itervalues
-import six
-
-
logger = logging.getLogger(__name__)
diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py
index bdc21e348f..6c0b5a4094 100644
--- a/synapse/util/caches/dictionary_cache.py
+++ b/synapse/util/caches/dictionary_cache.py
@@ -13,12 +13,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.util.caches.lrucache import LruCache
-from collections import namedtuple
-from . import register_cache
-import threading
import logging
+import threading
+from collections import namedtuple
+from synapse.util.caches.lrucache import LruCache
+
+from . import register_cache
logger = logging.getLogger(__name__)
@@ -107,29 +108,28 @@ class DictionaryCache(object):
self.sequence += 1
self.cache.clear()
- def update(self, sequence, key, value, full=False, known_absent=None):
+ def update(self, sequence, key, value, fetched_keys=None):
"""Updates the entry in the cache
Args:
sequence
- key
- value (dict): The value to update the cache with.
- full (bool): Whether the given value is the full dict, or just a
- partial subset there of. If not full then any existing entries
- for the key will be updated.
- known_absent (set): Set of keys that we know don't exist in the full
- dict.
+ key (K)
+ value (dict[X,Y]): The value to update the cache with.
+ fetched_keys (None|set[X]): All of the dictionary keys which were
+ fetched from the database.
+
+ If None, this is the complete value for key K. Otherwise, it
+ is used to infer a list of keys which we know don't exist in
+ the full dict.
"""
self.check_thread()
if self.sequence == sequence:
# Only update the cache if the caches sequence number matches the
# number that the cache had before the SELECT was started (SYN-369)
- if known_absent is None:
- known_absent = set()
- if full:
- self._insert(key, value, known_absent)
+ if fetched_keys is None:
+ self._insert(key, value, set())
else:
- self._update_or_insert(key, value, known_absent)
+ self._update_or_insert(key, value, fetched_keys)
def _update_or_insert(self, key, value, known_absent):
# We pop and reinsert as we need to tell the cache the size may have
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index ff04c91955..465adc54a8 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -13,11 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.util.caches import register_cache
-
-from collections import OrderedDict
import logging
+from collections import OrderedDict
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util.caches import register_cache
logger = logging.getLogger(__name__)
@@ -64,7 +64,10 @@ class ExpiringCache(object):
return
def f():
- self._prune_cache()
+ run_as_background_process(
+ "prune_cache_%s" % self._cache_name,
+ self._prune_cache,
+ )
self._clock.looping_call(f, self._expiry_ms / 2)
diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py
index 1c5a982094..b684f24e7b 100644
--- a/synapse/util/caches/lrucache.py
+++ b/synapse/util/caches/lrucache.py
@@ -14,8 +14,8 @@
# limitations under the License.
-from functools import wraps
import threading
+from functools import wraps
from synapse.util.caches.treecache import TreeCache
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index 817118e30f..f2bde74dc5 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -13,12 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.util import caches
-
+import logging
from sortedcontainers import SortedDict
-import logging
+from synapse.util import caches
logger = logging.getLogger(__name__)
@@ -75,13 +74,13 @@ class StreamChangeCache(object):
assert type(stream_pos) is int
if stream_pos >= self._earliest_known_stream_pos:
- not_known_entities = set(entities) - set(self._entity_to_key)
+ changed_entities = {
+ self._cache[k] for k in self._cache.islice(
+ start=self._cache.bisect_right(stream_pos),
+ )
+ }
- result = (
- set(self._cache.values()[self._cache.bisect_right(stream_pos) :])
- .intersection(entities)
- .union(not_known_entities)
- )
+ result = changed_entities.intersection(entities)
self.metrics.inc_hits()
else:
@@ -113,7 +112,8 @@ class StreamChangeCache(object):
assert type(stream_pos) is int
if stream_pos >= self._earliest_known_stream_pos:
- return self._cache.values()[self._cache.bisect_right(stream_pos) :]
+ return [self._cache[k] for k in self._cache.islice(
+ start=self._cache.bisect_right(stream_pos))]
else:
return None
|