summary refs log tree commit diff
path: root/synapse/storage/events.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2015-05-15 10:54:04 +0100
committerErik Johnston <erik@matrix.org>2015-05-15 10:54:04 +0100
commita2c4f3f150f63c720370f6882da804c8ac20fd69 (patch)
tree3b1465eeecbc337c70a3313441380f4a126df7d3 /synapse/storage/events.py
parentRemove race condition (diff)
downloadsynapse-a2c4f3f150f63c720370f6882da804c8ac20fd69.tar.xz
Fix daedlock
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r--synapse/storage/events.py125
1 files changed, 73 insertions, 52 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index b4abd83260..260bdf0ec4 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -420,12 +420,14 @@ class EventsStore(SQLBaseStore):
             ])
 
         if not txn:
+            logger.debug("enqueue before")
             missing_events = yield self._enqueue_events(
                 missing_events_ids,
                 check_redacted=check_redacted,
                 get_prev_content=get_prev_content,
                 allow_rejected=allow_rejected,
             )
+            logger.debug("enqueue after")
         else:
             missing_events = self._fetch_events_txn(
                 txn,
@@ -498,41 +500,39 @@ class EventsStore(SQLBaseStore):
             allow_rejected=allow_rejected,
         ))
 
-    @defer.inlineCallbacks
-    def _enqueue_events(self, events, check_redacted=True,
-                        get_prev_content=False, allow_rejected=False):
-        if not events:
-            defer.returnValue({})
-
-        def do_fetch(conn):
-            event_list = []
+    def _do_fetch(self, conn):
+        event_list = []
+        try:
             while True:
-                try:
-                    with self._event_fetch_lock:
-                        i = 0
-                        while not self._event_fetch_list:
-                            self._event_fetch_ongoing -= 1
-                            return
-
-                        event_list = self._event_fetch_list
-                        self._event_fetch_list = []
-
-                    event_id_lists = zip(*event_list)[0]
-                    event_ids = [
-                        item for sublist in event_id_lists for item in sublist
-                    ]
-
-                    with self._new_transaction(conn, "do_fetch", []) as txn:
-                        rows = self._fetch_event_rows(txn, event_ids)
-
-                    row_dict = {
-                        r["event_id"]: r
-                        for r in rows
-                    }
+                logger.debug("do_fetch getting lock")
+                with self._event_fetch_lock:
+                    logger.debug("do_fetch go lock: %r", self._event_fetch_list)
+                    event_list = self._event_fetch_list
+                    self._event_fetch_list = []
+                    if not event_list:
+                        self._event_fetch_ongoing -= 1
+                        return
+
+                event_id_lists = zip(*event_list)[0]
+                event_ids = [
+                    item for sublist in event_id_lists for item in sublist
+                ]
+
+                rows = self._new_transaction(
+                    conn, "do_fetch", [], self._fetch_event_rows, event_ids
+                )
 
-                    for ids, d in event_list:
-                        def fire():
-                            if not d.called:
+                row_dict = {
+                    r["event_id"]: r
+                    for r in rows
+                }
+
+                logger.debug("do_fetch got events: %r", row_dict.keys())
+
+                def fire(evs):
+                    for ids, d in evs:
+                        if not d.called:
+                            try:
                                 d.callback(
                                     [
                                         row_dict[i]
@@ -540,32 +540,51 @@ class EventsStore(SQLBaseStore):
                                         if i in row_dict
                                     ]
                                 )
-                        reactor.callFromThread(fire)
-                except Exception as e:
-                    logger.exception("do_fetch")
-                    for _, d in event_list:
-                        if not d.called:
-                            reactor.callFromThread(d.errback, e)
+                            except:
+                                logger.exception("Failed to callback")
+                reactor.callFromThread(fire, event_list)
+        except Exception as e:
+            logger.exception("do_fetch")
 
-                    with self._event_fetch_lock:
-                        self._event_fetch_ongoing -= 1
-                        return
+            def fire(evs):
+                for _, d in evs:
+                    if not d.called:
+                        d.errback(e)
+
+            if event_list:
+                reactor.callFromThread(fire, event_list)
+
+    @defer.inlineCallbacks
+    def _enqueue_events(self, events, check_redacted=True,
+                        get_prev_content=False, allow_rejected=False):
+        if not events:
+            defer.returnValue({})
 
         events_d = defer.Deferred()
-        with self._event_fetch_lock:
-            self._event_fetch_list.append(
-                (events, events_d)
-            )
+        try:
+            logger.debug("enqueueueueue getting lock")
+            with self._event_fetch_lock:
+                logger.debug("enqueue go lock")
+                self._event_fetch_list.append(
+                    (events, events_d)
+                )
 
-            self._event_fetch_lock.notify_all()
+                self._event_fetch_ongoing += 1
 
-            # if self._event_fetch_ongoing < 5:
-            self._event_fetch_ongoing += 1
             self.runWithConnection(
-                do_fetch
+                self._do_fetch
             )
 
-        rows = yield events_d
+        except Exception as e:
+            if not events_d.called:
+                events_d.errback(e)
+
+        logger.debug("events_d before")
+        try:
+            rows = yield events_d
+        except:
+            logger.exception("events_d")
+        logger.debug("events_d after")
 
         res = yield defer.gatherResults(
             [
@@ -580,6 +599,7 @@ class EventsStore(SQLBaseStore):
             ],
             consumeErrors=True
         )
+        logger.debug("gatherResults after")
 
         defer.returnValue({
             e.event_id: e
@@ -639,7 +659,8 @@ class EventsStore(SQLBaseStore):
                     rejected_reason=row["rejects"],
                 )
                 for row in rows
-            ]
+            ],
+            consumeErrors=True,
         )
 
         defer.returnValue({