summary refs log tree commit diff
path: root/synapse/media
diff options
context:
space:
mode:
authorEric Eastwood <erice@element.io>2023-07-10 17:23:11 -0500
committerGitHub <noreply@github.com>2023-07-10 17:23:11 -0500
commit2328e90fbb65216ff84a08834d3cd99573bccdff (patch)
treed8c969d3edaac110b571ff8c17f3768f37674f95 /synapse/media
parentDrop debian buster (#15893) (diff)
downloadsynapse-2328e90fbb65216ff84a08834d3cd99573bccdff.tar.xz
Make the media `/upload` tracing less ambiguous (#15888)
A lot of the functions have the same name in this space like `store_file`,
and we also do it multiple times for different reasons (main media repo,
other storage providers, thumbnails, etc) so it's good to differentiate
them so your head doesn't explode.

Follow-up to https://github.com/matrix-org/synapse/pull/15850

Tracing instrumentation to media `/upload` code paths to investigate https://github.com/matrix-org/synapse/issues/15841
Diffstat (limited to 'synapse/media')
-rw-r--r--synapse/media/media_storage.py78
-rw-r--r--synapse/media/storage_provider.py25
2 files changed, 61 insertions, 42 deletions
diff --git a/synapse/media/media_storage.py b/synapse/media/media_storage.py
index eebcbc48e8..a17ccb3d80 100644
--- a/synapse/media/media_storage.py
+++ b/synapse/media/media_storage.py
@@ -38,7 +38,7 @@ from twisted.protocols.basic import FileSender
 
 from synapse.api.errors import NotFoundError
 from synapse.logging.context import defer_to_thread, make_deferred_yieldable
-from synapse.logging.opentracing import trace
+from synapse.logging.opentracing import start_active_span, trace, trace_with_opname
 from synapse.util import Clock
 from synapse.util.file_consumer import BackgroundFileConsumer
 
@@ -77,7 +77,7 @@ class MediaStorage:
         self._spam_checker_module_callbacks = hs.get_module_api_callbacks().spam_checker
         self.clock = hs.get_clock()
 
-    @trace
+    @trace_with_opname("MediaStorage.store_file")
     async def store_file(self, source: IO, file_info: FileInfo) -> str:
         """Write `source` to the on disk media store, and also any other
         configured storage providers
@@ -91,18 +91,19 @@ class MediaStorage:
         """
 
         with self.store_into_file(file_info) as (f, fname, finish_cb):
-            # Write to the main repository
+            # Write to the main media repository
             await self.write_to_file(source, f)
+            # Write to the other storage providers
             await finish_cb()
 
         return fname
 
-    @trace
+    @trace_with_opname("MediaStorage.write_to_file")
     async def write_to_file(self, source: IO, output: IO) -> None:
         """Asynchronously write the `source` to `output`."""
         await defer_to_thread(self.reactor, _write_file_synchronously, source, output)
 
-    @trace
+    @trace_with_opname("MediaStorage.store_into_file")
     @contextlib.contextmanager
     def store_into_file(
         self, file_info: FileInfo
@@ -117,9 +118,9 @@ class MediaStorage:
         fname can be used to read the contents from after upload, e.g. to
         generate thumbnails.
 
-        finish_cb must be called and waited on after the file has been
-        successfully been written to. Should not be called if there was an
-        error.
+        finish_cb must be called and waited on after the file has been successfully been
+        written to. Should not be called if there was an error. Checks for spam and
+        stores the file into the configured storage providers.
 
         Args:
             file_info: Info about the file to store
@@ -139,35 +140,48 @@ class MediaStorage:
 
         finished_called = [False]
 
+        main_media_repo_write_trace_scope = start_active_span(
+            "writing to main media repo"
+        )
+        main_media_repo_write_trace_scope.__enter__()
+
         try:
             with open(fname, "wb") as f:
 
                 async def finish() -> None:
-                    # Ensure that all writes have been flushed and close the
-                    # file.
-                    f.flush()
-                    f.close()
-
-                    spam_check = await self._spam_checker_module_callbacks.check_media_file_for_spam(
-                        ReadableFileWrapper(self.clock, fname), file_info
-                    )
-                    if spam_check != self._spam_checker_module_callbacks.NOT_SPAM:
-                        logger.info("Blocking media due to spam checker")
-                        # Note that we'll delete the stored media, due to the
-                        # try/except below. The media also won't be stored in
-                        # the DB.
-                        # We currently ignore any additional field returned by
-                        # the spam-check API.
-                        raise SpamMediaException(errcode=spam_check[0])
-
-                    for provider in self.storage_providers:
-                        await provider.store_file(path, file_info)
-
-                    finished_called[0] = True
+                    # When someone calls finish, we assume they are done writing to the main media repo
+                    main_media_repo_write_trace_scope.__exit__(None, None, None)
+
+                    with start_active_span("writing to other storage providers"):
+                        # Ensure that all writes have been flushed and close the
+                        # file.
+                        f.flush()
+                        f.close()
+
+                        spam_check = await self._spam_checker_module_callbacks.check_media_file_for_spam(
+                            ReadableFileWrapper(self.clock, fname), file_info
+                        )
+                        if spam_check != self._spam_checker_module_callbacks.NOT_SPAM:
+                            logger.info("Blocking media due to spam checker")
+                            # Note that we'll delete the stored media, due to the
+                            # try/except below. The media also won't be stored in
+                            # the DB.
+                            # We currently ignore any additional field returned by
+                            # the spam-check API.
+                            raise SpamMediaException(errcode=spam_check[0])
+
+                        for provider in self.storage_providers:
+                            with start_active_span(str(provider)):
+                                await provider.store_file(path, file_info)
+
+                        finished_called[0] = True
 
                 yield f, fname, finish
         except Exception as e:
             try:
+                main_media_repo_write_trace_scope.__exit__(
+                    type(e), None, e.__traceback__
+                )
                 os.remove(fname)
             except Exception:
                 pass
@@ -175,7 +189,11 @@ class MediaStorage:
             raise e from None
 
         if not finished_called:
-            raise Exception("Finished callback not called")
+            exc = Exception("Finished callback not called")
+            main_media_repo_write_trace_scope.__exit__(
+                type(exc), None, exc.__traceback__
+            )
+            raise exc
 
     async def fetch_media(self, file_info: FileInfo) -> Optional[Responder]:
         """Attempts to fetch media described by file_info from the local cache
diff --git a/synapse/media/storage_provider.py b/synapse/media/storage_provider.py
index 0aea3a7a0d..70a45cfd5b 100644
--- a/synapse/media/storage_provider.py
+++ b/synapse/media/storage_provider.py
@@ -20,7 +20,7 @@ from typing import TYPE_CHECKING, Callable, Optional
 
 from synapse.config._base import Config
 from synapse.logging.context import defer_to_thread, run_in_background
-from synapse.logging.opentracing import trace
+from synapse.logging.opentracing import start_active_span, trace_with_opname
 from synapse.util.async_helpers import maybe_awaitable
 
 from ._base import FileInfo, Responder
@@ -87,7 +87,7 @@ class StorageProviderWrapper(StorageProvider):
     def __str__(self) -> str:
         return "StorageProviderWrapper[%s]" % (self.backend,)
 
-    @trace
+    @trace_with_opname("StorageProviderWrapper.store_file")
     async def store_file(self, path: str, file_info: FileInfo) -> None:
         if not file_info.server_name and not self.store_local:
             return None
@@ -116,7 +116,7 @@ class StorageProviderWrapper(StorageProvider):
 
             run_in_background(store)
 
-    @trace
+    @trace_with_opname("StorageProviderWrapper.fetch")
     async def fetch(self, path: str, file_info: FileInfo) -> Optional[Responder]:
         if file_info.url_cache:
             # Files in the URL preview cache definitely aren't stored here,
@@ -144,7 +144,7 @@ class FileStorageProviderBackend(StorageProvider):
     def __str__(self) -> str:
         return "FileStorageProviderBackend[%s]" % (self.base_directory,)
 
-    @trace
+    @trace_with_opname("FileStorageProviderBackend.store_file")
     async def store_file(self, path: str, file_info: FileInfo) -> None:
         """See StorageProvider.store_file"""
 
@@ -156,14 +156,15 @@ class FileStorageProviderBackend(StorageProvider):
 
         # mypy needs help inferring the type of the second parameter, which is generic
         shutil_copyfile: Callable[[str, str], str] = shutil.copyfile
-        await defer_to_thread(
-            self.hs.get_reactor(),
-            shutil_copyfile,
-            primary_fname,
-            backup_fname,
-        )
-
-    @trace
+        with start_active_span("shutil_copyfile"):
+            await defer_to_thread(
+                self.hs.get_reactor(),
+                shutil_copyfile,
+                primary_fname,
+                backup_fname,
+            )
+
+    @trace_with_opname("FileStorageProviderBackend.fetch")
     async def fetch(self, path: str, file_info: FileInfo) -> Optional[Responder]:
         """See StorageProvider.fetch"""