diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 97bf42469a..ceff99c16d 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -26,6 +26,8 @@ from util.id_generators import IdGenerator, StreamIdGenerator
from twisted.internet import defer
from collections import namedtuple, OrderedDict
+
+import contextlib
import functools
import sys
import time
@@ -299,7 +301,7 @@ class SQLBaseStore(object):
self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True,
max_entries=hs.config.event_cache_size)
- self._event_fetch_lock = threading.Lock()
+ self._event_fetch_lock = threading.Condition()
self._event_fetch_list = []
self._event_fetch_ongoing = 0
@@ -342,6 +344,84 @@ class SQLBaseStore(object):
self._clock.looping_call(loop, 10000)
+ @contextlib.contextmanager
+ def _new_transaction(self, conn, desc, after_callbacks):
+ start = time.time() * 1000
+ txn_id = self._TXN_ID
+
+ # We don't really need these to be unique, so lets stop it from
+ # growing really large.
+ self._TXN_ID = (self._TXN_ID + 1) % (sys.maxint - 1)
+
+ name = "%s-%x" % (desc, txn_id, )
+
+ transaction_logger.debug("[TXN START] {%s}", name)
+
+ try:
+ i = 0
+ N = 5
+ while True:
+ try:
+ txn = conn.cursor()
+ txn = LoggingTransaction(
+ txn, name, self.database_engine, after_callbacks
+ )
+ except self.database_engine.module.OperationalError as e:
+ # This can happen if the database disappears mid
+ # transaction.
+ logger.warn(
+ "[TXN OPERROR] {%s} %s %d/%d",
+ name, e, i, N
+ )
+ if i < N:
+ i += 1
+ try:
+ conn.rollback()
+ except self.database_engine.module.Error as e1:
+ logger.warn(
+ "[TXN EROLL] {%s} %s",
+ name, e1,
+ )
+ continue
+ raise
+ except self.database_engine.module.DatabaseError as e:
+ if self.database_engine.is_deadlock(e):
+ logger.warn("[TXN DEADLOCK] {%s} %d/%d", name, i, N)
+ if i < N:
+ i += 1
+ try:
+ conn.rollback()
+ except self.database_engine.module.Error as e1:
+ logger.warn(
+ "[TXN EROLL] {%s} %s",
+ name, e1,
+ )
+ continue
+ raise
+
+ try:
+ yield txn
+ conn.commit()
+ return
+ except:
+ try:
+ conn.rollback()
+ except:
+ pass
+ raise
+ except Exception as e:
+ logger.debug("[TXN FAIL] {%s} %s", name, e)
+ raise
+ finally:
+ end = time.time() * 1000
+ duration = end - start
+
+ transaction_logger.debug("[TXN END] {%s} %f", name, duration)
+
+ self._current_txn_total_time += duration
+ self._txn_perf_counters.update(desc, start, end)
+ sql_txn_timer.inc_by(duration, desc)
+
@defer.inlineCallbacks
def runInteraction(self, desc, func, *args, **kwargs):
"""Wraps the .runInteraction() method on the underlying db_pool."""
@@ -353,83 +433,49 @@ class SQLBaseStore(object):
def inner_func(conn, *args, **kwargs):
with LoggingContext("runInteraction") as context:
+ sql_scheduling_timer.inc_by(time.time() * 1000 - start_time)
+
if self.database_engine.is_connection_closed(conn):
logger.debug("Reconnecting closed database connection")
conn.reconnect()
current_context.copy_to(context)
- start = time.time() * 1000
- txn_id = self._TXN_ID
+ with self._new_transaction(conn, desc, after_callbacks) as txn:
+ return func(txn, *args, **kwargs)
- # We don't really need these to be unique, so lets stop it from
- # growing really large.
- self._TXN_ID = (self._TXN_ID + 1) % (sys.maxint - 1)
+ result = yield preserve_context_over_fn(
+ self._db_pool.runWithConnection,
+ inner_func, *args, **kwargs
+ )
- name = "%s-%x" % (desc, txn_id, )
+ for after_callback, after_args in after_callbacks:
+ after_callback(*after_args)
+ defer.returnValue(result)
+ @defer.inlineCallbacks
+ def runWithConnection(self, func, *args, **kwargs):
+ """Wraps the .runInteraction() method on the underlying db_pool."""
+ current_context = LoggingContext.current_context()
+
+ start_time = time.time() * 1000
+
+ def inner_func(conn, *args, **kwargs):
+ with LoggingContext("runWithConnection") as context:
sql_scheduling_timer.inc_by(time.time() * 1000 - start_time)
- transaction_logger.debug("[TXN START] {%s}", name)
- try:
- i = 0
- N = 5
- while True:
- try:
- txn = conn.cursor()
- txn = LoggingTransaction(
- txn, name, self.database_engine, after_callbacks
- )
- return func(txn, *args, **kwargs)
- except self.database_engine.module.OperationalError as e:
- # This can happen if the database disappears mid
- # transaction.
- logger.warn(
- "[TXN OPERROR] {%s} %s %d/%d",
- name, e, i, N
- )
- if i < N:
- i += 1
- try:
- conn.rollback()
- except self.database_engine.module.Error as e1:
- logger.warn(
- "[TXN EROLL] {%s} %s",
- name, e1,
- )
- continue
- except self.database_engine.module.DatabaseError as e:
- if self.database_engine.is_deadlock(e):
- logger.warn("[TXN DEADLOCK] {%s} %d/%d", name, i, N)
- if i < N:
- i += 1
- try:
- conn.rollback()
- except self.database_engine.module.Error as e1:
- logger.warn(
- "[TXN EROLL] {%s} %s",
- name, e1,
- )
- continue
- raise
- except Exception as e:
- logger.debug("[TXN FAIL] {%s} %s", name, e)
- raise
- finally:
- end = time.time() * 1000
- duration = end - start
- transaction_logger.debug("[TXN END] {%s} %f", name, duration)
+ if self.database_engine.is_connection_closed(conn):
+ logger.debug("Reconnecting closed database connection")
+ conn.reconnect()
+
+ current_context.copy_to(context)
- self._current_txn_total_time += duration
- self._txn_perf_counters.update(desc, start, end)
- sql_txn_timer.inc_by(duration, desc)
+ return func(conn, *args, **kwargs)
result = yield preserve_context_over_fn(
self._db_pool.runWithConnection,
inner_func, *args, **kwargs
)
- for after_callback, after_args in after_callbacks:
- after_callback(*after_args)
defer.returnValue(result)
def cursor_to_dict(self, cursor):
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index a323028546..4a855ffd56 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -19,6 +19,8 @@ from ._base import IncorrectDatabaseSetup
class PostgresEngine(object):
+ single_threaded = False
+
def __init__(self, database_module):
self.module = database_module
self.module.extensions.register_type(self.module.extensions.UNICODE)
diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py
index ff13d8006a..d18e2808d1 100644
--- a/synapse/storage/engines/sqlite3.py
+++ b/synapse/storage/engines/sqlite3.py
@@ -17,6 +17,8 @@ from synapse.storage import prepare_database, prepare_sqlite3_database
class Sqlite3Engine(object):
+ single_threaded = True
+
def __init__(self, database_module):
self.module = database_module
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 59af21a2ca..b4abd83260 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -504,23 +504,26 @@ class EventsStore(SQLBaseStore):
if not events:
defer.returnValue({})
- def do_fetch(txn):
+ def do_fetch(conn):
event_list = []
while True:
try:
with self._event_fetch_lock:
- event_list = self._event_fetch_list
- self._event_fetch_list = []
-
- if not event_list:
+ i = 0
+ while not self._event_fetch_list:
self._event_fetch_ongoing -= 1
return
+ event_list = self._event_fetch_list
+ self._event_fetch_list = []
+
event_id_lists = zip(*event_list)[0]
event_ids = [
item for sublist in event_id_lists for item in sublist
]
- rows = self._fetch_event_rows(txn, event_ids)
+
+ with self._new_transaction(conn, "do_fetch", []) as txn:
+ rows = self._fetch_event_rows(txn, event_ids)
row_dict = {
r["event_id"]: r
@@ -528,22 +531,44 @@ class EventsStore(SQLBaseStore):
}
for ids, d in event_list:
- reactor.callFromThread(
- d.callback,
- [
- row_dict[i] for i in ids
- if i in row_dict
- ]
- )
+ def fire():
+ if not d.called:
+ d.callback(
+ [
+ row_dict[i]
+ for i in ids
+ if i in row_dict
+ ]
+ )
+ reactor.callFromThread(fire)
except Exception as e:
+ logger.exception("do_fetch")
for _, d in event_list:
- try:
+ if not d.called:
reactor.callFromThread(d.errback, e)
- except:
- pass
- def cb(rows):
- return defer.gatherResults([
+ with self._event_fetch_lock:
+ self._event_fetch_ongoing -= 1
+ return
+
+ events_d = defer.Deferred()
+ with self._event_fetch_lock:
+ self._event_fetch_list.append(
+ (events, events_d)
+ )
+
+ self._event_fetch_lock.notify_all()
+
+ # if self._event_fetch_ongoing < 5:
+ self._event_fetch_ongoing += 1
+ self.runWithConnection(
+ do_fetch
+ )
+
+ rows = yield events_d
+
+ res = yield defer.gatherResults(
+ [
self._get_event_from_row(
None,
row["internal_metadata"], row["json"], row["redacts"],
@@ -552,23 +577,9 @@ class EventsStore(SQLBaseStore):
rejected_reason=row["rejects"],
)
for row in rows
- ])
-
- d = defer.Deferred()
- d.addCallback(cb)
- with self._event_fetch_lock:
- self._event_fetch_list.append(
- (events, d)
- )
-
- if self._event_fetch_ongoing < 3:
- self._event_fetch_ongoing += 1
- self.runInteraction(
- "do_fetch",
- do_fetch
- )
-
- res = yield d
+ ],
+ consumeErrors=True
+ )
defer.returnValue({
e.event_id: e
|