Count and loop
2 files changed, 35 insertions, 37 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index c20ff3a572..97bf42469a 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -301,7 +301,7 @@ class SQLBaseStore(object):
self._event_fetch_lock = threading.Lock()
self._event_fetch_list = []
- self._event_fetch_ongoing = False
+ self._event_fetch_ongoing = 0
self.database_engine = hs.database_engine
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 0859518b1b..a6b2e7677f 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -506,41 +506,39 @@ class EventsStore(SQLBaseStore):
def do_fetch(txn):
event_list = []
- try:
- with self._event_fetch_lock:
- event_list = self._event_fetch_list
- self._event_fetch_list = []
-
- if not event_list:
- return
-
- event_id_lists = zip(*event_list)[0]
- event_ids = [
- item for sublist in event_id_lists for item in sublist
- ]
- rows = self._fetch_event_rows(txn, event_ids)
-
- row_dict = {
- r["event_id"]: r
- for r in rows
- }
+ while True:
+ try:
+ with self._event_fetch_lock:
+ event_list = self._event_fetch_list
+ self._event_fetch_list = []
+
+ if not event_list:
+ return
+
+ event_id_lists = zip(*event_list)[0]
+ event_ids = [
+ item for sublist in event_id_lists for item in sublist
+ ]
+ rows = self._fetch_event_rows(txn, event_ids)
+
+ row_dict = {
+ r["event_id"]: r
+ for r in rows
+ }
- for ids, d in event_list:
- d.callback(
- [
- row_dict[i] for i in ids
- if i in row_dict
- ]
- )
- except Exception as e:
- for _, d in event_list:
- try:
- reactor.callFromThread(d.errback, e)
- except:
- pass
- finally:
- with self._event_fetch_lock:
- self._event_fetch_ongoing = False
+ for ids, d in event_list:
+ d.callback(
+ [
+ row_dict[i] for i in ids
+ if i in row_dict
+ ]
+ )
+ except Exception as e:
+ for _, d in event_list:
+ try:
+ reactor.callFromThread(d.errback, e)
+ except:
+ pass
def cb(rows):
return defer.gatherResults([
@@ -561,12 +559,12 @@ class EventsStore(SQLBaseStore):
(events, d)
)
- if not self._event_fetch_ongoing:
+ if self._event_fetch_ongoing < 3:
+ self._event_fetch_ongoing += 1
self.runInteraction(
"do_fetch",
do_fetch
)
- self._event_fetch_ongoing = True
res = yield d
|