diff options
author | Amber Brown <hawkowl@atleastfornow.net> | 2018-09-03 21:08:35 +1000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-09-03 21:08:35 +1000 |
commit | 4fc4b881c58fd638db5f4dac0863721111b67af0 (patch) | |
tree | cc1604f5e3b4e0a263e0e11a55b62ef4006a64a1 /synapse/util/file_consumer.py | |
parent | The project `matrix-synapse-auto-deploy` does not seem to be maintained anymore. (diff) | |
parent | Merge pull request #3777 from matrix-org/neilj/fix_register_user_registration (diff) | |
download | synapse-4fc4b881c58fd638db5f4dac0863721111b67af0.tar.xz |
Merge branch 'develop' into develop
Diffstat (limited to 'synapse/util/file_consumer.py')
-rw-r--r-- | synapse/util/file_consumer.py | 20 |
1 files changed, 13 insertions, 7 deletions
diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py index 3380970e4e..629ed44149 100644 --- a/synapse/util/file_consumer.py +++ b/synapse/util/file_consumer.py @@ -13,11 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import threads, reactor +from six.moves import queue -from synapse.util.logcontext import make_deferred_yieldable, run_in_background +from twisted.internet import threads -from six.moves import queue +from synapse.util.logcontext import make_deferred_yieldable, run_in_background class BackgroundFileConsumer(object): @@ -27,6 +27,7 @@ class BackgroundFileConsumer(object): Args: file_obj (file): The file like object to write to. Closed when finished. + reactor (twisted.internet.reactor): the Twisted reactor to use """ # For PushProducers pause if we have this many unwritten slices @@ -34,9 +35,11 @@ class BackgroundFileConsumer(object): # And resume once the size of the queue is less than this _RESUME_ON_QUEUE_SIZE = 2 - def __init__(self, file_obj): + def __init__(self, file_obj, reactor): self._file_obj = file_obj + self._reactor = reactor + # Producer we're registered with self._producer = None @@ -71,7 +74,10 @@ class BackgroundFileConsumer(object): self._producer = producer self.streaming = streaming self._finished_deferred = run_in_background( - threads.deferToThread, self._writer + threads.deferToThreadPool, + self._reactor, + self._reactor.getThreadPool(), + self._writer, ) if not streaming: self._producer.resumeProducing() @@ -109,7 +115,7 @@ class BackgroundFileConsumer(object): # producer. if self._producer and self._paused_producer: if self._bytes_queue.qsize() <= self._RESUME_ON_QUEUE_SIZE: - reactor.callFromThread(self._resume_paused_producer) + self._reactor.callFromThread(self._resume_paused_producer) bytes = self._bytes_queue.get() @@ -121,7 +127,7 @@ class BackgroundFileConsumer(object): # If its a pull producer then we need to explicitly ask for # more stuff. if not self.streaming and self._producer: - reactor.callFromThread(self._producer.resumeProducing) + self._reactor.callFromThread(self._producer.resumeProducing) except Exception as e: self._write_exception = e raise |