diff options
Diffstat (limited to 'synapse/storage/_base.py')
-rw-r--r-- | synapse/storage/_base.py | 95 |
1 files changed, 83 insertions, 12 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index bae50e7d1f..76ed7d06fb 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -17,6 +17,7 @@ import logging from twisted.internet import defer from synapse.api.errors import StoreError +from synapse.util.logutils import log_function import collections import copy @@ -25,6 +26,44 @@ import json logger = logging.getLogger(__name__) +sql_logger = logging.getLogger("synapse.storage.SQL") + + +class LoggingTransaction(object): + """An object that almost-transparently proxies for the 'txn' object + passed to the constructor. Adds logging to the .execute() method.""" + __slots__ = ["txn"] + + def __init__(self, txn): + object.__setattr__(self, "txn", txn) + + def __getattribute__(self, name): + if name == "execute": + return object.__getattribute__(self, "execute") + + return getattr(object.__getattribute__(self, "txn"), name) + + def __setattr__(self, name, value): + setattr(object.__getattribute__(self, "txn"), name, value) + + def execute(self, sql, *args, **kwargs): + # TODO(paul): Maybe use 'info' and 'debug' for values? + sql_logger.debug("[SQL] %s", sql) + try: + if args and args[0]: + values = args[0] + sql_logger.debug("[SQL values] " + + ", ".join(("<%s>",) * len(values)), *values) + except: + # Don't let logging failures stop SQL from working + pass + + # TODO(paul): Here would be an excellent place to put some timing + # measurements, and log (warning?) slow queries. + return object.__getattribute__(self, "txn").execute( + sql, *args, **kwargs + ) + class SQLBaseStore(object): @@ -34,6 +73,13 @@ class SQLBaseStore(object): self.event_factory = hs.get_event_factory() self._clock = hs.get_clock() + def runInteraction(self, func, *args, **kwargs): + """Wraps the .runInteraction() method on the underlying db_pool.""" + def inner_func(txn, *args, **kwargs): + return func(LoggingTransaction(txn), *args, **kwargs) + + return self._db_pool.runInteraction(inner_func, *args, **kwargs) + def cursor_to_dict(self, cursor): """Converts a SQL cursor into an list of dicts. @@ -59,11 +105,6 @@ class SQLBaseStore(object): Returns: The result of decoder(results) """ - logger.debug( - "[SQL] %s Args=%s Func=%s", - query, args, decoder.__name__ if decoder else None - ) - def interaction(txn): cursor = txn.execute(query, args) if decoder: @@ -71,7 +112,7 @@ class SQLBaseStore(object): else: return cursor.fetchall() - return self._db_pool.runInteraction(interaction) + return self.runInteraction(interaction) def _execute_and_decode(self, query, *args): return self._execute(self.cursor_to_dict, query, *args) @@ -87,10 +128,11 @@ class SQLBaseStore(object): values : dict of new column names and values for them or_replace : bool; if True performs an INSERT OR REPLACE """ - return self._db_pool.runInteraction( + return self.runInteraction( self._simple_insert_txn, table, values, or_replace=or_replace ) + @log_function def _simple_insert_txn(self, txn, table, values, or_replace=False): sql = "%s INTO %s (%s) VALUES(%s)" % ( ("INSERT OR REPLACE" if or_replace else "INSERT"), @@ -98,6 +140,12 @@ class SQLBaseStore(object): ", ".join(k for k in values), ", ".join("?" for k in values) ) + + logger.debug( + "[SQL] %s Args=%s Func=%s", + sql, values.values(), + ) + txn.execute(sql, values.values()) return txn.lastrowid @@ -164,7 +212,7 @@ class SQLBaseStore(object): txn.execute(sql, keyvalues.values()) return txn.fetchall() - res = yield self._db_pool.runInteraction(func) + res = yield self.runInteraction(func) defer.returnValue([r[0] for r in res]) @@ -187,7 +235,7 @@ class SQLBaseStore(object): txn.execute(sql, keyvalues.values()) return self.cursor_to_dict(txn) - return self._db_pool.runInteraction(func) + return self.runInteraction(func) def _simple_update_one(self, table, keyvalues, updatevalues, retcols=None): @@ -255,7 +303,7 @@ class SQLBaseStore(object): raise StoreError(500, "More than one row matched") return ret - return self._db_pool.runInteraction(func) + return self.runInteraction(func) def _simple_delete_one(self, table, keyvalues): """Executes a DELETE query on the named table, expecting to delete a @@ -276,7 +324,7 @@ class SQLBaseStore(object): raise StoreError(404, "No row found") if txn.rowcount > 1: raise StoreError(500, "more than one row matched") - return self._db_pool.runInteraction(func) + return self.runInteraction(func) def _simple_max_id(self, table): """Executes a SELECT query on the named table, expecting to return the @@ -294,7 +342,7 @@ class SQLBaseStore(object): return 0 return max_id - return self._db_pool.runInteraction(func) + return self.runInteraction(func) def _parse_event_from_row(self, row_dict): d = copy.deepcopy({k: v for k, v in row_dict.items() if v}) @@ -307,11 +355,34 @@ class SQLBaseStore(object): d["content"] = json.loads(d["content"]) del d["unrecognized_keys"] + if "age_ts" not in d: + # For compatibility + d["age_ts"] = d["ts"] if "ts" in d else 0 + return self.event_factory.create_event( etype=d["type"], **d ) + def _parse_events(self, rows): + return self.runInteraction(self._parse_events_txn, rows) + + def _parse_events_txn(self, txn, rows): + events = [self._parse_event_from_row(r) for r in rows] + + sql = "SELECT * FROM events WHERE event_id = ?" + + for ev in events: + if hasattr(ev, "prev_state"): + # Load previous state_content. + # TODO: Should we be pulling this out above? + cursor = txn.execute(sql, (ev.prev_state,)) + prevs = self.cursor_to_dict(cursor) + if prevs: + prev = self._parse_event_from_row(prevs[0]) + ev.prev_content = prev.content + + return events class Table(object): """ A base class used to store information about a particular table. |