diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 59af21a2ca..b4abd83260 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -504,23 +504,26 @@ class EventsStore(SQLBaseStore):
if not events:
defer.returnValue({})
- def do_fetch(txn):
+ def do_fetch(conn):
event_list = []
while True:
try:
with self._event_fetch_lock:
- event_list = self._event_fetch_list
- self._event_fetch_list = []
-
- if not event_list:
+ i = 0
+ while not self._event_fetch_list:
self._event_fetch_ongoing -= 1
return
+ event_list = self._event_fetch_list
+ self._event_fetch_list = []
+
event_id_lists = zip(*event_list)[0]
event_ids = [
item for sublist in event_id_lists for item in sublist
]
- rows = self._fetch_event_rows(txn, event_ids)
+
+ with self._new_transaction(conn, "do_fetch", []) as txn:
+ rows = self._fetch_event_rows(txn, event_ids)
row_dict = {
r["event_id"]: r
@@ -528,22 +531,44 @@ class EventsStore(SQLBaseStore):
}
for ids, d in event_list:
- reactor.callFromThread(
- d.callback,
- [
- row_dict[i] for i in ids
- if i in row_dict
- ]
- )
+ def fire():
+ if not d.called:
+ d.callback(
+ [
+ row_dict[i]
+ for i in ids
+ if i in row_dict
+ ]
+ )
+ reactor.callFromThread(fire)
except Exception as e:
+ logger.exception("do_fetch")
for _, d in event_list:
- try:
+ if not d.called:
reactor.callFromThread(d.errback, e)
- except:
- pass
- def cb(rows):
- return defer.gatherResults([
+ with self._event_fetch_lock:
+ self._event_fetch_ongoing -= 1
+ return
+
+ events_d = defer.Deferred()
+ with self._event_fetch_lock:
+ self._event_fetch_list.append(
+ (events, events_d)
+ )
+
+ self._event_fetch_lock.notify_all()
+
+ # if self._event_fetch_ongoing < 5:
+ self._event_fetch_ongoing += 1
+ self.runWithConnection(
+ do_fetch
+ )
+
+ rows = yield events_d
+
+ res = yield defer.gatherResults(
+ [
self._get_event_from_row(
None,
row["internal_metadata"], row["json"], row["redacts"],
@@ -552,23 +577,9 @@ class EventsStore(SQLBaseStore):
rejected_reason=row["rejects"],
)
for row in rows
- ])
-
- d = defer.Deferred()
- d.addCallback(cb)
- with self._event_fetch_lock:
- self._event_fetch_list.append(
- (events, d)
- )
-
- if self._event_fetch_ongoing < 3:
- self._event_fetch_ongoing += 1
- self.runInteraction(
- "do_fetch",
- do_fetch
- )
-
- res = yield d
+ ],
+ consumeErrors=True
+ )
defer.returnValue({
e.event_id: e
|