diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/storage/_base.py | 2 | ||||
-rw-r--r-- | synapse/storage/events.py | 70 |
2 files changed, 35 insertions, 37 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index c20ff3a572..97bf42469a 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -301,7 +301,7 @@ class SQLBaseStore(object): self._event_fetch_lock = threading.Lock() self._event_fetch_list = [] - self._event_fetch_ongoing = False + self._event_fetch_ongoing = 0 self.database_engine = hs.database_engine diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 0859518b1b..a6b2e7677f 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -506,41 +506,39 @@ class EventsStore(SQLBaseStore): def do_fetch(txn): event_list = [] - try: - with self._event_fetch_lock: - event_list = self._event_fetch_list - self._event_fetch_list = [] - - if not event_list: - return - - event_id_lists = zip(*event_list)[0] - event_ids = [ - item for sublist in event_id_lists for item in sublist - ] - rows = self._fetch_event_rows(txn, event_ids) - - row_dict = { - r["event_id"]: r - for r in rows - } + while True: + try: + with self._event_fetch_lock: + event_list = self._event_fetch_list + self._event_fetch_list = [] + + if not event_list: + return + + event_id_lists = zip(*event_list)[0] + event_ids = [ + item for sublist in event_id_lists for item in sublist + ] + rows = self._fetch_event_rows(txn, event_ids) + + row_dict = { + r["event_id"]: r + for r in rows + } - for ids, d in event_list: - d.callback( - [ - row_dict[i] for i in ids - if i in row_dict - ] - ) - except Exception as e: - for _, d in event_list: - try: - reactor.callFromThread(d.errback, e) - except: - pass - finally: - with self._event_fetch_lock: - self._event_fetch_ongoing = False + for ids, d in event_list: + d.callback( + [ + row_dict[i] for i in ids + if i in row_dict + ] + ) + except Exception as e: + for _, d in event_list: + try: + reactor.callFromThread(d.errback, e) + except: + pass def cb(rows): return defer.gatherResults([ @@ -561,12 +559,12 @@ class EventsStore(SQLBaseStore): (events, d) ) - if not self._event_fetch_ongoing: + if self._event_fetch_ongoing < 3: + self._event_fetch_ongoing += 1 self.runInteraction( "do_fetch", do_fetch ) - self._event_fetch_ongoing = True res = yield d |