summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/5853.feature1
-rw-r--r--synapse/handlers/device.py65
-rw-r--r--synapse/handlers/devicemessage.py6
-rw-r--r--synapse/logging/opentracing.py70
-rw-r--r--synapse/rest/client/v2_alpha/keys.py4
-rw-r--r--synapse/rest/client/v2_alpha/sendtodevice.py4
-rw-r--r--synapse/storage/deviceinbox.py21
-rw-r--r--synapse/storage/devices.py5
8 files changed, 118 insertions, 58 deletions
diff --git a/changelog.d/5853.feature b/changelog.d/5853.feature
new file mode 100644
index 0000000000..80a04ae2ee
--- /dev/null
+++ b/changelog.d/5853.feature
@@ -0,0 +1 @@
+Opentracing for device list updates.
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 5c1cf83c9d..71a8f33da3 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -25,6 +25,7 @@ from synapse.api.errors import (
     HttpResponseException,
     RequestSendFailed,
 )
+from synapse.logging.opentracing import log_kv, set_tag, trace
 from synapse.types import RoomStreamToken, get_domain_from_id
 from synapse.util import stringutils
 from synapse.util.async_helpers import Linearizer
@@ -45,6 +46,7 @@ class DeviceWorkerHandler(BaseHandler):
         self.state = hs.get_state_handler()
         self._auth_handler = hs.get_auth_handler()
 
+    @trace
     @defer.inlineCallbacks
     def get_devices_by_user(self, user_id):
         """
@@ -56,6 +58,7 @@ class DeviceWorkerHandler(BaseHandler):
             defer.Deferred: list[dict[str, X]]: info on each device
         """
 
+        set_tag("user_id", user_id)
         device_map = yield self.store.get_devices_by_user(user_id)
 
         ips = yield self.store.get_last_client_ip_by_device(user_id, device_id=None)
@@ -64,8 +67,10 @@ class DeviceWorkerHandler(BaseHandler):
         for device in devices:
             _update_device_from_client_ips(device, ips)
 
+        log_kv(device_map)
         return devices
 
+    @trace
     @defer.inlineCallbacks
     def get_device(self, user_id, device_id):
         """ Retrieve the given device
@@ -85,9 +90,14 @@ class DeviceWorkerHandler(BaseHandler):
             raise errors.NotFoundError
         ips = yield self.store.get_last_client_ip_by_device(user_id, device_id)
         _update_device_from_client_ips(device, ips)
+
+        set_tag("device", device)
+        set_tag("ips", ips)
+
         return device
 
     @measure_func("device.get_user_ids_changed")
+    @trace
     @defer.inlineCallbacks
     def get_user_ids_changed(self, user_id, from_token):
         """Get list of users that have had the devices updated, or have newly
@@ -97,6 +107,9 @@ class DeviceWorkerHandler(BaseHandler):
             user_id (str)
             from_token (StreamToken)
         """
+
+        set_tag("user_id", user_id)
+        set_tag("from_token", from_token)
         now_room_key = yield self.store.get_room_events_max_id()
 
         room_ids = yield self.store.get_rooms_for_user(user_id)
@@ -148,6 +161,9 @@ class DeviceWorkerHandler(BaseHandler):
             # special-case for an empty prev state: include all members
             # in the changed list
             if not event_ids:
+                log_kv(
+                    {"event": "encountered empty previous state", "room_id": room_id}
+                )
                 for key, event_id in iteritems(current_state_ids):
                     etype, state_key = key
                     if etype != EventTypes.Member:
@@ -200,7 +216,11 @@ class DeviceWorkerHandler(BaseHandler):
             possibly_joined = []
             possibly_left = []
 
-        return {"changed": list(possibly_joined), "left": list(possibly_left)}
+        result = {"changed": list(possibly_joined), "left": list(possibly_left)}
+
+        log_kv(result)
+
+        return result
 
 
 class DeviceHandler(DeviceWorkerHandler):
@@ -267,6 +287,7 @@ class DeviceHandler(DeviceWorkerHandler):
 
         raise errors.StoreError(500, "Couldn't generate a device ID.")
 
+    @trace
     @defer.inlineCallbacks
     def delete_device(self, user_id, device_id):
         """ Delete the given device
@@ -284,6 +305,10 @@ class DeviceHandler(DeviceWorkerHandler):
         except errors.StoreError as e:
             if e.code == 404:
                 # no match
+                set_tag("error", True)
+                log_kv(
+                    {"reason": "User doesn't have device id.", "device_id": device_id}
+                )
                 pass
             else:
                 raise
@@ -296,6 +321,7 @@ class DeviceHandler(DeviceWorkerHandler):
 
         yield self.notify_device_update(user_id, [device_id])
 
+    @trace
     @defer.inlineCallbacks
     def delete_all_devices_for_user(self, user_id, except_device_id=None):
         """Delete all of the user's devices
@@ -331,6 +357,8 @@ class DeviceHandler(DeviceWorkerHandler):
         except errors.StoreError as e:
             if e.code == 404:
                 # no match
+                set_tag("error", True)
+                set_tag("reason", "User doesn't have that device id.")
                 pass
             else:
                 raise
@@ -371,6 +399,7 @@ class DeviceHandler(DeviceWorkerHandler):
             else:
                 raise
 
+    @trace
     @measure_func("notify_device_update")
     @defer.inlineCallbacks
     def notify_device_update(self, user_id, device_ids):
@@ -386,6 +415,8 @@ class DeviceHandler(DeviceWorkerHandler):
             hosts.update(get_domain_from_id(u) for u in users_who_share_room)
             hosts.discard(self.server_name)
 
+        set_tag("target_hosts", hosts)
+
         position = yield self.store.add_device_change_to_streams(
             user_id, device_ids, list(hosts)
         )
@@ -405,6 +436,7 @@ class DeviceHandler(DeviceWorkerHandler):
             )
             for host in hosts:
                 self.federation_sender.send_device_messages(host)
+                log_kv({"message": "sent device update to host", "host": host})
 
     @defer.inlineCallbacks
     def on_federation_query_user_devices(self, user_id):
@@ -451,12 +483,15 @@ class DeviceListUpdater(object):
             iterable=True,
         )
 
+    @trace
     @defer.inlineCallbacks
     def incoming_device_list_update(self, origin, edu_content):
         """Called on incoming device list update from federation. Responsible
         for parsing the EDU and adding to pending updates list.
         """
 
+        set_tag("origin", origin)
+        set_tag("edu_content", edu_content)
         user_id = edu_content.pop("user_id")
         device_id = edu_content.pop("device_id")
         stream_id = str(edu_content.pop("stream_id"))  # They may come as ints
@@ -471,12 +506,30 @@ class DeviceListUpdater(object):
                 device_id,
                 origin,
             )
+
+            set_tag("error", True)
+            log_kv(
+                {
+                    "message": "Got a device list update edu from a user and "
+                    "device which does not match the origin of the request.",
+                    "user_id": user_id,
+                    "device_id": device_id,
+                }
+            )
             return
 
         room_ids = yield self.store.get_rooms_for_user(user_id)
         if not room_ids:
             # We don't share any rooms with this user. Ignore update, as we
             # probably won't get any further updates.
+            set_tag("error", True)
+            log_kv(
+                {
+                    "message": "Got an update from a user for which "
+                    "we don't share any rooms",
+                    "other user_id": user_id,
+                }
+            )
             logger.warning(
                 "Got device list update edu for %r/%r, but don't share a room",
                 user_id,
@@ -578,6 +631,7 @@ class DeviceListUpdater(object):
             request:
             https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid
         """
+        log_kv({"message": "Doing resync to update device list."})
         # Fetch all devices for the user.
         origin = get_domain_from_id(user_id)
         try:
@@ -594,13 +648,20 @@ class DeviceListUpdater(object):
             # eventually become consistent.
             return
         except FederationDeniedError as e:
+            set_tag("error", True)
+            log_kv({"reason": "FederationDeniedError"})
             logger.info(e)
             return
-        except Exception:
+        except Exception as e:
             # TODO: Remember that we are now out of sync and try again
             # later
+            set_tag("error", True)
+            log_kv(
+                {"message": "Exception raised by federation request", "exception": e}
+            )
             logger.exception("Failed to handle device list update for %s", user_id)
             return
+        log_kv({"result": result})
         stream_id = result["stream_id"]
         devices = result["devices"]
 
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index c7d56779b8..01731cb2d0 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -22,6 +22,7 @@ from twisted.internet import defer
 from synapse.api.errors import SynapseError
 from synapse.logging.opentracing import (
     get_active_span_text_map,
+    log_kv,
     set_tag,
     start_active_span,
     whitelisted_homeserver,
@@ -86,7 +87,8 @@ class DeviceMessageHandler(object):
 
     @defer.inlineCallbacks
     def send_device_message(self, sender_user_id, message_type, messages):
-
+        set_tag("number_of_messages", len(messages))
+        set_tag("sender", sender_user_id)
         local_messages = {}
         remote_messages = {}
         for user_id, by_device in messages.items():
@@ -124,6 +126,7 @@ class DeviceMessageHandler(object):
                     else None,
                 }
 
+        log_kv({"local_messages": local_messages})
         stream_id = yield self.store.add_messages_to_device_inbox(
             local_messages, remote_edu_contents
         )
@@ -132,6 +135,7 @@ class DeviceMessageHandler(object):
             "to_device_key", stream_id, users=local_messages.keys()
         )
 
+        log_kv({"remote_messages": remote_messages})
         for destination in remote_messages.keys():
             # Enqueue a new federation transaction to send the new
             # device messages to each remote destination.
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index dd296027a1..256b972aaa 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -85,14 +85,14 @@ the function becomes the operation name for the span.
        return something_usual_and_useful
 
 
-Operation names can be explicitly set for functions by using
-``trace_using_operation_name``
+Operation names can be explicitly set for a function by passing the
+operation name to ``trace``
 
 .. code-block:: python
 
-   from synapse.logging.opentracing import trace_using_operation_name
+   from synapse.logging.opentracing import trace
 
-   @trace_using_operation_name("A *much* better operation name")
+   @trace(opname="a_better_operation_name")
    def interesting_badly_named_function(*args, **kwargs):
        # Does all kinds of cool and expected things
        return something_usual_and_useful
@@ -641,66 +641,26 @@ def extract_text_map(carrier):
 # Tracing decorators
 
 
-def trace(func):
+def trace(func=None, opname=None):
     """
     Decorator to trace a function.
-    Sets the operation name to that of the function's.
+    Sets the operation name to that of the function's or that given
+    as operation_name. See the module's doc string for usage
+    examples.
     """
-    if opentracing is None:
-        return func
 
-    @wraps(func)
-    def _trace_inner(self, *args, **kwargs):
-        if opentracing is None:
-            return func(self, *args, **kwargs)
-
-        scope = start_active_span(func.__name__)
-        scope.__enter__()
-
-        try:
-            result = func(self, *args, **kwargs)
-            if isinstance(result, defer.Deferred):
-
-                def call_back(result):
-                    scope.__exit__(None, None, None)
-                    return result
-
-                def err_back(result):
-                    scope.span.set_tag(tags.ERROR, True)
-                    scope.__exit__(None, None, None)
-                    return result
-
-                result.addCallbacks(call_back, err_back)
-
-            else:
-                scope.__exit__(None, None, None)
-
-            return result
-
-        except Exception as e:
-            scope.__exit__(type(e), None, e.__traceback__)
-            raise
-
-    return _trace_inner
-
-
-def trace_using_operation_name(operation_name):
-    """Decorator to trace a function. Explicitely sets the operation_name."""
-
-    def trace(func):
-        """
-        Decorator to trace a function.
-        Sets the operation name to that of the function's.
-        """
+    def decorator(func):
         if opentracing is None:
             return func
 
+        _opname = opname if opname else func.__name__
+
         @wraps(func)
         def _trace_inner(self, *args, **kwargs):
             if opentracing is None:
                 return func(self, *args, **kwargs)
 
-            scope = start_active_span(operation_name)
+            scope = start_active_span(_opname)
             scope.__enter__()
 
             try:
@@ -717,6 +677,7 @@ def trace_using_operation_name(operation_name):
                         return result
 
                     result.addCallbacks(call_back, err_back)
+
                 else:
                     scope.__exit__(None, None, None)
 
@@ -728,7 +689,10 @@ def trace_using_operation_name(operation_name):
 
         return _trace_inner
 
-    return trace
+    if func:
+        return decorator(func)
+    else:
+        return decorator
 
 
 def tag_args(func):
diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py
index 64b6898eb8..2e680134a0 100644
--- a/synapse/rest/client/v2_alpha/keys.py
+++ b/synapse/rest/client/v2_alpha/keys.py
@@ -24,7 +24,7 @@ from synapse.http.servlet import (
     parse_json_object_from_request,
     parse_string,
 )
-from synapse.logging.opentracing import log_kv, set_tag, trace_using_operation_name
+from synapse.logging.opentracing import log_kv, set_tag, trace
 from synapse.types import StreamToken
 
 from ._base import client_patterns
@@ -69,7 +69,7 @@ class KeyUploadServlet(RestServlet):
         self.auth = hs.get_auth()
         self.e2e_keys_handler = hs.get_e2e_keys_handler()
 
-    @trace_using_operation_name("upload_keys")
+    @trace(opname="upload_keys")
     @defer.inlineCallbacks
     def on_POST(self, request, device_id):
         requester = yield self.auth.get_user_by_req(request, allow_guest=True)
diff --git a/synapse/rest/client/v2_alpha/sendtodevice.py b/synapse/rest/client/v2_alpha/sendtodevice.py
index 2613648d82..d90e52ed1a 100644
--- a/synapse/rest/client/v2_alpha/sendtodevice.py
+++ b/synapse/rest/client/v2_alpha/sendtodevice.py
@@ -19,6 +19,7 @@ from twisted.internet import defer
 
 from synapse.http import servlet
 from synapse.http.servlet import parse_json_object_from_request
+from synapse.logging.opentracing import set_tag, trace
 from synapse.rest.client.transactions import HttpTransactionCache
 
 from ._base import client_patterns
@@ -42,7 +43,10 @@ class SendToDeviceRestServlet(servlet.RestServlet):
         self.txns = HttpTransactionCache(hs)
         self.device_message_handler = hs.get_device_message_handler()
 
+    @trace(opname="sendToDevice")
     def on_PUT(self, request, message_type, txn_id):
+        set_tag("message_type", message_type)
+        set_tag("txn_id", txn_id)
         return self.txns.fetch_or_execute_request(
             request, self._put, request, message_type, txn_id
         )
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index 4dca9de617..6b7458304e 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -19,6 +19,7 @@ from canonicaljson import json
 
 from twisted.internet import defer
 
+from synapse.logging.opentracing import log_kv, set_tag, trace
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.background_updates import BackgroundUpdateStore
 from synapse.util.caches.expiringcache import ExpiringCache
@@ -72,6 +73,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
             "get_new_messages_for_device", get_new_messages_for_device_txn
         )
 
+    @trace
     @defer.inlineCallbacks
     def delete_messages_for_device(self, user_id, device_id, up_to_stream_id):
         """
@@ -87,11 +89,15 @@ class DeviceInboxWorkerStore(SQLBaseStore):
         last_deleted_stream_id = self._last_device_delete_cache.get(
             (user_id, device_id), None
         )
+
+        set_tag("last_deleted_stream_id", last_deleted_stream_id)
+
         if last_deleted_stream_id:
             has_changed = self._device_inbox_stream_cache.has_entity_changed(
                 user_id, last_deleted_stream_id
             )
             if not has_changed:
+                log_kv({"message": "No changes in cache since last check"})
                 return 0
 
         def delete_messages_for_device_txn(txn):
@@ -107,6 +113,10 @@ class DeviceInboxWorkerStore(SQLBaseStore):
             "delete_messages_for_device", delete_messages_for_device_txn
         )
 
+        log_kv(
+            {"message": "deleted {} messages for device".format(count), "count": count}
+        )
+
         # Update the cache, ensuring that we only ever increase the value
         last_deleted_stream_id = self._last_device_delete_cache.get(
             (user_id, device_id), 0
@@ -117,6 +127,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
 
         return count
 
+    @trace
     def get_new_device_msgs_for_remote(
         self, destination, last_stream_id, current_stream_id, limit
     ):
@@ -132,16 +143,23 @@ class DeviceInboxWorkerStore(SQLBaseStore):
                 in the stream the messages got to.
         """
 
+        set_tag("destination", destination)
+        set_tag("last_stream_id", last_stream_id)
+        set_tag("current_stream_id", current_stream_id)
+        set_tag("limit", limit)
+
         has_changed = self._device_federation_outbox_stream_cache.has_entity_changed(
             destination, last_stream_id
         )
         if not has_changed or last_stream_id == current_stream_id:
+            log_kv({"message": "No new messages in stream"})
             return defer.succeed(([], current_stream_id))
 
         if limit <= 0:
             # This can happen if we run out of room for EDUs in the transaction.
             return defer.succeed(([], last_stream_id))
 
+        @trace
         def get_new_messages_for_remote_destination_txn(txn):
             sql = (
                 "SELECT stream_id, messages_json FROM device_federation_outbox"
@@ -156,6 +174,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
                 stream_pos = row[0]
                 messages.append(json.loads(row[1]))
             if len(messages) < limit:
+                log_kv({"message": "Set stream position to current position"})
                 stream_pos = current_stream_id
             return messages, stream_pos
 
@@ -164,6 +183,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
             get_new_messages_for_remote_destination_txn,
         )
 
+    @trace
     def delete_device_msgs_for_remote(self, destination, up_to_stream_id):
         """Used to delete messages when the remote destination acknowledges
         their receipt.
@@ -214,6 +234,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
             expiry_ms=30 * 60 * 1000,
         )
 
+    @trace
     @defer.inlineCallbacks
     def add_messages_to_device_inbox(
         self, local_messages_by_user_then_device, remote_messages_by_destination
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 76542c512d..41f62828bd 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -23,6 +23,7 @@ from twisted.internet import defer
 from synapse.api.errors import StoreError
 from synapse.logging.opentracing import (
     get_active_span_text_map,
+    set_tag,
     trace,
     whitelisted_homeserver,
 )
@@ -321,6 +322,7 @@ class DeviceWorkerStore(SQLBaseStore):
     def get_device_stream_token(self):
         return self._device_list_id_gen.get_current_token()
 
+    @trace
     @defer.inlineCallbacks
     def get_user_devices_from_cache(self, query_list):
         """Get the devices (and keys if any) for remote users from the cache.
@@ -352,6 +354,9 @@ class DeviceWorkerStore(SQLBaseStore):
             else:
                 results[user_id] = yield self._get_cached_devices_for_user(user_id)
 
+        set_tag("in_cache", results)
+        set_tag("not_in_cache", user_ids_not_in_cache)
+
         return user_ids_not_in_cache, results
 
     @cachedInlineCallbacks(num_args=2, tree=True)