diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index ceff99c16d..0df1b46edc 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -301,10 +301,12 @@ class SQLBaseStore(object):
self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True,
max_entries=hs.config.event_cache_size)
- self._event_fetch_lock = threading.Condition()
+ self._event_fetch_lock = threading.Lock()
self._event_fetch_list = []
self._event_fetch_ongoing = 0
+ self._pending_ds = []
+
self.database_engine = hs.database_engine
self._stream_id_gen = StreamIdGenerator()
@@ -344,8 +346,7 @@ class SQLBaseStore(object):
self._clock.looping_call(loop, 10000)
- @contextlib.contextmanager
- def _new_transaction(self, conn, desc, after_callbacks):
+ def _new_transaction(self, conn, desc, after_callbacks, func, *args, **kwargs):
start = time.time() * 1000
txn_id = self._TXN_ID
@@ -366,6 +367,9 @@ class SQLBaseStore(object):
txn = LoggingTransaction(
txn, name, self.database_engine, after_callbacks
)
+ r = func(txn, *args, **kwargs)
+ conn.commit()
+ return r
except self.database_engine.module.OperationalError as e:
# This can happen if the database disappears mid
# transaction.
@@ -398,17 +402,6 @@ class SQLBaseStore(object):
)
continue
raise
-
- try:
- yield txn
- conn.commit()
- return
- except:
- try:
- conn.rollback()
- except:
- pass
- raise
except Exception as e:
logger.debug("[TXN FAIL] {%s} %s", name, e)
raise
@@ -440,8 +433,9 @@ class SQLBaseStore(object):
conn.reconnect()
current_context.copy_to(context)
- with self._new_transaction(conn, desc, after_callbacks) as txn:
- return func(txn, *args, **kwargs)
+ return self._new_transaction(
+ conn, desc, after_callbacks, func, *args, **kwargs
+ )
result = yield preserve_context_over_fn(
self._db_pool.runWithConnection,
|