diff --git a/changelog.d/8640.misc b/changelog.d/8640.misc
new file mode 100644
index 0000000000..cf6023f783
--- /dev/null
+++ b/changelog.d/8640.misc
@@ -0,0 +1 @@
+Reduce number of OpenTracing spans started.
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 07240d3a14..7826387e53 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -14,7 +14,7 @@
# limitations under the License.
import logging
-from typing import Dict, List, Optional
+from typing import Dict, List, Optional, Union
from prometheus_client import Counter
@@ -30,7 +30,10 @@ from synapse.metrics import (
event_processing_loop_counter,
event_processing_loop_room_count,
)
-from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.metrics.background_process_metrics import (
+ run_as_background_process,
+ wrap_as_background_process,
+)
from synapse.types import Collection, JsonDict, RoomStreamToken, UserID
from synapse.util.metrics import Measure
@@ -53,7 +56,7 @@ class ApplicationServicesHandler:
self.current_max = 0
self.is_processing = False
- async def notify_interested_services(self, max_token: RoomStreamToken):
+ def notify_interested_services(self, max_token: RoomStreamToken):
"""Notifies (pushes) all application services interested in this event.
Pushing is done asynchronously, so this method won't block for any
@@ -72,6 +75,12 @@ class ApplicationServicesHandler:
if self.is_processing:
return
+ # We only start a new background process if necessary rather than
+ # optimistically (to cut down on overhead).
+ self._notify_interested_services(max_token)
+
+ @wrap_as_background_process("notify_interested_services")
+ async def _notify_interested_services(self, max_token: RoomStreamToken):
with Measure(self.clock, "notify_interested_services"):
self.is_processing = True
try:
@@ -166,8 +175,11 @@ class ApplicationServicesHandler:
finally:
self.is_processing = False
- async def notify_interested_services_ephemeral(
- self, stream_key: str, new_token: Optional[int], users: Collection[UserID] = [],
+ def notify_interested_services_ephemeral(
+ self,
+ stream_key: str,
+ new_token: Optional[int],
+ users: Collection[Union[str, UserID]] = [],
):
"""This is called by the notifier in the background
when a ephemeral event handled by the homeserver.
@@ -183,13 +195,34 @@ class ApplicationServicesHandler:
new_token: The latest stream token
users: The user(s) involved with the event.
"""
+ if not self.notify_appservices:
+ return
+
+ if stream_key not in ("typing_key", "receipt_key", "presence_key"):
+ return
+
services = [
service
for service in self.store.get_app_services()
if service.supports_ephemeral
]
- if not services or not self.notify_appservices:
+ if not services:
return
+
+ # We only start a new background process if necessary rather than
+ # optimistically (to cut down on overhead).
+ self._notify_interested_services_ephemeral(
+ services, stream_key, new_token, users
+ )
+
+ @wrap_as_background_process("notify_interested_services_ephemeral")
+ async def _notify_interested_services_ephemeral(
+ self,
+ services: List[ApplicationService],
+ stream_key: str,
+ new_token: Optional[int],
+ users: Collection[Union[str, UserID]],
+ ):
logger.info("Checking interested services for %s" % (stream_key))
with Measure(self.clock, "notify_interested_services_ephemeral"):
for service in services:
@@ -237,7 +270,7 @@ class ApplicationServicesHandler:
return receipts
async def _handle_presence(
- self, service: ApplicationService, users: Collection[UserID]
+ self, service: ApplicationService, users: Collection[Union[str, UserID]]
):
events = [] # type: List[JsonDict]
presence_source = self.event_sources.sources["presence"]
@@ -245,6 +278,9 @@ class ApplicationServicesHandler:
service, "presence"
)
for user in users:
+ if isinstance(user, str):
+ user = UserID.from_string(user)
+
interested = await service.is_interested_in_presence(user, self.store)
if not interested:
continue
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index e58850faff..ab586c318c 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -317,7 +317,7 @@ def ensure_active_span(message, ret=None):
@contextlib.contextmanager
-def _noop_context_manager(*args, **kwargs):
+def noop_context_manager(*args, **kwargs):
"""Does exactly what it says on the tin"""
yield
@@ -413,7 +413,7 @@ def start_active_span(
"""
if opentracing is None:
- return _noop_context_manager()
+ return noop_context_manager()
return opentracing.tracer.start_active_span(
operation_name,
@@ -428,7 +428,7 @@ def start_active_span(
def start_active_span_follows_from(operation_name, contexts):
if opentracing is None:
- return _noop_context_manager()
+ return noop_context_manager()
references = [opentracing.follows_from(context) for context in contexts]
scope = start_active_span(operation_name, references=references)
@@ -459,7 +459,7 @@ def start_active_span_from_request(
# Also, twisted uses byte arrays while opentracing expects strings.
if opentracing is None:
- return _noop_context_manager()
+ return noop_context_manager()
header_dict = {
k.decode(): v[0].decode() for k, v in request.requestHeaders.getAllRawHeaders()
@@ -497,7 +497,7 @@ def start_active_span_from_edu(
"""
if opentracing is None:
- return _noop_context_manager()
+ return noop_context_manager()
carrier = json_decoder.decode(edu_content.get("context", "{}")).get(
"opentracing", {}
diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py
index 08fbf78eee..658f6ecd72 100644
--- a/synapse/metrics/background_process_metrics.py
+++ b/synapse/metrics/background_process_metrics.py
@@ -24,7 +24,7 @@ from prometheus_client.core import REGISTRY, Counter, Gauge
from twisted.internet import defer
from synapse.logging.context import LoggingContext, PreserveLoggingContext
-from synapse.logging.opentracing import start_active_span
+from synapse.logging.opentracing import noop_context_manager, start_active_span
if TYPE_CHECKING:
import resource
@@ -167,7 +167,7 @@ class _BackgroundProcess:
)
-def run_as_background_process(desc: str, func, *args, **kwargs):
+def run_as_background_process(desc: str, func, *args, bg_start_span=True, **kwargs):
"""Run the given function in its own logcontext, with resource metrics
This should be used to wrap processes which are fired off to run in the
@@ -181,6 +181,9 @@ def run_as_background_process(desc: str, func, *args, **kwargs):
Args:
desc: a description for this background process type
func: a function, which may return a Deferred or a coroutine
+ bg_start_span: Whether to start an opentracing span. Defaults to True.
+ Should only be disabled for processes that will not log to or tag
+ a span.
args: positional args for func
kwargs: keyword args for func
@@ -199,7 +202,10 @@ def run_as_background_process(desc: str, func, *args, **kwargs):
with BackgroundProcessLoggingContext(desc) as context:
context.request = "%s-%i" % (desc, count)
try:
- with start_active_span(desc, tags={"request_id": context.request}):
+ ctx = noop_context_manager()
+ if bg_start_span:
+ ctx = start_active_span(desc, tags={"request_id": context.request})
+ with ctx:
result = func(*args, **kwargs)
if inspect.isawaitable(result):
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 858b487bec..eb56b26f21 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -40,7 +40,6 @@ from synapse.handlers.presence import format_user_presence_state
from synapse.logging.context import PreserveLoggingContext
from synapse.logging.utils import log_function
from synapse.metrics import LaterGauge
-from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.streams.config import PaginationConfig
from synapse.types import (
Collection,
@@ -310,44 +309,37 @@ class Notifier:
"""
# poke any interested application service.
- run_as_background_process(
- "_notify_app_services", self._notify_app_services, max_room_stream_token
- )
-
- run_as_background_process(
- "_notify_pusher_pool", self._notify_pusher_pool, max_room_stream_token
- )
+ self._notify_app_services(max_room_stream_token)
+ self._notify_pusher_pool(max_room_stream_token)
if self.federation_sender:
self.federation_sender.notify_new_events(max_room_stream_token)
- async def _notify_app_services(self, max_room_stream_token: RoomStreamToken):
+ def _notify_app_services(self, max_room_stream_token: RoomStreamToken):
try:
- await self.appservice_handler.notify_interested_services(
- max_room_stream_token
- )
+ self.appservice_handler.notify_interested_services(max_room_stream_token)
except Exception:
logger.exception("Error notifying application services of event")
- async def _notify_app_services_ephemeral(
+ def _notify_app_services_ephemeral(
self,
stream_key: str,
new_token: Union[int, RoomStreamToken],
- users: Collection[UserID] = [],
+ users: Collection[Union[str, UserID]] = [],
):
try:
stream_token = None
if isinstance(new_token, int):
stream_token = new_token
- await self.appservice_handler.notify_interested_services_ephemeral(
+ self.appservice_handler.notify_interested_services_ephemeral(
stream_key, stream_token, users
)
except Exception:
logger.exception("Error notifying application services of event")
- async def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
+ def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
try:
- await self._pusher_pool.on_new_notifications(max_room_stream_token)
+ self._pusher_pool.on_new_notifications(max_room_stream_token)
except Exception:
logger.exception("Error pusher pool of event")
@@ -384,12 +376,8 @@ class Notifier:
self.notify_replication()
# Notify appservices
- run_as_background_process(
- "_notify_app_services_ephemeral",
- self._notify_app_services_ephemeral,
- stream_key,
- new_token,
- users,
+ self._notify_app_services_ephemeral(
+ stream_key, new_token, users,
)
def on_new_replication_data(self) -> None:
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 0080c68ce2..f325964983 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -19,7 +19,10 @@ from typing import TYPE_CHECKING, Dict, Union
from prometheus_client import Gauge
-from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.metrics.background_process_metrics import (
+ run_as_background_process,
+ wrap_as_background_process,
+)
from synapse.push import PusherConfigException
from synapse.push.emailpusher import EmailPusher
from synapse.push.httppusher import HttpPusher
@@ -187,7 +190,7 @@ class PusherPool:
)
await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])
- async def on_new_notifications(self, max_token: RoomStreamToken):
+ def on_new_notifications(self, max_token: RoomStreamToken):
if not self.pushers:
# nothing to do here.
return
@@ -201,6 +204,17 @@ class PusherPool:
# Nothing to do
return
+ # We only start a new background process if necessary rather than
+ # optimistically (to cut down on overhead).
+ self._on_new_notifications(max_token)
+
+ @wrap_as_background_process("on_new_notifications")
+ async def _on_new_notifications(self, max_token: RoomStreamToken):
+ # We just use the minimum stream ordering and ignore the vector clock
+ # component. This is safe to do as long as we *always* ignore the vector
+ # clock components.
+ max_stream_id = max_token.stream
+
prev_stream_id = self._last_room_stream_id_seen
self._last_room_stream_id_seen = max_stream_id
diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py
index de19705c1f..bc6ba709a7 100644
--- a/synapse/replication/tcp/redis.py
+++ b/synapse/replication/tcp/redis.py
@@ -166,7 +166,9 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection):
Args:
cmd (Command)
"""
- run_as_background_process("send-cmd", self._async_send_command, cmd)
+ run_as_background_process(
+ "send-cmd", self._async_send_command, cmd, bg_start_span=False
+ )
async def _async_send_command(self, cmd: Command):
"""Encode a replication command and send it over our outbound connection"""
diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py
index ee4f3da31c..53763cd0f9 100644
--- a/tests/handlers/test_appservice.py
+++ b/tests/handlers/test_appservice.py
@@ -42,7 +42,6 @@ class AppServiceHandlerTestCase(unittest.TestCase):
hs.get_clock.return_value = MockClock()
self.handler = ApplicationServicesHandler(hs)
- @defer.inlineCallbacks
def test_notify_interested_services(self):
interested_service = self._mkservice(is_interested=True)
services = [
@@ -62,14 +61,12 @@ class AppServiceHandlerTestCase(unittest.TestCase):
defer.succeed((0, [event])),
defer.succeed((0, [])),
]
- yield defer.ensureDeferred(
- self.handler.notify_interested_services(RoomStreamToken(None, 0))
- )
+ self.handler.notify_interested_services(RoomStreamToken(None, 0))
+
self.mock_scheduler.submit_event_for_as.assert_called_once_with(
interested_service, event
)
- @defer.inlineCallbacks
def test_query_user_exists_unknown_user(self):
user_id = "@someone:anywhere"
services = [self._mkservice(is_interested=True)]
@@ -83,12 +80,11 @@ class AppServiceHandlerTestCase(unittest.TestCase):
defer.succeed((0, [event])),
defer.succeed((0, [])),
]
- yield defer.ensureDeferred(
- self.handler.notify_interested_services(RoomStreamToken(None, 0))
- )
+
+ self.handler.notify_interested_services(RoomStreamToken(None, 0))
+
self.mock_as_api.query_user.assert_called_once_with(services[0], user_id)
- @defer.inlineCallbacks
def test_query_user_exists_known_user(self):
user_id = "@someone:anywhere"
services = [self._mkservice(is_interested=True)]
@@ -102,9 +98,9 @@ class AppServiceHandlerTestCase(unittest.TestCase):
defer.succeed((0, [event])),
defer.succeed((0, [])),
]
- yield defer.ensureDeferred(
- self.handler.notify_interested_services(RoomStreamToken(None, 0))
- )
+
+ self.handler.notify_interested_services(RoomStreamToken(None, 0))
+
self.assertFalse(
self.mock_as_api.query_user.called,
"query_user called when it shouldn't have been.",
|