diff options
author | Matthew Hodgson <matthew@matrix.org> | 2018-07-19 11:26:04 +0100 |
---|---|---|
committer | Matthew Hodgson <matthew@matrix.org> | 2018-07-19 11:26:04 +0100 |
commit | be3adfc331ef7f19b2e44b17cd06e463bff09f3a (patch) | |
tree | 2e565f68ef2154e3882704916eae3b1e8f0c9f3c /synapse/util/file_consumer.py | |
parent | Merge branch 'develop' into matthew/filter_members (diff) | |
parent | revert 00bc979 (diff) | |
download | synapse-be3adfc331ef7f19b2e44b17cd06e463bff09f3a.tar.xz |
merge develop pydoc for _get_state_for_groups
Diffstat (limited to 'synapse/util/file_consumer.py')
-rw-r--r-- | synapse/util/file_consumer.py | 20 |
1 files changed, 13 insertions, 7 deletions
diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py index 3380970e4e..629ed44149 100644 --- a/synapse/util/file_consumer.py +++ b/synapse/util/file_consumer.py @@ -13,11 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import threads, reactor +from six.moves import queue -from synapse.util.logcontext import make_deferred_yieldable, run_in_background +from twisted.internet import threads -from six.moves import queue +from synapse.util.logcontext import make_deferred_yieldable, run_in_background class BackgroundFileConsumer(object): @@ -27,6 +27,7 @@ class BackgroundFileConsumer(object): Args: file_obj (file): The file like object to write to. Closed when finished. + reactor (twisted.internet.reactor): the Twisted reactor to use """ # For PushProducers pause if we have this many unwritten slices @@ -34,9 +35,11 @@ class BackgroundFileConsumer(object): # And resume once the size of the queue is less than this _RESUME_ON_QUEUE_SIZE = 2 - def __init__(self, file_obj): + def __init__(self, file_obj, reactor): self._file_obj = file_obj + self._reactor = reactor + # Producer we're registered with self._producer = None @@ -71,7 +74,10 @@ class BackgroundFileConsumer(object): self._producer = producer self.streaming = streaming self._finished_deferred = run_in_background( - threads.deferToThread, self._writer + threads.deferToThreadPool, + self._reactor, + self._reactor.getThreadPool(), + self._writer, ) if not streaming: self._producer.resumeProducing() @@ -109,7 +115,7 @@ class BackgroundFileConsumer(object): # producer. if self._producer and self._paused_producer: if self._bytes_queue.qsize() <= self._RESUME_ON_QUEUE_SIZE: - reactor.callFromThread(self._resume_paused_producer) + self._reactor.callFromThread(self._resume_paused_producer) bytes = self._bytes_queue.get() @@ -121,7 +127,7 @@ class BackgroundFileConsumer(object): # If its a pull producer then we need to explicitly ask for # more stuff. if not self.streaming and self._producer: - reactor.callFromThread(self._producer.resumeProducing) + self._reactor.callFromThread(self._producer.resumeProducing) except Exception as e: self._write_exception = e raise |