diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py
index 3380970e4e..c78801015b 100644
--- a/synapse/util/file_consumer.py
+++ b/synapse/util/file_consumer.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import threads, reactor
+from twisted.internet import threads
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
@@ -27,6 +27,7 @@ class BackgroundFileConsumer(object):
Args:
file_obj (file): The file like object to write to. Closed when
finished.
+ reactor (twisted.internet.reactor): the Twisted reactor to use
"""
# For PushProducers pause if we have this many unwritten slices
@@ -34,9 +35,11 @@ class BackgroundFileConsumer(object):
# And resume once the size of the queue is less than this
_RESUME_ON_QUEUE_SIZE = 2
- def __init__(self, file_obj):
+ def __init__(self, file_obj, reactor):
self._file_obj = file_obj
+ self._reactor = reactor
+
# Producer we're registered with
self._producer = None
@@ -71,7 +74,10 @@ class BackgroundFileConsumer(object):
self._producer = producer
self.streaming = streaming
self._finished_deferred = run_in_background(
- threads.deferToThread, self._writer
+ threads.deferToThreadPool,
+ self._reactor,
+ self._reactor.getThreadPool(),
+ self._writer,
)
if not streaming:
self._producer.resumeProducing()
@@ -109,7 +115,7 @@ class BackgroundFileConsumer(object):
# producer.
if self._producer and self._paused_producer:
if self._bytes_queue.qsize() <= self._RESUME_ON_QUEUE_SIZE:
- reactor.callFromThread(self._resume_paused_producer)
+ self._reactor.callFromThread(self._resume_paused_producer)
bytes = self._bytes_queue.get()
@@ -121,7 +127,7 @@ class BackgroundFileConsumer(object):
# If its a pull producer then we need to explicitly ask for
# more stuff.
if not self.streaming and self._producer:
- reactor.callFromThread(self._producer.resumeProducing)
+ self._reactor.callFromThread(self._producer.resumeProducing)
except Exception as e:
self._write_exception = e
raise
|