diff options
author | Erik Johnston <erik@matrix.org> | 2024-06-17 18:07:39 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2024-06-17 18:07:39 +0100 |
commit | cdbe676bc4eb89deb70e1e95d840bf60b6c91e8a (patch) | |
tree | 632aeda06fe329f1a7a0f36e52595ddc599a5a57 /synapse | |
parent | We need to support 3rd party storage providers (diff) | |
download | synapse-cdbe676bc4eb89deb70e1e95d840bf60b6c91e8a.tar.xz |
Comments
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/media/media_storage.py | 20 |
1 files changed, 16 insertions, 4 deletions
diff --git a/synapse/media/media_storage.py b/synapse/media/media_storage.py index baf947b873..731feedbe2 100644 --- a/synapse/media/media_storage.py +++ b/synapse/media/media_storage.py @@ -400,6 +400,10 @@ class ReadableFileWrapper: @implementer(interfaces.IConsumer) @implementer(interfaces.IPushProducer) class MultipartFileConsumer: + """Wraps a given consumer so that any data that gets written to it gets + converted to a multipart format. + """ + def __init__( self, clock: Clock, @@ -415,11 +419,16 @@ class MultipartFileConsumer: self.file_content_type = file_content_type self.boundary = uuid4().hex.encode("ascii") + # The producer that registered with us, and if its a push or pull + # producer. self.producer: Optional["interfaces.IProducer"] = None self.streaming = Optional[None] + # Whether the wrapped consumer has asked us to pause. self.paused = False + ### IConsumer APIs ### + def registerProducer( self, producer: "interfaces.IProducer", streaming: bool ) -> None: @@ -450,8 +459,6 @@ class MultipartFileConsumer: self.wrapped_consumer.registerProducer(self, True) - run_in_background(self._resumeProducingRepeatedly) - def unregisterProducer(self) -> None: """ Stop consuming data from a producer, without disconnecting. @@ -491,6 +498,8 @@ class MultipartFileConsumer: self.wrapped_consumer.write(data) + ### IPushProducer APIs ### + def stopProducing(self) -> None: """ Stop producing data. @@ -530,9 +539,12 @@ class MultipartFileConsumer: if self.streaming: cast("interfaces.IPushProducer", self.producer).resumeProducing() - return + else: + # If the producer is not a streaming producer we need to start + # repeatedly calling `resumeProducing` in a loop. + run_in_background(self._resumeProducingRepeatedly) - run_in_background(self._resumeProducingRepeatedly) + ### Internal APIs. ### async def _resumeProducingRepeatedly(self) -> None: assert self.producer is not None |