summary refs log tree commit diff
path: root/synapse/storage/_base.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/_base.py')
-rw-r--r--synapse/storage/_base.py26
1 files changed, 10 insertions, 16 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index ceff99c16d..0df1b46edc 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -301,10 +301,12 @@ class SQLBaseStore(object):
         self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True,
                                       max_entries=hs.config.event_cache_size)
 
-        self._event_fetch_lock = threading.Condition()
+        self._event_fetch_lock = threading.Lock()
         self._event_fetch_list = []
         self._event_fetch_ongoing = 0
 
+        self._pending_ds = []
+
         self.database_engine = hs.database_engine
 
         self._stream_id_gen = StreamIdGenerator()
@@ -344,8 +346,7 @@ class SQLBaseStore(object):
 
         self._clock.looping_call(loop, 10000)
 
-    @contextlib.contextmanager
-    def _new_transaction(self, conn, desc, after_callbacks):
+    def _new_transaction(self, conn, desc, after_callbacks, func, *args, **kwargs):
         start = time.time() * 1000
         txn_id = self._TXN_ID
 
@@ -366,6 +367,9 @@ class SQLBaseStore(object):
                     txn = LoggingTransaction(
                         txn, name, self.database_engine, after_callbacks
                     )
+                    r = func(txn, *args, **kwargs)
+                    conn.commit()
+                    return r
                 except self.database_engine.module.OperationalError as e:
                     # This can happen if the database disappears mid
                     # transaction.
@@ -398,17 +402,6 @@ class SQLBaseStore(object):
                                 )
                             continue
                     raise
-
-                try:
-                    yield txn
-                    conn.commit()
-                    return
-                except:
-                    try:
-                        conn.rollback()
-                    except:
-                        pass
-                    raise
         except Exception as e:
             logger.debug("[TXN FAIL] {%s} %s", name, e)
             raise
@@ -440,8 +433,9 @@ class SQLBaseStore(object):
                     conn.reconnect()
 
                 current_context.copy_to(context)
-                with self._new_transaction(conn, desc, after_callbacks) as txn:
-                    return func(txn, *args, **kwargs)
+                return self._new_transaction(
+                    conn, desc, after_callbacks, func, *args, **kwargs
+                )
 
         result = yield preserve_context_over_fn(
             self._db_pool.runWithConnection,