summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--poetry.lock28
-rw-r--r--pyproject.toml2
-rw-r--r--synapse/handlers/federation_event.py28
-rw-r--r--synapse/logging/tracing.py85
-rw-r--r--synapse/storage/controllers/persist_events.py17
-rw-r--r--synapse/storage/databases/main/event_federation.py1
-rw-r--r--synapse/storage/databases/main/events.py2
-rw-r--r--synapse/storage/databases/main/events_worker.py5
8 files changed, 116 insertions, 52 deletions
diff --git a/poetry.lock b/poetry.lock

index a78ceb0ae5..5fa6db6bad 100644 --- a/poetry.lock +++ b/poetry.lock
@@ -1267,17 +1267,22 @@ telegram = ["requests"] [[package]] name = "treq" -version = "15.1.0" -description = "A requests-like API built on top of twisted.web's Agent" +version = "22.2.0" +description = "High-level Twisted HTTP Client API" category = "main" optional = false -python-versions = "*" +python-versions = ">=3.6" [package.dependencies] -pyOpenSSL = {version = ">=0.15.1", markers = "python_version > \"3.0\""} +attrs = "*" +hyperlink = ">=21.0.0" +incremental = "*" requests = ">=2.1.0" -service_identity = ">=14.0.0" -Twisted = {version = ">=15.5.0", markers = "python_version > \"3.0\""} +Twisted = {version = ">=18.7.0", extras = ["tls"]} + +[package.extras] +dev = ["pep8", "pyflakes", "httpbin (==0.5.0)"] +docs = ["sphinx (>=1.4.8)"] [[package]] name = "twine" @@ -1313,7 +1318,10 @@ attrs = ">=19.2.0" Automat = ">=0.8.0" constantly = ">=15.1" hyperlink = ">=17.1.1" +idna = {version = ">=2.4", optional = true, markers = "extra == \"tls\""} incremental = ">=21.3.0" +pyopenssl = {version = ">=21.0.0", optional = true, markers = "extra == \"tls\""} +service-identity = {version = ">=18.1.0", optional = true, markers = "extra == \"tls\""} twisted-iocpsupport = {version = ">=1.0.2,<2", markers = "platform_system == \"Windows\""} typing-extensions = ">=3.6.5" "zope.interface" = ">=4.4.2" @@ -1339,7 +1347,7 @@ windows_platform = ["pywin32 (!=226)", "cython-test-exception-raiser (>=1.0.2,<2 type = "git" url = "https://github.com/twisted/twisted.git" reference = "trunk" -resolved_reference = "ff2ea6181f7ca4887adbaf4158b2fe0891e17ef9" +resolved_reference = "b249a121afffefa3d9d9ab5b7a1315c5a1bb454d" [[package]] name = "twisted-iocpsupport" @@ -1615,7 +1623,7 @@ url_preview = ["lxml"] [metadata] lock-version = "1.1" python-versions = "^3.7.1" -content-hash = "c2cfbb348a49e088c404148c1b682fc5af5abb6278cf4479c6a51fff1656328c" +content-hash = "94116a568c9ab41174ec66c60cb0cb783e349bf586352b1fab08c714e5191665" [metadata.files] attrs = [ @@ -2642,8 +2650,8 @@ tqdm = [ {file = "tqdm-4.63.0.tar.gz", hash = "sha256:1d9835ede8e394bb8c9dcbffbca02d717217113adc679236873eeaac5bc0b3cd"}, ] treq = [ - {file = "treq-15.1.0-py2.py3-none-any.whl", hash = "sha256:1ad1ba89ddc62ae877084b290bd327755b13f6e7bc7076dc4d8e2efb701bfd63"}, - {file = "treq-15.1.0.tar.gz", hash = "sha256:425a47d5d52a993d51211028fb6ade252e5fbea094e878bb4b644096a7322de8"}, + {file = "treq-22.2.0-py3-none-any.whl", hash = "sha256:27d95b07c5c14be3e7b280416139b036087617ad5595be913b1f9b3ce981b9b2"}, + {file = "treq-22.2.0.tar.gz", hash = "sha256:df757e3f141fc782ede076a604521194ffcb40fa2645cf48e5a37060307f52ec"}, ] twine = [ {file = "twine-3.8.0-py3-none-any.whl", hash = "sha256:d0550fca9dc19f3d5e8eadfce0c227294df0a2a951251a4385797c8a6198b7c8"}, diff --git a/pyproject.toml b/pyproject.toml
index 6c81ab8efc..a6138957ca 100644 --- a/pyproject.toml +++ b/pyproject.toml
@@ -119,7 +119,7 @@ signedjson = "^1.1.0" service-identity = ">=18.1.0" # Twisted 18.9 introduces some logger improvements that the structured # logger utilises -twisted = {git = "https://github.com/twisted/twisted.git", rev = "trunk"} +twisted = {git = "https://github.com/twisted/twisted.git", rev = "trunk", extras = ["tls"]} treq = ">=15.1" # Twisted has required pyopenssl 16.0 since about Twisted 16.6. pyOpenSSL = ">=16.0.0" diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 740a04aad6..60436fe497 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py
@@ -59,7 +59,7 @@ from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.federation.federation_client import InvalidResponseError from synapse.logging.context import nested_logging_context -from synapse.logging.tracing import trace +from synapse.logging.tracing import trace, tag_args, set_attribute, SynapseTags from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet from synapse.replication.http.federation import ( @@ -410,6 +410,7 @@ class FederationEventHandler: prev_member_event, ) + @trace async def process_remote_join( self, origin: str, @@ -753,6 +754,7 @@ class FederationEventHandler: await self._process_pulled_event(origin, ev, backfilled=backfilled) @trace + @tag_args async def _process_pulled_event( self, origin: str, event: EventBase, backfilled: bool ) -> None: @@ -854,6 +856,7 @@ class FederationEventHandler: else: raise + @trace async def _compute_event_context_with_maybe_missing_prevs( self, dest: str, event: EventBase ) -> EventContext: @@ -970,6 +973,8 @@ class FederationEventHandler: event, state_ids_before_event=state_map, partial_state=partial_state ) + @trace + @tag_args async def _get_state_ids_after_missing_prev_event( self, destination: str, @@ -1112,6 +1117,8 @@ class FederationEventHandler: return state_map + @trace + @tag_args async def _get_state_and_persist( self, destination: str, room_id: str, event_id: str ) -> None: @@ -1133,6 +1140,7 @@ class FederationEventHandler: destination=destination, room_id=room_id, event_ids=(event_id,) ) + @trace async def _process_received_pdu( self, origin: str, @@ -1283,6 +1291,7 @@ class FederationEventHandler: except Exception: logger.exception("Failed to resync device for %s", sender) + @trace async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> None: """Handles backfilling the insertion event when we receive a marker event that points to one. @@ -1414,6 +1423,8 @@ class FederationEventHandler: return event_from_response + @trace + @tag_args async def _get_events_and_persist( self, destination: str, room_id: str, event_ids: Collection[str] ) -> None: @@ -1459,6 +1470,7 @@ class FederationEventHandler: logger.info("Fetched %i events of %i requested", len(events), len(event_ids)) await self._auth_and_persist_outliers(room_id, events) + @trace async def _auth_and_persist_outliers( self, room_id: str, events: Iterable[EventBase] ) -> None: @@ -1477,6 +1489,11 @@ class FederationEventHandler: """ event_map = {event.event_id: event for event in events} + set_attribute( + SynapseTags.FUNC_ARG_PREFIX + "event_ids", + event_map.keys(), + ) + # filter out any events we have already seen. This might happen because # the events were eagerly pushed to us (eg, during a room join), or because # another thread has raced against us since we decided to request the event. @@ -1593,6 +1610,7 @@ class FederationEventHandler: backfilled=True, ) + @trace async def _check_event_auth( self, origin: Optional[str], event: EventBase, context: EventContext ) -> None: @@ -1631,6 +1649,9 @@ class FederationEventHandler: claimed_auth_events = await self._load_or_fetch_auth_events_for_event( origin, event ) + set_attribute( + "claimed_auth_events", [ev.event_id for ev in claimed_auth_events] + ) # ... and check that the event passes auth at those auth events. # https://spec.matrix.org/v1.3/server-server-api/#checks-performed-on-receipt-of-a-pdu: @@ -1728,6 +1749,7 @@ class FederationEventHandler: ) context.rejected = RejectedReason.AUTH_ERROR + @trace async def _maybe_kick_guest_users(self, event: EventBase) -> None: if event.type != EventTypes.GuestAccess: return @@ -1935,6 +1957,8 @@ class FederationEventHandler: # instead we raise an AuthError, which will make the caller ignore it. raise AuthError(code=HTTPStatus.FORBIDDEN, msg="Auth events could not be found") + @trace + @tag_args async def _get_remote_auth_chain_for_event( self, destination: str, room_id: str, event_id: str ) -> None: @@ -1963,6 +1987,7 @@ class FederationEventHandler: await self._auth_and_persist_outliers(room_id, remote_auth_events) + @trace async def _run_push_actions_and_persist_event( self, event: EventBase, context: EventContext, backfilled: bool = False ) -> None: @@ -2008,6 +2033,7 @@ class FederationEventHandler: await self._store.remove_push_actions_from_staging(event.event_id) raise + @trace async def persist_events_and_notify( self, room_id: str, diff --git a/synapse/logging/tracing.py b/synapse/logging/tracing.py
index 2f4555cc2e..cb557a147d 100644 --- a/synapse/logging/tracing.py +++ b/synapse/logging/tracing.py
@@ -281,6 +281,16 @@ class SynapseTags: # The name of the external cache CACHE_NAME = "cache.name" + # Used to tag function arguments + # + # Tag a named arg. The name of the argument should be appended to this + # prefix + FUNC_ARG_PREFIX = "ARG." + # Tag extra variadic number of positional arguments (`def foo(first, second, *extras)`) + FUNC_ARGS = "args" + # Tag keyword args + FUNC_KWARGS = "kwargs" + class SynapseBaggage: FORCE_TRACING = "synapse-force-tracing" @@ -790,30 +800,28 @@ def extract_text_map( # Tracing decorators -def create_decorator( +def _custom_sync_async_decorator( func: Callable[P, R], - # TODO: What is the correct type for these `Any`? `P.args, P.kwargs` isn't allowed here wrapping_logic: Callable[[Callable[P, R], Any, Any], ContextManager[None]], ) -> Callable[P, R]: """ - Creates a decorator that is able to handle sync functions, async functions - (coroutines), and inlineDeferred from Twisted. - + Decorates a function that is sync or async (coroutines), or that returns a Twisted + `Deferred`. The custom business logic of the decorator goes in `wrapping_logic`. Example usage: ```py - # Decorator to time the functiona and log it out + # Decorator to time the function and log it out def duration(func: Callable[P, R]) -> Callable[P, R]: @contextlib.contextmanager - def _wrapping_logic(func: Callable[P, R], *args: P.args, **kwargs: P.kwargs): + def _wrapping_logic(func: Callable[P, R], *args: P.args, **kwargs: P.kwargs) -> Generator[None, None, None]: start_ts = time.time() - yield - end_ts = time.time() - duration = end_ts - start_ts - logger.info("%s took %s seconds", func.__name__, duration) - - return create_decorator(func, _wrapping_logic) + try: + yield + finally: + end_ts = time.time() + duration = end_ts - start_ts + logger.info("%s took %s seconds", func.__name__, duration) + return _custom_sync_async_decorator(func, _wrapping_logic) ``` - Args: func: The function to be decorated wrapping_logic: The business logic of your custom decorator. @@ -821,14 +829,18 @@ def create_decorator( before/after the function as desired. """ - @wraps(func) - async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R: - if inspect.iscoroutinefunction(func): + if inspect.iscoroutinefunction(func): + + @wraps(func) + async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R: with wrapping_logic(func, *args, **kwargs): - return await func(*args, **kwargs) - else: - # The other case here handles both sync functions and those - # decorated with inlineDeferred. + return await func(*args, **kwargs) # type: ignore[misc] + + else: + # The other case here handles both sync functions and those + # decorated with inlineDeferred. + @wraps(func) + def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R: scope = wrapping_logic(func, *args, **kwargs) scope.__enter__() @@ -866,7 +878,11 @@ def create_decorator( return _wrapper # type: ignore[return-value] -def trace_with_opname(opname: str) -> Callable[[Callable[P, R]], Callable[P, R]]: +def trace_with_opname( + opname: str, + *, + tracer: Optional["opentelemetry.trace.Tracer"] = None, +) -> Callable[[Callable[P, R]], Callable[P, R]]: """ Decorator to trace a function with a custom opname. @@ -875,21 +891,14 @@ def trace_with_opname(opname: str) -> Callable[[Callable[P, R]], Callable[P, R]] @contextlib.contextmanager def _wrapping_logic(func: Callable[P, R], *args: P.args, **kwargs: P.kwargs): - if opentelemetry is None: - return None - - scope = start_active_span(opname) - scope.__enter__() - try: + with start_active_span(opname, tracer=tracer): yield - except Exception as e: - scope.__exit__(type(e), None, e.__traceback__) - raise - finally: - scope.__exit__(None, None, None) def _decorator(func: Callable[P, R]): - return create_decorator(func, _wrapping_logic) + if not opentelemetry: + return func + + return _custom_sync_async_decorator(func, _wrapping_logic) return _decorator @@ -918,12 +927,12 @@ def tag_args(func: Callable[P, R]) -> Callable[P, R]: def _wrapping_logic(func: Callable[P, R], *args: P.args, **kwargs: P.kwargs): argspec = inspect.getfullargspec(func) for i, arg in enumerate(args[1:]): - set_attribute("ARG_" + argspec.args[i + 1], str(arg)) # type: ignore[index] - set_attribute("args", str(args[len(argspec.args) :])) # type: ignore[index] - set_attribute("kwargs", str(kwargs)) + set_attribute(SynapseTags.FUNC_ARG_PREFIX + argspec.args[i + 1], str(arg)) # type: ignore[index] + set_attribute(SynapseTags.FUNC_ARGS, str(args[len(argspec.args) :])) # type: ignore[index] + set_attribute(SynapseTags.FUNC_KWARGS, str(kwargs)) yield - return create_decorator(func, _wrapping_logic) + return _custom_sync_async_decorator(func, _wrapping_logic) @contextlib.contextmanager diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py
index f87f5098a5..0bdb213286 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py
@@ -46,7 +46,14 @@ from synapse.api.constants import EventTypes, Membership from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable -from synapse.logging.tracing import Link, get_active_span, start_active_span, trace +from synapse.logging.tracing import ( + Link, + get_active_span, + set_attribute, + start_active_span, + trace, + SynapseTags, +) from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.controllers.state import StateStorageController from synapse.storage.databases import Databases @@ -383,6 +390,12 @@ class EventsPersistenceStorageController: PartialStateConflictError: if attempting to persist a partial state event in a room that has been un-partial stated. """ + set_attribute( + SynapseTags.FUNC_ARG_PREFIX + "event_ids", + [e.event_id for e, _ in events_and_contexts], + ) + set_attribute(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled)) + partitioned: Dict[str, List[Tuple[EventBase, EventContext]]] = {} for event, ctx in events_and_contexts: partitioned.setdefault(event.room_id, []).append((event, ctx)) @@ -781,7 +794,7 @@ class EventsPersistenceStorageController: stale_forward_extremities_counter.observe(len(stale)) return result - + async def _get_new_state_after_events( self, room_id: str, diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 3fcb2069df..178536b10f 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py
@@ -1381,6 +1381,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas _delete_old_forward_extrem_cache_txn, ) + @trace async def insert_insertion_extremity(self, event_id: str, room_id: str) -> None: await self.db_pool.simple_upsert( table="insertion_event_extremities", diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 1f600f1190..21ba7a540e 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py
@@ -40,6 +40,7 @@ from synapse.api.errors import Codes, SynapseError from synapse.api.room_versions import RoomVersions from synapse.events import EventBase, relation_from_event from synapse.events.snapshot import EventContext +from synapse.logging.tracing import trace from synapse.storage._base import db_to_json, make_in_list_sql_clause from synapse.storage.database import ( DatabasePool, @@ -145,6 +146,7 @@ class PersistEventsStore: self._backfill_id_gen: AbstractStreamIdGenerator = self.store._backfill_id_gen self._stream_id_gen: AbstractStreamIdGenerator = self.store._stream_id_gen + @trace async def _persist_events_and_state_updates( self, events_and_contexts: List[Tuple[EventBase, EventContext]], diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 29c99c6357..449cb03276 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py
@@ -54,6 +54,7 @@ from synapse.logging.context import ( current_context, make_deferred_yieldable, ) +from synapse.logging.tracing import trace, tag_args from synapse.metrics.background_process_metrics import ( run_as_background_process, wrap_as_background_process, @@ -394,6 +395,8 @@ class EventsWorkerStore(SQLBaseStore): return event + @trace + @tag_args async def get_events( self, event_ids: Collection[str], @@ -1363,6 +1366,8 @@ class EventsWorkerStore(SQLBaseStore): return {r["event_id"] for r in rows} + @trace + @tag_args async def have_seen_events( self, room_id: str, event_ids: Iterable[str] ) -> Set[str]: