diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 1300b540e3..8b2249fc5b 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -15,6 +15,7 @@
# limitations under the License.
import logging
+from synapse.util.tracerutils import TracerUtil, trace_defered_function
from six import iteritems
@@ -45,6 +46,7 @@ class E2eKeysHandler(object):
"client_keys", self.on_federation_query_client_keys
)
+ @trace_defered_function
@defer.inlineCallbacks
def query_devices(self, query_body, timeout):
""" Handle a device key query from a client
@@ -65,6 +67,7 @@ class E2eKeysHandler(object):
}
}
"""
+
device_keys_query = query_body.get("device_keys", {})
# separate users by domain.
@@ -79,6 +82,9 @@ class E2eKeysHandler(object):
else:
remote_queries[user_id] = device_ids
+ TracerUtil.set_tag("local_key_query", local_query)
+ TracerUtil.set_tag("remote_key_query", remote_queries)
+
# First get local devices.
failures = {}
results = {}
@@ -119,9 +125,12 @@ class E2eKeysHandler(object):
r[user_id] = remote_queries[user_id]
# Now fetch any devices that we don't have in our cache
+ @trace_defered_function
@defer.inlineCallbacks
def do_remote_query(destination):
destination_query = remote_queries_not_in_cache[destination]
+
+ TracerUtil.set_tag("key_query", destination_query)
try:
remote_result = yield self.federation.query_client_keys(
destination, {"device_keys": destination_query}, timeout=timeout
@@ -132,7 +141,10 @@ class E2eKeysHandler(object):
results[user_id] = keys
except Exception as e:
- failures[destination] = _exception_to_failure(e)
+ failure = _exception_to_failure(e)
+ failures[destination] = failure
+ TracerUtil.set_tag("error", True)
+ TracerUtil.set_tag("reason", failure)
yield make_deferred_yieldable(
defer.gatherResults(
@@ -146,6 +158,7 @@ class E2eKeysHandler(object):
return {"device_keys": results, "failures": failures}
+ @trace_defered_function
@defer.inlineCallbacks
def query_local_devices(self, query):
"""Get E2E device keys for local users
@@ -158,6 +171,7 @@ class E2eKeysHandler(object):
defer.Deferred: (resolves to dict[string, dict[string, dict]]):
map from user_id -> device_id -> device details
"""
+ TracerUtil.set_tag("local_query", query)
local_query = []
result_dict = {}
@@ -165,6 +179,14 @@ class E2eKeysHandler(object):
# we use UserID.from_string to catch invalid user ids
if not self.is_mine(UserID.from_string(user_id)):
logger.warning("Request for keys for non-local user %s", user_id)
+ TracerUtil.log_kv(
+ {
+ "message": "Requested a local key for a user which"
+ + " was not local to the homeserver",
+ "user_id": user_id,
+ }
+ )
+ TracerUtil.set_tag("error", True)
raise SynapseError(400, "Not a user here")
if not device_ids:
@@ -189,6 +211,7 @@ class E2eKeysHandler(object):
r["unsigned"]["device_display_name"] = display_name
result_dict[user_id][device_id] = r
+ TracerUtil.log_kv(results)
return result_dict
@defer.inlineCallbacks
@@ -199,6 +222,7 @@ class E2eKeysHandler(object):
res = yield self.query_local_devices(device_keys_query)
return {"device_keys": res}
+ @trace_defered_function
@defer.inlineCallbacks
def claim_one_time_keys(self, query, timeout):
local_query = []
@@ -213,6 +237,9 @@ class E2eKeysHandler(object):
domain = get_domain_from_id(user_id)
remote_queries.setdefault(domain, {})[user_id] = device_keys
+ TracerUtil.set_tag("local_key_query", local_query)
+ TracerUtil.set_tag("remote_key_query", remote_queries)
+
results = yield self.store.claim_e2e_one_time_keys(local_query)
json_result = {}
@@ -224,8 +251,10 @@ class E2eKeysHandler(object):
key_id: json.loads(json_bytes)
}
+ @trace_defered_function
@defer.inlineCallbacks
def claim_client_keys(destination):
+ TracerUtil.set_tag("destination", destination)
device_keys = remote_queries[destination]
try:
remote_result = yield self.federation.claim_client_keys(
@@ -234,8 +263,12 @@ class E2eKeysHandler(object):
for user_id, keys in remote_result["one_time_keys"].items():
if user_id in device_keys:
json_result[user_id] = keys
+
except Exception as e:
- failures[destination] = _exception_to_failure(e)
+ failure = _exception_to_failure(e)
+ failures[destination] = failure
+ TracerUtil.set_tag("error", True)
+ TracerUtil.set_tag("reason", failure)
yield make_deferred_yieldable(
defer.gatherResults(
@@ -259,14 +292,21 @@ class E2eKeysHandler(object):
),
)
+ TracerUtil.log_kv({"one_time_keys": json_result, "failures": failures})
return {"one_time_keys": json_result, "failures": failures}
+ @trace_defered_function
@defer.inlineCallbacks
def upload_keys_for_user(self, user_id, device_id, keys):
+ TracerUtil.set_tag("user_id", user_id)
+ TracerUtil.set_tag("device_id", device_id)
+ TracerUtil.set_tag("keys", keys)
+
time_now = self.clock.time_msec()
# TODO: Validate the JSON to make sure it has the right keys.
device_keys = keys.get("device_keys", None)
+ TracerUtil.set_tag("device_keys", device_keys)
if device_keys:
logger.info(
"Updating device_keys for device %r for user %s at %d",
@@ -287,6 +327,10 @@ class E2eKeysHandler(object):
yield self._upload_one_time_keys_for_user(
user_id, device_id, time_now, one_time_keys
)
+ else:
+ TracerUtil.log_kv(
+ {"event": "did not upload one_time_keys", "reason": "no keys given"}
+ )
# the device should have been registered already, but it may have been
# deleted due to a race with a DELETE request. Or we may be using an
@@ -297,8 +341,10 @@ class E2eKeysHandler(object):
result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
+ TracerUtil.set_tag("one_time_key_counts", result)
return {"one_time_key_counts": result}
+ @trace_defered_function
@defer.inlineCallbacks
def _upload_one_time_keys_for_user(
self, user_id, device_id, time_now, one_time_keys
diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
index 41b871fc59..06837bb126 100644
--- a/synapse/handlers/e2e_room_keys.py
+++ b/synapse/handlers/e2e_room_keys.py
@@ -27,6 +27,7 @@ from synapse.api.errors import (
SynapseError,
)
from synapse.util.async_helpers import Linearizer
+from synapse.util.tracerutils import TracerUtil, trace_defered_function
logger = logging.getLogger(__name__)
@@ -49,6 +50,7 @@ class E2eRoomKeysHandler(object):
# changed.
self._upload_linearizer = Linearizer("upload_room_keys_lock")
+ @trace_defered_function
@defer.inlineCallbacks
def get_room_keys(self, user_id, version, room_id=None, session_id=None):
"""Bulk get the E2E room keys for a given backup, optionally filtered to a given
@@ -84,8 +86,10 @@ class E2eRoomKeysHandler(object):
user_id, version, room_id, session_id
)
+ TracerUtil.log_kv(results)
return results
+ @trace_defered_function
@defer.inlineCallbacks
def delete_room_keys(self, user_id, version, room_id=None, session_id=None):
"""Bulk delete the E2E room keys for a given backup, optionally filtered to a given
@@ -107,6 +111,7 @@ class E2eRoomKeysHandler(object):
with (yield self._upload_linearizer.queue(user_id)):
yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)
+ @trace_defered_function
@defer.inlineCallbacks
def upload_room_keys(self, user_id, version, room_keys):
"""Bulk upload a list of room keys into a given backup version, asserting
@@ -174,6 +179,7 @@ class E2eRoomKeysHandler(object):
user_id, version, room_id, session_id, session
)
+ @trace_defered_function
@defer.inlineCallbacks
def _upload_room_key(self, user_id, version, room_id, session_id, room_key):
"""Upload a given room_key for a given room and session into a given
@@ -236,6 +242,7 @@ class E2eRoomKeysHandler(object):
return False
return True
+ @trace_defered_function
@defer.inlineCallbacks
def create_version(self, user_id, version_info):
"""Create a new backup version. This automatically becomes the new
@@ -264,6 +271,7 @@ class E2eRoomKeysHandler(object):
)
return new_version
+ @trace_defered_function
@defer.inlineCallbacks
def get_version_info(self, user_id, version=None):
"""Get the info about a given version of the user's backup
@@ -294,6 +302,7 @@ class E2eRoomKeysHandler(object):
raise
return res
+ @trace_defered_function
@defer.inlineCallbacks
def delete_version(self, user_id, version=None):
"""Deletes a given version of the user's e2e_room_keys backup
@@ -314,6 +323,7 @@ class E2eRoomKeysHandler(object):
else:
raise
+ @trace_defered_function
@defer.inlineCallbacks
def update_version(self, user_id, version, version_info):
"""Update the info about a given version of the user's backup
diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py
index 6008adec7c..8de9e12df4 100644
--- a/synapse/rest/client/v2_alpha/keys.py
+++ b/synapse/rest/client/v2_alpha/keys.py
@@ -25,6 +25,10 @@ from synapse.http.servlet import (
parse_string,
)
from synapse.types import StreamToken
+from synapse.util.tracerutils import (
+ TracerUtil,
+ trace_defered_function_using_operation_name,
+)
from ._base import client_patterns
@@ -68,6 +72,7 @@ class KeyUploadServlet(RestServlet):
self.auth = hs.get_auth()
self.e2e_keys_handler = hs.get_e2e_keys_handler()
+ @trace_defered_function_using_operation_name("upload_keys")
@defer.inlineCallbacks
def on_POST(self, request, device_id):
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
@@ -78,6 +83,14 @@ class KeyUploadServlet(RestServlet):
# passing the device_id here is deprecated; however, we allow it
# for now for compatibility with older clients.
if requester.device_id is not None and device_id != requester.device_id:
+ TracerUtil.set_tag("error", True)
+ TracerUtil.log_kv(
+ {
+ "message": "Client uploading keys for a different device",
+ "logged_in_id": requester.device_id,
+ "key_being_uploaded": device_id,
+ }
+ )
logger.warning(
"Client uploading keys for a different device "
"(logged in as %s, uploading for %s)",
diff --git a/synapse/storage/e2e_room_keys.py b/synapse/storage/e2e_room_keys.py
index 99128f2df7..b0e426d30e 100644
--- a/synapse/storage/e2e_room_keys.py
+++ b/synapse/storage/e2e_room_keys.py
@@ -18,11 +18,13 @@ import json
from twisted.internet import defer
from synapse.api.errors import StoreError
+from synapse.util.tracerutils import trace_defered_function, trace_function
from ._base import SQLBaseStore
class EndToEndRoomKeyStore(SQLBaseStore):
+ @trace_defered_function
@defer.inlineCallbacks
def get_e2e_room_key(self, user_id, version, room_id, session_id):
"""Get the encrypted E2E room key for a given session from a given
@@ -63,6 +65,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
return row
+ @trace_defered_function
@defer.inlineCallbacks
def set_e2e_room_key(self, user_id, version, room_id, session_id, room_key):
"""Replaces or inserts the encrypted E2E room key for a given session in
@@ -95,6 +98,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
lock=False,
)
+ @trace_defered_function
@defer.inlineCallbacks
def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
"""Bulk get the E2E room keys for a given backup, optionally filtered to a given
@@ -153,6 +157,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
return sessions
+ @trace_defered_function
@defer.inlineCallbacks
def delete_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
"""Bulk delete the E2E room keys for a given backup, optionally filtered to a given
@@ -194,6 +199,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
raise StoreError(404, "No current backup version")
return row[0]
+ @trace_function
def get_e2e_room_keys_version_info(self, user_id, version=None):
"""Get info metadata about a version of our room_keys backup.
@@ -236,6 +242,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
"get_e2e_room_keys_version_info", _get_e2e_room_keys_version_info_txn
)
+ @trace_function
def create_e2e_room_keys_version(self, user_id, info):
"""Atomically creates a new version of this user's e2e_room_keys store
with the given version info.
@@ -276,6 +283,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
"create_e2e_room_keys_version_txn", _create_e2e_room_keys_version_txn
)
+ @trace_function
def update_e2e_room_keys_version(self, user_id, version, info):
"""Update a given backup version
@@ -292,6 +300,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
desc="update_e2e_room_keys_version",
)
+ @trace_function
def delete_e2e_room_keys_version(self, user_id, version=None):
"""Delete a given backup version of the user's room keys.
Doesn't delete their actual key data.
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index 1e07474e70..9779cb70c9 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -22,8 +22,11 @@ from synapse.util.caches.descriptors import cached
from ._base import SQLBaseStore, db_to_json
+from synapse.util.tracerutils import TracerUtil, trace_defered_function, trace_function
+
class EndToEndKeyWorkerStore(SQLBaseStore):
+ @trace_defered_function
@defer.inlineCallbacks
def get_e2e_device_keys(
self, query_list, include_all_devices=False, include_deleted_devices=False
@@ -40,6 +43,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
Dict mapping from user-id to dict mapping from device_id to
dict containing "key_json", "device_display_name".
"""
+ TracerUtil.set_tag("query_list", query_list)
if not query_list:
return {}
@@ -57,9 +61,13 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
return results
+ @trace_function
def _get_e2e_device_keys_txn(
self, txn, query_list, include_all_devices=False, include_deleted_devices=False
):
+ TracerUtil.set_tag("include_all_devices", include_all_devices)
+ TracerUtil.set_tag("include_deleted_devices", include_deleted_devices)
+
query_clauses = []
query_params = []
@@ -104,8 +112,10 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
for user_id, device_id in deleted_devices:
result.setdefault(user_id, {})[device_id] = None
+ TracerUtil.log_kv(result)
return result
+ @trace_defered_function
@defer.inlineCallbacks
def get_e2e_one_time_keys(self, user_id, device_id, key_ids):
"""Retrieve a number of one-time keys for a user
@@ -121,6 +131,10 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
key_id) to json string for key
"""
+ TracerUtil.set_tag("user_id", user_id)
+ TracerUtil.set_tag("device_id", device_id)
+ TracerUtil.set_tag("key_ids", key_ids)
+
rows = yield self._simple_select_many_batch(
table="e2e_one_time_keys_json",
column="key_id",
@@ -145,7 +159,11 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
(algorithm, key_id, key json)
"""
+ @trace_function
def _add_e2e_one_time_keys(txn):
+ TracerUtil.set_tag("user_id", user_id)
+ TracerUtil.set_tag("device_id", device_id)
+ TracerUtil.set_tag("new_keys", new_keys)
# We are protected from race between lookup and insertion due to
# a unique constraint. If there is a race of two calls to
# `add_e2e_one_time_keys` then they'll conflict and we will only
@@ -201,7 +219,13 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
or the keys were already in the database.
"""
+ @trace_function
def _set_e2e_device_keys_txn(txn):
+ TracerUtil.set_tag("user_id", user_id)
+ TracerUtil.set_tag("device_id", device_id)
+ TracerUtil.set_tag("time_now", time_now)
+ TracerUtil.set_tag("device_keys", device_keys)
+
old_key_json = self._simple_select_one_onecol_txn(
txn,
table="e2e_device_keys_json",
@@ -215,6 +239,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
new_key_json = encode_canonical_json(device_keys).decode("utf-8")
if old_key_json == new_key_json:
+ TracerUtil.set_tag("error", True)
return False
self._simple_upsert_txn(
@@ -231,6 +256,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
def claim_e2e_one_time_keys(self, query_list):
"""Take a list of one time keys out of the database"""
+ @trace_function
def _claim_e2e_one_time_keys(txn):
sql = (
"SELECT key_id, key_json FROM e2e_one_time_keys_json"
@@ -252,7 +278,13 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
" AND key_id = ?"
)
for user_id, device_id, algorithm, key_id in delete:
+ TracerUtil.log_kv(
+ {"message": "executing claim transaction on database"}
+ )
txn.execute(sql, (user_id, device_id, algorithm, key_id))
+ TracerUtil.log_kv(
+ {"message": "finished executing and invalidating cache"}
+ )
self._invalidate_cache_and_stream(
txn, self.count_e2e_one_time_keys, (user_id, device_id)
)
@@ -261,7 +293,10 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
return self.runInteraction("claim_e2e_one_time_keys", _claim_e2e_one_time_keys)
def delete_e2e_keys_by_device(self, user_id, device_id):
+ @trace_function
def delete_e2e_keys_by_device_txn(txn):
+ TracerUtil.set_tag("user_id", user_id)
+ TracerUtil.set_tag("device_id", device_id)
self._simple_delete_txn(
txn,
table="e2e_device_keys_json",
|