From 9789b1fba541a5ae01b946770416729e5b7e5b7e Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Fri, 9 Oct 2020 17:22:25 +0100 Subject: Fix threadsafety in ThreadedMemoryReactorClock (#8497) This could, very occasionally, cause: ``` tests.test_visibility.FilterEventsForServerTestCase.test_large_room =============================================================================== [ERROR] Traceback (most recent call last): File "/src/tests/rest/media/v1/test_media_storage.py", line 86, in test_ensure_media_is_in_local_cache self.wait_on_thread(x) File "/src/tests/unittest.py", line 296, in wait_on_thread self.reactor.advance(0.01) File "/src/.tox/py35/lib/python3.5/site-packages/twisted/internet/task.py", line 826, in advance self._sortCalls() File "/src/.tox/py35/lib/python3.5/site-packages/twisted/internet/task.py", line 787, in _sortCalls self.calls.sort(key=lambda a: a.getTime()) builtins.ValueError: list modified during sort tests.rest.media.v1.test_media_storage.MediaStorageTests.test_ensure_media_is_in_local_cache ``` --- tests/server.py | 36 ++++++++++++++++++++++++++++++++---- 1 file changed, 32 insertions(+), 4 deletions(-) (limited to 'tests') diff --git a/tests/server.py b/tests/server.py index f7f5276b21..422c8b42ca 100644 --- a/tests/server.py +++ b/tests/server.py @@ -1,8 +1,11 @@ import json import logging +from collections import deque from io import SEEK_END, BytesIO +from typing import Callable import attr +from typing_extensions import Deque from zope.interface import implementer from twisted.internet import address, threads, udp @@ -251,6 +254,7 @@ class ThreadedMemoryReactorClock(MemoryReactorClock): self._tcp_callbacks = {} self._udp = [] lookups = self.lookups = {} + self._thread_callbacks = deque() # type: Deque[Callable[[], None]]() @implementer(IResolverSimple) class FakeResolver: @@ -272,10 +276,10 @@ class ThreadedMemoryReactorClock(MemoryReactorClock): """ Make the callback fire in the next reactor iteration. """ - d = Deferred() - d.addCallback(lambda x: callback(*args, **kwargs)) - self.callLater(0, d.callback, True) - return d + cb = lambda: callback(*args, **kwargs) + # it's not safe to call callLater() here, so we append the callback to a + # separate queue. + self._thread_callbacks.append(cb) def getThreadPool(self): return self.threadpool @@ -303,6 +307,30 @@ class ThreadedMemoryReactorClock(MemoryReactorClock): return conn + def advance(self, amount): + # first advance our reactor's time, and run any "callLater" callbacks that + # makes ready + super().advance(amount) + + # now run any "callFromThread" callbacks + while True: + try: + callback = self._thread_callbacks.popleft() + except IndexError: + break + callback() + + # check for more "callLater" callbacks added by the thread callback + # This isn't required in a regular reactor, but it ends up meaning that + # our database queries can complete in a single call to `advance` [1] which + # simplifies tests. + # + # [1]: we replace the threadpool backing the db connection pool with a + # mock ThreadPool which doesn't really use threads; but we still use + # reactor.callFromThread to feed results back from the db functions to the + # main thread. + super().advance(0) + class ThreadPool: """ -- cgit 1.4.1