summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/events.py47
1 files changed, 22 insertions, 25 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 260bdf0ec4..f2c181dde7 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -502,8 +502,8 @@ class EventsStore(SQLBaseStore):
 
     def _do_fetch(self, conn):
         event_list = []
-        try:
-            while True:
+        while True:
+            try:
                 logger.debug("do_fetch getting lock")
                 with self._event_fetch_lock:
                     logger.debug("do_fetch go lock: %r", self._event_fetch_list)
@@ -543,16 +543,16 @@ class EventsStore(SQLBaseStore):
                             except:
                                 logger.exception("Failed to callback")
                 reactor.callFromThread(fire, event_list)
-        except Exception as e:
-            logger.exception("do_fetch")
+            except Exception as e:
+                logger.exception("do_fetch")
 
-            def fire(evs):
-                for _, d in evs:
-                    if not d.called:
-                        d.errback(e)
+                def fire(evs):
+                    for _, d in evs:
+                        if not d.called:
+                            d.errback(e)
 
-            if event_list:
-                reactor.callFromThread(fire, event_list)
+                if event_list:
+                    reactor.callFromThread(fire, event_list)
 
     @defer.inlineCallbacks
     def _enqueue_events(self, events, check_redacted=True,
@@ -561,29 +561,26 @@ class EventsStore(SQLBaseStore):
             defer.returnValue({})
 
         events_d = defer.Deferred()
-        try:
-            logger.debug("enqueueueueue getting lock")
-            with self._event_fetch_lock:
-                logger.debug("enqueue go lock")
-                self._event_fetch_list.append(
-                    (events, events_d)
-                )
+        logger.debug("enqueueueueue getting lock")
+        with self._event_fetch_lock:
+            logger.debug("enqueue go lock")
+            self._event_fetch_list.append(
+                (events, events_d)
+            )
 
+            if self._event_fetch_ongoing < 1:
                 self._event_fetch_ongoing += 1
+                should_start = True
+            else:
+                should_start = False
 
+        if should_start:
             self.runWithConnection(
                 self._do_fetch
             )
 
-        except Exception as e:
-            if not events_d.called:
-                events_d.errback(e)
-
         logger.debug("events_d before")
-        try:
-            rows = yield events_d
-        except:
-            logger.exception("events_d")
+        rows = yield events_d
         logger.debug("events_d after")
 
         res = yield defer.gatherResults(