From 39e21ea51cf94f436f1f845f24c1b04f11b92c6f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Jul 2015 13:57:29 +0100 Subject: Add support for using keyword arguments with cached functions --- synapse/storage/_base.py | 40 ++++++++++++++++++++++++++++++++++------ synapse/storage/keys.py | 5 ++--- synapse/storage/push_rule.py | 8 +++----- synapse/storage/receipts.py | 5 ++--- synapse/storage/room.py | 5 ++--- synapse/storage/state.py | 5 ++--- 6 files changed, 45 insertions(+), 23 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 8f812f0fd7..f1265541ba 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -27,6 +27,7 @@ from twisted.internet import defer from collections import namedtuple, OrderedDict import functools +import inspect import sys import time import threading @@ -141,13 +142,28 @@ class CacheDescriptor(object): which can be used to insert values into the cache specifically, without calling the calculation function. """ - def __init__(self, orig, max_entries=1000, num_args=1, lru=False): + def __init__(self, orig, max_entries=1000, num_args=1, lru=False, + inlineCallbacks=False): self.orig = orig + if inlineCallbacks: + self.function_to_call = defer.inlineCallbacks(orig) + else: + self.function_to_call = orig + self.max_entries = max_entries self.num_args = num_args self.lru = lru + self.arg_names = inspect.getargspec(orig).args[1:num_args+1] + + if len(self.arg_names) < self.num_args: + raise Exception( + "Not enough explicit positional arguments to key off of for %r." + " (@cached cannot key off of *args or **kwars)" + % (orig.__name__,) + ) + def __get__(self, obj, objtype=None): cache = Cache( name=self.orig.__name__, @@ -158,11 +174,13 @@ class CacheDescriptor(object): @functools.wraps(self.orig) @defer.inlineCallbacks - def wrapped(*keyargs): + def wrapped(*args, **kwargs): + arg_dict = inspect.getcallargs(self.orig, obj, *args, **kwargs) + keyargs = [arg_dict[arg_nm] for arg_nm in self.arg_names] try: - cached_result = cache.get(*keyargs[:self.num_args]) + cached_result = cache.get(*keyargs) if DEBUG_CACHES: - actual_result = yield self.orig(obj, *keyargs) + actual_result = yield self.function_to_call(obj, *args, **kwargs) if actual_result != cached_result: logger.error( "Stale cache entry %s%r: cached: %r, actual %r", @@ -177,9 +195,9 @@ class CacheDescriptor(object): # while the SELECT is executing (SYN-369) sequence = cache.sequence - ret = yield self.orig(obj, *keyargs) + ret = yield self.function_to_call(obj, *args, **kwargs) - cache.update(sequence, *keyargs[:self.num_args] + (ret,)) + cache.update(sequence, *(keyargs + [ret])) defer.returnValue(ret) @@ -201,6 +219,16 @@ def cached(max_entries=1000, num_args=1, lru=False): ) +def cachedInlineCallbacks(max_entries=1000, num_args=1, lru=False): + return lambda orig: CacheDescriptor( + orig, + max_entries=max_entries, + num_args=num_args, + lru=lru, + inlineCallbacks=True, + ) + + class LoggingTransaction(object): """An object that almost-transparently proxies for the 'txn' object passed to the constructor. Adds logging and metrics to the .execute() diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py index 940a5f7e08..e3f98f0cde 100644 --- a/synapse/storage/keys.py +++ b/synapse/storage/keys.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from _base import SQLBaseStore, cached +from _base import SQLBaseStore, cachedInlineCallbacks from twisted.internet import defer @@ -71,8 +71,7 @@ class KeyStore(SQLBaseStore): desc="store_server_certificate", ) - @cached() - @defer.inlineCallbacks + @cachedInlineCallbacks() def get_all_server_verify_keys(self, server_name): rows = yield self._simple_select_list( table="server_signature_keys", diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 4cac118d17..a220f3632e 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import SQLBaseStore, cached +from ._base import SQLBaseStore, cachedInlineCallbacks from twisted.internet import defer import logging @@ -23,8 +23,7 @@ logger = logging.getLogger(__name__) class PushRuleStore(SQLBaseStore): - @cached() - @defer.inlineCallbacks + @cachedInlineCallbacks() def get_push_rules_for_user(self, user_name): rows = yield self._simple_select_list( table=PushRuleTable.table_name, @@ -41,8 +40,7 @@ class PushRuleStore(SQLBaseStore): defer.returnValue(rows) - @cached() - @defer.inlineCallbacks + @cachedInlineCallbacks() def get_push_rules_enabled_for_user(self, user_name): results = yield self._simple_select_list( table=PushRuleEnableTable.table_name, diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 7a6af98d98..b79d6683ca 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import SQLBaseStore, cached +from ._base import SQLBaseStore, cachedInlineCallbacks from twisted.internet import defer @@ -128,8 +128,7 @@ class ReceiptsStore(SQLBaseStore): def get_max_receipt_stream_id(self): return self._receipts_id_gen.get_max_token(self) - @cached - @defer.inlineCallbacks + @cachedInlineCallbacks() def get_graph_receipts_for_room(self, room_id): """Get receipts for sending to remote servers. """ diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 4612a8aa83..dd5bc2c8fb 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -17,7 +17,7 @@ from twisted.internet import defer from synapse.api.errors import StoreError -from ._base import SQLBaseStore, cached +from ._base import SQLBaseStore, cachedInlineCallbacks import collections import logging @@ -186,8 +186,7 @@ class RoomStore(SQLBaseStore): } ) - @cached() - @defer.inlineCallbacks + @cachedInlineCallbacks() def get_room_name_and_aliases(self, room_id): def f(txn): sql = ( diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 47bec65497..55c6d52890 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import SQLBaseStore, cached +from ._base import SQLBaseStore, cached, cachedInlineCallbacks from twisted.internet import defer @@ -189,8 +189,7 @@ class StateStore(SQLBaseStore): events = yield self._get_events(event_ids, get_prev_content=False) defer.returnValue(events) - @cached(num_args=3) - @defer.inlineCallbacks + @cachedInlineCallbacks(num_args=3) def get_current_state_for_key(self, room_id, event_type, state_key): def f(txn): sql = ( -- cgit 1.5.1 From a89559d79714b8dd07ec5a73f2629d003fcad92c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 5 Aug 2015 15:39:47 +0100 Subject: Use LRU cache by default --- synapse/storage/_base.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 8f812f0fd7..2a601e37e3 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -57,7 +57,7 @@ cache_counter = metrics.register_cache( class Cache(object): - def __init__(self, name, max_entries=1000, keylen=1, lru=False): + def __init__(self, name, max_entries=1000, keylen=1, lru=True): if lru: self.cache = LruCache(max_size=max_entries) self.max_entries = None @@ -141,7 +141,7 @@ class CacheDescriptor(object): which can be used to insert values into the cache specifically, without calling the calculation function. """ - def __init__(self, orig, max_entries=1000, num_args=1, lru=False): + def __init__(self, orig, max_entries=1000, num_args=1, lru=True): self.orig = orig self.max_entries = max_entries @@ -192,7 +192,7 @@ class CacheDescriptor(object): return wrapped -def cached(max_entries=1000, num_args=1, lru=False): +def cached(max_entries=1000, num_args=1, lru=True): return lambda orig: CacheDescriptor( orig, max_entries=max_entries, -- cgit 1.5.1 From 1e62a3d3a9d09fbd856b4abd733f4fb687f746e0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 5 Aug 2015 15:40:40 +0100 Subject: Up the cache size for 'get_joined_hosts_for_room' and 'get_users_in_room' --- synapse/storage/roommember.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 4db07f6fb4..55dd3f6cfb 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -78,7 +78,7 @@ class RoomMemberStore(SQLBaseStore): lambda events: events[0] if events else None ) - @cached() + @cached(max_entries=5000) def get_users_in_room(self, room_id): def f(txn): @@ -154,7 +154,7 @@ class RoomMemberStore(SQLBaseStore): RoomsForUser(**r) for r in self.cursor_to_dict(txn) ] - @cached() + @cached(max_entries=5000) def get_joined_hosts_for_room(self, room_id): return self.runInteraction( "get_joined_hosts_for_room", -- cgit 1.5.1 From 7eea3e356ff58168f3525879a8eb684f0681ee68 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 6 Aug 2015 13:33:34 +0100 Subject: Make @cached cache deferreds rather than the deferreds' values --- synapse/storage/_base.py | 21 ++++++++------------- synapse/util/async.py | 9 +++++++-- tests/storage/test__base.py | 11 +++++++---- 3 files changed, 22 insertions(+), 19 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index f1265541ba..8604d38c3e 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -15,6 +15,7 @@ import logging from synapse.api.errors import StoreError +from synapse.util.async import ObservableDeferred from synapse.util.logutils import log_function from synapse.util.logcontext import preserve_context_over_fn, LoggingContext from synapse.util.lrucache import LruCache @@ -173,33 +174,27 @@ class CacheDescriptor(object): ) @functools.wraps(self.orig) - @defer.inlineCallbacks def wrapped(*args, **kwargs): arg_dict = inspect.getcallargs(self.orig, obj, *args, **kwargs) keyargs = [arg_dict[arg_nm] for arg_nm in self.arg_names] try: cached_result = cache.get(*keyargs) - if DEBUG_CACHES: - actual_result = yield self.function_to_call(obj, *args, **kwargs) - if actual_result != cached_result: - logger.error( - "Stale cache entry %s%r: cached: %r, actual %r", - self.orig.__name__, keyargs, - cached_result, actual_result, - ) - raise ValueError("Stale cache entry") - defer.returnValue(cached_result) + return cached_result.observe() except KeyError: # Get the sequence number of the cache before reading from the # database so that we can tell if the cache is invalidated # while the SELECT is executing (SYN-369) sequence = cache.sequence - ret = yield self.function_to_call(obj, *args, **kwargs) + ret = defer.maybeDeferred( + self.function_to_call, + obj, *args, **kwargs + ) + ret = ObservableDeferred(ret, consumeErrors=False) cache.update(sequence, *(keyargs + [ret])) - defer.returnValue(ret) + return ret.observe() wrapped.invalidate = cache.invalidate wrapped.invalidate_all = cache.invalidate_all diff --git a/synapse/util/async.py b/synapse/util/async.py index 5a1d545c96..7bf2d38bb8 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -51,7 +51,7 @@ class ObservableDeferred(object): object.__setattr__(self, "_observers", set()) def callback(r): - self._result = (True, r) + object.__setattr__(self, "_result", (True, r)) while self._observers: try: self._observers.pop().callback(r) @@ -60,7 +60,7 @@ class ObservableDeferred(object): return r def errback(f): - self._result = (False, f) + object.__setattr__(self, "_result", (False, f)) while self._observers: try: self._observers.pop().errback(f) @@ -97,3 +97,8 @@ class ObservableDeferred(object): def __setattr__(self, name, value): setattr(self._deferred, name, value) + + def __repr__(self): + return "" % ( + id(self), self._result, self._deferred, + ) diff --git a/tests/storage/test__base.py b/tests/storage/test__base.py index 8c3d2952bd..8fa305d18a 100644 --- a/tests/storage/test__base.py +++ b/tests/storage/test__base.py @@ -17,6 +17,8 @@ from tests import unittest from twisted.internet import defer +from synapse.util.async import ObservableDeferred + from synapse.storage._base import Cache, cached @@ -178,19 +180,20 @@ class CacheDecoratorTestCase(unittest.TestCase): self.assertTrue(callcount[0] >= 14, msg="Expected callcount >= 14, got %d" % (callcount[0])) - @defer.inlineCallbacks def test_prefill(self): callcount = [0] + d = defer.succeed(123) + class A(object): @cached() def func(self, key): callcount[0] += 1 - return key + return d a = A() - a.func.prefill("foo", 123) + a.func.prefill("foo", ObservableDeferred(d)) - self.assertEquals((yield a.func("foo")), 123) + self.assertEquals(a.func("foo").result, d.result) self.assertEquals(callcount[0], 0) -- cgit 1.5.1 From 433314cc34295ffefca349b9fc6914d81f2521e0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 6 Aug 2015 14:01:05 +0100 Subject: Re-implement DEBUG_CACHES flag --- synapse/storage/_base.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 7b2be6745e..8384299a13 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -178,8 +178,23 @@ class CacheDescriptor(object): arg_dict = inspect.getcallargs(self.orig, obj, *args, **kwargs) keyargs = [arg_dict[arg_nm] for arg_nm in self.arg_names] try: - cached_result = cache.get(*keyargs) - return cached_result.observe() + cached_result_d = cache.get(*keyargs) + + if DEBUG_CACHES: + + @defer.inlineCallbacks + def check_result(cached_result): + actual_result = yield self.function_to_call(obj, *args, **kwargs) + if actual_result != cached_result: + logger.error( + "Stale cache entry %s%r: cached: %r, actual %r", + self.orig.__name__, keyargs, + cached_result, actual_result, + ) + raise ValueError("Stale cache entry") + cached_result_d.observe().addCallback(check_result) + + return cached_result_d.observe() except KeyError: # Get the sequence number of the cache before reading from the # database so that we can tell if the cache is invalidated -- cgit 1.5.1 From b811c9857491c0569ae367708721fbbaebf3adab Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 6 Aug 2015 14:01:27 +0100 Subject: Remove failed deferreds from cache --- synapse/storage/_base.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 8384299a13..99c0948754 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -205,8 +205,14 @@ class CacheDescriptor(object): self.function_to_call, obj, *args, **kwargs ) - ret = ObservableDeferred(ret, consumeErrors=False) + def onErr(f): + cache.invalidate(*keyargs) + return f + + ret.addErrback(onErr) + + ret = ObservableDeferred(ret, consumeErrors=False) cache.update(sequence, *(keyargs + [ret])) return ret.observe() -- cgit 1.5.1 From 63b1eaf32c57f0e1bae9b6a3768a560165637aee Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 6 Aug 2015 14:02:50 +0100 Subject: Docs --- synapse/storage/_base.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 99c0948754..0872a438f1 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -132,6 +132,9 @@ class Cache(object): class CacheDescriptor(object): """ A method decorator that applies a memoizing cache around the function. + This caches deferreds, rather than the results themselves. Deferreds that + fail are removed from the cache. + The function is presumed to take zero or more arguments, which are used in a tuple as the key for the cache. Hits are served directly from the cache; misses use the function body to generate the value. -- cgit 1.5.1 From 0db40d3e939506e22b927214bb0d3eaad40730ab Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Fri, 7 Aug 2015 17:22:11 +0100 Subject: Don't complain about extra .pyc files we find while hunting for database schemas --- synapse/storage/__init__.py | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 71d5d92500..973e78e047 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -354,6 +354,11 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files, ) logger.debug("Running script %s", relative_path) module.run_upgrade(cur, database_engine) + elif ext == ".pyc": + # Sometimes .pyc files turn up anyway even though we've + # disabled their generation; e.g. from distribution package + # installers. Silently skip it + pass elif ext == ".sql": # A plain old .sql file, just read and execute it logger.debug("Applying schema %s", relative_path) -- cgit 1.5.1 From 20addfa358e4f72b9f0c25d48c1e3ecfc08a68b2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 7 Aug 2015 11:52:21 +0100 Subject: Change Cache to not use *args in its interface --- synapse/storage/__init__.py | 4 +-- synapse/storage/_base.py | 61 +++++++++++++++++++------------------ synapse/storage/directory.py | 4 +-- synapse/storage/event_federation.py | 4 +-- synapse/storage/events.py | 21 +++++++------ synapse/storage/keys.py | 2 +- synapse/storage/presence.py | 4 +-- synapse/storage/push_rule.py | 16 +++++----- synapse/storage/registration.py | 2 +- synapse/storage/roommember.py | 6 ++-- tests/storage/test__base.py | 12 ++++---- 11 files changed, 69 insertions(+), 67 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 71d5d92500..1a6a8a3762 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -99,7 +99,7 @@ class DataStore(RoomMemberStore, RoomStore, key = (user.to_string(), access_token, device_id, ip) try: - last_seen = self.client_ip_last_seen.get(*key) + last_seen = self.client_ip_last_seen.get(key) except KeyError: last_seen = None @@ -107,7 +107,7 @@ class DataStore(RoomMemberStore, RoomStore, if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY: defer.returnValue(None) - self.client_ip_last_seen.prefill(*key + (now,)) + self.client_ip_last_seen.prefill(key, now) # It's safe not to lock here: a) no unique constraint, # b) LAST_SEEN_GRANULARITY makes concurrent updates incredibly unlikely diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 0872a438f1..e07cf3b58a 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -57,6 +57,9 @@ cache_counter = metrics.register_cache( ) +_CacheSentinel = object() + + class Cache(object): def __init__(self, name, max_entries=1000, keylen=1, lru=True): @@ -83,41 +86,38 @@ class Cache(object): "Cache objects can only be accessed from the main thread" ) - def get(self, *keyargs): - if len(keyargs) != self.keylen: - raise ValueError("Expected a key to have %d items", self.keylen) - - if keyargs in self.cache: + def get(self, keyargs, default=_CacheSentinel): + val = self.cache.get(keyargs, _CacheSentinel) + if val is not _CacheSentinel: cache_counter.inc_hits(self.name) - return self.cache[keyargs] + return val cache_counter.inc_misses(self.name) - raise KeyError() - def update(self, sequence, *args): + if default is _CacheSentinel: + raise KeyError() + else: + return default + + def update(self, sequence, keyargs, value): self.check_thread() if self.sequence == sequence: # Only update the cache if the caches sequence number matches the # number that the cache had before the SELECT was started (SYN-369) - self.prefill(*args) - - def prefill(self, *args): # because I can't *keyargs, value - keyargs = args[:-1] - value = args[-1] - - if len(keyargs) != self.keylen: - raise ValueError("Expected a key to have %d items", self.keylen) + self.prefill(keyargs, value) + def prefill(self, keyargs, value): if self.max_entries is not None: while len(self.cache) >= self.max_entries: self.cache.popitem(last=False) self.cache[keyargs] = value - def invalidate(self, *keyargs): + def invalidate(self, keyargs): self.check_thread() - if len(keyargs) != self.keylen: - raise ValueError("Expected a key to have %d items", self.keylen) + if not isinstance(keyargs, tuple): + raise ValueError("keyargs must be a tuple.") + # Increment the sequence number so that any SELECT statements that # raced with the INSERT don't update the cache (SYN-369) self.sequence += 1 @@ -168,20 +168,21 @@ class CacheDescriptor(object): % (orig.__name__,) ) - def __get__(self, obj, objtype=None): - cache = Cache( + self.cache = Cache( name=self.orig.__name__, max_entries=self.max_entries, keylen=self.num_args, lru=self.lru, ) + def __get__(self, obj, objtype=None): + @functools.wraps(self.orig) def wrapped(*args, **kwargs): arg_dict = inspect.getcallargs(self.orig, obj, *args, **kwargs) - keyargs = [arg_dict[arg_nm] for arg_nm in self.arg_names] + keyargs = tuple(arg_dict[arg_nm] for arg_nm in self.arg_names) try: - cached_result_d = cache.get(*keyargs) + cached_result_d = self.cache.get(keyargs) if DEBUG_CACHES: @@ -202,7 +203,7 @@ class CacheDescriptor(object): # Get the sequence number of the cache before reading from the # database so that we can tell if the cache is invalidated # while the SELECT is executing (SYN-369) - sequence = cache.sequence + sequence = self.cache.sequence ret = defer.maybeDeferred( self.function_to_call, @@ -210,19 +211,19 @@ class CacheDescriptor(object): ) def onErr(f): - cache.invalidate(*keyargs) + self.cache.invalidate(keyargs) return f ret.addErrback(onErr) - ret = ObservableDeferred(ret, consumeErrors=False) - cache.update(sequence, *(keyargs + [ret])) + ret = ObservableDeferred(ret, consumeErrors=True) + self.cache.update(sequence, keyargs, ret) return ret.observe() - wrapped.invalidate = cache.invalidate - wrapped.invalidate_all = cache.invalidate_all - wrapped.prefill = cache.prefill + wrapped.invalidate = self.cache.invalidate + wrapped.invalidate_all = self.cache.invalidate_all + wrapped.prefill = self.cache.prefill obj.__dict__[self.orig.__name__] = wrapped diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index 2b2bdf8615..f3947bbe89 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -104,7 +104,7 @@ class DirectoryStore(SQLBaseStore): }, desc="create_room_alias_association", ) - self.get_aliases_for_room.invalidate(room_id) + self.get_aliases_for_room.invalidate((room_id,)) @defer.inlineCallbacks def delete_room_alias(self, room_alias): @@ -114,7 +114,7 @@ class DirectoryStore(SQLBaseStore): room_alias, ) - self.get_aliases_for_room.invalidate(room_id) + self.get_aliases_for_room.invalidate((room_id,)) defer.returnValue(room_id) def _delete_room_alias_txn(self, txn, room_alias): diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 45b86c94e8..910b6598a7 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -362,7 +362,7 @@ class EventFederationStore(SQLBaseStore): for room_id in events_by_room: txn.call_after( - self.get_latest_event_ids_in_room.invalidate, room_id + self.get_latest_event_ids_in_room.invalidate, (room_id,) ) def get_backfill_events(self, room_id, event_list, limit): @@ -505,4 +505,4 @@ class EventFederationStore(SQLBaseStore): query = "DELETE FROM event_forward_extremities WHERE room_id = ?" txn.execute(query, (room_id,)) - txn.call_after(self.get_latest_event_ids_in_room.invalidate, room_id) + txn.call_after(self.get_latest_event_ids_in_room.invalidate, (room_id,)) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index ed7ea38804..5b64918024 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -162,8 +162,8 @@ class EventsStore(SQLBaseStore): if current_state: txn.call_after(self.get_current_state_for_key.invalidate_all) txn.call_after(self.get_rooms_for_user.invalidate_all) - txn.call_after(self.get_users_in_room.invalidate, event.room_id) - txn.call_after(self.get_joined_hosts_for_room.invalidate, event.room_id) + txn.call_after(self.get_users_in_room.invalidate, (event.room_id,)) + txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,)) txn.call_after(self.get_room_name_and_aliases, event.room_id) self._simple_delete_txn( @@ -430,13 +430,13 @@ class EventsStore(SQLBaseStore): if not context.rejected: txn.call_after( self.get_current_state_for_key.invalidate, - event.room_id, event.type, event.state_key - ) + (event.room_id, event.type, event.state_key,) + ) if event.type in [EventTypes.Name, EventTypes.Aliases]: txn.call_after( self.get_room_name_and_aliases.invalidate, - event.room_id + (event.room_id,) ) self._simple_upsert_txn( @@ -567,8 +567,9 @@ class EventsStore(SQLBaseStore): def _invalidate_get_event_cache(self, event_id): for check_redacted in (False, True): for get_prev_content in (False, True): - self._get_event_cache.invalidate(event_id, check_redacted, - get_prev_content) + self._get_event_cache.invalidate( + (event_id, check_redacted, get_prev_content) + ) def _get_event_txn(self, txn, event_id, check_redacted=True, get_prev_content=False, allow_rejected=False): @@ -589,7 +590,7 @@ class EventsStore(SQLBaseStore): for event_id in events: try: ret = self._get_event_cache.get( - event_id, check_redacted, get_prev_content + (event_id, check_redacted, get_prev_content,) ) if allow_rejected or not ret.rejected_reason: @@ -822,7 +823,7 @@ class EventsStore(SQLBaseStore): ev.unsigned["prev_content"] = prev.get_dict()["content"] self._get_event_cache.prefill( - ev.event_id, check_redacted, get_prev_content, ev + (ev.event_id, check_redacted, get_prev_content), ev ) defer.returnValue(ev) @@ -879,7 +880,7 @@ class EventsStore(SQLBaseStore): ev.unsigned["prev_content"] = prev.get_dict()["content"] self._get_event_cache.prefill( - ev.event_id, check_redacted, get_prev_content, ev + (ev.event_id, check_redacted, get_prev_content), ev ) return ev diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py index e3f98f0cde..49b8e37cfd 100644 --- a/synapse/storage/keys.py +++ b/synapse/storage/keys.py @@ -131,7 +131,7 @@ class KeyStore(SQLBaseStore): desc="store_server_verify_key", ) - self.get_all_server_verify_keys.invalidate(server_name) + self.get_all_server_verify_keys.invalidate((server_name,)) def store_server_keys_json(self, server_name, key_id, from_server, ts_now_ms, ts_expires_ms, key_json_bytes): diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index fefcf6bce0..576cf670cc 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -98,7 +98,7 @@ class PresenceStore(SQLBaseStore): updatevalues={"accepted": True}, desc="set_presence_list_accepted", ) - self.get_presence_list_accepted.invalidate(observer_localpart) + self.get_presence_list_accepted.invalidate((observer_localpart,)) defer.returnValue(result) def get_presence_list(self, observer_localpart, accepted=None): @@ -133,4 +133,4 @@ class PresenceStore(SQLBaseStore): "observed_user_id": observed_userid}, desc="del_presence_list", ) - self.get_presence_list_accepted.invalidate(observer_localpart) + self.get_presence_list_accepted.invalidate((observer_localpart,)) diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index a220f3632e..9b88ca7b39 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -151,11 +151,11 @@ class PushRuleStore(SQLBaseStore): txn.execute(sql, (user_name, priority_class, new_rule_priority)) txn.call_after( - self.get_push_rules_for_user.invalidate, user_name + self.get_push_rules_for_user.invalidate, (user_name,) ) txn.call_after( - self.get_push_rules_enabled_for_user.invalidate, user_name + self.get_push_rules_enabled_for_user.invalidate, (user_name,) ) self._simple_insert_txn( @@ -187,10 +187,10 @@ class PushRuleStore(SQLBaseStore): new_rule['priority'] = new_prio txn.call_after( - self.get_push_rules_for_user.invalidate, user_name + self.get_push_rules_for_user.invalidate, (user_name,) ) txn.call_after( - self.get_push_rules_enabled_for_user.invalidate, user_name + self.get_push_rules_enabled_for_user.invalidate, (user_name,) ) self._simple_insert_txn( @@ -216,8 +216,8 @@ class PushRuleStore(SQLBaseStore): desc="delete_push_rule", ) - self.get_push_rules_for_user.invalidate(user_name) - self.get_push_rules_enabled_for_user.invalidate(user_name) + self.get_push_rules_for_user.invalidate((user_name,)) + self.get_push_rules_enabled_for_user.invalidate((user_name,)) @defer.inlineCallbacks def set_push_rule_enabled(self, user_name, rule_id, enabled): @@ -238,10 +238,10 @@ class PushRuleStore(SQLBaseStore): {'id': new_id}, ) txn.call_after( - self.get_push_rules_for_user.invalidate, user_name + self.get_push_rules_for_user.invalidate, (user_name,) ) txn.call_after( - self.get_push_rules_enabled_for_user.invalidate, user_name + self.get_push_rules_enabled_for_user.invalidate, (user_name,) ) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 90e2606be2..4eaa088b36 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -131,7 +131,7 @@ class RegistrationStore(SQLBaseStore): user_id ) for r in rows: - self.get_user_by_token.invalidate(r) + self.get_user_by_token.invalidate((r,)) @cached() def get_user_by_token(self, token): diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 55dd3f6cfb..9f14f38f24 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -54,9 +54,9 @@ class RoomMemberStore(SQLBaseStore): ) for event in events: - txn.call_after(self.get_rooms_for_user.invalidate, event.state_key) - txn.call_after(self.get_joined_hosts_for_room.invalidate, event.room_id) - txn.call_after(self.get_users_in_room.invalidate, event.room_id) + txn.call_after(self.get_rooms_for_user.invalidate, (event.state_key,)) + txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,)) + txn.call_after(self.get_users_in_room.invalidate, (event.room_id,)) def get_room_member(self, user_id, room_id): """Retrieve the current state of a room member. diff --git a/tests/storage/test__base.py b/tests/storage/test__base.py index 8fa305d18a..abee2f631d 100644 --- a/tests/storage/test__base.py +++ b/tests/storage/test__base.py @@ -42,12 +42,12 @@ class CacheTestCase(unittest.TestCase): self.assertEquals(self.cache.get("foo"), 123) def test_invalidate(self): - self.cache.prefill("foo", 123) - self.cache.invalidate("foo") + self.cache.prefill(("foo",), 123) + self.cache.invalidate(("foo",)) failed = False try: - self.cache.get("foo") + self.cache.get(("foo",)) except KeyError: failed = True @@ -141,7 +141,7 @@ class CacheDecoratorTestCase(unittest.TestCase): self.assertEquals(callcount[0], 1) - a.func.invalidate("foo") + a.func.invalidate(("foo",)) yield a.func("foo") @@ -153,7 +153,7 @@ class CacheDecoratorTestCase(unittest.TestCase): def func(self, key): return key - A().func.invalidate("what") + A().func.invalidate(("what",)) @defer.inlineCallbacks def test_max_entries(self): @@ -193,7 +193,7 @@ class CacheDecoratorTestCase(unittest.TestCase): a = A() - a.func.prefill("foo", ObservableDeferred(d)) + a.func.prefill(("foo",), ObservableDeferred(d)) self.assertEquals(a.func("foo").result, d.result) self.assertEquals(callcount[0], 0) -- cgit 1.5.1 From 3213ff630ca6f7f624d65dbab55e2c4c65fe80ce Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 7 Aug 2015 19:13:45 +0100 Subject: Remove unnecessary cache --- synapse/storage/state.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 55c6d52890..7ce51b9bdc 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import SQLBaseStore, cached, cachedInlineCallbacks +from ._base import SQLBaseStore, cachedInlineCallbacks from twisted.internet import defer @@ -91,7 +91,6 @@ class StateStore(SQLBaseStore): defer.returnValue(dict(state_list)) - @cached(num_args=1) def _fetch_events_for_group(self, key, events): return self._get_events( events, get_prev_content=False -- cgit 1.5.1 From 62126c996c5bc8d86d3e15c015a68e8e60622a55 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 7 Aug 2015 19:17:58 +0100 Subject: Propogate stale cache errors to calling functions --- synapse/storage/_base.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 0872a438f1..524a003153 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -183,8 +183,8 @@ class CacheDescriptor(object): try: cached_result_d = cache.get(*keyargs) + observed = cached_result_d.observe() if DEBUG_CACHES: - @defer.inlineCallbacks def check_result(cached_result): actual_result = yield self.function_to_call(obj, *args, **kwargs) @@ -195,9 +195,10 @@ class CacheDescriptor(object): cached_result, actual_result, ) raise ValueError("Stale cache entry") - cached_result_d.observe().addCallback(check_result) + defer.returnValue(cached_result) + observed.addCallback(check_result) - return cached_result_d.observe() + return observed except KeyError: # Get the sequence number of the cache before reading from the # database so that we can tell if the cache is invalidated -- cgit 1.5.1 From 9c5385b53ad6d8ac21227dbc3f150c7d2511b061 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 7 Aug 2015 19:26:38 +0100 Subject: s/observed/observer/ --- synapse/storage/_base.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 524a003153..5997603b3c 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -183,7 +183,7 @@ class CacheDescriptor(object): try: cached_result_d = cache.get(*keyargs) - observed = cached_result_d.observe() + observer = cached_result_d.observe() if DEBUG_CACHES: @defer.inlineCallbacks def check_result(cached_result): @@ -196,9 +196,9 @@ class CacheDescriptor(object): ) raise ValueError("Stale cache entry") defer.returnValue(cached_result) - observed.addCallback(check_result) + observer.addCallback(check_result) - return observed + return observer except KeyError: # Get the sequence number of the cache before reading from the # database so that we can tell if the cache is invalidated -- cgit 1.5.1 From 2cd6cb9f65e64ea49e0a8ba7c4851c79821243ad Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Aug 2015 10:38:47 +0100 Subject: Rename keyargs to args in Cache --- synapse/storage/_base.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index a088a6175c..1e2d436660 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -86,8 +86,8 @@ class Cache(object): "Cache objects can only be accessed from the main thread" ) - def get(self, keyargs, default=_CacheSentinel): - val = self.cache.get(keyargs, _CacheSentinel) + def get(self, key, default=_CacheSentinel): + val = self.cache.get(key, _CacheSentinel) if val is not _CacheSentinel: cache_counter.inc_hits(self.name) return val @@ -99,29 +99,29 @@ class Cache(object): else: return default - def update(self, sequence, keyargs, value): + def update(self, sequence, key, value): self.check_thread() if self.sequence == sequence: # Only update the cache if the caches sequence number matches the # number that the cache had before the SELECT was started (SYN-369) - self.prefill(keyargs, value) + self.prefill(key, value) - def prefill(self, keyargs, value): + def prefill(self, key, value): if self.max_entries is not None: while len(self.cache) >= self.max_entries: self.cache.popitem(last=False) - self.cache[keyargs] = value + self.cache[key] = value - def invalidate(self, keyargs): + def invalidate(self, key): self.check_thread() - if not isinstance(keyargs, tuple): + if not isinstance(key, tuple): raise ValueError("keyargs must be a tuple.") # Increment the sequence number so that any SELECT statements that # raced with the INSERT don't update the cache (SYN-369) self.sequence += 1 - self.cache.pop(keyargs, None) + self.cache.pop(key, None) def invalidate_all(self): self.check_thread() -- cgit 1.5.1 From 86eaaa885bb4f3d75990874a76c53b2a0870d639 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Aug 2015 13:44:44 +0100 Subject: Rename keyargs to args in CacheDescriptor --- synapse/storage/_base.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 1e2d436660..e76ee2779a 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -180,9 +180,9 @@ class CacheDescriptor(object): @functools.wraps(self.orig) def wrapped(*args, **kwargs): arg_dict = inspect.getcallargs(self.orig, obj, *args, **kwargs) - keyargs = tuple(arg_dict[arg_nm] for arg_nm in self.arg_names) + cache_key = tuple(arg_dict[arg_nm] for arg_nm in self.arg_names) try: - cached_result_d = self.cache.get(keyargs) + cached_result_d = self.cache.get(cache_key) observer = cached_result_d.observe() if DEBUG_CACHES: @@ -192,7 +192,7 @@ class CacheDescriptor(object): if actual_result != cached_result: logger.error( "Stale cache entry %s%r: cached: %r, actual %r", - self.orig.__name__, keyargs, + self.orig.__name__, cache_key, cached_result, actual_result, ) raise ValueError("Stale cache entry") @@ -212,13 +212,13 @@ class CacheDescriptor(object): ) def onErr(f): - self.cache.invalidate(keyargs) + self.cache.invalidate(cache_key) return f ret.addErrback(onErr) ret = ObservableDeferred(ret, consumeErrors=True) - self.cache.update(sequence, keyargs, ret) + self.cache.update(sequence, cache_key, ret) return ret.observe() -- cgit 1.5.1 From 559c51debcf5402cd5999aa1afb854b34e9de363 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 10 Aug 2015 14:07:08 +0100 Subject: Use TypeError instead of ValueError and give a nicer error mesasge when someone calls Cache.invalidate with the wrong type. --- synapse/storage/_base.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index e76ee2779a..73eea157a4 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -116,7 +116,9 @@ class Cache(object): def invalidate(self, key): self.check_thread() if not isinstance(key, tuple): - raise ValueError("keyargs must be a tuple.") + raise TypeError( + "The cache key must be a tuple not %r" % (type(key),) + ) # Increment the sequence number so that any SELECT statements that # raced with the INSERT don't update the cache (SYN-369) -- cgit 1.5.1