diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index b4abd83260..260bdf0ec4 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -420,12 +420,14 @@ class EventsStore(SQLBaseStore):
])
if not txn:
+ logger.debug("enqueue before")
missing_events = yield self._enqueue_events(
missing_events_ids,
check_redacted=check_redacted,
get_prev_content=get_prev_content,
allow_rejected=allow_rejected,
)
+ logger.debug("enqueue after")
else:
missing_events = self._fetch_events_txn(
txn,
@@ -498,41 +500,39 @@ class EventsStore(SQLBaseStore):
allow_rejected=allow_rejected,
))
- @defer.inlineCallbacks
- def _enqueue_events(self, events, check_redacted=True,
- get_prev_content=False, allow_rejected=False):
- if not events:
- defer.returnValue({})
-
- def do_fetch(conn):
- event_list = []
+ def _do_fetch(self, conn):
+ event_list = []
+ try:
while True:
- try:
- with self._event_fetch_lock:
- i = 0
- while not self._event_fetch_list:
- self._event_fetch_ongoing -= 1
- return
-
- event_list = self._event_fetch_list
- self._event_fetch_list = []
-
- event_id_lists = zip(*event_list)[0]
- event_ids = [
- item for sublist in event_id_lists for item in sublist
- ]
-
- with self._new_transaction(conn, "do_fetch", []) as txn:
- rows = self._fetch_event_rows(txn, event_ids)
-
- row_dict = {
- r["event_id"]: r
- for r in rows
- }
+ logger.debug("do_fetch getting lock")
+ with self._event_fetch_lock:
+ logger.debug("do_fetch go lock: %r", self._event_fetch_list)
+ event_list = self._event_fetch_list
+ self._event_fetch_list = []
+ if not event_list:
+ self._event_fetch_ongoing -= 1
+ return
+
+ event_id_lists = zip(*event_list)[0]
+ event_ids = [
+ item for sublist in event_id_lists for item in sublist
+ ]
+
+ rows = self._new_transaction(
+ conn, "do_fetch", [], self._fetch_event_rows, event_ids
+ )
- for ids, d in event_list:
- def fire():
- if not d.called:
+ row_dict = {
+ r["event_id"]: r
+ for r in rows
+ }
+
+ logger.debug("do_fetch got events: %r", row_dict.keys())
+
+ def fire(evs):
+ for ids, d in evs:
+ if not d.called:
+ try:
d.callback(
[
row_dict[i]
@@ -540,32 +540,51 @@ class EventsStore(SQLBaseStore):
if i in row_dict
]
)
- reactor.callFromThread(fire)
- except Exception as e:
- logger.exception("do_fetch")
- for _, d in event_list:
- if not d.called:
- reactor.callFromThread(d.errback, e)
+ except:
+ logger.exception("Failed to callback")
+ reactor.callFromThread(fire, event_list)
+ except Exception as e:
+ logger.exception("do_fetch")
- with self._event_fetch_lock:
- self._event_fetch_ongoing -= 1
- return
+ def fire(evs):
+ for _, d in evs:
+ if not d.called:
+ d.errback(e)
+
+ if event_list:
+ reactor.callFromThread(fire, event_list)
+
+ @defer.inlineCallbacks
+ def _enqueue_events(self, events, check_redacted=True,
+ get_prev_content=False, allow_rejected=False):
+ if not events:
+ defer.returnValue({})
events_d = defer.Deferred()
- with self._event_fetch_lock:
- self._event_fetch_list.append(
- (events, events_d)
- )
+ try:
+ logger.debug("enqueueueueue getting lock")
+ with self._event_fetch_lock:
+ logger.debug("enqueue go lock")
+ self._event_fetch_list.append(
+ (events, events_d)
+ )
- self._event_fetch_lock.notify_all()
+ self._event_fetch_ongoing += 1
- # if self._event_fetch_ongoing < 5:
- self._event_fetch_ongoing += 1
self.runWithConnection(
- do_fetch
+ self._do_fetch
)
- rows = yield events_d
+ except Exception as e:
+ if not events_d.called:
+ events_d.errback(e)
+
+ logger.debug("events_d before")
+ try:
+ rows = yield events_d
+ except:
+ logger.exception("events_d")
+ logger.debug("events_d after")
res = yield defer.gatherResults(
[
@@ -580,6 +599,7 @@ class EventsStore(SQLBaseStore):
],
consumeErrors=True
)
+ logger.debug("gatherResults after")
defer.returnValue({
e.event_id: e
@@ -639,7 +659,8 @@ class EventsStore(SQLBaseStore):
rejected_reason=row["rejects"],
)
for row in rows
- ]
+ ],
+ consumeErrors=True,
)
defer.returnValue({
|