summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/util/async.py24
-rw-r--r--tests/util/test_linearizer.py28
2 files changed, 46 insertions, 6 deletions
diff --git a/synapse/util/async.py b/synapse/util/async.py
index bb252f75d7..0fd5b42523 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -203,7 +203,26 @@ class Linearizer(object):
             except:
                 logger.exception("Unexpected exception in Linearizer")
 
-        logger.info("Acquired linearizer lock %r for key %r", self.name, key)
+            logger.info("Acquired linearizer lock %r for key %r", self.name,
+                        key)
+
+            # if the code holding the lock completes synchronously, then it
+            # will recursively run the next claimant on the list. That can
+            # relatively rapidly lead to stack exhaustion. This is essentially
+            # the same problem as http://twistedmatrix.com/trac/ticket/9304.
+            #
+            # In order to break the cycle, we add a cheeky sleep(0) here to
+            # ensure that we fall back to the reactor between each iteration.
+            #
+            # (There's no particular need for it to happen before we return
+            # the context manager, but it needs to happen while we hold the
+            # lock, and the context manager's exit code must be synchronous,
+            # so actually this is the only sensible place.
+            yield run_on_reactor()
+
+        else:
+            logger.info("Acquired uncontended linearizer lock %r for key %r",
+                        self.name, key)
 
         @contextmanager
         def _ctx_manager():
@@ -211,7 +230,8 @@ class Linearizer(object):
                 yield
             finally:
                 logger.info("Releasing linearizer lock %r for key %r", self.name, key)
-                new_defer.callback(None)
+                with PreserveLoggingContext():
+                    new_defer.callback(None)
                 current_d = self.key_to_defer.get(key)
                 if current_d is new_defer:
                     self.key_to_defer.pop(key, None)
diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py
index afcba482f9..793a88e462 100644
--- a/tests/util/test_linearizer.py
+++ b/tests/util/test_linearizer.py
@@ -12,8 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-
+from synapse.util import async, logcontext
 from tests import unittest
 
 from twisted.internet import defer
@@ -38,7 +37,28 @@ class LinearizerTestCase(unittest.TestCase):
         with cm1:
             self.assertFalse(d2.called)
 
-        self.assertTrue(d2.called)
-
         with (yield d2):
             pass
+
+    def test_lots_of_queued_things(self):
+        # we have one slow thing, and lots of fast things queued up behind it.
+        # it should *not* explode the stack.
+        linearizer = Linearizer()
+
+        @defer.inlineCallbacks
+        def func(i, sleep=False):
+            with logcontext.LoggingContext("func(%s)" % i) as lc:
+                with (yield linearizer.queue("")):
+                    self.assertEqual(
+                        logcontext.LoggingContext.current_context(), lc)
+                    if sleep:
+                        yield async.sleep(0)
+
+                self.assertEqual(
+                    logcontext.LoggingContext.current_context(), lc)
+
+        func(0, sleep=True)
+        for i in xrange(1, 100):
+            func(i)
+
+        return func(1000)