summary refs log tree commit diff
path: root/tests
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2020-10-09 17:22:25 +0100
committerGitHub <noreply@github.com>2020-10-09 17:22:25 +0100
commit9789b1fba541a5ae01b946770416729e5b7e5b7e (patch)
tree6f09495a7e7c180444fb1f9d9e186c3f384c56ba /tests
parentIncrease default max_upload_size from 10M to 50M (#8502) (diff)
downloadsynapse-9789b1fba541a5ae01b946770416729e5b7e5b7e.tar.xz
Fix threadsafety in ThreadedMemoryReactorClock (#8497)
This could, very occasionally, cause:

```
tests.test_visibility.FilterEventsForServerTestCase.test_large_room
===============================================================================
[ERROR]
Traceback (most recent call last):
  File "/src/tests/rest/media/v1/test_media_storage.py", line 86, in test_ensure_media_is_in_local_cache
    self.wait_on_thread(x)
  File "/src/tests/unittest.py", line 296, in wait_on_thread
    self.reactor.advance(0.01)
  File "/src/.tox/py35/lib/python3.5/site-packages/twisted/internet/task.py", line 826, in advance
    self._sortCalls()
  File "/src/.tox/py35/lib/python3.5/site-packages/twisted/internet/task.py", line 787, in _sortCalls
    self.calls.sort(key=lambda a: a.getTime())
builtins.ValueError: list modified during sort

tests.rest.media.v1.test_media_storage.MediaStorageTests.test_ensure_media_is_in_local_cache
```
Diffstat (limited to '')
-rw-r--r--tests/server.py36
1 files changed, 32 insertions, 4 deletions
diff --git a/tests/server.py b/tests/server.py
index f7f5276b21..422c8b42ca 100644
--- a/tests/server.py
+++ b/tests/server.py
@@ -1,8 +1,11 @@
 import json
 import logging
+from collections import deque
 from io import SEEK_END, BytesIO
+from typing import Callable
 
 import attr
+from typing_extensions import Deque
 from zope.interface import implementer
 
 from twisted.internet import address, threads, udp
@@ -251,6 +254,7 @@ class ThreadedMemoryReactorClock(MemoryReactorClock):
         self._tcp_callbacks = {}
         self._udp = []
         lookups = self.lookups = {}
+        self._thread_callbacks = deque()  # type: Deque[Callable[[], None]]()
 
         @implementer(IResolverSimple)
         class FakeResolver:
@@ -272,10 +276,10 @@ class ThreadedMemoryReactorClock(MemoryReactorClock):
         """
         Make the callback fire in the next reactor iteration.
         """
-        d = Deferred()
-        d.addCallback(lambda x: callback(*args, **kwargs))
-        self.callLater(0, d.callback, True)
-        return d
+        cb = lambda: callback(*args, **kwargs)
+        # it's not safe to call callLater() here, so we append the callback to a
+        # separate queue.
+        self._thread_callbacks.append(cb)
 
     def getThreadPool(self):
         return self.threadpool
@@ -303,6 +307,30 @@ class ThreadedMemoryReactorClock(MemoryReactorClock):
 
         return conn
 
+    def advance(self, amount):
+        # first advance our reactor's time, and run any "callLater" callbacks that
+        # makes ready
+        super().advance(amount)
+
+        # now run any "callFromThread" callbacks
+        while True:
+            try:
+                callback = self._thread_callbacks.popleft()
+            except IndexError:
+                break
+            callback()
+
+            # check for more "callLater" callbacks added by the thread callback
+            # This isn't required in a regular reactor, but it ends up meaning that
+            # our database queries can complete in a single call to `advance` [1] which
+            # simplifies tests.
+            #
+            # [1]: we replace the threadpool backing the db connection pool with a
+            # mock ThreadPool which doesn't really use threads; but we still use
+            # reactor.callFromThread to feed results back from the db functions to the
+            # main thread.
+            super().advance(0)
+
 
 class ThreadPool:
     """