summary refs log tree commit diff
path: root/synapse/storage/_base.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/_base.py')
-rw-r--r--synapse/storage/_base.py95
1 files changed, 83 insertions, 12 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index bae50e7d1f..76ed7d06fb 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -17,6 +17,7 @@ import logging
 from twisted.internet import defer
 
 from synapse.api.errors import StoreError
+from synapse.util.logutils import log_function
 
 import collections
 import copy
@@ -25,6 +26,44 @@ import json
 
 logger = logging.getLogger(__name__)
 
+sql_logger = logging.getLogger("synapse.storage.SQL")
+
+
+class LoggingTransaction(object):
+    """An object that almost-transparently proxies for the 'txn' object
+    passed to the constructor. Adds logging to the .execute() method."""
+    __slots__ = ["txn"]
+
+    def __init__(self, txn):
+        object.__setattr__(self, "txn", txn)
+
+    def __getattribute__(self, name):
+        if name == "execute":
+            return object.__getattribute__(self, "execute")
+
+        return getattr(object.__getattribute__(self, "txn"), name)
+
+    def __setattr__(self, name, value):
+        setattr(object.__getattribute__(self, "txn"), name, value)
+
+    def execute(self, sql, *args, **kwargs):
+        # TODO(paul): Maybe use 'info' and 'debug' for values?
+        sql_logger.debug("[SQL] %s", sql)
+        try:
+            if args and args[0]:
+                values = args[0]
+                sql_logger.debug("[SQL values] " +
+                    ", ".join(("<%s>",) * len(values)), *values)
+        except:
+            # Don't let logging failures stop SQL from working
+            pass
+
+        # TODO(paul): Here would be an excellent place to put some timing
+        #   measurements, and log (warning?) slow queries.
+        return object.__getattribute__(self, "txn").execute(
+            sql, *args, **kwargs
+        )
+
 
 class SQLBaseStore(object):
 
@@ -34,6 +73,13 @@ class SQLBaseStore(object):
         self.event_factory = hs.get_event_factory()
         self._clock = hs.get_clock()
 
+    def runInteraction(self, func, *args, **kwargs):
+        """Wraps the .runInteraction() method on the underlying db_pool."""
+        def inner_func(txn, *args, **kwargs):
+            return func(LoggingTransaction(txn), *args, **kwargs)
+
+        return self._db_pool.runInteraction(inner_func, *args, **kwargs)
+
     def cursor_to_dict(self, cursor):
         """Converts a SQL cursor into an list of dicts.
 
@@ -59,11 +105,6 @@ class SQLBaseStore(object):
         Returns:
             The result of decoder(results)
         """
-        logger.debug(
-            "[SQL] %s  Args=%s Func=%s",
-            query, args, decoder.__name__ if decoder else None
-        )
-
         def interaction(txn):
             cursor = txn.execute(query, args)
             if decoder:
@@ -71,7 +112,7 @@ class SQLBaseStore(object):
             else:
                 return cursor.fetchall()
 
-        return self._db_pool.runInteraction(interaction)
+        return self.runInteraction(interaction)
 
     def _execute_and_decode(self, query, *args):
         return self._execute(self.cursor_to_dict, query, *args)
@@ -87,10 +128,11 @@ class SQLBaseStore(object):
             values : dict of new column names and values for them
             or_replace : bool; if True performs an INSERT OR REPLACE
         """
-        return self._db_pool.runInteraction(
+        return self.runInteraction(
             self._simple_insert_txn, table, values, or_replace=or_replace
         )
 
+    @log_function
     def _simple_insert_txn(self, txn, table, values, or_replace=False):
         sql = "%s INTO %s (%s) VALUES(%s)" % (
             ("INSERT OR REPLACE" if or_replace else "INSERT"),
@@ -98,6 +140,12 @@ class SQLBaseStore(object):
             ", ".join(k for k in values),
             ", ".join("?" for k in values)
         )
+
+        logger.debug(
+            "[SQL] %s  Args=%s Func=%s",
+            sql, values.values(),
+        )
+
         txn.execute(sql, values.values())
         return txn.lastrowid
 
@@ -164,7 +212,7 @@ class SQLBaseStore(object):
             txn.execute(sql, keyvalues.values())
             return txn.fetchall()
 
-        res = yield self._db_pool.runInteraction(func)
+        res = yield self.runInteraction(func)
 
         defer.returnValue([r[0] for r in res])
 
@@ -187,7 +235,7 @@ class SQLBaseStore(object):
             txn.execute(sql, keyvalues.values())
             return self.cursor_to_dict(txn)
 
-        return self._db_pool.runInteraction(func)
+        return self.runInteraction(func)
 
     def _simple_update_one(self, table, keyvalues, updatevalues,
                            retcols=None):
@@ -255,7 +303,7 @@ class SQLBaseStore(object):
                     raise StoreError(500, "More than one row matched")
 
             return ret
-        return self._db_pool.runInteraction(func)
+        return self.runInteraction(func)
 
     def _simple_delete_one(self, table, keyvalues):
         """Executes a DELETE query on the named table, expecting to delete a
@@ -276,7 +324,7 @@ class SQLBaseStore(object):
                 raise StoreError(404, "No row found")
             if txn.rowcount > 1:
                 raise StoreError(500, "more than one row matched")
-        return self._db_pool.runInteraction(func)
+        return self.runInteraction(func)
 
     def _simple_max_id(self, table):
         """Executes a SELECT query on the named table, expecting to return the
@@ -294,7 +342,7 @@ class SQLBaseStore(object):
                 return 0
             return max_id
 
-        return self._db_pool.runInteraction(func)
+        return self.runInteraction(func)
 
     def _parse_event_from_row(self, row_dict):
         d = copy.deepcopy({k: v for k, v in row_dict.items() if v})
@@ -307,11 +355,34 @@ class SQLBaseStore(object):
         d["content"] = json.loads(d["content"])
         del d["unrecognized_keys"]
 
+        if "age_ts" not in d:
+            # For compatibility
+            d["age_ts"] = d["ts"] if "ts" in d else 0
+
         return self.event_factory.create_event(
             etype=d["type"],
             **d
         )
 
+    def _parse_events(self, rows):
+        return self.runInteraction(self._parse_events_txn, rows)
+
+    def _parse_events_txn(self, txn, rows):
+        events = [self._parse_event_from_row(r) for r in rows]
+
+        sql = "SELECT * FROM events WHERE event_id = ?"
+
+        for ev in events:
+           if hasattr(ev, "prev_state"):
+                # Load previous state_content. 
+                # TODO: Should we be pulling this out above?
+                cursor = txn.execute(sql, (ev.prev_state,))
+                prevs = self.cursor_to_dict(cursor)
+                if prevs:
+                    prev = self._parse_event_from_row(prevs[0])
+                    ev.prev_content = prev.content
+
+        return events
 
 class Table(object):
     """ A base class used to store information about a particular table.