summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2024-06-17 18:07:39 +0100
committerErik Johnston <erik@matrix.org>2024-06-17 18:07:39 +0100
commitcdbe676bc4eb89deb70e1e95d840bf60b6c91e8a (patch)
tree632aeda06fe329f1a7a0f36e52595ddc599a5a57 /synapse
parentWe need to support 3rd party storage providers (diff)
downloadsynapse-cdbe676bc4eb89deb70e1e95d840bf60b6c91e8a.tar.xz
Comments
Diffstat (limited to 'synapse')
-rw-r--r--synapse/media/media_storage.py20
1 files changed, 16 insertions, 4 deletions
diff --git a/synapse/media/media_storage.py b/synapse/media/media_storage.py
index baf947b873..731feedbe2 100644
--- a/synapse/media/media_storage.py
+++ b/synapse/media/media_storage.py
@@ -400,6 +400,10 @@ class ReadableFileWrapper:
 @implementer(interfaces.IConsumer)
 @implementer(interfaces.IPushProducer)
 class MultipartFileConsumer:
+    """Wraps a given consumer so that any data that gets written to it gets
+    converted to a multipart format.
+    """
+
     def __init__(
         self,
         clock: Clock,
@@ -415,11 +419,16 @@ class MultipartFileConsumer:
         self.file_content_type = file_content_type
         self.boundary = uuid4().hex.encode("ascii")
 
+        # The producer that registered with us, and if its a push or pull
+        # producer.
         self.producer: Optional["interfaces.IProducer"] = None
         self.streaming = Optional[None]
 
+        # Whether the wrapped consumer has asked us to pause.
         self.paused = False
 
+    ### IConsumer APIs ###
+
     def registerProducer(
         self, producer: "interfaces.IProducer", streaming: bool
     ) -> None:
@@ -450,8 +459,6 @@ class MultipartFileConsumer:
 
         self.wrapped_consumer.registerProducer(self, True)
 
-        run_in_background(self._resumeProducingRepeatedly)
-
     def unregisterProducer(self) -> None:
         """
         Stop consuming data from a producer, without disconnecting.
@@ -491,6 +498,8 @@ class MultipartFileConsumer:
 
         self.wrapped_consumer.write(data)
 
+    ### IPushProducer APIs ###
+
     def stopProducing(self) -> None:
         """
         Stop producing data.
@@ -530,9 +539,12 @@ class MultipartFileConsumer:
 
         if self.streaming:
             cast("interfaces.IPushProducer", self.producer).resumeProducing()
-            return
+        else:
+            # If the producer is not a streaming producer we need to start
+            # repeatedly calling  `resumeProducing` in a loop.
+            run_in_background(self._resumeProducingRepeatedly)
 
-        run_in_background(self._resumeProducingRepeatedly)
+    ### Internal APIs. ###
 
     async def _resumeProducingRepeatedly(self) -> None:
         assert self.producer is not None