diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 260bdf0ec4..f2c181dde7 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -502,8 +502,8 @@ class EventsStore(SQLBaseStore):
def _do_fetch(self, conn):
event_list = []
- try:
- while True:
+ while True:
+ try:
logger.debug("do_fetch getting lock")
with self._event_fetch_lock:
logger.debug("do_fetch go lock: %r", self._event_fetch_list)
@@ -543,16 +543,16 @@ class EventsStore(SQLBaseStore):
except:
logger.exception("Failed to callback")
reactor.callFromThread(fire, event_list)
- except Exception as e:
- logger.exception("do_fetch")
+ except Exception as e:
+ logger.exception("do_fetch")
- def fire(evs):
- for _, d in evs:
- if not d.called:
- d.errback(e)
+ def fire(evs):
+ for _, d in evs:
+ if not d.called:
+ d.errback(e)
- if event_list:
- reactor.callFromThread(fire, event_list)
+ if event_list:
+ reactor.callFromThread(fire, event_list)
@defer.inlineCallbacks
def _enqueue_events(self, events, check_redacted=True,
@@ -561,29 +561,26 @@ class EventsStore(SQLBaseStore):
defer.returnValue({})
events_d = defer.Deferred()
- try:
- logger.debug("enqueueueueue getting lock")
- with self._event_fetch_lock:
- logger.debug("enqueue go lock")
- self._event_fetch_list.append(
- (events, events_d)
- )
+ logger.debug("enqueueueueue getting lock")
+ with self._event_fetch_lock:
+ logger.debug("enqueue go lock")
+ self._event_fetch_list.append(
+ (events, events_d)
+ )
+ if self._event_fetch_ongoing < 1:
self._event_fetch_ongoing += 1
+ should_start = True
+ else:
+ should_start = False
+ if should_start:
self.runWithConnection(
self._do_fetch
)
- except Exception as e:
- if not events_d.called:
- events_d.errback(e)
-
logger.debug("events_d before")
- try:
- rows = yield events_d
- except:
- logger.exception("events_d")
+ rows = yield events_d
logger.debug("events_d after")
res = yield defer.gatherResults(
|