summary refs log tree commit diff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--changelog.d/9686.misc1
-rw-r--r--synapse/federation/sender/per_destination_queue.py8
-rw-r--r--synapse/handlers/devicemessage.py35
-rw-r--r--synapse/handlers/sync.py18
-rw-r--r--synapse/handlers/typing.py6
-rw-r--r--synapse/logging/opentracing.py8
-rw-r--r--synapse/notifier.py45
7 files changed, 102 insertions, 19 deletions
diff --git a/changelog.d/9686.misc b/changelog.d/9686.misc
new file mode 100644
index 0000000000..bb2335acf9
--- /dev/null
+++ b/changelog.d/9686.misc
@@ -0,0 +1 @@
+Improve Jaeger tracing for `to_device` messages.
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 89df9a619b..e9c8a9f20a 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -29,6 +29,7 @@ from synapse.api.presence import UserPresenceState
 from synapse.events import EventBase
 from synapse.federation.units import Edu
 from synapse.handlers.presence import format_user_presence_state
+from synapse.logging.opentracing import SynapseTags, set_tag
 from synapse.metrics import sent_transactions_counter
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.types import ReadReceipt
@@ -557,6 +558,13 @@ class PerDestinationQueue:
         contents, stream_id = await self._store.get_new_device_msgs_for_remote(
             self._destination, last_device_stream_id, to_device_stream_id, limit
         )
+        for content in contents:
+            message_id = content.get("message_id")
+            if not message_id:
+                continue
+
+            set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
+
         edus = [
             Edu(
                 origin=self._server_name,
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index 5ee48be6ff..c971eeb4d2 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -21,10 +21,10 @@ from synapse.api.errors import SynapseError
 from synapse.api.ratelimiting import Ratelimiter
 from synapse.logging.context import run_in_background
 from synapse.logging.opentracing import (
+    SynapseTags,
     get_active_span_text_map,
     log_kv,
     set_tag,
-    start_active_span,
 )
 from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
 from synapse.types import JsonDict, Requester, UserID, get_domain_from_id
@@ -183,7 +183,10 @@ class DeviceMessageHandler:
     ) -> None:
         sender_user_id = requester.user.to_string()
 
-        set_tag("number_of_messages", len(messages))
+        message_id = random_string(16)
+        set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
+
+        log_kv({"number_of_to_device_messages": len(messages)})
         set_tag("sender", sender_user_id)
         local_messages = {}
         remote_messages = {}  # type: Dict[str, Dict[str, Dict[str, JsonDict]]]
@@ -205,32 +208,35 @@ class DeviceMessageHandler:
                         "content": message_content,
                         "type": message_type,
                         "sender": sender_user_id,
+                        "message_id": message_id,
                     }
                     for device_id, message_content in by_device.items()
                 }
                 if messages_by_device:
                     local_messages[user_id] = messages_by_device
+                    log_kv(
+                        {
+                            "user_id": user_id,
+                            "device_id": list(messages_by_device),
+                        }
+                    )
             else:
                 destination = get_domain_from_id(user_id)
                 remote_messages.setdefault(destination, {})[user_id] = by_device
 
-        message_id = random_string(16)
-
         context = get_active_span_text_map()
 
         remote_edu_contents = {}
         for destination, messages in remote_messages.items():
-            with start_active_span("to_device_for_user"):
-                set_tag("destination", destination)
-                remote_edu_contents[destination] = {
-                    "messages": messages,
-                    "sender": sender_user_id,
-                    "type": message_type,
-                    "message_id": message_id,
-                    "org.matrix.opentracing_context": json_encoder.encode(context),
-                }
+            log_kv({"destination": destination})
+            remote_edu_contents[destination] = {
+                "messages": messages,
+                "sender": sender_user_id,
+                "type": message_type,
+                "message_id": message_id,
+                "org.matrix.opentracing_context": json_encoder.encode(context),
+            }
 
-        log_kv({"local_messages": local_messages})
         stream_id = await self.store.add_messages_to_device_inbox(
             local_messages, remote_edu_contents
         )
@@ -239,7 +245,6 @@ class DeviceMessageHandler:
             "to_device_key", stream_id, users=local_messages.keys()
         )
 
-        log_kv({"remote_messages": remote_messages})
         if self.federation_sender:
             for destination in remote_messages.keys():
                 # Enqueue a new federation transaction to send the new
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index ee607e6e65..7b356ba7e5 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -24,6 +24,7 @@ from synapse.api.constants import AccountDataTypes, EventTypes, Membership
 from synapse.api.filtering import FilterCollection
 from synapse.events import EventBase
 from synapse.logging.context import current_context
+from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span
 from synapse.push.clientformat import format_push_rules_for_user
 from synapse.storage.roommember import MemberSummary
 from synapse.storage.state import StateFilter
@@ -340,7 +341,14 @@ class SyncHandler:
         full_state: bool = False,
     ) -> SyncResult:
         """Get the sync for client needed to match what the server has now."""
-        return await self.generate_sync_result(sync_config, since_token, full_state)
+        with start_active_span("current_sync_for_user"):
+            log_kv({"since_token": since_token})
+            sync_result = await self.generate_sync_result(
+                sync_config, since_token, full_state
+            )
+
+            set_tag(SynapseTags.SYNC_RESULT, bool(sync_result))
+            return sync_result
 
     async def push_rules_for_user(self, user: UserID) -> JsonDict:
         user_id = user.to_string()
@@ -964,6 +972,7 @@ class SyncHandler:
         # to query up to a given point.
         # Always use the `now_token` in `SyncResultBuilder`
         now_token = self.event_sources.get_current_token()
+        log_kv({"now_token": now_token})
 
         logger.debug(
             "Calculating sync response for %r between %s and %s",
@@ -1225,6 +1234,13 @@ class SyncHandler:
                 user_id, device_id, since_stream_id, now_token.to_device_key
             )
 
+            for message in messages:
+                # We pop here as we shouldn't be sending the message ID down
+                # `/sync`
+                message_id = message.pop("message_id", None)
+                if message_id:
+                    set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
+
             logger.debug(
                 "Returning %d to-device messages between %d and %d (current token: %d)",
                 len(messages),
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 096d199f4c..bb35af099d 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -19,7 +19,10 @@ from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Set, Tuple
 
 from synapse.api.errors import AuthError, ShadowBanError, SynapseError
 from synapse.appservice import ApplicationService
-from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.metrics.background_process_metrics import (
+    run_as_background_process,
+    wrap_as_background_process,
+)
 from synapse.replication.tcp.streams import TypingStream
 from synapse.types import JsonDict, Requester, UserID, get_domain_from_id
 from synapse.util.caches.stream_change_cache import StreamChangeCache
@@ -86,6 +89,7 @@ class FollowerTypingHandler:
         self._member_last_federation_poke = {}
         self.wheel_timer = WheelTimer(bucket_size=5000)
 
+    @wrap_as_background_process("typing._handle_timeouts")
     def _handle_timeouts(self) -> None:
         logger.debug("Checking for typing timeouts")
 
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index aa146e8bb8..b8081f197e 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -259,6 +259,14 @@ except ImportError:
 logger = logging.getLogger(__name__)
 
 
+class SynapseTags:
+    # The message ID of any to_device message processed
+    TO_DEVICE_MESSAGE_ID = "to_device.message_id"
+
+    # Whether the sync response has new data to be returned to the client.
+    SYNC_RESULT = "sync.new_data"
+
+
 # Block everything by default
 # A regex which matches the server_names to expose traces for.
 # None means 'block everything'.
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 1374aae490..d35c1f3f02 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -39,6 +39,7 @@ from synapse.api.errors import AuthError
 from synapse.events import EventBase
 from synapse.handlers.presence import format_user_presence_state
 from synapse.logging.context import PreserveLoggingContext
+from synapse.logging.opentracing import log_kv, start_active_span
 from synapse.logging.utils import log_function
 from synapse.metrics import LaterGauge
 from synapse.streams.config import PaginationConfig
@@ -136,6 +137,15 @@ class _NotifierUserStream:
         self.last_notified_ms = time_now_ms
         noify_deferred = self.notify_deferred
 
+        log_kv(
+            {
+                "notify": self.user_id,
+                "stream": stream_key,
+                "stream_id": stream_id,
+                "listeners": self.count_listeners(),
+            }
+        )
+
         users_woken_by_stream_counter.labels(stream_key).inc()
 
         with PreserveLoggingContext():
@@ -404,6 +414,13 @@ class Notifier:
         with Measure(self.clock, "on_new_event"):
             user_streams = set()
 
+            log_kv(
+                {
+                    "waking_up_explicit_users": len(users),
+                    "waking_up_explicit_rooms": len(rooms),
+                }
+            )
+
             for user in users:
                 user_stream = self.user_to_user_stream.get(str(user))
                 if user_stream is not None:
@@ -476,12 +493,34 @@ class Notifier:
                         (end_time - now) / 1000.0,
                         self.hs.get_reactor(),
                     )
-                    with PreserveLoggingContext():
-                        await listener.deferred
+
+                    with start_active_span("wait_for_events.deferred"):
+                        log_kv(
+                            {
+                                "wait_for_events": "sleep",
+                                "token": prev_token,
+                            }
+                        )
+
+                        with PreserveLoggingContext():
+                            await listener.deferred
+
+                        log_kv(
+                            {
+                                "wait_for_events": "woken",
+                                "token": user_stream.current_token,
+                            }
+                        )
 
                     current_token = user_stream.current_token
 
                     result = await callback(prev_token, current_token)
+                    log_kv(
+                        {
+                            "wait_for_events": "result",
+                            "result": bool(result),
+                        }
+                    )
                     if result:
                         break
 
@@ -489,8 +528,10 @@ class Notifier:
                     # has happened between the old prev_token and the current_token
                     prev_token = current_token
                 except defer.TimeoutError:
+                    log_kv({"wait_for_events": "timeout"})
                     break
                 except defer.CancelledError:
+                    log_kv({"wait_for_events": "cancelled"})
                     break
 
         if result is None: