summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/storage/_base.py2
-rw-r--r--synapse/storage/events.py70
2 files changed, 35 insertions, 37 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index c20ff3a572..97bf42469a 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -301,7 +301,7 @@ class SQLBaseStore(object):
 
         self._event_fetch_lock = threading.Lock()
         self._event_fetch_list = []
-        self._event_fetch_ongoing = False
+        self._event_fetch_ongoing = 0
 
         self.database_engine = hs.database_engine
 
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 0859518b1b..a6b2e7677f 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -506,41 +506,39 @@ class EventsStore(SQLBaseStore):
 
         def do_fetch(txn):
             event_list = []
-            try:
-                with self._event_fetch_lock:
-                    event_list = self._event_fetch_list
-                    self._event_fetch_list = []
-
-                    if not event_list:
-                        return
-
-                event_id_lists = zip(*event_list)[0]
-                event_ids = [
-                    item for sublist in event_id_lists for item in sublist
-                ]
-                rows = self._fetch_event_rows(txn, event_ids)
-
-                row_dict = {
-                    r["event_id"]: r
-                    for r in rows
-                }
+            while True:
+                try:
+                    with self._event_fetch_lock:
+                        event_list = self._event_fetch_list
+                        self._event_fetch_list = []
+
+                        if not event_list:
+                            return
+
+                    event_id_lists = zip(*event_list)[0]
+                    event_ids = [
+                        item for sublist in event_id_lists for item in sublist
+                    ]
+                    rows = self._fetch_event_rows(txn, event_ids)
+
+                    row_dict = {
+                        r["event_id"]: r
+                        for r in rows
+                    }
 
-                for ids, d in event_list:
-                    d.callback(
-                        [
-                            row_dict[i] for i in ids
-                            if i in row_dict
-                        ]
-                    )
-            except Exception as e:
-                for _, d in event_list:
-                    try:
-                        reactor.callFromThread(d.errback, e)
-                    except:
-                        pass
-            finally:
-                with self._event_fetch_lock:
-                    self._event_fetch_ongoing = False
+                    for ids, d in event_list:
+                        d.callback(
+                            [
+                                row_dict[i] for i in ids
+                                if i in row_dict
+                            ]
+                        )
+                except Exception as e:
+                    for _, d in event_list:
+                        try:
+                            reactor.callFromThread(d.errback, e)
+                        except:
+                            pass
 
         def cb(rows):
             return defer.gatherResults([
@@ -561,12 +559,12 @@ class EventsStore(SQLBaseStore):
                 (events, d)
             )
 
-            if not self._event_fetch_ongoing:
+            if self._event_fetch_ongoing < 3:
+                self._event_fetch_ongoing += 1
                 self.runInteraction(
                     "do_fetch",
                     do_fetch
                 )
-                self._event_fetch_ongoing = True
 
         res = yield d