diff --git a/changelog.d/5771.feature b/changelog.d/5771.feature
new file mode 100644
index 0000000000..f2f4de1fdd
--- /dev/null
+++ b/changelog.d/5771.feature
@@ -0,0 +1 @@
+Make Opentracing work in worker mode.
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index a17148fc3c..dc53b4b170 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -38,7 +38,12 @@ from synapse.http.servlet import (
parse_string_from_args,
)
from synapse.logging.context import run_in_background
-from synapse.logging.opentracing import start_active_span_from_context, tags
+from synapse.logging.opentracing import (
+ start_active_span,
+ start_active_span_from_request,
+ tags,
+ whitelisted_homeserver,
+)
from synapse.types import ThirdPartyInstanceID, get_domain_from_id
from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.versionstring import get_version_string
@@ -288,20 +293,28 @@ class BaseFederationServlet(object):
logger.warn("authenticate_request failed: %s", e)
raise
- # Start an opentracing span
- with start_active_span_from_context(
- request.requestHeaders,
- "incoming-federation-request",
- tags={
- "request_id": request.get_request_id(),
- tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
- tags.HTTP_METHOD: request.get_method(),
- tags.HTTP_URL: request.get_redacted_uri(),
- tags.PEER_HOST_IPV6: request.getClientIP(),
- "authenticated_entity": origin,
- "servlet_name": request.request_metrics.name,
- },
- ):
+ request_tags = {
+ "request_id": request.get_request_id(),
+ tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
+ tags.HTTP_METHOD: request.get_method(),
+ tags.HTTP_URL: request.get_redacted_uri(),
+ tags.PEER_HOST_IPV6: request.getClientIP(),
+ "authenticated_entity": origin,
+ "servlet_name": request.request_metrics.name,
+ }
+
+ # Only accept the span context if the origin is authenticated
+ # and whitelisted
+ if origin and whitelisted_homeserver(origin):
+ scope = start_active_span_from_request(
+ request, "incoming-federation-request", tags=request_tags
+ )
+ else:
+ scope = start_active_span(
+ "incoming-federation-request", tags=request_tags
+ )
+
+ with scope:
if origin:
with ratelimiter.ratelimit(origin) as d:
await d
diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py
index fd07bf7b8e..c186b31f59 100644
--- a/synapse/http/servlet.py
+++ b/synapse/http/servlet.py
@@ -300,7 +300,7 @@ class RestServlet(object):
http_server.register_paths(
method,
patterns,
- trace_servlet(servlet_classname, method_handler),
+ trace_servlet(servlet_classname)(method_handler),
servlet_classname,
)
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index 6b706e1892..4abea4474b 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -174,10 +174,48 @@ from twisted.internet import defer
from synapse.config import ConfigError
+# Helper class
+
+
+class _DummyTagNames(object):
+ """wrapper of opentracings tags. We need to have them if we
+ want to reference them without opentracing around. Clearly they
+ should never actually show up in a trace. `set_tags` overwrites
+ these with the correct ones."""
+
+ INVALID_TAG = "invalid-tag"
+ COMPONENT = INVALID_TAG
+ DATABASE_INSTANCE = INVALID_TAG
+ DATABASE_STATEMENT = INVALID_TAG
+ DATABASE_TYPE = INVALID_TAG
+ DATABASE_USER = INVALID_TAG
+ ERROR = INVALID_TAG
+ HTTP_METHOD = INVALID_TAG
+ HTTP_STATUS_CODE = INVALID_TAG
+ HTTP_URL = INVALID_TAG
+ MESSAGE_BUS_DESTINATION = INVALID_TAG
+ PEER_ADDRESS = INVALID_TAG
+ PEER_HOSTNAME = INVALID_TAG
+ PEER_HOST_IPV4 = INVALID_TAG
+ PEER_HOST_IPV6 = INVALID_TAG
+ PEER_PORT = INVALID_TAG
+ PEER_SERVICE = INVALID_TAG
+ SAMPLING_PRIORITY = INVALID_TAG
+ SERVICE = INVALID_TAG
+ SPAN_KIND = INVALID_TAG
+ SPAN_KIND_CONSUMER = INVALID_TAG
+ SPAN_KIND_PRODUCER = INVALID_TAG
+ SPAN_KIND_RPC_CLIENT = INVALID_TAG
+ SPAN_KIND_RPC_SERVER = INVALID_TAG
+
+
try:
import opentracing
+
+ tags = opentracing.tags
except ImportError:
opentracing = None
+ tags = _DummyTagNames
try:
from jaeger_client import Config as JaegerConfig
from synapse.logging.scopecontextmanager import LogContextScopeManager
@@ -252,10 +290,6 @@ def init_tracer(config):
scope_manager=LogContextScopeManager(config),
).initialize_tracer()
- # Set up tags to be opentracing's tags
- global tags
- tags = opentracing.tags
-
# Whitelisting
@@ -334,8 +368,8 @@ def start_active_span_follows_from(operation_name, contexts):
return scope
-def start_active_span_from_context(
- headers,
+def start_active_span_from_request(
+ request,
operation_name,
references=None,
tags=None,
@@ -344,9 +378,9 @@ def start_active_span_from_context(
finish_on_close=True,
):
"""
- Extracts a span context from Twisted Headers.
+ Extracts a span context from a Twisted Request.
args:
- headers (twisted.web.http_headers.Headers)
+ headers (twisted.web.http.Request)
For the other args see opentracing.tracer
@@ -360,7 +394,9 @@ def start_active_span_from_context(
if opentracing is None:
return _noop_context_manager()
- header_dict = {k.decode(): v[0].decode() for k, v in headers.getAllRawHeaders()}
+ header_dict = {
+ k.decode(): v[0].decode() for k, v in request.requestHeaders.getAllRawHeaders()
+ }
context = opentracing.tracer.extract(opentracing.Format.HTTP_HEADERS, header_dict)
return opentracing.tracer.start_active_span(
@@ -448,7 +484,7 @@ def set_operation_name(operation_name):
@only_if_tracing
-def inject_active_span_twisted_headers(headers, destination):
+def inject_active_span_twisted_headers(headers, destination, check_destination=True):
"""
Injects a span context into twisted headers in-place
@@ -467,7 +503,7 @@ def inject_active_span_twisted_headers(headers, destination):
https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/constants.py
"""
- if not whitelisted_homeserver(destination):
+ if check_destination and not whitelisted_homeserver(destination):
return
span = opentracing.tracer.active_span
@@ -479,7 +515,7 @@ def inject_active_span_twisted_headers(headers, destination):
@only_if_tracing
-def inject_active_span_byte_dict(headers, destination):
+def inject_active_span_byte_dict(headers, destination, check_destination=True):
"""
Injects a span context into a dict where the headers are encoded as byte
strings
@@ -511,7 +547,7 @@ def inject_active_span_byte_dict(headers, destination):
@only_if_tracing
-def inject_active_span_text_map(carrier, destination=None):
+def inject_active_span_text_map(carrier, destination, check_destination=True):
"""
Injects a span context into a dict
@@ -532,7 +568,7 @@ def inject_active_span_text_map(carrier, destination=None):
https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/constants.py
"""
- if destination and not whitelisted_homeserver(destination):
+ if check_destination and not whitelisted_homeserver(destination):
return
opentracing.tracer.inject(
@@ -689,65 +725,43 @@ def tag_args(func):
return _tag_args_inner
-def trace_servlet(servlet_name, func):
+def trace_servlet(servlet_name, extract_context=False):
"""Decorator which traces a serlet. It starts a span with some servlet specific
- tags such as the servlet_name and request information"""
- if not opentracing:
- return func
+ tags such as the servlet_name and request information
- @wraps(func)
- @defer.inlineCallbacks
- def _trace_servlet_inner(request, *args, **kwargs):
- with start_active_span(
- "incoming-client-request",
- tags={
+ Args:
+ servlet_name (str): The name to be used for the span's operation_name
+ extract_context (bool): Whether to attempt to extract the opentracing
+ context from the request the servlet is handling.
+
+ """
+
+ def _trace_servlet_inner_1(func):
+ if not opentracing:
+ return func
+
+ @wraps(func)
+ @defer.inlineCallbacks
+ def _trace_servlet_inner(request, *args, **kwargs):
+ request_tags = {
"request_id": request.get_request_id(),
tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
tags.HTTP_METHOD: request.get_method(),
tags.HTTP_URL: request.get_redacted_uri(),
tags.PEER_HOST_IPV6: request.getClientIP(),
- "servlet_name": servlet_name,
- },
- ):
- result = yield defer.maybeDeferred(func, request, *args, **kwargs)
- return result
-
- return _trace_servlet_inner
-
-
-# Helper class
-
+ }
-class _DummyTagNames(object):
- """wrapper of opentracings tags. We need to have them if we
- want to reference them without opentracing around. Clearly they
- should never actually show up in a trace. `set_tags` overwrites
- these with the correct ones."""
+ if extract_context:
+ scope = start_active_span_from_request(
+ request, servlet_name, tags=request_tags
+ )
+ else:
+ scope = start_active_span(servlet_name, tags=request_tags)
- INVALID_TAG = "invalid-tag"
- COMPONENT = INVALID_TAG
- DATABASE_INSTANCE = INVALID_TAG
- DATABASE_STATEMENT = INVALID_TAG
- DATABASE_TYPE = INVALID_TAG
- DATABASE_USER = INVALID_TAG
- ERROR = INVALID_TAG
- HTTP_METHOD = INVALID_TAG
- HTTP_STATUS_CODE = INVALID_TAG
- HTTP_URL = INVALID_TAG
- MESSAGE_BUS_DESTINATION = INVALID_TAG
- PEER_ADDRESS = INVALID_TAG
- PEER_HOSTNAME = INVALID_TAG
- PEER_HOST_IPV4 = INVALID_TAG
- PEER_HOST_IPV6 = INVALID_TAG
- PEER_PORT = INVALID_TAG
- PEER_SERVICE = INVALID_TAG
- SAMPLING_PRIORITY = INVALID_TAG
- SERVICE = INVALID_TAG
- SPAN_KIND = INVALID_TAG
- SPAN_KIND_CONSUMER = INVALID_TAG
- SPAN_KIND_PRODUCER = INVALID_TAG
- SPAN_KIND_RPC_CLIENT = INVALID_TAG
- SPAN_KIND_RPC_SERVER = INVALID_TAG
+ with scope:
+ result = yield defer.maybeDeferred(func, request, *args, **kwargs)
+ return result
+ return _trace_servlet_inner
-tags = _DummyTagNames
+ return _trace_servlet_inner_1
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index 2e0594e581..c4be9273f6 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -22,6 +22,7 @@ from six.moves import urllib
from twisted.internet import defer
+import synapse.logging.opentracing as opentracing
from synapse.api.errors import (
CodeMessageException,
HttpResponseException,
@@ -165,8 +166,12 @@ class ReplicationEndpoint(object):
# have a good idea that the request has either succeeded or failed on
# the master, and so whether we should clean up or not.
while True:
+ headers = {}
+ opentracing.inject_active_span_byte_dict(
+ headers, None, check_destination=False
+ )
try:
- result = yield request_func(uri, data)
+ result = yield request_func(uri, data, headers=headers)
break
except CodeMessageException as e:
if e.code != 504 or not cls.RETRY_ON_TIMEOUT:
@@ -205,7 +210,14 @@ class ReplicationEndpoint(object):
args = "/".join("(?P<%s>[^/]+)" % (arg,) for arg in url_args)
pattern = re.compile("^/_synapse/replication/%s/%s$" % (self.NAME, args))
- http_server.register_paths(method, [pattern], handler, self.__class__.__name__)
+ http_server.register_paths(
+ method,
+ [pattern],
+ opentracing.trace_servlet(self.__class__.__name__, extract_context=True)(
+ handler
+ ),
+ self.__class__.__name__,
+ )
def _cached_handler(self, request, txn_id, **kwargs):
"""Called on new incoming requests when caching is enabled. Checks
|