diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/media/media_storage.py | 78 | ||||
-rw-r--r-- | synapse/media/storage_provider.py | 25 |
2 files changed, 61 insertions, 42 deletions
diff --git a/synapse/media/media_storage.py b/synapse/media/media_storage.py index eebcbc48e8..a17ccb3d80 100644 --- a/synapse/media/media_storage.py +++ b/synapse/media/media_storage.py @@ -38,7 +38,7 @@ from twisted.protocols.basic import FileSender from synapse.api.errors import NotFoundError from synapse.logging.context import defer_to_thread, make_deferred_yieldable -from synapse.logging.opentracing import trace +from synapse.logging.opentracing import start_active_span, trace, trace_with_opname from synapse.util import Clock from synapse.util.file_consumer import BackgroundFileConsumer @@ -77,7 +77,7 @@ class MediaStorage: self._spam_checker_module_callbacks = hs.get_module_api_callbacks().spam_checker self.clock = hs.get_clock() - @trace + @trace_with_opname("MediaStorage.store_file") async def store_file(self, source: IO, file_info: FileInfo) -> str: """Write `source` to the on disk media store, and also any other configured storage providers @@ -91,18 +91,19 @@ class MediaStorage: """ with self.store_into_file(file_info) as (f, fname, finish_cb): - # Write to the main repository + # Write to the main media repository await self.write_to_file(source, f) + # Write to the other storage providers await finish_cb() return fname - @trace + @trace_with_opname("MediaStorage.write_to_file") async def write_to_file(self, source: IO, output: IO) -> None: """Asynchronously write the `source` to `output`.""" await defer_to_thread(self.reactor, _write_file_synchronously, source, output) - @trace + @trace_with_opname("MediaStorage.store_into_file") @contextlib.contextmanager def store_into_file( self, file_info: FileInfo @@ -117,9 +118,9 @@ class MediaStorage: fname can be used to read the contents from after upload, e.g. to generate thumbnails. - finish_cb must be called and waited on after the file has been - successfully been written to. Should not be called if there was an - error. + finish_cb must be called and waited on after the file has been successfully been + written to. Should not be called if there was an error. Checks for spam and + stores the file into the configured storage providers. Args: file_info: Info about the file to store @@ -139,35 +140,48 @@ class MediaStorage: finished_called = [False] + main_media_repo_write_trace_scope = start_active_span( + "writing to main media repo" + ) + main_media_repo_write_trace_scope.__enter__() + try: with open(fname, "wb") as f: async def finish() -> None: - # Ensure that all writes have been flushed and close the - # file. - f.flush() - f.close() - - spam_check = await self._spam_checker_module_callbacks.check_media_file_for_spam( - ReadableFileWrapper(self.clock, fname), file_info - ) - if spam_check != self._spam_checker_module_callbacks.NOT_SPAM: - logger.info("Blocking media due to spam checker") - # Note that we'll delete the stored media, due to the - # try/except below. The media also won't be stored in - # the DB. - # We currently ignore any additional field returned by - # the spam-check API. - raise SpamMediaException(errcode=spam_check[0]) - - for provider in self.storage_providers: - await provider.store_file(path, file_info) - - finished_called[0] = True + # When someone calls finish, we assume they are done writing to the main media repo + main_media_repo_write_trace_scope.__exit__(None, None, None) + + with start_active_span("writing to other storage providers"): + # Ensure that all writes have been flushed and close the + # file. + f.flush() + f.close() + + spam_check = await self._spam_checker_module_callbacks.check_media_file_for_spam( + ReadableFileWrapper(self.clock, fname), file_info + ) + if spam_check != self._spam_checker_module_callbacks.NOT_SPAM: + logger.info("Blocking media due to spam checker") + # Note that we'll delete the stored media, due to the + # try/except below. The media also won't be stored in + # the DB. + # We currently ignore any additional field returned by + # the spam-check API. + raise SpamMediaException(errcode=spam_check[0]) + + for provider in self.storage_providers: + with start_active_span(str(provider)): + await provider.store_file(path, file_info) + + finished_called[0] = True yield f, fname, finish except Exception as e: try: + main_media_repo_write_trace_scope.__exit__( + type(e), None, e.__traceback__ + ) os.remove(fname) except Exception: pass @@ -175,7 +189,11 @@ class MediaStorage: raise e from None if not finished_called: - raise Exception("Finished callback not called") + exc = Exception("Finished callback not called") + main_media_repo_write_trace_scope.__exit__( + type(exc), None, exc.__traceback__ + ) + raise exc async def fetch_media(self, file_info: FileInfo) -> Optional[Responder]: """Attempts to fetch media described by file_info from the local cache diff --git a/synapse/media/storage_provider.py b/synapse/media/storage_provider.py index 0aea3a7a0d..70a45cfd5b 100644 --- a/synapse/media/storage_provider.py +++ b/synapse/media/storage_provider.py @@ -20,7 +20,7 @@ from typing import TYPE_CHECKING, Callable, Optional from synapse.config._base import Config from synapse.logging.context import defer_to_thread, run_in_background -from synapse.logging.opentracing import trace +from synapse.logging.opentracing import start_active_span, trace_with_opname from synapse.util.async_helpers import maybe_awaitable from ._base import FileInfo, Responder @@ -87,7 +87,7 @@ class StorageProviderWrapper(StorageProvider): def __str__(self) -> str: return "StorageProviderWrapper[%s]" % (self.backend,) - @trace + @trace_with_opname("StorageProviderWrapper.store_file") async def store_file(self, path: str, file_info: FileInfo) -> None: if not file_info.server_name and not self.store_local: return None @@ -116,7 +116,7 @@ class StorageProviderWrapper(StorageProvider): run_in_background(store) - @trace + @trace_with_opname("StorageProviderWrapper.fetch") async def fetch(self, path: str, file_info: FileInfo) -> Optional[Responder]: if file_info.url_cache: # Files in the URL preview cache definitely aren't stored here, @@ -144,7 +144,7 @@ class FileStorageProviderBackend(StorageProvider): def __str__(self) -> str: return "FileStorageProviderBackend[%s]" % (self.base_directory,) - @trace + @trace_with_opname("FileStorageProviderBackend.store_file") async def store_file(self, path: str, file_info: FileInfo) -> None: """See StorageProvider.store_file""" @@ -156,14 +156,15 @@ class FileStorageProviderBackend(StorageProvider): # mypy needs help inferring the type of the second parameter, which is generic shutil_copyfile: Callable[[str, str], str] = shutil.copyfile - await defer_to_thread( - self.hs.get_reactor(), - shutil_copyfile, - primary_fname, - backup_fname, - ) - - @trace + with start_active_span("shutil_copyfile"): + await defer_to_thread( + self.hs.get_reactor(), + shutil_copyfile, + primary_fname, + backup_fname, + ) + + @trace_with_opname("FileStorageProviderBackend.fetch") async def fetch(self, path: str, file_info: FileInfo) -> Optional[Responder]: """See StorageProvider.fetch""" |