diff options
author | Erik Johnston <erikj@jki.re> | 2018-10-01 14:20:30 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-10-01 14:20:30 +0200 |
commit | 8f5c23d0cddc371f4eadd5e7780cb3bd6d7a7732 (patch) | |
tree | ba09e9e63e6fa70f997da471e5327678dbcaddb4 | |
parent | Further reduce the size of the docker image (#3972) (diff) | |
parent | Correctly match 'dict.pop' api (diff) | |
download | synapse-8f5c23d0cddc371f4eadd5e7780cb3bd6d7a7732.tar.xz |
Merge pull request #3933 from matrix-org/erikj/destination_retry_cache
Add a five minute cache to get_destination_retry_timings
-rw-r--r-- | changelog.d/3933.misc | 1 | ||||
-rw-r--r-- | synapse/storage/transactions.py | 23 | ||||
-rw-r--r-- | synapse/util/caches/expiringcache.py | 24 |
3 files changed, 45 insertions, 3 deletions
diff --git a/changelog.d/3933.misc b/changelog.d/3933.misc new file mode 100644 index 0000000000..6545871f55 --- /dev/null +++ b/changelog.d/3933.misc @@ -0,0 +1 @@ +Add a cache to get_destination_retry_timings diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index baf0379a68..ab54977a75 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -23,6 +23,7 @@ from canonicaljson import encode_canonical_json from twisted.internet import defer from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.util.caches.expiringcache import ExpiringCache from ._base import SQLBaseStore, db_to_json @@ -49,6 +50,8 @@ _UpdateTransactionRow = namedtuple( ) ) +SENTINEL = object() + class TransactionStore(SQLBaseStore): """A collection of queries for handling PDUs. @@ -59,6 +62,12 @@ class TransactionStore(SQLBaseStore): self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000) + self._destination_retry_cache = ExpiringCache( + cache_name="get_destination_retry_timings", + clock=self._clock, + expiry_ms=5 * 60 * 1000, + ) + def get_received_txn_response(self, transaction_id, origin): """For an incoming transaction from a given origin, check if we have already responded to it. If so, return the response code and response @@ -155,6 +164,7 @@ class TransactionStore(SQLBaseStore): """ pass + @defer.inlineCallbacks def get_destination_retry_timings(self, destination): """Gets the current retry timings (if any) for a given destination. @@ -165,10 +175,20 @@ class TransactionStore(SQLBaseStore): None if not retrying Otherwise a dict for the retry scheme """ - return self.runInteraction( + + result = self._destination_retry_cache.get(destination, SENTINEL) + if result is not SENTINEL: + defer.returnValue(result) + + result = yield self.runInteraction( "get_destination_retry_timings", self._get_destination_retry_timings, destination) + # We don't hugely care about race conditions between getting and + # invalidating the cache, since we time out fairly quickly anyway. + self._destination_retry_cache[destination] = result + defer.returnValue(result) + def _get_destination_retry_timings(self, txn, destination): result = self._simple_select_one_txn( txn, @@ -196,6 +216,7 @@ class TransactionStore(SQLBaseStore): retry_interval (int) - how long until next retry in ms """ + self._destination_retry_cache.pop(destination) return self.runInteraction( "set_destination_retry_timings", self._set_destination_retry_timings, diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index 9af4ec4aa8..f369780277 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -16,7 +16,7 @@ import logging from collections import OrderedDict -from six import itervalues +from six import iteritems, itervalues from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.caches import register_cache @@ -24,6 +24,9 @@ from synapse.util.caches import register_cache logger = logging.getLogger(__name__) +SENTINEL = object() + + class ExpiringCache(object): def __init__(self, cache_name, clock, max_len=0, expiry_ms=0, reset_expiry_on_get=False, iterable=False): @@ -95,6 +98,21 @@ class ExpiringCache(object): return entry.value + def pop(self, key, default=SENTINEL): + """Removes and returns the value with the given key from the cache. + + If the key isn't in the cache then `default` will be returned if + specified, otherwise `KeyError` will get raised. + + Identical functionality to `dict.pop(..)`. + """ + + value = self._cache.pop(key, default) + if value is SENTINEL: + raise KeyError(key) + + return value + def __contains__(self, key): return key in self._cache @@ -122,7 +140,7 @@ class ExpiringCache(object): keys_to_delete = set() - for key, cache_entry in self._cache.items(): + for key, cache_entry in iteritems(self._cache): if now - cache_entry.time > self._expiry_ms: keys_to_delete.add(key) @@ -146,6 +164,8 @@ class ExpiringCache(object): class _CacheEntry(object): + __slots__ = ["time", "value"] + def __init__(self, time, value): self.time = time self.value = value |