summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2017-06-07 17:34:20 +0100
committerErik Johnston <erik@matrix.org>2017-06-07 17:39:36 +0100
commit197bd126f09b0df42b2cbb0bd7e121b04ab9d670 (patch)
treed9ff1eaeebc879138f1982c1f51e3e8667d85baa /synapse/storage
parentMerge branch 'release-v0.21.0' of github.com:matrix-org/synapse (diff)
downloadsynapse-197bd126f09b0df42b2cbb0bd7e121b04ab9d670.tar.xz
Fix bug where state_group tables got corrupted
This is due to the fact that we prefilled caches using txn.call_after,
which always gets called including on error.

We fix this by making txn.call_after only fire when a transaction
completes successfully, which is what we want most of the time anyway.
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py3
-rw-r--r--synapse/storage/_base.py29
-rw-r--r--synapse/storage/events.py2
3 files changed, 24 insertions, 10 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index d604e7668f..349f96e24b 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -225,7 +225,8 @@ class DataStore(RoomMemberStore, RoomStore,
             db_conn.cursor(),
             name="_find_stream_orderings_for_times_txn",
             database_engine=self.database_engine,
-            after_callbacks=[]
+            after_callbacks=[],
+            final_callbacks=[],
         )
         self._find_stream_orderings_for_times_txn(cur)
         cur.close()
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 58b73af7d2..f214b9d4c4 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -52,13 +52,17 @@ class LoggingTransaction(object):
     """An object that almost-transparently proxies for the 'txn' object
     passed to the constructor. Adds logging and metrics to the .execute()
     method."""
-    __slots__ = ["txn", "name", "database_engine", "after_callbacks"]
+    __slots__ = [
+        "txn", "name", "database_engine", "after_callbacks", "final_callbacks",
+    ]
 
-    def __init__(self, txn, name, database_engine, after_callbacks):
+    def __init__(self, txn, name, database_engine, after_callbacks,
+                 final_callbacks):
         object.__setattr__(self, "txn", txn)
         object.__setattr__(self, "name", name)
         object.__setattr__(self, "database_engine", database_engine)
         object.__setattr__(self, "after_callbacks", after_callbacks)
+        object.__setattr__(self, "final_callbacks", final_callbacks)
 
     def call_after(self, callback, *args, **kwargs):
         """Call the given callback on the main twisted thread after the
@@ -67,6 +71,9 @@ class LoggingTransaction(object):
         """
         self.after_callbacks.append((callback, args, kwargs))
 
+    def call_finally(self, callback, *args, **kwargs):
+        self.final_callbacks.append((callback, args, kwargs))
+
     def __getattr__(self, name):
         return getattr(self.txn, name)
 
@@ -217,8 +224,8 @@ class SQLBaseStore(object):
 
         self._clock.looping_call(loop, 10000)
 
-    def _new_transaction(self, conn, desc, after_callbacks, logging_context,
-                         func, *args, **kwargs):
+    def _new_transaction(self, conn, desc, after_callbacks, final_callbacks,
+                         logging_context, func, *args, **kwargs):
         start = time.time() * 1000
         txn_id = self._TXN_ID
 
@@ -237,7 +244,8 @@ class SQLBaseStore(object):
                 try:
                     txn = conn.cursor()
                     txn = LoggingTransaction(
-                        txn, name, self.database_engine, after_callbacks
+                        txn, name, self.database_engine, after_callbacks,
+                        final_callbacks,
                     )
                     r = func(txn, *args, **kwargs)
                     conn.commit()
@@ -298,6 +306,7 @@ class SQLBaseStore(object):
         start_time = time.time() * 1000
 
         after_callbacks = []
+        final_callbacks = []
 
         def inner_func(conn, *args, **kwargs):
             with LoggingContext("runInteraction") as context:
@@ -309,7 +318,7 @@ class SQLBaseStore(object):
 
                 current_context.copy_to(context)
                 return self._new_transaction(
-                    conn, desc, after_callbacks, current_context,
+                    conn, desc, after_callbacks, final_callbacks, current_context,
                     func, *args, **kwargs
                 )
 
@@ -318,9 +327,13 @@ class SQLBaseStore(object):
                 result = yield self._db_pool.runWithConnection(
                     inner_func, *args, **kwargs
                 )
-        finally:
+
             for after_callback, after_args, after_kwargs in after_callbacks:
                 after_callback(*after_args, **after_kwargs)
+        finally:
+            for after_callback, after_args, after_kwargs in final_callbacks:
+                after_callback(*after_args, **after_kwargs)
+
         defer.returnValue(result)
 
     @defer.inlineCallbacks
@@ -936,7 +949,7 @@ class SQLBaseStore(object):
             # __exit__ called after the transaction finishes.
             ctx = self._cache_id_gen.get_next()
             stream_id = ctx.__enter__()
-            txn.call_after(ctx.__exit__, None, None, None)
+            txn.call_finally(ctx.__exit__, None, None, None)
             txn.call_after(self.hs.get_notifier().on_new_replication_data)
 
             self._simple_insert_txn(
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index c4aeb48800..73283eb4c7 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -1419,7 +1419,7 @@ class EventsStore(SQLBaseStore):
                 ]
 
                 rows = self._new_transaction(
-                    conn, "do_fetch", [], None, self._fetch_event_rows, event_ids
+                    conn, "do_fetch", [], [], None, self._fetch_event_rows, event_ids
                 )
 
                 row_dict = {