diff --git a/changelog.d/15888.misc b/changelog.d/15888.misc
new file mode 100644
index 0000000000..0e49ab23fe
--- /dev/null
+++ b/changelog.d/15888.misc
@@ -0,0 +1 @@
+Add tracing to media `/upload` code paths.
diff --git a/synapse/media/media_storage.py b/synapse/media/media_storage.py
index eebcbc48e8..a17ccb3d80 100644
--- a/synapse/media/media_storage.py
+++ b/synapse/media/media_storage.py
@@ -38,7 +38,7 @@ from twisted.protocols.basic import FileSender
from synapse.api.errors import NotFoundError
from synapse.logging.context import defer_to_thread, make_deferred_yieldable
-from synapse.logging.opentracing import trace
+from synapse.logging.opentracing import start_active_span, trace, trace_with_opname
from synapse.util import Clock
from synapse.util.file_consumer import BackgroundFileConsumer
@@ -77,7 +77,7 @@ class MediaStorage:
self._spam_checker_module_callbacks = hs.get_module_api_callbacks().spam_checker
self.clock = hs.get_clock()
- @trace
+ @trace_with_opname("MediaStorage.store_file")
async def store_file(self, source: IO, file_info: FileInfo) -> str:
"""Write `source` to the on disk media store, and also any other
configured storage providers
@@ -91,18 +91,19 @@ class MediaStorage:
"""
with self.store_into_file(file_info) as (f, fname, finish_cb):
- # Write to the main repository
+ # Write to the main media repository
await self.write_to_file(source, f)
+ # Write to the other storage providers
await finish_cb()
return fname
- @trace
+ @trace_with_opname("MediaStorage.write_to_file")
async def write_to_file(self, source: IO, output: IO) -> None:
"""Asynchronously write the `source` to `output`."""
await defer_to_thread(self.reactor, _write_file_synchronously, source, output)
- @trace
+ @trace_with_opname("MediaStorage.store_into_file")
@contextlib.contextmanager
def store_into_file(
self, file_info: FileInfo
@@ -117,9 +118,9 @@ class MediaStorage:
fname can be used to read the contents from after upload, e.g. to
generate thumbnails.
- finish_cb must be called and waited on after the file has been
- successfully been written to. Should not be called if there was an
- error.
+ finish_cb must be called and waited on after the file has been successfully been
+ written to. Should not be called if there was an error. Checks for spam and
+ stores the file into the configured storage providers.
Args:
file_info: Info about the file to store
@@ -139,35 +140,48 @@ class MediaStorage:
finished_called = [False]
+ main_media_repo_write_trace_scope = start_active_span(
+ "writing to main media repo"
+ )
+ main_media_repo_write_trace_scope.__enter__()
+
try:
with open(fname, "wb") as f:
async def finish() -> None:
- # Ensure that all writes have been flushed and close the
- # file.
- f.flush()
- f.close()
-
- spam_check = await self._spam_checker_module_callbacks.check_media_file_for_spam(
- ReadableFileWrapper(self.clock, fname), file_info
- )
- if spam_check != self._spam_checker_module_callbacks.NOT_SPAM:
- logger.info("Blocking media due to spam checker")
- # Note that we'll delete the stored media, due to the
- # try/except below. The media also won't be stored in
- # the DB.
- # We currently ignore any additional field returned by
- # the spam-check API.
- raise SpamMediaException(errcode=spam_check[0])
-
- for provider in self.storage_providers:
- await provider.store_file(path, file_info)
-
- finished_called[0] = True
+ # When someone calls finish, we assume they are done writing to the main media repo
+ main_media_repo_write_trace_scope.__exit__(None, None, None)
+
+ with start_active_span("writing to other storage providers"):
+ # Ensure that all writes have been flushed and close the
+ # file.
+ f.flush()
+ f.close()
+
+ spam_check = await self._spam_checker_module_callbacks.check_media_file_for_spam(
+ ReadableFileWrapper(self.clock, fname), file_info
+ )
+ if spam_check != self._spam_checker_module_callbacks.NOT_SPAM:
+ logger.info("Blocking media due to spam checker")
+ # Note that we'll delete the stored media, due to the
+ # try/except below. The media also won't be stored in
+ # the DB.
+ # We currently ignore any additional field returned by
+ # the spam-check API.
+ raise SpamMediaException(errcode=spam_check[0])
+
+ for provider in self.storage_providers:
+ with start_active_span(str(provider)):
+ await provider.store_file(path, file_info)
+
+ finished_called[0] = True
yield f, fname, finish
except Exception as e:
try:
+ main_media_repo_write_trace_scope.__exit__(
+ type(e), None, e.__traceback__
+ )
os.remove(fname)
except Exception:
pass
@@ -175,7 +189,11 @@ class MediaStorage:
raise e from None
if not finished_called:
- raise Exception("Finished callback not called")
+ exc = Exception("Finished callback not called")
+ main_media_repo_write_trace_scope.__exit__(
+ type(exc), None, exc.__traceback__
+ )
+ raise exc
async def fetch_media(self, file_info: FileInfo) -> Optional[Responder]:
"""Attempts to fetch media described by file_info from the local cache
diff --git a/synapse/media/storage_provider.py b/synapse/media/storage_provider.py
index 0aea3a7a0d..70a45cfd5b 100644
--- a/synapse/media/storage_provider.py
+++ b/synapse/media/storage_provider.py
@@ -20,7 +20,7 @@ from typing import TYPE_CHECKING, Callable, Optional
from synapse.config._base import Config
from synapse.logging.context import defer_to_thread, run_in_background
-from synapse.logging.opentracing import trace
+from synapse.logging.opentracing import start_active_span, trace_with_opname
from synapse.util.async_helpers import maybe_awaitable
from ._base import FileInfo, Responder
@@ -87,7 +87,7 @@ class StorageProviderWrapper(StorageProvider):
def __str__(self) -> str:
return "StorageProviderWrapper[%s]" % (self.backend,)
- @trace
+ @trace_with_opname("StorageProviderWrapper.store_file")
async def store_file(self, path: str, file_info: FileInfo) -> None:
if not file_info.server_name and not self.store_local:
return None
@@ -116,7 +116,7 @@ class StorageProviderWrapper(StorageProvider):
run_in_background(store)
- @trace
+ @trace_with_opname("StorageProviderWrapper.fetch")
async def fetch(self, path: str, file_info: FileInfo) -> Optional[Responder]:
if file_info.url_cache:
# Files in the URL preview cache definitely aren't stored here,
@@ -144,7 +144,7 @@ class FileStorageProviderBackend(StorageProvider):
def __str__(self) -> str:
return "FileStorageProviderBackend[%s]" % (self.base_directory,)
- @trace
+ @trace_with_opname("FileStorageProviderBackend.store_file")
async def store_file(self, path: str, file_info: FileInfo) -> None:
"""See StorageProvider.store_file"""
@@ -156,14 +156,15 @@ class FileStorageProviderBackend(StorageProvider):
# mypy needs help inferring the type of the second parameter, which is generic
shutil_copyfile: Callable[[str, str], str] = shutil.copyfile
- await defer_to_thread(
- self.hs.get_reactor(),
- shutil_copyfile,
- primary_fname,
- backup_fname,
- )
-
- @trace
+ with start_active_span("shutil_copyfile"):
+ await defer_to_thread(
+ self.hs.get_reactor(),
+ shutil_copyfile,
+ primary_fname,
+ backup_fname,
+ )
+
+ @trace_with_opname("FileStorageProviderBackend.fetch")
async def fetch(self, path: str, file_info: FileInfo) -> Optional[Responder]:
"""See StorageProvider.fetch"""
|