summary refs log tree commit diff
path: root/synapse/util/file_consumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/util/file_consumer.py')
-rw-r--r--synapse/util/file_consumer.py26
1 files changed, 1 insertions, 25 deletions
diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py
index d19d48665c..3241035247 100644
--- a/synapse/util/file_consumer.py
+++ b/synapse/util/file_consumer.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer, threads, reactor
+from twisted.internet import threads, reactor
 
 from synapse.util.logcontext import make_deferred_yieldable
 
@@ -57,10 +57,6 @@ class BackgroundFileConsumer(object):
         # If the _writer thread throws an exception it gets stored here.
         self._write_exception = None
 
-        # A deferred that gets resolved when the bytes_queue gets empty.
-        # Mainly used for tests.
-        self._notify_empty_deferred = None
-
     def registerProducer(self, producer, streaming):
         """Part of IConsumer interface
 
@@ -113,9 +109,6 @@ class BackgroundFileConsumer(object):
                     if self._bytes_queue.qsize() <= self._RESUME_ON_QUEUE_SIZE:
                         reactor.callFromThread(self._resume_paused_producer)
 
-                if self._notify_empty_deferred and self._bytes_queue.empty():
-                    reactor.callFromThread(self._notify_empty)
-
                 bytes = self._bytes_queue.get()
 
                 # If we get a None (or empty list) then that's a signal used
@@ -144,20 +137,3 @@ class BackgroundFileConsumer(object):
         if self._paused_producer and self._producer:
             self._paused_producer = False
             self._producer.resumeProducing()
-
-    def _notify_empty(self):
-        """Called when the _writer thread thinks the queue may be empty and
-        we should notify anything waiting on `wait_for_writes`
-        """
-        if self._notify_empty_deferred and self._bytes_queue.empty():
-            d = self._notify_empty_deferred
-            self._notify_empty_deferred = None
-            d.callback(None)
-
-    def wait_for_writes(self):
-        """Wait for the write queue to be empty and for writes to have
-        finished. This is mainly useful for tests.
-        """
-        if not self._notify_empty_deferred:
-            self._notify_empty_deferred = defer.Deferred()
-        return self._notify_empty_deferred