diff options
author | Erik Johnston <erik@matrix.org> | 2021-04-01 17:08:21 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-04-01 17:08:21 +0100 |
commit | 33548f37aa7858c4d9ce01bf3ec931cc3f08833a (patch) | |
tree | a301076b73976909451029fa1fdb53ef3d03a6ae /synapse/handlers | |
parent | Add `order_by` to list user admin API (#9691) (diff) | |
download | synapse-33548f37aa7858c4d9ce01bf3ec931cc3f08833a.tar.xz |
Improve tracing for to device messages (#9686)
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/devicemessage.py | 35 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 18 | ||||
-rw-r--r-- | synapse/handlers/typing.py | 6 |
3 files changed, 42 insertions, 17 deletions
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index 5ee48be6ff..c971eeb4d2 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -21,10 +21,10 @@ from synapse.api.errors import SynapseError from synapse.api.ratelimiting import Ratelimiter from synapse.logging.context import run_in_background from synapse.logging.opentracing import ( + SynapseTags, get_active_span_text_map, log_kv, set_tag, - start_active_span, ) from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet from synapse.types import JsonDict, Requester, UserID, get_domain_from_id @@ -183,7 +183,10 @@ class DeviceMessageHandler: ) -> None: sender_user_id = requester.user.to_string() - set_tag("number_of_messages", len(messages)) + message_id = random_string(16) + set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id) + + log_kv({"number_of_to_device_messages": len(messages)}) set_tag("sender", sender_user_id) local_messages = {} remote_messages = {} # type: Dict[str, Dict[str, Dict[str, JsonDict]]] @@ -205,32 +208,35 @@ class DeviceMessageHandler: "content": message_content, "type": message_type, "sender": sender_user_id, + "message_id": message_id, } for device_id, message_content in by_device.items() } if messages_by_device: local_messages[user_id] = messages_by_device + log_kv( + { + "user_id": user_id, + "device_id": list(messages_by_device), + } + ) else: destination = get_domain_from_id(user_id) remote_messages.setdefault(destination, {})[user_id] = by_device - message_id = random_string(16) - context = get_active_span_text_map() remote_edu_contents = {} for destination, messages in remote_messages.items(): - with start_active_span("to_device_for_user"): - set_tag("destination", destination) - remote_edu_contents[destination] = { - "messages": messages, - "sender": sender_user_id, - "type": message_type, - "message_id": message_id, - "org.matrix.opentracing_context": json_encoder.encode(context), - } + log_kv({"destination": destination}) + remote_edu_contents[destination] = { + "messages": messages, + "sender": sender_user_id, + "type": message_type, + "message_id": message_id, + "org.matrix.opentracing_context": json_encoder.encode(context), + } - log_kv({"local_messages": local_messages}) stream_id = await self.store.add_messages_to_device_inbox( local_messages, remote_edu_contents ) @@ -239,7 +245,6 @@ class DeviceMessageHandler: "to_device_key", stream_id, users=local_messages.keys() ) - log_kv({"remote_messages": remote_messages}) if self.federation_sender: for destination in remote_messages.keys(): # Enqueue a new federation transaction to send the new diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index ee607e6e65..7b356ba7e5 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -24,6 +24,7 @@ from synapse.api.constants import AccountDataTypes, EventTypes, Membership from synapse.api.filtering import FilterCollection from synapse.events import EventBase from synapse.logging.context import current_context +from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span from synapse.push.clientformat import format_push_rules_for_user from synapse.storage.roommember import MemberSummary from synapse.storage.state import StateFilter @@ -340,7 +341,14 @@ class SyncHandler: full_state: bool = False, ) -> SyncResult: """Get the sync for client needed to match what the server has now.""" - return await self.generate_sync_result(sync_config, since_token, full_state) + with start_active_span("current_sync_for_user"): + log_kv({"since_token": since_token}) + sync_result = await self.generate_sync_result( + sync_config, since_token, full_state + ) + + set_tag(SynapseTags.SYNC_RESULT, bool(sync_result)) + return sync_result async def push_rules_for_user(self, user: UserID) -> JsonDict: user_id = user.to_string() @@ -964,6 +972,7 @@ class SyncHandler: # to query up to a given point. # Always use the `now_token` in `SyncResultBuilder` now_token = self.event_sources.get_current_token() + log_kv({"now_token": now_token}) logger.debug( "Calculating sync response for %r between %s and %s", @@ -1225,6 +1234,13 @@ class SyncHandler: user_id, device_id, since_stream_id, now_token.to_device_key ) + for message in messages: + # We pop here as we shouldn't be sending the message ID down + # `/sync` + message_id = message.pop("message_id", None) + if message_id: + set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id) + logger.debug( "Returning %d to-device messages between %d and %d (current token: %d)", len(messages), diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 096d199f4c..bb35af099d 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -19,7 +19,10 @@ from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Set, Tuple from synapse.api.errors import AuthError, ShadowBanError, SynapseError from synapse.appservice import ApplicationService -from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.metrics.background_process_metrics import ( + run_as_background_process, + wrap_as_background_process, +) from synapse.replication.tcp.streams import TypingStream from synapse.types import JsonDict, Requester, UserID, get_domain_from_id from synapse.util.caches.stream_change_cache import StreamChangeCache @@ -86,6 +89,7 @@ class FollowerTypingHandler: self._member_last_federation_poke = {} self.wheel_timer = WheelTimer(bucket_size=5000) + @wrap_as_background_process("typing._handle_timeouts") def _handle_timeouts(self) -> None: logger.debug("Checking for typing timeouts") |