diff --git a/poetry.lock b/poetry.lock
index a78ceb0ae5..5fa6db6bad 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -1267,17 +1267,22 @@ telegram = ["requests"]
[[package]]
name = "treq"
-version = "15.1.0"
-description = "A requests-like API built on top of twisted.web's Agent"
+version = "22.2.0"
+description = "High-level Twisted HTTP Client API"
category = "main"
optional = false
-python-versions = "*"
+python-versions = ">=3.6"
[package.dependencies]
-pyOpenSSL = {version = ">=0.15.1", markers = "python_version > \"3.0\""}
+attrs = "*"
+hyperlink = ">=21.0.0"
+incremental = "*"
requests = ">=2.1.0"
-service_identity = ">=14.0.0"
-Twisted = {version = ">=15.5.0", markers = "python_version > \"3.0\""}
+Twisted = {version = ">=18.7.0", extras = ["tls"]}
+
+[package.extras]
+dev = ["pep8", "pyflakes", "httpbin (==0.5.0)"]
+docs = ["sphinx (>=1.4.8)"]
[[package]]
name = "twine"
@@ -1313,7 +1318,10 @@ attrs = ">=19.2.0"
Automat = ">=0.8.0"
constantly = ">=15.1"
hyperlink = ">=17.1.1"
+idna = {version = ">=2.4", optional = true, markers = "extra == \"tls\""}
incremental = ">=21.3.0"
+pyopenssl = {version = ">=21.0.0", optional = true, markers = "extra == \"tls\""}
+service-identity = {version = ">=18.1.0", optional = true, markers = "extra == \"tls\""}
twisted-iocpsupport = {version = ">=1.0.2,<2", markers = "platform_system == \"Windows\""}
typing-extensions = ">=3.6.5"
"zope.interface" = ">=4.4.2"
@@ -1339,7 +1347,7 @@ windows_platform = ["pywin32 (!=226)", "cython-test-exception-raiser (>=1.0.2,<2
type = "git"
url = "https://github.com/twisted/twisted.git"
reference = "trunk"
-resolved_reference = "ff2ea6181f7ca4887adbaf4158b2fe0891e17ef9"
+resolved_reference = "b249a121afffefa3d9d9ab5b7a1315c5a1bb454d"
[[package]]
name = "twisted-iocpsupport"
@@ -1615,7 +1623,7 @@ url_preview = ["lxml"]
[metadata]
lock-version = "1.1"
python-versions = "^3.7.1"
-content-hash = "c2cfbb348a49e088c404148c1b682fc5af5abb6278cf4479c6a51fff1656328c"
+content-hash = "94116a568c9ab41174ec66c60cb0cb783e349bf586352b1fab08c714e5191665"
[metadata.files]
attrs = [
@@ -2642,8 +2650,8 @@ tqdm = [
{file = "tqdm-4.63.0.tar.gz", hash = "sha256:1d9835ede8e394bb8c9dcbffbca02d717217113adc679236873eeaac5bc0b3cd"},
]
treq = [
- {file = "treq-15.1.0-py2.py3-none-any.whl", hash = "sha256:1ad1ba89ddc62ae877084b290bd327755b13f6e7bc7076dc4d8e2efb701bfd63"},
- {file = "treq-15.1.0.tar.gz", hash = "sha256:425a47d5d52a993d51211028fb6ade252e5fbea094e878bb4b644096a7322de8"},
+ {file = "treq-22.2.0-py3-none-any.whl", hash = "sha256:27d95b07c5c14be3e7b280416139b036087617ad5595be913b1f9b3ce981b9b2"},
+ {file = "treq-22.2.0.tar.gz", hash = "sha256:df757e3f141fc782ede076a604521194ffcb40fa2645cf48e5a37060307f52ec"},
]
twine = [
{file = "twine-3.8.0-py3-none-any.whl", hash = "sha256:d0550fca9dc19f3d5e8eadfce0c227294df0a2a951251a4385797c8a6198b7c8"},
diff --git a/pyproject.toml b/pyproject.toml
index 6c81ab8efc..a6138957ca 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -119,7 +119,7 @@ signedjson = "^1.1.0"
service-identity = ">=18.1.0"
# Twisted 18.9 introduces some logger improvements that the structured
# logger utilises
-twisted = {git = "https://github.com/twisted/twisted.git", rev = "trunk"}
+twisted = {git = "https://github.com/twisted/twisted.git", rev = "trunk", extras = ["tls"]}
treq = ">=15.1"
# Twisted has required pyopenssl 16.0 since about Twisted 16.6.
pyOpenSSL = ">=16.0.0"
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 740a04aad6..60436fe497 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -59,7 +59,7 @@ from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.federation.federation_client import InvalidResponseError
from synapse.logging.context import nested_logging_context
-from synapse.logging.tracing import trace
+from synapse.logging.tracing import trace, tag_args, set_attribute, SynapseTags
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
from synapse.replication.http.federation import (
@@ -410,6 +410,7 @@ class FederationEventHandler:
prev_member_event,
)
+ @trace
async def process_remote_join(
self,
origin: str,
@@ -753,6 +754,7 @@ class FederationEventHandler:
await self._process_pulled_event(origin, ev, backfilled=backfilled)
@trace
+ @tag_args
async def _process_pulled_event(
self, origin: str, event: EventBase, backfilled: bool
) -> None:
@@ -854,6 +856,7 @@ class FederationEventHandler:
else:
raise
+ @trace
async def _compute_event_context_with_maybe_missing_prevs(
self, dest: str, event: EventBase
) -> EventContext:
@@ -970,6 +973,8 @@ class FederationEventHandler:
event, state_ids_before_event=state_map, partial_state=partial_state
)
+ @trace
+ @tag_args
async def _get_state_ids_after_missing_prev_event(
self,
destination: str,
@@ -1112,6 +1117,8 @@ class FederationEventHandler:
return state_map
+ @trace
+ @tag_args
async def _get_state_and_persist(
self, destination: str, room_id: str, event_id: str
) -> None:
@@ -1133,6 +1140,7 @@ class FederationEventHandler:
destination=destination, room_id=room_id, event_ids=(event_id,)
)
+ @trace
async def _process_received_pdu(
self,
origin: str,
@@ -1283,6 +1291,7 @@ class FederationEventHandler:
except Exception:
logger.exception("Failed to resync device for %s", sender)
+ @trace
async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> None:
"""Handles backfilling the insertion event when we receive a marker
event that points to one.
@@ -1414,6 +1423,8 @@ class FederationEventHandler:
return event_from_response
+ @trace
+ @tag_args
async def _get_events_and_persist(
self, destination: str, room_id: str, event_ids: Collection[str]
) -> None:
@@ -1459,6 +1470,7 @@ class FederationEventHandler:
logger.info("Fetched %i events of %i requested", len(events), len(event_ids))
await self._auth_and_persist_outliers(room_id, events)
+ @trace
async def _auth_and_persist_outliers(
self, room_id: str, events: Iterable[EventBase]
) -> None:
@@ -1477,6 +1489,11 @@ class FederationEventHandler:
"""
event_map = {event.event_id: event for event in events}
+ set_attribute(
+ SynapseTags.FUNC_ARG_PREFIX + "event_ids",
+ event_map.keys(),
+ )
+
# filter out any events we have already seen. This might happen because
# the events were eagerly pushed to us (eg, during a room join), or because
# another thread has raced against us since we decided to request the event.
@@ -1593,6 +1610,7 @@ class FederationEventHandler:
backfilled=True,
)
+ @trace
async def _check_event_auth(
self, origin: Optional[str], event: EventBase, context: EventContext
) -> None:
@@ -1631,6 +1649,9 @@ class FederationEventHandler:
claimed_auth_events = await self._load_or_fetch_auth_events_for_event(
origin, event
)
+ set_attribute(
+ "claimed_auth_events", [ev.event_id for ev in claimed_auth_events]
+ )
# ... and check that the event passes auth at those auth events.
# https://spec.matrix.org/v1.3/server-server-api/#checks-performed-on-receipt-of-a-pdu:
@@ -1728,6 +1749,7 @@ class FederationEventHandler:
)
context.rejected = RejectedReason.AUTH_ERROR
+ @trace
async def _maybe_kick_guest_users(self, event: EventBase) -> None:
if event.type != EventTypes.GuestAccess:
return
@@ -1935,6 +1957,8 @@ class FederationEventHandler:
# instead we raise an AuthError, which will make the caller ignore it.
raise AuthError(code=HTTPStatus.FORBIDDEN, msg="Auth events could not be found")
+ @trace
+ @tag_args
async def _get_remote_auth_chain_for_event(
self, destination: str, room_id: str, event_id: str
) -> None:
@@ -1963,6 +1987,7 @@ class FederationEventHandler:
await self._auth_and_persist_outliers(room_id, remote_auth_events)
+ @trace
async def _run_push_actions_and_persist_event(
self, event: EventBase, context: EventContext, backfilled: bool = False
) -> None:
@@ -2008,6 +2033,7 @@ class FederationEventHandler:
await self._store.remove_push_actions_from_staging(event.event_id)
raise
+ @trace
async def persist_events_and_notify(
self,
room_id: str,
diff --git a/synapse/logging/tracing.py b/synapse/logging/tracing.py
index 2f4555cc2e..cb557a147d 100644
--- a/synapse/logging/tracing.py
+++ b/synapse/logging/tracing.py
@@ -281,6 +281,16 @@ class SynapseTags:
# The name of the external cache
CACHE_NAME = "cache.name"
+ # Used to tag function arguments
+ #
+ # Tag a named arg. The name of the argument should be appended to this
+ # prefix
+ FUNC_ARG_PREFIX = "ARG."
+ # Tag extra variadic number of positional arguments (`def foo(first, second, *extras)`)
+ FUNC_ARGS = "args"
+ # Tag keyword args
+ FUNC_KWARGS = "kwargs"
+
class SynapseBaggage:
FORCE_TRACING = "synapse-force-tracing"
@@ -790,30 +800,28 @@ def extract_text_map(
# Tracing decorators
-def create_decorator(
+def _custom_sync_async_decorator(
func: Callable[P, R],
- # TODO: What is the correct type for these `Any`? `P.args, P.kwargs` isn't allowed here
wrapping_logic: Callable[[Callable[P, R], Any, Any], ContextManager[None]],
) -> Callable[P, R]:
"""
- Creates a decorator that is able to handle sync functions, async functions
- (coroutines), and inlineDeferred from Twisted.
-
+ Decorates a function that is sync or async (coroutines), or that returns a Twisted
+ `Deferred`. The custom business logic of the decorator goes in `wrapping_logic`.
Example usage:
```py
- # Decorator to time the functiona and log it out
+ # Decorator to time the function and log it out
def duration(func: Callable[P, R]) -> Callable[P, R]:
@contextlib.contextmanager
- def _wrapping_logic(func: Callable[P, R], *args: P.args, **kwargs: P.kwargs):
+ def _wrapping_logic(func: Callable[P, R], *args: P.args, **kwargs: P.kwargs) -> Generator[None, None, None]:
start_ts = time.time()
- yield
- end_ts = time.time()
- duration = end_ts - start_ts
- logger.info("%s took %s seconds", func.__name__, duration)
-
- return create_decorator(func, _wrapping_logic)
+ try:
+ yield
+ finally:
+ end_ts = time.time()
+ duration = end_ts - start_ts
+ logger.info("%s took %s seconds", func.__name__, duration)
+ return _custom_sync_async_decorator(func, _wrapping_logic)
```
-
Args:
func: The function to be decorated
wrapping_logic: The business logic of your custom decorator.
@@ -821,14 +829,18 @@ def create_decorator(
before/after the function as desired.
"""
- @wraps(func)
- async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
- if inspect.iscoroutinefunction(func):
+ if inspect.iscoroutinefunction(func):
+
+ @wraps(func)
+ async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
with wrapping_logic(func, *args, **kwargs):
- return await func(*args, **kwargs)
- else:
- # The other case here handles both sync functions and those
- # decorated with inlineDeferred.
+ return await func(*args, **kwargs) # type: ignore[misc]
+
+ else:
+ # The other case here handles both sync functions and those
+ # decorated with inlineDeferred.
+ @wraps(func)
+ def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
scope = wrapping_logic(func, *args, **kwargs)
scope.__enter__()
@@ -866,7 +878,11 @@ def create_decorator(
return _wrapper # type: ignore[return-value]
-def trace_with_opname(opname: str) -> Callable[[Callable[P, R]], Callable[P, R]]:
+def trace_with_opname(
+ opname: str,
+ *,
+ tracer: Optional["opentelemetry.trace.Tracer"] = None,
+) -> Callable[[Callable[P, R]], Callable[P, R]]:
"""
Decorator to trace a function with a custom opname.
@@ -875,21 +891,14 @@ def trace_with_opname(opname: str) -> Callable[[Callable[P, R]], Callable[P, R]]
@contextlib.contextmanager
def _wrapping_logic(func: Callable[P, R], *args: P.args, **kwargs: P.kwargs):
- if opentelemetry is None:
- return None
-
- scope = start_active_span(opname)
- scope.__enter__()
- try:
+ with start_active_span(opname, tracer=tracer):
yield
- except Exception as e:
- scope.__exit__(type(e), None, e.__traceback__)
- raise
- finally:
- scope.__exit__(None, None, None)
def _decorator(func: Callable[P, R]):
- return create_decorator(func, _wrapping_logic)
+ if not opentelemetry:
+ return func
+
+ return _custom_sync_async_decorator(func, _wrapping_logic)
return _decorator
@@ -918,12 +927,12 @@ def tag_args(func: Callable[P, R]) -> Callable[P, R]:
def _wrapping_logic(func: Callable[P, R], *args: P.args, **kwargs: P.kwargs):
argspec = inspect.getfullargspec(func)
for i, arg in enumerate(args[1:]):
- set_attribute("ARG_" + argspec.args[i + 1], str(arg)) # type: ignore[index]
- set_attribute("args", str(args[len(argspec.args) :])) # type: ignore[index]
- set_attribute("kwargs", str(kwargs))
+ set_attribute(SynapseTags.FUNC_ARG_PREFIX + argspec.args[i + 1], str(arg)) # type: ignore[index]
+ set_attribute(SynapseTags.FUNC_ARGS, str(args[len(argspec.args) :])) # type: ignore[index]
+ set_attribute(SynapseTags.FUNC_KWARGS, str(kwargs))
yield
- return create_decorator(func, _wrapping_logic)
+ return _custom_sync_async_decorator(func, _wrapping_logic)
@contextlib.contextmanager
diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py
index f87f5098a5..0bdb213286 100644
--- a/synapse/storage/controllers/persist_events.py
+++ b/synapse/storage/controllers/persist_events.py
@@ -46,7 +46,14 @@ from synapse.api.constants import EventTypes, Membership
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
-from synapse.logging.tracing import Link, get_active_span, start_active_span, trace
+from synapse.logging.tracing import (
+ Link,
+ get_active_span,
+ set_attribute,
+ start_active_span,
+ trace,
+ SynapseTags,
+)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.controllers.state import StateStorageController
from synapse.storage.databases import Databases
@@ -383,6 +390,12 @@ class EventsPersistenceStorageController:
PartialStateConflictError: if attempting to persist a partial state event in
a room that has been un-partial stated.
"""
+ set_attribute(
+ SynapseTags.FUNC_ARG_PREFIX + "event_ids",
+ [e.event_id for e, _ in events_and_contexts],
+ )
+ set_attribute(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled))
+
partitioned: Dict[str, List[Tuple[EventBase, EventContext]]] = {}
for event, ctx in events_and_contexts:
partitioned.setdefault(event.room_id, []).append((event, ctx))
@@ -781,7 +794,7 @@ class EventsPersistenceStorageController:
stale_forward_extremities_counter.observe(len(stale))
return result
-
+
async def _get_new_state_after_events(
self,
room_id: str,
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 3fcb2069df..178536b10f 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -1381,6 +1381,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
_delete_old_forward_extrem_cache_txn,
)
+ @trace
async def insert_insertion_extremity(self, event_id: str, room_id: str) -> None:
await self.db_pool.simple_upsert(
table="insertion_event_extremities",
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 1f600f1190..21ba7a540e 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -40,6 +40,7 @@ from synapse.api.errors import Codes, SynapseError
from synapse.api.room_versions import RoomVersions
from synapse.events import EventBase, relation_from_event
from synapse.events.snapshot import EventContext
+from synapse.logging.tracing import trace
from synapse.storage._base import db_to_json, make_in_list_sql_clause
from synapse.storage.database import (
DatabasePool,
@@ -145,6 +146,7 @@ class PersistEventsStore:
self._backfill_id_gen: AbstractStreamIdGenerator = self.store._backfill_id_gen
self._stream_id_gen: AbstractStreamIdGenerator = self.store._stream_id_gen
+ @trace
async def _persist_events_and_state_updates(
self,
events_and_contexts: List[Tuple[EventBase, EventContext]],
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 29c99c6357..449cb03276 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -54,6 +54,7 @@ from synapse.logging.context import (
current_context,
make_deferred_yieldable,
)
+from synapse.logging.tracing import trace, tag_args
from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
@@ -394,6 +395,8 @@ class EventsWorkerStore(SQLBaseStore):
return event
+ @trace
+ @tag_args
async def get_events(
self,
event_ids: Collection[str],
@@ -1363,6 +1366,8 @@ class EventsWorkerStore(SQLBaseStore):
return {r["event_id"] for r in rows}
+ @trace
+ @tag_args
async def have_seen_events(
self, room_id: str, event_ids: Iterable[str]
) -> Set[str]:
|