diff options
author | Amber Brown <hawkowl@atleastfornow.net> | 2018-06-22 09:37:10 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-06-22 09:37:10 +0100 |
commit | 77ac14b960cb8daef76062ce85fc0427749b48af (patch) | |
tree | 4a2b3af0e0404640a2ef251030cd3c2591a1d6ba /synapse/util/file_consumer.py | |
parent | Populate synapse_federation_client_sent_pdu_destinations:count again (#3386) (diff) | |
download | synapse-77ac14b960cb8daef76062ce85fc0427749b48af.tar.xz |
Pass around the reactor explicitly (#3385)
Diffstat (limited to 'synapse/util/file_consumer.py')
-rw-r--r-- | synapse/util/file_consumer.py | 16 |
1 files changed, 11 insertions, 5 deletions
diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py index 3380970e4e..c78801015b 100644 --- a/synapse/util/file_consumer.py +++ b/synapse/util/file_consumer.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import threads, reactor +from twisted.internet import threads from synapse.util.logcontext import make_deferred_yieldable, run_in_background @@ -27,6 +27,7 @@ class BackgroundFileConsumer(object): Args: file_obj (file): The file like object to write to. Closed when finished. + reactor (twisted.internet.reactor): the Twisted reactor to use """ # For PushProducers pause if we have this many unwritten slices @@ -34,9 +35,11 @@ class BackgroundFileConsumer(object): # And resume once the size of the queue is less than this _RESUME_ON_QUEUE_SIZE = 2 - def __init__(self, file_obj): + def __init__(self, file_obj, reactor): self._file_obj = file_obj + self._reactor = reactor + # Producer we're registered with self._producer = None @@ -71,7 +74,10 @@ class BackgroundFileConsumer(object): self._producer = producer self.streaming = streaming self._finished_deferred = run_in_background( - threads.deferToThread, self._writer + threads.deferToThreadPool, + self._reactor, + self._reactor.getThreadPool(), + self._writer, ) if not streaming: self._producer.resumeProducing() @@ -109,7 +115,7 @@ class BackgroundFileConsumer(object): # producer. if self._producer and self._paused_producer: if self._bytes_queue.qsize() <= self._RESUME_ON_QUEUE_SIZE: - reactor.callFromThread(self._resume_paused_producer) + self._reactor.callFromThread(self._resume_paused_producer) bytes = self._bytes_queue.get() @@ -121,7 +127,7 @@ class BackgroundFileConsumer(object): # If its a pull producer then we need to explicitly ask for # more stuff. if not self.streaming and self._producer: - reactor.callFromThread(self._producer.resumeProducing) + self._reactor.callFromThread(self._producer.resumeProducing) except Exception as e: self._write_exception = e raise |