summary refs log tree commit diff
path: root/synapse/util/file_consumer.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-01-18 11:10:12 +0000
committerErik Johnston <erik@matrix.org>2018-01-18 11:57:54 +0000
commit2f18a2647b6b9cc07c3cc5f2bec3e1bab67d0eea (patch)
tree6374060a7fe82866c613b34c44e2f8f5448b78a8 /synapse/util/file_consumer.py
parentEnsure we registerProducer isn't called twice (diff)
downloadsynapse-2f18a2647b6b9cc07c3cc5f2bec3e1bab67d0eea.tar.xz
Make all fields private
Diffstat (limited to '')
-rw-r--r--synapse/util/file_consumer.py62
1 files changed, 31 insertions, 31 deletions
diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py
index d7bbb0aeb8..d19d48665c 100644
--- a/synapse/util/file_consumer.py
+++ b/synapse/util/file_consumer.py
@@ -35,24 +35,24 @@ class BackgroundFileConsumer(object):
     _RESUME_ON_QUEUE_SIZE = 2
 
     def __init__(self, file_obj):
-        self.file_obj = file_obj
+        self._file_obj = file_obj
 
         # Producer we're registered with
-        self.producer = None
+        self._producer = None
 
         # True if PushProducer, false if PullProducer
         self.streaming = False
 
         # For PushProducers, indicates whether we've paused the producer and
         # need to call resumeProducing before we get more data.
-        self.paused_producer = False
+        self._paused_producer = False
 
         # Queue of slices of bytes to be written. When producer calls
         # unregister a final None is sent.
-        self.bytes_queue = Queue.Queue()
+        self._bytes_queue = Queue.Queue()
 
         # Deferred that is resolved when finished writing
-        self.finished_deferred = None
+        self._finished_deferred = None
 
         # If the _writer thread throws an exception it gets stored here.
         self._write_exception = None
@@ -69,21 +69,21 @@ class BackgroundFileConsumer(object):
             streaming (bool): True if push based producer, False if pull
                 based.
         """
-        if self.producer:
+        if self._producer:
             raise Exception("registerProducer called twice")
 
-        self.producer = producer
+        self._producer = producer
         self.streaming = streaming
-        self.finished_deferred = threads.deferToThread(self._writer)
+        self._finished_deferred = threads.deferToThread(self._writer)
         if not streaming:
-            self.producer.resumeProducing()
+            self._producer.resumeProducing()
 
     def unregisterProducer(self):
         """Part of IProducer interface
         """
-        self.producer = None
-        if not self.finished_deferred.called:
-            self.bytes_queue.put_nowait(None)
+        self._producer = None
+        if not self._finished_deferred.called:
+            self._bytes_queue.put_nowait(None)
 
     def write(self, bytes):
         """Part of IProducer interface
@@ -91,65 +91,65 @@ class BackgroundFileConsumer(object):
         if self._write_exception:
             raise self._write_exception
 
-        if self.finished_deferred.called:
+        if self._finished_deferred.called:
             raise Exception("consumer has closed")
 
-        self.bytes_queue.put_nowait(bytes)
+        self._bytes_queue.put_nowait(bytes)
 
         # If this is a PushProducer and the queue is getting behind
         # then we pause the producer.
-        if self.streaming and self.bytes_queue.qsize() >= self._PAUSE_ON_QUEUE_SIZE:
-            self.paused_producer = True
-            self.producer.pauseProducing()
+        if self.streaming and self._bytes_queue.qsize() >= self._PAUSE_ON_QUEUE_SIZE:
+            self._paused_producer = True
+            self._producer.pauseProducing()
 
     def _writer(self):
         """This is run in a background thread to write to the file.
         """
         try:
-            while self.producer or not self.bytes_queue.empty():
+            while self._producer or not self._bytes_queue.empty():
                 # If we've paused the producer check if we should resume the
                 # producer.
-                if self.producer and self.paused_producer:
-                    if self.bytes_queue.qsize() <= self._RESUME_ON_QUEUE_SIZE:
+                if self._producer and self._paused_producer:
+                    if self._bytes_queue.qsize() <= self._RESUME_ON_QUEUE_SIZE:
                         reactor.callFromThread(self._resume_paused_producer)
 
-                if self._notify_empty_deferred and self.bytes_queue.empty():
+                if self._notify_empty_deferred and self._bytes_queue.empty():
                     reactor.callFromThread(self._notify_empty)
 
-                bytes = self.bytes_queue.get()
+                bytes = self._bytes_queue.get()
 
                 # If we get a None (or empty list) then that's a signal used
                 # to indicate we should check if we should stop.
                 if bytes:
-                    self.file_obj.write(bytes)
+                    self._file_obj.write(bytes)
 
                 # If its a pull producer then we need to explicitly ask for
                 # more stuff.
-                if not self.streaming and self.producer:
-                    reactor.callFromThread(self.producer.resumeProducing)
+                if not self.streaming and self._producer:
+                    reactor.callFromThread(self._producer.resumeProducing)
         except Exception as e:
             self._write_exception = e
             raise
         finally:
-            self.file_obj.close()
+            self._file_obj.close()
 
     def wait(self):
         """Returns a deferred that resolves when finished writing to file
         """
-        return make_deferred_yieldable(self.finished_deferred)
+        return make_deferred_yieldable(self._finished_deferred)
 
     def _resume_paused_producer(self):
         """Gets called if we should resume producing after being paused
         """
-        if self.paused_producer and self.producer:
-            self.paused_producer = False
-            self.producer.resumeProducing()
+        if self._paused_producer and self._producer:
+            self._paused_producer = False
+            self._producer.resumeProducing()
 
     def _notify_empty(self):
         """Called when the _writer thread thinks the queue may be empty and
         we should notify anything waiting on `wait_for_writes`
         """
-        if self._notify_empty_deferred and self.bytes_queue.empty():
+        if self._notify_empty_deferred and self._bytes_queue.empty():
             d = self._notify_empty_deferred
             self._notify_empty_deferred = None
             d.callback(None)