diff options
author | Erik Johnston <erik@matrix.org> | 2015-05-15 10:54:04 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2015-05-15 10:54:04 +0100 |
commit | a2c4f3f150f63c720370f6882da804c8ac20fd69 (patch) | |
tree | 3b1465eeecbc337c70a3313441380f4a126df7d3 /synapse/storage/events.py | |
parent | Remove race condition (diff) | |
download | synapse-a2c4f3f150f63c720370f6882da804c8ac20fd69.tar.xz |
Fix daedlock
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r-- | synapse/storage/events.py | 125 |
1 files changed, 73 insertions, 52 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py index b4abd83260..260bdf0ec4 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -420,12 +420,14 @@ class EventsStore(SQLBaseStore): ]) if not txn: + logger.debug("enqueue before") missing_events = yield self._enqueue_events( missing_events_ids, check_redacted=check_redacted, get_prev_content=get_prev_content, allow_rejected=allow_rejected, ) + logger.debug("enqueue after") else: missing_events = self._fetch_events_txn( txn, @@ -498,41 +500,39 @@ class EventsStore(SQLBaseStore): allow_rejected=allow_rejected, )) - @defer.inlineCallbacks - def _enqueue_events(self, events, check_redacted=True, - get_prev_content=False, allow_rejected=False): - if not events: - defer.returnValue({}) - - def do_fetch(conn): - event_list = [] + def _do_fetch(self, conn): + event_list = [] + try: while True: - try: - with self._event_fetch_lock: - i = 0 - while not self._event_fetch_list: - self._event_fetch_ongoing -= 1 - return - - event_list = self._event_fetch_list - self._event_fetch_list = [] - - event_id_lists = zip(*event_list)[0] - event_ids = [ - item for sublist in event_id_lists for item in sublist - ] - - with self._new_transaction(conn, "do_fetch", []) as txn: - rows = self._fetch_event_rows(txn, event_ids) - - row_dict = { - r["event_id"]: r - for r in rows - } + logger.debug("do_fetch getting lock") + with self._event_fetch_lock: + logger.debug("do_fetch go lock: %r", self._event_fetch_list) + event_list = self._event_fetch_list + self._event_fetch_list = [] + if not event_list: + self._event_fetch_ongoing -= 1 + return + + event_id_lists = zip(*event_list)[0] + event_ids = [ + item for sublist in event_id_lists for item in sublist + ] + + rows = self._new_transaction( + conn, "do_fetch", [], self._fetch_event_rows, event_ids + ) - for ids, d in event_list: - def fire(): - if not d.called: + row_dict = { + r["event_id"]: r + for r in rows + } + + logger.debug("do_fetch got events: %r", row_dict.keys()) + + def fire(evs): + for ids, d in evs: + if not d.called: + try: d.callback( [ row_dict[i] @@ -540,32 +540,51 @@ class EventsStore(SQLBaseStore): if i in row_dict ] ) - reactor.callFromThread(fire) - except Exception as e: - logger.exception("do_fetch") - for _, d in event_list: - if not d.called: - reactor.callFromThread(d.errback, e) + except: + logger.exception("Failed to callback") + reactor.callFromThread(fire, event_list) + except Exception as e: + logger.exception("do_fetch") - with self._event_fetch_lock: - self._event_fetch_ongoing -= 1 - return + def fire(evs): + for _, d in evs: + if not d.called: + d.errback(e) + + if event_list: + reactor.callFromThread(fire, event_list) + + @defer.inlineCallbacks + def _enqueue_events(self, events, check_redacted=True, + get_prev_content=False, allow_rejected=False): + if not events: + defer.returnValue({}) events_d = defer.Deferred() - with self._event_fetch_lock: - self._event_fetch_list.append( - (events, events_d) - ) + try: + logger.debug("enqueueueueue getting lock") + with self._event_fetch_lock: + logger.debug("enqueue go lock") + self._event_fetch_list.append( + (events, events_d) + ) - self._event_fetch_lock.notify_all() + self._event_fetch_ongoing += 1 - # if self._event_fetch_ongoing < 5: - self._event_fetch_ongoing += 1 self.runWithConnection( - do_fetch + self._do_fetch ) - rows = yield events_d + except Exception as e: + if not events_d.called: + events_d.errback(e) + + logger.debug("events_d before") + try: + rows = yield events_d + except: + logger.exception("events_d") + logger.debug("events_d after") res = yield defer.gatherResults( [ @@ -580,6 +599,7 @@ class EventsStore(SQLBaseStore): ], consumeErrors=True ) + logger.debug("gatherResults after") defer.returnValue({ e.event_id: e @@ -639,7 +659,8 @@ class EventsStore(SQLBaseStore): rejected_reason=row["rejects"], ) for row in rows - ] + ], + consumeErrors=True, ) defer.returnValue({ |