summary refs log tree commit diff
diff options
context:
space:
mode:
authorJorik Schellekens <joriks@matrix.org>2019-06-28 10:34:53 +0100
committerJorik Schellekens <joriks@matrix.org>2019-07-23 15:59:23 +0100
commit565544b6039c619cc4e9af8ad7f04793c4525b70 (patch)
treecdf1cc92e150ceffc2d8f85692c8e633e9ddc311
parentOne tracing decorator to rule them all. (diff)
downloadsynapse-565544b6039c619cc4e9af8ad7f04793c4525b70.tar.xz
Trace e2e
-rw-r--r--synapse/handlers/e2e_keys.py50
-rw-r--r--synapse/handlers/e2e_room_keys.py10
-rw-r--r--synapse/rest/client/v2_alpha/keys.py13
-rw-r--r--synapse/storage/e2e_room_keys.py9
-rw-r--r--synapse/storage/end_to_end_keys.py35
5 files changed, 115 insertions, 2 deletions
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 1300b540e3..8b2249fc5b 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -15,6 +15,7 @@
 # limitations under the License.
 
 import logging
+from synapse.util.tracerutils import TracerUtil, trace_defered_function
 
 from six import iteritems
 
@@ -45,6 +46,7 @@ class E2eKeysHandler(object):
             "client_keys", self.on_federation_query_client_keys
         )
 
+    @trace_defered_function
     @defer.inlineCallbacks
     def query_devices(self, query_body, timeout):
         """ Handle a device key query from a client
@@ -65,6 +67,7 @@ class E2eKeysHandler(object):
             }
         }
         """
+
         device_keys_query = query_body.get("device_keys", {})
 
         # separate users by domain.
@@ -79,6 +82,9 @@ class E2eKeysHandler(object):
             else:
                 remote_queries[user_id] = device_ids
 
+        TracerUtil.set_tag("local_key_query", local_query)
+        TracerUtil.set_tag("remote_key_query", remote_queries)
+
         # First get local devices.
         failures = {}
         results = {}
@@ -119,9 +125,12 @@ class E2eKeysHandler(object):
                 r[user_id] = remote_queries[user_id]
 
         # Now fetch any devices that we don't have in our cache
+        @trace_defered_function
         @defer.inlineCallbacks
         def do_remote_query(destination):
             destination_query = remote_queries_not_in_cache[destination]
+
+            TracerUtil.set_tag("key_query", destination_query)
             try:
                 remote_result = yield self.federation.query_client_keys(
                     destination, {"device_keys": destination_query}, timeout=timeout
@@ -132,7 +141,10 @@ class E2eKeysHandler(object):
                         results[user_id] = keys
 
             except Exception as e:
-                failures[destination] = _exception_to_failure(e)
+                failure = _exception_to_failure(e)
+                failures[destination] = failure
+                TracerUtil.set_tag("error", True)
+                TracerUtil.set_tag("reason", failure)
 
         yield make_deferred_yieldable(
             defer.gatherResults(
@@ -146,6 +158,7 @@ class E2eKeysHandler(object):
 
         return {"device_keys": results, "failures": failures}
 
+    @trace_defered_function
     @defer.inlineCallbacks
     def query_local_devices(self, query):
         """Get E2E device keys for local users
@@ -158,6 +171,7 @@ class E2eKeysHandler(object):
             defer.Deferred: (resolves to dict[string, dict[string, dict]]):
                  map from user_id -> device_id -> device details
         """
+        TracerUtil.set_tag("local_query", query)
         local_query = []
 
         result_dict = {}
@@ -165,6 +179,14 @@ class E2eKeysHandler(object):
             # we use UserID.from_string to catch invalid user ids
             if not self.is_mine(UserID.from_string(user_id)):
                 logger.warning("Request for keys for non-local user %s", user_id)
+                TracerUtil.log_kv(
+                    {
+                        "message": "Requested a local key for a user which"
+                        + " was not local to the homeserver",
+                        "user_id": user_id,
+                    }
+                )
+                TracerUtil.set_tag("error", True)
                 raise SynapseError(400, "Not a user here")
 
             if not device_ids:
@@ -189,6 +211,7 @@ class E2eKeysHandler(object):
                     r["unsigned"]["device_display_name"] = display_name
                 result_dict[user_id][device_id] = r
 
+        TracerUtil.log_kv(results)
         return result_dict
 
     @defer.inlineCallbacks
@@ -199,6 +222,7 @@ class E2eKeysHandler(object):
         res = yield self.query_local_devices(device_keys_query)
         return {"device_keys": res}
 
+    @trace_defered_function
     @defer.inlineCallbacks
     def claim_one_time_keys(self, query, timeout):
         local_query = []
@@ -213,6 +237,9 @@ class E2eKeysHandler(object):
                 domain = get_domain_from_id(user_id)
                 remote_queries.setdefault(domain, {})[user_id] = device_keys
 
+        TracerUtil.set_tag("local_key_query", local_query)
+        TracerUtil.set_tag("remote_key_query", remote_queries)
+
         results = yield self.store.claim_e2e_one_time_keys(local_query)
 
         json_result = {}
@@ -224,8 +251,10 @@ class E2eKeysHandler(object):
                         key_id: json.loads(json_bytes)
                     }
 
+        @trace_defered_function
         @defer.inlineCallbacks
         def claim_client_keys(destination):
+            TracerUtil.set_tag("destination", destination)
             device_keys = remote_queries[destination]
             try:
                 remote_result = yield self.federation.claim_client_keys(
@@ -234,8 +263,12 @@ class E2eKeysHandler(object):
                 for user_id, keys in remote_result["one_time_keys"].items():
                     if user_id in device_keys:
                         json_result[user_id] = keys
+
             except Exception as e:
-                failures[destination] = _exception_to_failure(e)
+                failure = _exception_to_failure(e)
+                failures[destination] = failure
+                TracerUtil.set_tag("error", True)
+                TracerUtil.set_tag("reason", failure)
 
         yield make_deferred_yieldable(
             defer.gatherResults(
@@ -259,14 +292,21 @@ class E2eKeysHandler(object):
             ),
         )
 
+        TracerUtil.log_kv({"one_time_keys": json_result, "failures": failures})
         return {"one_time_keys": json_result, "failures": failures}
 
+    @trace_defered_function
     @defer.inlineCallbacks
     def upload_keys_for_user(self, user_id, device_id, keys):
+        TracerUtil.set_tag("user_id", user_id)
+        TracerUtil.set_tag("device_id", device_id)
+        TracerUtil.set_tag("keys", keys)
+
         time_now = self.clock.time_msec()
 
         # TODO: Validate the JSON to make sure it has the right keys.
         device_keys = keys.get("device_keys", None)
+        TracerUtil.set_tag("device_keys", device_keys)
         if device_keys:
             logger.info(
                 "Updating device_keys for device %r for user %s at %d",
@@ -287,6 +327,10 @@ class E2eKeysHandler(object):
             yield self._upload_one_time_keys_for_user(
                 user_id, device_id, time_now, one_time_keys
             )
+        else:
+            TracerUtil.log_kv(
+                {"event": "did not upload one_time_keys", "reason": "no keys given"}
+            )
 
         # the device should have been registered already, but it may have been
         # deleted due to a race with a DELETE request. Or we may be using an
@@ -297,8 +341,10 @@ class E2eKeysHandler(object):
 
         result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
 
+        TracerUtil.set_tag("one_time_key_counts", result)
         return {"one_time_key_counts": result}
 
+    @trace_defered_function
     @defer.inlineCallbacks
     def _upload_one_time_keys_for_user(
         self, user_id, device_id, time_now, one_time_keys
diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
index 41b871fc59..06837bb126 100644
--- a/synapse/handlers/e2e_room_keys.py
+++ b/synapse/handlers/e2e_room_keys.py
@@ -27,6 +27,7 @@ from synapse.api.errors import (
     SynapseError,
 )
 from synapse.util.async_helpers import Linearizer
+from synapse.util.tracerutils import TracerUtil, trace_defered_function
 
 logger = logging.getLogger(__name__)
 
@@ -49,6 +50,7 @@ class E2eRoomKeysHandler(object):
         # changed.
         self._upload_linearizer = Linearizer("upload_room_keys_lock")
 
+    @trace_defered_function
     @defer.inlineCallbacks
     def get_room_keys(self, user_id, version, room_id=None, session_id=None):
         """Bulk get the E2E room keys for a given backup, optionally filtered to a given
@@ -84,8 +86,10 @@ class E2eRoomKeysHandler(object):
                 user_id, version, room_id, session_id
             )
 
+            TracerUtil.log_kv(results)
             return results
 
+    @trace_defered_function
     @defer.inlineCallbacks
     def delete_room_keys(self, user_id, version, room_id=None, session_id=None):
         """Bulk delete the E2E room keys for a given backup, optionally filtered to a given
@@ -107,6 +111,7 @@ class E2eRoomKeysHandler(object):
         with (yield self._upload_linearizer.queue(user_id)):
             yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)
 
+    @trace_defered_function
     @defer.inlineCallbacks
     def upload_room_keys(self, user_id, version, room_keys):
         """Bulk upload a list of room keys into a given backup version, asserting
@@ -174,6 +179,7 @@ class E2eRoomKeysHandler(object):
                         user_id, version, room_id, session_id, session
                     )
 
+    @trace_defered_function
     @defer.inlineCallbacks
     def _upload_room_key(self, user_id, version, room_id, session_id, room_key):
         """Upload a given room_key for a given room and session into a given
@@ -236,6 +242,7 @@ class E2eRoomKeysHandler(object):
                 return False
         return True
 
+    @trace_defered_function
     @defer.inlineCallbacks
     def create_version(self, user_id, version_info):
         """Create a new backup version.  This automatically becomes the new
@@ -264,6 +271,7 @@ class E2eRoomKeysHandler(object):
             )
             return new_version
 
+    @trace_defered_function
     @defer.inlineCallbacks
     def get_version_info(self, user_id, version=None):
         """Get the info about a given version of the user's backup
@@ -294,6 +302,7 @@ class E2eRoomKeysHandler(object):
                     raise
             return res
 
+    @trace_defered_function
     @defer.inlineCallbacks
     def delete_version(self, user_id, version=None):
         """Deletes a given version of the user's e2e_room_keys backup
@@ -314,6 +323,7 @@ class E2eRoomKeysHandler(object):
                 else:
                     raise
 
+    @trace_defered_function
     @defer.inlineCallbacks
     def update_version(self, user_id, version, version_info):
         """Update the info about a given version of the user's backup
diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py
index 6008adec7c..8de9e12df4 100644
--- a/synapse/rest/client/v2_alpha/keys.py
+++ b/synapse/rest/client/v2_alpha/keys.py
@@ -25,6 +25,10 @@ from synapse.http.servlet import (
     parse_string,
 )
 from synapse.types import StreamToken
+from synapse.util.tracerutils import (
+    TracerUtil,
+    trace_defered_function_using_operation_name,
+)
 
 from ._base import client_patterns
 
@@ -68,6 +72,7 @@ class KeyUploadServlet(RestServlet):
         self.auth = hs.get_auth()
         self.e2e_keys_handler = hs.get_e2e_keys_handler()
 
+    @trace_defered_function_using_operation_name("upload_keys")
     @defer.inlineCallbacks
     def on_POST(self, request, device_id):
         requester = yield self.auth.get_user_by_req(request, allow_guest=True)
@@ -78,6 +83,14 @@ class KeyUploadServlet(RestServlet):
             # passing the device_id here is deprecated; however, we allow it
             # for now for compatibility with older clients.
             if requester.device_id is not None and device_id != requester.device_id:
+                TracerUtil.set_tag("error", True)
+                TracerUtil.log_kv(
+                    {
+                        "message": "Client uploading keys for a different device",
+                        "logged_in_id": requester.device_id,
+                        "key_being_uploaded": device_id,
+                    }
+                )
                 logger.warning(
                     "Client uploading keys for a different device "
                     "(logged in as %s, uploading for %s)",
diff --git a/synapse/storage/e2e_room_keys.py b/synapse/storage/e2e_room_keys.py
index 99128f2df7..b0e426d30e 100644
--- a/synapse/storage/e2e_room_keys.py
+++ b/synapse/storage/e2e_room_keys.py
@@ -18,11 +18,13 @@ import json
 from twisted.internet import defer
 
 from synapse.api.errors import StoreError
+from synapse.util.tracerutils import trace_defered_function, trace_function
 
 from ._base import SQLBaseStore
 
 
 class EndToEndRoomKeyStore(SQLBaseStore):
+    @trace_defered_function
     @defer.inlineCallbacks
     def get_e2e_room_key(self, user_id, version, room_id, session_id):
         """Get the encrypted E2E room key for a given session from a given
@@ -63,6 +65,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
 
         return row
 
+    @trace_defered_function
     @defer.inlineCallbacks
     def set_e2e_room_key(self, user_id, version, room_id, session_id, room_key):
         """Replaces or inserts the encrypted E2E room key for a given session in
@@ -95,6 +98,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
             lock=False,
         )
 
+    @trace_defered_function
     @defer.inlineCallbacks
     def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
         """Bulk get the E2E room keys for a given backup, optionally filtered to a given
@@ -153,6 +157,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
 
         return sessions
 
+    @trace_defered_function
     @defer.inlineCallbacks
     def delete_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
         """Bulk delete the E2E room keys for a given backup, optionally filtered to a given
@@ -194,6 +199,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
             raise StoreError(404, "No current backup version")
         return row[0]
 
+    @trace_function
     def get_e2e_room_keys_version_info(self, user_id, version=None):
         """Get info metadata about a version of our room_keys backup.
 
@@ -236,6 +242,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
             "get_e2e_room_keys_version_info", _get_e2e_room_keys_version_info_txn
         )
 
+    @trace_function
     def create_e2e_room_keys_version(self, user_id, info):
         """Atomically creates a new version of this user's e2e_room_keys store
         with the given version info.
@@ -276,6 +283,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
             "create_e2e_room_keys_version_txn", _create_e2e_room_keys_version_txn
         )
 
+    @trace_function
     def update_e2e_room_keys_version(self, user_id, version, info):
         """Update a given backup version
 
@@ -292,6 +300,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
             desc="update_e2e_room_keys_version",
         )
 
+    @trace_function
     def delete_e2e_room_keys_version(self, user_id, version=None):
         """Delete a given backup version of the user's room keys.
         Doesn't delete their actual key data.
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index 1e07474e70..9779cb70c9 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -22,8 +22,11 @@ from synapse.util.caches.descriptors import cached
 
 from ._base import SQLBaseStore, db_to_json
 
+from synapse.util.tracerutils import TracerUtil, trace_defered_function, trace_function
+
 
 class EndToEndKeyWorkerStore(SQLBaseStore):
+    @trace_defered_function
     @defer.inlineCallbacks
     def get_e2e_device_keys(
         self, query_list, include_all_devices=False, include_deleted_devices=False
@@ -40,6 +43,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
             Dict mapping from user-id to dict mapping from device_id to
             dict containing "key_json", "device_display_name".
         """
+        TracerUtil.set_tag("query_list", query_list)
         if not query_list:
             return {}
 
@@ -57,9 +61,13 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
 
         return results
 
+    @trace_function
     def _get_e2e_device_keys_txn(
         self, txn, query_list, include_all_devices=False, include_deleted_devices=False
     ):
+        TracerUtil.set_tag("include_all_devices", include_all_devices)
+        TracerUtil.set_tag("include_deleted_devices", include_deleted_devices)
+
         query_clauses = []
         query_params = []
 
@@ -104,8 +112,10 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
             for user_id, device_id in deleted_devices:
                 result.setdefault(user_id, {})[device_id] = None
 
+        TracerUtil.log_kv(result)
         return result
 
+    @trace_defered_function
     @defer.inlineCallbacks
     def get_e2e_one_time_keys(self, user_id, device_id, key_ids):
         """Retrieve a number of one-time keys for a user
@@ -121,6 +131,10 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
             key_id) to json string for key
         """
 
+        TracerUtil.set_tag("user_id", user_id)
+        TracerUtil.set_tag("device_id", device_id)
+        TracerUtil.set_tag("key_ids", key_ids)
+
         rows = yield self._simple_select_many_batch(
             table="e2e_one_time_keys_json",
             column="key_id",
@@ -145,7 +159,11 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
                 (algorithm, key_id, key json)
         """
 
+        @trace_function
         def _add_e2e_one_time_keys(txn):
+            TracerUtil.set_tag("user_id", user_id)
+            TracerUtil.set_tag("device_id", device_id)
+            TracerUtil.set_tag("new_keys", new_keys)
             # We are protected from race between lookup and insertion due to
             # a unique constraint. If there is a race of two calls to
             # `add_e2e_one_time_keys` then they'll conflict and we will only
@@ -201,7 +219,13 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
         or the keys were already in the database.
         """
 
+        @trace_function
         def _set_e2e_device_keys_txn(txn):
+            TracerUtil.set_tag("user_id", user_id)
+            TracerUtil.set_tag("device_id", device_id)
+            TracerUtil.set_tag("time_now", time_now)
+            TracerUtil.set_tag("device_keys", device_keys)
+
             old_key_json = self._simple_select_one_onecol_txn(
                 txn,
                 table="e2e_device_keys_json",
@@ -215,6 +239,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
             new_key_json = encode_canonical_json(device_keys).decode("utf-8")
 
             if old_key_json == new_key_json:
+                TracerUtil.set_tag("error", True)
                 return False
 
             self._simple_upsert_txn(
@@ -231,6 +256,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
     def claim_e2e_one_time_keys(self, query_list):
         """Take a list of one time keys out of the database"""
 
+        @trace_function
         def _claim_e2e_one_time_keys(txn):
             sql = (
                 "SELECT key_id, key_json FROM e2e_one_time_keys_json"
@@ -252,7 +278,13 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
                 " AND key_id = ?"
             )
             for user_id, device_id, algorithm, key_id in delete:
+                TracerUtil.log_kv(
+                    {"message": "executing claim transaction on database"}
+                )
                 txn.execute(sql, (user_id, device_id, algorithm, key_id))
+                TracerUtil.log_kv(
+                    {"message": "finished executing and invalidating cache"}
+                )
                 self._invalidate_cache_and_stream(
                     txn, self.count_e2e_one_time_keys, (user_id, device_id)
                 )
@@ -261,7 +293,10 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
         return self.runInteraction("claim_e2e_one_time_keys", _claim_e2e_one_time_keys)
 
     def delete_e2e_keys_by_device(self, user_id, device_id):
+        @trace_function
         def delete_e2e_keys_by_device_txn(txn):
+            TracerUtil.set_tag("user_id", user_id)
+            TracerUtil.set_tag("device_id", device_id)
             self._simple_delete_txn(
                 txn,
                 table="e2e_device_keys_json",