diff options
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r-- | synapse/storage/events.py | 81 |
1 files changed, 46 insertions, 35 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 59af21a2ca..b4abd83260 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -504,23 +504,26 @@ class EventsStore(SQLBaseStore): if not events: defer.returnValue({}) - def do_fetch(txn): + def do_fetch(conn): event_list = [] while True: try: with self._event_fetch_lock: - event_list = self._event_fetch_list - self._event_fetch_list = [] - - if not event_list: + i = 0 + while not self._event_fetch_list: self._event_fetch_ongoing -= 1 return + event_list = self._event_fetch_list + self._event_fetch_list = [] + event_id_lists = zip(*event_list)[0] event_ids = [ item for sublist in event_id_lists for item in sublist ] - rows = self._fetch_event_rows(txn, event_ids) + + with self._new_transaction(conn, "do_fetch", []) as txn: + rows = self._fetch_event_rows(txn, event_ids) row_dict = { r["event_id"]: r @@ -528,22 +531,44 @@ class EventsStore(SQLBaseStore): } for ids, d in event_list: - reactor.callFromThread( - d.callback, - [ - row_dict[i] for i in ids - if i in row_dict - ] - ) + def fire(): + if not d.called: + d.callback( + [ + row_dict[i] + for i in ids + if i in row_dict + ] + ) + reactor.callFromThread(fire) except Exception as e: + logger.exception("do_fetch") for _, d in event_list: - try: + if not d.called: reactor.callFromThread(d.errback, e) - except: - pass - def cb(rows): - return defer.gatherResults([ + with self._event_fetch_lock: + self._event_fetch_ongoing -= 1 + return + + events_d = defer.Deferred() + with self._event_fetch_lock: + self._event_fetch_list.append( + (events, events_d) + ) + + self._event_fetch_lock.notify_all() + + # if self._event_fetch_ongoing < 5: + self._event_fetch_ongoing += 1 + self.runWithConnection( + do_fetch + ) + + rows = yield events_d + + res = yield defer.gatherResults( + [ self._get_event_from_row( None, row["internal_metadata"], row["json"], row["redacts"], @@ -552,23 +577,9 @@ class EventsStore(SQLBaseStore): rejected_reason=row["rejects"], ) for row in rows - ]) - - d = defer.Deferred() - d.addCallback(cb) - with self._event_fetch_lock: - self._event_fetch_list.append( - (events, d) - ) - - if self._event_fetch_ongoing < 3: - self._event_fetch_ongoing += 1 - self.runInteraction( - "do_fetch", - do_fetch - ) - - res = yield d + ], + consumeErrors=True + ) defer.returnValue({ e.event_id: e |