diff options
author | Kegan Dougal <kegan@matrix.org> | 2015-03-26 10:07:59 +0000 |
---|---|---|
committer | Kegan Dougal <kegan@matrix.org> | 2015-03-26 10:07:59 +0000 |
commit | 4edcbcee3b9579f4b50ecb97e566edab1a7c4c8b (patch) | |
tree | 5ab27c5ebbfc762ee133223fb48e868b85474c79 /synapse/storage/_base.py | |
parent | Set the service ID as soon as it is known. (diff) | |
parent | Allow a choice of LRU behaviour for Cache() by using LruCache() or OrderedDict() (diff) | |
download | synapse-4edcbcee3b9579f4b50ecb97e566edab1a7c4c8b.tar.xz |
Merge branch 'develop' into application-services-txn-reliability
Conflicts: synapse/storage/__init__.py
Diffstat (limited to 'synapse/storage/_base.py')
-rw-r--r-- | synapse/storage/_base.py | 242 |
1 files changed, 159 insertions, 83 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 40f2fc6d76..6fa63f052e 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -25,6 +25,7 @@ import synapse.metrics from twisted.internet import defer from collections import namedtuple, OrderedDict +import functools import simplejson as json import sys import time @@ -38,6 +39,8 @@ transaction_logger = logging.getLogger("synapse.storage.txn") metrics = synapse.metrics.get_metrics_for("synapse.storage") +sql_scheduling_timer = metrics.register_distribution("schedule_time") + sql_query_timer = metrics.register_distribution("query_time", labels=["verb"]) sql_txn_timer = metrics.register_distribution("transaction_time", labels=["desc"]) sql_getevents_timer = metrics.register_distribution("getEvents_time", labels=["desc"]) @@ -50,14 +53,57 @@ cache_counter = metrics.register_cache( ) -# TODO(paul): -# * more generic key management -# * consider other eviction strategies - LRU? -def cached(max_entries=1000): +class Cache(object): + + def __init__(self, name, max_entries=1000, keylen=1, lru=False): + if lru: + self.cache = LruCache(max_size=max_entries) + self.max_entries = None + else: + self.cache = OrderedDict() + self.max_entries = max_entries + + self.name = name + self.keylen = keylen + + caches_by_name[name] = self.cache + + def get(self, *keyargs): + if len(keyargs) != self.keylen: + raise ValueError("Expected a key to have %d items", self.keylen) + + if keyargs in self.cache: + cache_counter.inc_hits(self.name) + return self.cache[keyargs] + + cache_counter.inc_misses(self.name) + raise KeyError() + + def prefill(self, *args): # because I can't *keyargs, value + keyargs = args[:-1] + value = args[-1] + + if len(keyargs) != self.keylen: + raise ValueError("Expected a key to have %d items", self.keylen) + + if self.max_entries is not None: + while len(self.cache) >= self.max_entries: + self.cache.popitem(last=False) + + self.cache[keyargs] = value + + def invalidate(self, *keyargs): + if len(keyargs) != self.keylen: + raise ValueError("Expected a key to have %d items", self.keylen) + + self.cache.pop(keyargs, None) + + +def cached(max_entries=1000, num_args=1, lru=False): """ A method decorator that applies a memoizing cache around the function. - The function is presumed to take one additional argument, which is used as - the key for the cache. Cache hits are served directly from the cache; + The function is presumed to take zero or more arguments, which are used in + a tuple as the key for the cache. Hits are served directly from the cache; misses use the function body to generate the value. The wrapped function has an additional member, a callable called @@ -68,33 +114,27 @@ def cached(max_entries=1000): calling the calculation function. """ def wrap(orig): - cache = OrderedDict() - name = orig.__name__ - - caches_by_name[name] = cache - - def prefill(key, value): - while len(cache) > max_entries: - cache.popitem(last=False) - - cache[key] = value + cache = Cache( + name=orig.__name__, + max_entries=max_entries, + keylen=num_args, + lru=lru, + ) + @functools.wraps(orig) @defer.inlineCallbacks - def wrapped(self, key): - if key in cache: - cache_counter.inc_hits(name) - defer.returnValue(cache[key]) + def wrapped(self, *keyargs): + try: + defer.returnValue(cache.get(*keyargs)) + except KeyError: + ret = yield orig(self, *keyargs) - cache_counter.inc_misses(name) - ret = yield orig(self, key) - prefill(key, ret) - defer.returnValue(ret) + cache.prefill(*keyargs + (ret,)) - def invalidate(key): - cache.pop(key, None) + defer.returnValue(ret) - wrapped.invalidate = invalidate - wrapped.prefill = prefill + wrapped.invalidate = cache.invalidate + wrapped.prefill = cache.prefill return wrapped return wrap @@ -240,6 +280,8 @@ class SQLBaseStore(object): """Wraps the .runInteraction() method on the underlying db_pool.""" current_context = LoggingContext.current_context() + start_time = time.time() * 1000 + def inner_func(txn, *args, **kwargs): with LoggingContext("runInteraction") as context: current_context.copy_to(context) @@ -252,6 +294,7 @@ class SQLBaseStore(object): name = "%s-%x" % (desc, txn_id, ) + sql_scheduling_timer.inc_by(time.time() * 1000 - start_time) transaction_logger.debug("[TXN START] {%s}", name) try: return func(LoggingTransaction(txn, name), *args, **kwargs) @@ -314,7 +357,8 @@ class SQLBaseStore(object): # "Simple" SQL API methods that operate on a single table with no JOINs, # no complex WHERE clauses, just a dict of values for columns. - def _simple_insert(self, table, values, or_replace=False, or_ignore=False): + def _simple_insert(self, table, values, or_replace=False, or_ignore=False, + desc="_simple_insert"): """Executes an INSERT query on the named table. Args: @@ -323,7 +367,7 @@ class SQLBaseStore(object): or_replace : bool; if True performs an INSERT OR REPLACE """ return self.runInteraction( - "_simple_insert", + desc, self._simple_insert_txn, table, values, or_replace=or_replace, or_ignore=or_ignore, ) @@ -347,7 +391,7 @@ class SQLBaseStore(object): txn.execute(sql, values.values()) return txn.lastrowid - def _simple_upsert(self, table, keyvalues, values): + def _simple_upsert(self, table, keyvalues, values, desc="_simple_upsert"): """ Args: table (str): The table to upsert into @@ -356,7 +400,7 @@ class SQLBaseStore(object): Returns: A deferred """ return self.runInteraction( - "_simple_upsert", + desc, self._simple_upsert_txn, table, keyvalues, values ) @@ -392,7 +436,7 @@ class SQLBaseStore(object): txn.execute(sql, allvalues.values()) def _simple_select_one(self, table, keyvalues, retcols, - allow_none=False): + allow_none=False, desc="_simple_select_one"): """Executes a SELECT query on the named table, which is expected to return a single row, returning a single column from it. @@ -404,12 +448,15 @@ class SQLBaseStore(object): allow_none : If true, return None instead of failing if the SELECT statement returns no rows """ - return self._simple_selectupdate_one( - table, keyvalues, retcols=retcols, allow_none=allow_none + return self.runInteraction( + desc, + self._simple_select_one_txn, + table, keyvalues, retcols, allow_none, ) def _simple_select_one_onecol(self, table, keyvalues, retcol, - allow_none=False): + allow_none=False, + desc="_simple_select_one_onecol"): """Executes a SELECT query on the named table, which is expected to return a single row, returning a single column from it." @@ -419,7 +466,7 @@ class SQLBaseStore(object): retcol : string giving the name of the column to return """ return self.runInteraction( - "_simple_select_one_onecol", + desc, self._simple_select_one_onecol_txn, table, keyvalues, retcol, allow_none=allow_none, ) @@ -455,7 +502,8 @@ class SQLBaseStore(object): return [r[0] for r in txn.fetchall()] - def _simple_select_onecol(self, table, keyvalues, retcol): + def _simple_select_onecol(self, table, keyvalues, retcol, + desc="_simple_select_onecol"): """Executes a SELECT query on the named table, which returns a list comprising of the values of the named column from the selected rows. @@ -468,12 +516,13 @@ class SQLBaseStore(object): Deferred: Results in a list """ return self.runInteraction( - "_simple_select_onecol", + desc, self._simple_select_onecol_txn, table, keyvalues, retcol ) - def _simple_select_list(self, table, keyvalues, retcols): + def _simple_select_list(self, table, keyvalues, retcols, + desc="_simple_select_list"): """Executes a SELECT query on the named table, which may return zero or more rows, returning the result as a list of dicts. @@ -484,7 +533,7 @@ class SQLBaseStore(object): retcols : list of strings giving the names of the columns to return """ return self.runInteraction( - "_simple_select_list", + desc, self._simple_select_list_txn, table, keyvalues, retcols ) @@ -516,7 +565,7 @@ class SQLBaseStore(object): return self.cursor_to_dict(txn) def _simple_update_one(self, table, keyvalues, updatevalues, - retcols=None): + desc="_simple_update_one"): """Executes an UPDATE query on the named table, setting new values for columns in a row matching the key values. @@ -534,56 +583,76 @@ class SQLBaseStore(object): get-and-set. This can be used to implement compare-and-set by putting the update column in the 'keyvalues' dict as well. """ - return self._simple_selectupdate_one(table, keyvalues, updatevalues, - retcols=retcols) + return self.runInteraction( + desc, + self._simple_update_one_txn, + table, keyvalues, updatevalues, + ) - def _simple_selectupdate_one(self, table, keyvalues, updatevalues=None, - retcols=None, allow_none=False): - """ Combined SELECT then UPDATE.""" - if retcols: - select_sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % ( - ", ".join(retcols), - table, - " AND ".join("%s = ?" % (k) for k in keyvalues) - ) + def _simple_update_one_txn(self, txn, table, keyvalues, updatevalues): + update_sql = "UPDATE %s SET %s WHERE %s" % ( + table, + ", ".join("%s = ?" % (k,) for k in updatevalues), + " AND ".join("%s = ?" % (k,) for k in keyvalues) + ) - if updatevalues: - update_sql = "UPDATE %s SET %s WHERE %s" % ( - table, - ", ".join("%s = ?" % (k,) for k in updatevalues), - " AND ".join("%s = ?" % (k,) for k in keyvalues) - ) + txn.execute( + update_sql, + updatevalues.values() + keyvalues.values() + ) + + if txn.rowcount == 0: + raise StoreError(404, "No row found") + if txn.rowcount > 1: + raise StoreError(500, "More than one row matched") + + def _simple_select_one_txn(self, txn, table, keyvalues, retcols, + allow_none=False): + select_sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % ( + ", ".join(retcols), + table, + " AND ".join("%s = ?" % (k) for k in keyvalues) + ) + + txn.execute(select_sql, keyvalues.values()) + row = txn.fetchone() + if not row: + if allow_none: + return None + raise StoreError(404, "No row found") + if txn.rowcount > 1: + raise StoreError(500, "More than one row matched") + + return dict(zip(retcols, row)) + + def _simple_selectupdate_one(self, table, keyvalues, updatevalues=None, + retcols=None, allow_none=False, + desc="_simple_selectupdate_one"): + """ Combined SELECT then UPDATE.""" def func(txn): ret = None if retcols: - txn.execute(select_sql, keyvalues.values()) - - row = txn.fetchone() - if not row: - if allow_none: - return None - raise StoreError(404, "No row found") - if txn.rowcount > 1: - raise StoreError(500, "More than one row matched") - - ret = dict(zip(retcols, row)) + ret = self._simple_select_one_txn( + txn, + table=table, + keyvalues=keyvalues, + retcols=retcols, + allow_none=allow_none, + ) if updatevalues: - txn.execute( - update_sql, - updatevalues.values() + keyvalues.values() + self._simple_update_one_txn( + txn, + table=table, + keyvalues=keyvalues, + updatevalues=updatevalues, ) - if txn.rowcount == 0: - raise StoreError(404, "No row found") - if txn.rowcount > 1: - raise StoreError(500, "More than one row matched") - return ret - return self.runInteraction("_simple_selectupdate_one", func) + return self.runInteraction(desc, func) - def _simple_delete_one(self, table, keyvalues): + def _simple_delete_one(self, table, keyvalues, desc="_simple_delete_one"): """Executes a DELETE query on the named table, expecting to delete a single row. @@ -602,9 +671,9 @@ class SQLBaseStore(object): raise StoreError(404, "No row found") if txn.rowcount > 1: raise StoreError(500, "more than one row matched") - return self.runInteraction("_simple_delete_one", func) + return self.runInteraction(desc, func) - def _simple_delete(self, table, keyvalues): + def _simple_delete(self, table, keyvalues, desc="_simple_delete"): """Executes a DELETE query on the named table. Args: @@ -612,7 +681,7 @@ class SQLBaseStore(object): keyvalues : dict of column names and values to select the row with """ - return self.runInteraction("_simple_delete", self._simple_delete_txn) + return self.runInteraction(desc, self._simple_delete_txn) def _simple_delete_txn(self, txn, table, keyvalues): sql = "DELETE FROM %s WHERE %s" % ( @@ -782,6 +851,13 @@ class SQLBaseStore(object): return result[0] if result else None +class _RollbackButIsFineException(Exception): + """ This exception is used to rollback a transaction without implying + something went wrong. + """ + pass + + class Table(object): """ A base class used to store information about a particular table. """ |