diff --git a/synapse/media/media_storage.py b/synapse/media/media_storage.py
index baf947b873..731feedbe2 100644
--- a/synapse/media/media_storage.py
+++ b/synapse/media/media_storage.py
@@ -400,6 +400,10 @@ class ReadableFileWrapper:
@implementer(interfaces.IConsumer)
@implementer(interfaces.IPushProducer)
class MultipartFileConsumer:
+ """Wraps a given consumer so that any data that gets written to it gets
+ converted to a multipart format.
+ """
+
def __init__(
self,
clock: Clock,
@@ -415,11 +419,16 @@ class MultipartFileConsumer:
self.file_content_type = file_content_type
self.boundary = uuid4().hex.encode("ascii")
+ # The producer that registered with us, and if its a push or pull
+ # producer.
self.producer: Optional["interfaces.IProducer"] = None
self.streaming = Optional[None]
+ # Whether the wrapped consumer has asked us to pause.
self.paused = False
+ ### IConsumer APIs ###
+
def registerProducer(
self, producer: "interfaces.IProducer", streaming: bool
) -> None:
@@ -450,8 +459,6 @@ class MultipartFileConsumer:
self.wrapped_consumer.registerProducer(self, True)
- run_in_background(self._resumeProducingRepeatedly)
-
def unregisterProducer(self) -> None:
"""
Stop consuming data from a producer, without disconnecting.
@@ -491,6 +498,8 @@ class MultipartFileConsumer:
self.wrapped_consumer.write(data)
+ ### IPushProducer APIs ###
+
def stopProducing(self) -> None:
"""
Stop producing data.
@@ -530,9 +539,12 @@ class MultipartFileConsumer:
if self.streaming:
cast("interfaces.IPushProducer", self.producer).resumeProducing()
- return
+ else:
+ # If the producer is not a streaming producer we need to start
+ # repeatedly calling `resumeProducing` in a loop.
+ run_in_background(self._resumeProducingRepeatedly)
- run_in_background(self._resumeProducingRepeatedly)
+ ### Internal APIs. ###
async def _resumeProducingRepeatedly(self) -> None:
assert self.producer is not None
|