summary refs log tree commit diff
path: root/synapse/storage/databases/main/deviceinbox.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/deviceinbox.py')
-rw-r--r--synapse/storage/databases/main/deviceinbox.py92
1 files changed, 75 insertions, 17 deletions
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 73c95ffb6f..48a54d9cb8 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -26,8 +26,15 @@ from typing import (
     cast,
 )
 
+from synapse.api.constants import EventContentFields
 from synapse.logging import issue9533_logger
-from synapse.logging.opentracing import log_kv, set_tag, trace
+from synapse.logging.opentracing import (
+    SynapseTags,
+    log_kv,
+    set_tag,
+    start_active_span,
+    trace,
+)
 from synapse.replication.tcp.streams import ToDeviceStream
 from synapse.storage._base import SQLBaseStore, db_to_json
 from synapse.storage.database import (
@@ -397,6 +404,17 @@ class DeviceInboxWorkerStore(SQLBaseStore):
                     (recipient_user_id, recipient_device_id), []
                 ).append(message_dict)
 
+                # start a new span for each message, so that we can tag each separately
+                with start_active_span("get_to_device_message"):
+                    set_tag(SynapseTags.TO_DEVICE_TYPE, message_dict["type"])
+                    set_tag(SynapseTags.TO_DEVICE_SENDER, message_dict["sender"])
+                    set_tag(SynapseTags.TO_DEVICE_RECIPIENT, recipient_user_id)
+                    set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, recipient_device_id)
+                    set_tag(
+                        SynapseTags.TO_DEVICE_MSGID,
+                        message_dict["content"].get(EventContentFields.TO_DEVICE_MSGID),
+                    )
+
             if limit is not None and rowcount == limit:
                 # We ended up bumping up against the message limit. There may be more messages
                 # to retrieve. Return what we have, as well as the last stream position that
@@ -678,12 +696,35 @@ class DeviceInboxWorkerStore(SQLBaseStore):
                 ],
             )
 
-            if remote_messages_by_destination:
-                issue9533_logger.debug(
-                    "Queued outgoing to-device messages with stream_id %i for %s",
-                    stream_id,
-                    list(remote_messages_by_destination.keys()),
-                )
+            for destination, edu in remote_messages_by_destination.items():
+                if issue9533_logger.isEnabledFor(logging.DEBUG):
+                    issue9533_logger.debug(
+                        "Queued outgoing to-device messages with "
+                        "stream_id %i, EDU message_id %s, type %s for %s: %s",
+                        stream_id,
+                        edu["message_id"],
+                        edu["type"],
+                        destination,
+                        [
+                            f"{user_id}/{device_id} (msgid "
+                            f"{msg.get(EventContentFields.TO_DEVICE_MSGID)})"
+                            for (user_id, messages_by_device) in edu["messages"].items()
+                            for (device_id, msg) in messages_by_device.items()
+                        ],
+                    )
+
+                for (user_id, messages_by_device) in edu["messages"].items():
+                    for (device_id, msg) in messages_by_device.items():
+                        with start_active_span("store_outgoing_to_device_message"):
+                            set_tag(SynapseTags.TO_DEVICE_EDU_ID, edu["sender"])
+                            set_tag(SynapseTags.TO_DEVICE_EDU_ID, edu["message_id"])
+                            set_tag(SynapseTags.TO_DEVICE_TYPE, edu["type"])
+                            set_tag(SynapseTags.TO_DEVICE_RECIPIENT, user_id)
+                            set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, device_id)
+                            set_tag(
+                                SynapseTags.TO_DEVICE_MSGID,
+                                msg.get(EventContentFields.TO_DEVICE_MSGID),
+                            )
 
         async with self._device_inbox_id_gen.get_next() as stream_id:
             now_ms = self._clock.time_msec()
@@ -801,7 +842,19 @@ class DeviceInboxWorkerStore(SQLBaseStore):
                     # Only insert into the local inbox if the device exists on
                     # this server
                     device_id = row["device_id"]
-                    message_json = json_encoder.encode(messages_by_device[device_id])
+
+                    with start_active_span("serialise_to_device_message"):
+                        msg = messages_by_device[device_id]
+                        set_tag(SynapseTags.TO_DEVICE_TYPE, msg["type"])
+                        set_tag(SynapseTags.TO_DEVICE_SENDER, msg["sender"])
+                        set_tag(SynapseTags.TO_DEVICE_RECIPIENT, user_id)
+                        set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, device_id)
+                        set_tag(
+                            SynapseTags.TO_DEVICE_MSGID,
+                            msg["content"].get(EventContentFields.TO_DEVICE_MSGID),
+                        )
+                        message_json = json_encoder.encode(msg)
+
                     messages_json_for_user[device_id] = message_json
 
             if messages_json_for_user:
@@ -821,15 +874,20 @@ class DeviceInboxWorkerStore(SQLBaseStore):
             ],
         )
 
-        issue9533_logger.debug(
-            "Stored to-device messages with stream_id %i for %s",
-            stream_id,
-            [
-                (user_id, device_id)
-                for (user_id, messages_by_device) in local_by_user_then_device.items()
-                for device_id in messages_by_device.keys()
-            ],
-        )
+        if issue9533_logger.isEnabledFor(logging.DEBUG):
+            issue9533_logger.debug(
+                "Stored to-device messages with stream_id %i: %s",
+                stream_id,
+                [
+                    f"{user_id}/{device_id} (msgid "
+                    f"{msg['content'].get(EventContentFields.TO_DEVICE_MSGID)})"
+                    for (
+                        user_id,
+                        messages_by_device,
+                    ) in messages_by_user_then_device.items()
+                    for (device_id, msg) in messages_by_device.items()
+                ],
+            )
 
 
 class DeviceInboxBackgroundUpdateStore(SQLBaseStore):