From bbfe4e996c9b9729f19d5b104dc6abfe120531b4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 17 May 2017 14:31:23 +0100 Subject: Make get_state_groups_from_groups faster. Most of the time was spent copying a dict to filter out sentinel values that indicated that keys did not exist in the dict. The sentinel values were added to ensure that we cached the non-existence of keys. By updating DictionaryCache to keep track of which keys were known to not exist itself we can remove a dictionary copy. --- synapse/util/caches/dictionary_cache.py | 57 ++++++++++++++++++++++++++------- 1 file changed, 46 insertions(+), 11 deletions(-) (limited to 'synapse/util/caches') diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py index cb6933c61c..d4105822b3 100644 --- a/synapse/util/caches/dictionary_cache.py +++ b/synapse/util/caches/dictionary_cache.py @@ -23,7 +23,17 @@ import logging logger = logging.getLogger(__name__) -class DictionaryEntry(namedtuple("DictionaryEntry", ("full", "value"))): +class DictionaryEntry(namedtuple("DictionaryEntry", ("full", "known_absent", "value"))): + """Returned when getting an entry from the cache + + Attributes: + full (bool): Whether the cache has the full or dict or just some keys. + If not full then not all requested keys will necessarily be present + in `value` + known_absent (set): Keys that were looked up in the dict and were not + there. + value (dict): The full or partial dict value + """ def __len__(self): return len(self.value) @@ -58,21 +68,31 @@ class DictionaryCache(object): ) def get(self, key, dict_keys=None): + """Fetch an entry out of the cache + + Args: + key + dict_key(list): If given a set of keys then return only those keys + that exist in the cache. + + Returns: + DictionaryEntry + """ entry = self.cache.get(key, self.sentinel) if entry is not self.sentinel: self.metrics.inc_hits() if dict_keys is None: - return DictionaryEntry(entry.full, dict(entry.value)) + return DictionaryEntry(entry.full, entry.known_absent, dict(entry.value)) else: - return DictionaryEntry(entry.full, { + return DictionaryEntry(entry.full, entry.known_absent, { k: entry.value[k] for k in dict_keys if k in entry.value }) self.metrics.inc_misses() - return DictionaryEntry(False, {}) + return DictionaryEntry(False, set(), {}) def invalidate(self, key): self.check_thread() @@ -87,19 +107,34 @@ class DictionaryCache(object): self.sequence += 1 self.cache.clear() - def update(self, sequence, key, value, full=False): + def update(self, sequence, key, value, full=False, known_absent=None): + """Updates the entry in the cache + + Args: + sequence + key + value (dict): The value to update the cache with. + full (bool): Whether the given value is the full dict, or just a + partial subset there of. If not full then any existing entries + for the key will be updated. + known_absent (set): Set of keys that we know don't exist in the full + dict. + """ self.check_thread() if self.sequence == sequence: # Only update the cache if the caches sequence number matches the # number that the cache had before the SELECT was started (SYN-369) + if known_absent is None: + known_absent = set() if full: - self._insert(key, value) + self._insert(key, value, known_absent) else: - self._update_or_insert(key, value) + self._update_or_insert(key, value, known_absent) - def _update_or_insert(self, key, value): - entry = self.cache.setdefault(key, DictionaryEntry(False, {})) + def _update_or_insert(self, key, value, known_absent): + entry = self.cache.setdefault(key, DictionaryEntry(False, set(), {})) entry.value.update(value) + entry.known_absent.update(known_absent) - def _insert(self, key, value): - self.cache[key] = DictionaryEntry(True, value) + def _insert(self, key, value, known_absent): + self.cache[key] = DictionaryEntry(True, known_absent, value) -- cgit 1.4.1 From e3417a06e23c532e6502bdcdcaedac826e231d69 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 22 May 2017 15:04:42 +0100 Subject: Update list cache to handle one arg case We update the normal cache descriptors to handle caches with a single argument specially so that the key wasn't a 1-tuple. We need to update the cache list to be aware of this. --- synapse/util/caches/descriptors.py | 50 +++++++++++++++++++++++++------------- 1 file changed, 33 insertions(+), 17 deletions(-) (limited to 'synapse/util/caches') diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 48dcbafeef..77a0d8e35d 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -404,6 +404,7 @@ class CacheDescriptor(_CacheDescriptorBase): wrapped.invalidate_all = cache.invalidate_all wrapped.cache = cache + wrapped.num_args = self.num_args obj.__dict__[self.orig.__name__] = wrapped @@ -451,8 +452,9 @@ class CacheListDescriptor(_CacheDescriptorBase): ) def __get__(self, obj, objtype=None): - - cache = getattr(obj, self.cached_method_name).cache + cached_method = getattr(obj, self.cached_method_name) + cache = cached_method.cache + num_args = cached_method.num_args @functools.wraps(self.orig) def wrapped(*args, **kwargs): @@ -470,11 +472,14 @@ class CacheListDescriptor(_CacheDescriptorBase): cached_defers = {} missing = [] for arg in list_args: - key = list(keyargs) - key[self.list_pos] = arg - try: - res = cache.get(tuple(key), callback=invalidate_callback) + if num_args == 1: + res = cache.get(arg, callback=invalidate_callback) + else: + key = list(keyargs) + key[self.list_pos] = arg + res = cache.get(tuple(key), callback=invalidate_callback) + if not isinstance(res, ObservableDeferred): results[arg] = res elif not res.has_succeeded(): @@ -505,17 +510,28 @@ class CacheListDescriptor(_CacheDescriptorBase): observer = ObservableDeferred(observer) - key = list(keyargs) - key[self.list_pos] = arg - cache.set( - tuple(key), observer, - callback=invalidate_callback - ) - - def invalidate(f, key): - cache.invalidate(key) - return f - observer.addErrback(invalidate, tuple(key)) + if num_args == 1: + cache.set( + arg, observer, + callback=invalidate_callback + ) + + def invalidate(f, key): + cache.invalidate(key) + return f + observer.addErrback(invalidate, arg) + else: + key = list(keyargs) + key[self.list_pos] = arg + cache.set( + tuple(key), observer, + callback=invalidate_callback + ) + + def invalidate(f, key): + cache.invalidate(key) + return f + observer.addErrback(invalidate, tuple(key)) res = observer.observe() res.addCallback(lambda r, arg: (arg, r), arg) -- cgit 1.4.1 From bd7bb5df717810eec0ae56d558a8413003d2ecaa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 22 May 2017 15:12:19 +0100 Subject: Pull out if statement from for loop --- synapse/util/caches/descriptors.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) (limited to 'synapse/util/caches') diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 77a0d8e35d..cbdff86596 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -471,14 +471,22 @@ class CacheListDescriptor(_CacheDescriptorBase): results = {} cached_defers = {} missing = [] + + # If the cache takes a single arg then that is used as the key, + # otherwise a tuple is used. + if num_args == 1: + def cache_get(arg): + return cache.get(arg, callback=invalidate_callback) + else: + key = list(keyargs) + + def cache_get(arg): + key[self.list_pos] = arg + return cache.get(tuple(key), callback=invalidate_callback) + for arg in list_args: try: - if num_args == 1: - res = cache.get(arg, callback=invalidate_callback) - else: - key = list(keyargs) - key[self.list_pos] = arg - res = cache.get(tuple(key), callback=invalidate_callback) + res = cache_get(arg) if not isinstance(res, ObservableDeferred): results[arg] = res -- cgit 1.4.1 From 304880d18545b59a51c5d4b928e563c6d1514fdc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 31 May 2017 15:46:36 +0100 Subject: Add stream change cache --- synapse/storage/__init__.py | 12 ++++++++++++ synapse/storage/events.py | 4 ++++ synapse/storage/user_directory.py | 4 +++- synapse/util/caches/stream_change_cache.py | 15 +++++++++++++++ 4 files changed, 34 insertions(+), 1 deletion(-) (limited to 'synapse/util/caches') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 11655bf60f..3c88ba9860 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -223,6 +223,18 @@ class DataStore(RoomMemberStore, RoomStore, "DeviceListFederationStreamChangeCache", device_list_max, ) + curr_state_delta_prefill, min_curr_state_delta_id = self._get_cache_dict( + db_conn, "current_state_delta_stream", + entity_column="room_id", + stream_column="stream_id", + max_value=events_max, # As we share the stream id with events token + limit=1000, + ) + self._curr_state_delta_stream_cache = StreamChangeCache( + "_curr_state_delta_stream_cache", min_curr_state_delta_id, + prefilled_cache=curr_state_delta_prefill, + ) + cur = LoggingTransaction( db_conn.cursor(), name="_find_stream_orderings_for_times_txn", diff --git a/synapse/storage/events.py b/synapse/storage/events.py index dfb57f9d12..77861488d2 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -755,6 +755,10 @@ class EventsStore(SQLBaseStore): ] ) + self._curr_state_delta_stream_cache.enttity_has_changed( + room_id, max_stream_order, + ) + # Invalidate the various caches # Figure out the changes of membership to invalidate the diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index b1957cb873..15b8ea0460 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -204,7 +204,9 @@ class UserDirectoryStore(SQLBaseStore): ) def get_current_state_deltas(self, prev_stream_id): - # TODO: Add stream change cache + if not self._curr_state_delta_stream_cache.has_any_entity_changed(prev_stream_id): + return [] + # TODO: Add limit sql = """ SELECT stream_id, room_id, type, state_key, event_id, prev_event_id diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 70fe00ce0b..c498aee46c 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -89,6 +89,21 @@ class StreamChangeCache(object): return result + def has_any_entity_changed(self, stream_pos): + """Returns if any entity has changed + """ + assert type(stream_pos) is int + + if stream_pos >= self._earliest_known_stream_pos: + self.metrics.inc_hits() + if stream_pos >= max(self._cache): + return False + else: + return True + else: + self.metrics.inc_misses() + return True + def get_all_entities_changed(self, stream_pos): """Returns all entites that have had new things since the given position. If the position is too old it will return None. -- cgit 1.4.1 From eed59dcc1efdda95ea2deaad6dd8b70e5d346a6e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Jun 2017 11:39:35 +0100 Subject: Fix has_any_entity_changed Occaisonally has_any_entity_changed would throw the error: "Set changed size during iteration" when taking the max of the `sorteddict`. While its uncertain how that happens, its quite inefficient to iterate over the entire dict anyway so we change to using the more traditional `bisect_*` functions. --- synapse/util/caches/stream_change_cache.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'synapse/util/caches') diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index c498aee46c..b723e33641 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -96,10 +96,10 @@ class StreamChangeCache(object): if stream_pos >= self._earliest_known_stream_pos: self.metrics.inc_hits() - if stream_pos >= max(self._cache): - return False - else: - return True + keys = self._cache.keys() + i = keys.bisect_right(stream_pos) + + return len(keys[i:]) > 0 else: self.metrics.inc_misses() return True -- cgit 1.4.1 From efc2b7db95c78f658d3719862702b85d5d9d4a76 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Jun 2017 13:35:15 +0100 Subject: Rewrite conditional --- synapse/util/caches/stream_change_cache.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/util/caches') diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index b723e33641..609625b322 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -99,7 +99,7 @@ class StreamChangeCache(object): keys = self._cache.keys() i = keys.bisect_right(stream_pos) - return len(keys[i:]) > 0 + return i < len(keys) else: self.metrics.inc_misses() return True -- cgit 1.4.1 From c72058bcc6688e3ed9fa600f77f5190441107318 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 29 Jun 2017 14:08:33 +0100 Subject: Use an ExpiringCache for storing registration sessions This is because pruning them was a significant performance drain on matrix.org --- synapse/handlers/auth.py | 21 ++++++++++----------- synapse/util/caches/expiringcache.py | 3 +++ 2 files changed, 13 insertions(+), 11 deletions(-) (limited to 'synapse/util/caches') diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index e7a1bb7246..b00446bec0 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -21,6 +21,7 @@ from synapse.api.constants import LoginType from synapse.types import UserID from synapse.api.errors import AuthError, LoginError, Codes, StoreError, SynapseError from synapse.util.async import run_on_reactor +from synapse.util.caches.expiringcache import ExpiringCache from twisted.web.client import PartialDownloadError @@ -52,7 +53,15 @@ class AuthHandler(BaseHandler): LoginType.DUMMY: self._check_dummy_auth, } self.bcrypt_rounds = hs.config.bcrypt_rounds - self.sessions = {} + + # This is not a cache per se, but a store of all current sessions that + # expire after N hours + self.sessions = ExpiringCache( + cache_name="register_sessions", + clock=hs.get_clock(), + expiry_ms=self.SESSION_EXPIRE_MS, + reset_expiry_on_get=True, + ) account_handler = _AccountHandler( hs, check_user_exists=self.check_user_exists @@ -617,16 +626,6 @@ class AuthHandler(BaseHandler): logger.debug("Saving session %s", session) session["last_used"] = self.hs.get_clock().time_msec() self.sessions[session["id"]] = session - self._prune_sessions() - - def _prune_sessions(self): - for sid, sess in self.sessions.items(): - last_used = 0 - if 'last_used' in sess: - last_used = sess['last_used'] - now = self.hs.get_clock().time_msec() - if last_used < now - AuthHandler.SESSION_EXPIRE_MS: - del self.sessions[sid] def hash(self, password): """Computes a secure hash of password. diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index cbdde34a57..6ad53a6390 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -94,6 +94,9 @@ class ExpiringCache(object): return entry.value + def __contains__(self, key): + return key in self._cache + def get(self, key, default=None): try: return self[key] -- cgit 1.4.1 From b5e8d529e610352e0eb3887fac67a8ca5897bec1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 4 Jul 2017 09:56:44 +0100 Subject: Define CACHE_SIZE_FACTOR once --- synapse/state.py | 5 +---- synapse/storage/_base.py | 5 +---- synapse/storage/client_ips.py | 6 ++---- synapse/util/caches/descriptors.py | 5 +---- synapse/util/caches/stream_change_cache.py | 6 +----- 5 files changed, 6 insertions(+), 21 deletions(-) (limited to 'synapse/util/caches') diff --git a/synapse/state.py b/synapse/state.py index 5b386e3183..390799fbd5 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -24,13 +24,13 @@ from synapse.api.constants import EventTypes from synapse.api.errors import AuthError from synapse.events.snapshot import EventContext from synapse.util.async import Linearizer +from synapse.util.caches import CACHE_SIZE_FACTOR from collections import namedtuple from frozendict import frozendict import logging import hashlib -import os logger = logging.getLogger(__name__) @@ -38,9 +38,6 @@ logger = logging.getLogger(__name__) KeyStateTuple = namedtuple("KeyStateTuple", ("context", "type", "state_key")) -CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1)) - - SIZE_OF_CACHE = int(100000 * CACHE_SIZE_FACTOR) EVICTION_TIMEOUT_SECONDS = 60 * 60 diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 51730a88bf..6f54036d67 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -16,6 +16,7 @@ import logging from synapse.api.errors import StoreError from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.util.caches import CACHE_SIZE_FACTOR from synapse.util.caches.dictionary_cache import DictionaryCache from synapse.util.caches.descriptors import Cache from synapse.storage.engines import PostgresEngine @@ -27,10 +28,6 @@ from twisted.internet import defer import sys import time import threading -import os - - -CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1)) logger = logging.getLogger(__name__) diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index 88a5eb232f..fc468ea185 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -20,7 +20,8 @@ from twisted.internet import defer, reactor from ._base import Cache from . import background_updates -import os +from synapse.util.caches import CACHE_SIZE_FACTOR + logger = logging.getLogger(__name__) @@ -30,9 +31,6 @@ logger = logging.getLogger(__name__) LAST_SEEN_GRANULARITY = 120 * 1000 -CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1)) - - class ClientIpStore(background_updates.BackgroundUpdateStore): def __init__(self, hs): self.client_ip_last_seen = Cache( diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index cbdff86596..af65bfe7b8 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -16,6 +16,7 @@ import logging from synapse.util.async import ObservableDeferred from synapse.util import unwrapFirstError, logcontext +from synapse.util.caches import CACHE_SIZE_FACTOR from synapse.util.caches.lrucache import LruCache from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry from synapse.util.stringutils import to_ascii @@ -25,7 +26,6 @@ from . import register_cache from twisted.internet import defer from collections import namedtuple -import os import functools import inspect import threading @@ -37,9 +37,6 @@ logger = logging.getLogger(__name__) _CacheSentinel = object() -CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1)) - - class CacheEntry(object): __slots__ = [ "deferred", "sequence", "callbacks", "invalidated" diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 609625b322..941d873ab8 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -13,20 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.util.caches import register_cache +from synapse.util.caches import register_cache, CACHE_SIZE_FACTOR from blist import sorteddict import logging -import os logger = logging.getLogger(__name__) -CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1)) - - class StreamChangeCache(object): """Keeps track of the stream positions of the latest change in a set of entities. -- cgit 1.4.1 From 495f075b418abebfd2e4519e893cb51ad9821f55 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 4 Jul 2017 09:58:32 +0100 Subject: Increase default cache factor size. --- synapse/util/caches/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/util/caches') diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py index 4a83c46d98..4adae96681 100644 --- a/synapse/util/caches/__init__.py +++ b/synapse/util/caches/__init__.py @@ -16,7 +16,7 @@ import synapse.metrics import os -CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1)) +CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5)) metrics = synapse.metrics.get_metrics_for("synapse.util.caches") -- cgit 1.4.1