Fix threadsafety in ThreadedMemoryReactorClock (#8497)
This could, very occasionally, cause:
```
tests.test_visibility.FilterEventsForServerTestCase.test_large_room
===============================================================================
[ERROR]
Traceback (most recent call last):
File "/src/tests/rest/media/v1/test_media_storage.py", line 86, in test_ensure_media_is_in_local_cache
self.wait_on_thread(x)
File "/src/tests/unittest.py", line 296, in wait_on_thread
self.reactor.advance(0.01)
File "/src/.tox/py35/lib/python3.5/site-packages/twisted/internet/task.py", line 826, in advance
self._sortCalls()
File "/src/.tox/py35/lib/python3.5/site-packages/twisted/internet/task.py", line 787, in _sortCalls
self.calls.sort(key=lambda a: a.getTime())
builtins.ValueError: list modified during sort
tests.rest.media.v1.test_media_storage.MediaStorageTests.test_ensure_media_is_in_local_cache
```
2 files changed, 33 insertions, 4 deletions
diff --git a/changelog.d/8497.misc b/changelog.d/8497.misc
new file mode 100644
index 0000000000..8bc05e8df6
--- /dev/null
+++ b/changelog.d/8497.misc
@@ -0,0 +1 @@
+Fix a threadsafety bug in unit tests.
diff --git a/tests/server.py b/tests/server.py
index f7f5276b21..422c8b42ca 100644
--- a/tests/server.py
+++ b/tests/server.py
@@ -1,8 +1,11 @@
import json
import logging
+from collections import deque
from io import SEEK_END, BytesIO
+from typing import Callable
import attr
+from typing_extensions import Deque
from zope.interface import implementer
from twisted.internet import address, threads, udp
@@ -251,6 +254,7 @@ class ThreadedMemoryReactorClock(MemoryReactorClock):
self._tcp_callbacks = {}
self._udp = []
lookups = self.lookups = {}
+ self._thread_callbacks = deque() # type: Deque[Callable[[], None]]()
@implementer(IResolverSimple)
class FakeResolver:
@@ -272,10 +276,10 @@ class ThreadedMemoryReactorClock(MemoryReactorClock):
"""
Make the callback fire in the next reactor iteration.
"""
- d = Deferred()
- d.addCallback(lambda x: callback(*args, **kwargs))
- self.callLater(0, d.callback, True)
- return d
+ cb = lambda: callback(*args, **kwargs)
+ # it's not safe to call callLater() here, so we append the callback to a
+ # separate queue.
+ self._thread_callbacks.append(cb)
def getThreadPool(self):
return self.threadpool
@@ -303,6 +307,30 @@ class ThreadedMemoryReactorClock(MemoryReactorClock):
return conn
+ def advance(self, amount):
+ # first advance our reactor's time, and run any "callLater" callbacks that
+ # makes ready
+ super().advance(amount)
+
+ # now run any "callFromThread" callbacks
+ while True:
+ try:
+ callback = self._thread_callbacks.popleft()
+ except IndexError:
+ break
+ callback()
+
+ # check for more "callLater" callbacks added by the thread callback
+ # This isn't required in a regular reactor, but it ends up meaning that
+ # our database queries can complete in a single call to `advance` [1] which
+ # simplifies tests.
+ #
+ # [1]: we replace the threadpool backing the db connection pool with a
+ # mock ThreadPool which doesn't really use threads; but we still use
+ # reactor.callFromThread to feed results back from the db functions to the
+ # main thread.
+ super().advance(0)
+
class ThreadPool:
"""
|