summary refs log tree commit diff
diff options
context:
space:
mode:
authorJorik Schellekens <joriks@matrix.org>2019-07-04 17:11:46 +0100
committerJorik Schellekens <joriks@matrix.org>2019-07-23 16:05:20 +0100
commitbfc50050fd314897c1f04dcc489893a6be2466ce (patch)
tree3907ed04413fe00458c66985753f31e016ac2d92
parentThese functions were not deferreds! (diff)
downloadsynapse-bfc50050fd314897c1f04dcc489893a6be2466ce.tar.xz
The great logging/ migration
-rw-r--r--synapse/handlers/device.py46
-rw-r--r--synapse/handlers/devicemessage.py8
-rw-r--r--synapse/handlers/e2e_keys.py58
-rw-r--r--synapse/handlers/e2e_room_keys.py18
-rw-r--r--synapse/rest/client/v2_alpha/keys.py8
-rw-r--r--synapse/rest/client/v2_alpha/room_keys.py4
-rw-r--r--synapse/rest/client/v2_alpha/sendtodevice.py8
-rw-r--r--synapse/storage/deviceinbox.py30
-rw-r--r--synapse/storage/devices.py8
-rw-r--r--synapse/storage/e2e_room_keys.py2
-rw-r--r--synapse/storage/end_to_end_keys.py54
11 files changed, 122 insertions, 122 deletions
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 81f9795315..746e2179e1 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -31,7 +31,7 @@ from synapse.util.async_helpers import Linearizer
 from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.util.metrics import measure_func
 from synapse.util.retryutils import NotRetryingDestination
-import synapse.util.tracerutils as tracerutils
+import synapse.logging.opentracing as opentracing
 
 from ._base import BaseHandler
 
@@ -46,7 +46,7 @@ class DeviceWorkerHandler(BaseHandler):
         self.state = hs.get_state_handler()
         self._auth_handler = hs.get_auth_handler()
 
-    @tracerutils.trace_defered_function
+    @opentracing.trace_defered_function
     @defer.inlineCallbacks
     def get_devices_by_user(self, user_id):
         """
@@ -58,7 +58,7 @@ class DeviceWorkerHandler(BaseHandler):
             defer.Deferred: list[dict[str, X]]: info on each device
         """
 
-        tracerutils.set_tag("user_id", user_id)
+        opentracing.set_tag("user_id", user_id)
         device_map = yield self.store.get_devices_by_user(user_id)
 
         ips = yield self.store.get_last_client_ip_by_device(user_id, device_id=None)
@@ -67,10 +67,10 @@ class DeviceWorkerHandler(BaseHandler):
         for device in devices:
             _update_device_from_client_ips(device, ips)
 
-        tracerutils.log_kv(device_map)
+        opentracing.log_kv(device_map)
         return devices
 
-    @tracerutils.trace_defered_function
+    @opentracing.trace_defered_function
     @defer.inlineCallbacks
     def get_device(self, user_id, device_id):
         """ Retrieve the given device
@@ -91,13 +91,13 @@ class DeviceWorkerHandler(BaseHandler):
         ips = yield self.store.get_last_client_ip_by_device(user_id, device_id)
         _update_device_from_client_ips(device, ips)
 
-        tracerutils.set_tag("device", device)
-        tracerutils.set_tag("ips", ips)
+        opentracing.set_tag("device", device)
+        opentracing.set_tag("ips", ips)
 
         return device
 
     @measure_func("device.get_user_ids_changed")
-    @tracerutils.trace_defered_function
+    @opentracing.trace_defered_function
     @defer.inlineCallbacks
     def get_user_ids_changed(self, user_id, from_token):
         """Get list of users that have had the devices updated, or have newly
@@ -108,8 +108,8 @@ class DeviceWorkerHandler(BaseHandler):
             from_token (StreamToken)
         """
 
-        tracerutils.set_tag("user_id", user_id)
-        tracerutils.set_tag("from_token", from_token)
+        opentracing.set_tag("user_id", user_id)
+        opentracing.set_tag("from_token", from_token)
         now_room_key = yield self.store.get_room_events_max_id()
 
         room_ids = yield self.store.get_rooms_for_user(user_id)
@@ -161,7 +161,7 @@ class DeviceWorkerHandler(BaseHandler):
             # special-case for an empty prev state: include all members
             # in the changed list
             if not event_ids:
-                tracerutils.log_kv(
+                opentracing.log_kv(
                     {"event": "encountered empty previous state", "room_id": room_id}
                 )
                 for key, event_id in iteritems(current_state_ids):
@@ -216,7 +216,7 @@ class DeviceWorkerHandler(BaseHandler):
             possibly_joined = []
             possibly_left = []
 
-        tracerutils.log_kv(
+        opentracing.log_kv(
             {"changed": list(possibly_joined), "left": list(possibly_left)}
         )
 
@@ -287,7 +287,7 @@ class DeviceHandler(DeviceWorkerHandler):
 
         raise errors.StoreError(500, "Couldn't generate a device ID.")
 
-    @tracerutils.trace_defered_function
+    @opentracing.trace_defered_function
     @defer.inlineCallbacks
     def delete_device(self, user_id, device_id):
         """ Delete the given device
@@ -305,8 +305,8 @@ class DeviceHandler(DeviceWorkerHandler):
         except errors.StoreError as e:
             if e.code == 404:
                 # no match
-                tracerutils.set_tag("error", True)
-                tracerutils.set_tag("reason", "User doesn't have that device id.")
+                opentracing.set_tag("error", True)
+                opentracing.set_tag("reason", "User doesn't have that device id.")
                 pass
             else:
                 raise
@@ -319,7 +319,7 @@ class DeviceHandler(DeviceWorkerHandler):
 
         yield self.notify_device_update(user_id, [device_id])
 
-    @tracerutils.trace_defered_function
+    @opentracing.trace_defered_function
     @defer.inlineCallbacks
     def delete_all_devices_for_user(self, user_id, except_device_id=None):
         """Delete all of the user's devices
@@ -355,8 +355,8 @@ class DeviceHandler(DeviceWorkerHandler):
         except errors.StoreError as e:
             if e.code == 404:
                 # no match
-                tracerutils.set_tag("error", True)
-                tracerutils.set_tag("reason", "User doesn't have that device id.")
+                opentracing.set_tag("error", True)
+                opentracing.set_tag("reason", "User doesn't have that device id.")
                 pass
             else:
                 raise
@@ -477,15 +477,15 @@ class DeviceListEduUpdater(object):
             iterable=True,
         )
 
-    @tracerutils.trace_defered_function
+    @opentracing.trace_defered_function
     @defer.inlineCallbacks
     def incoming_device_list_update(self, origin, edu_content):
         """Called on incoming device list update from federation. Responsible
         for parsing the EDU and adding to pending updates list.
         """
 
-        tracerutils.set_tag("origin", origin)
-        tracerutils.set_tag("edu_content", edu_content)
+        opentracing.set_tag("origin", origin)
+        opentracing.set_tag("edu_content", edu_content)
         user_id = edu_content.pop("user_id")
         device_id = edu_content.pop("device_id")
         stream_id = str(edu_content.pop("stream_id"))  # They may come as ints
@@ -506,8 +506,8 @@ class DeviceListEduUpdater(object):
         if not room_ids:
             # We don't share any rooms with this user. Ignore update, as we
             # probably won't get any further updates.
-            tracerutils.set_tag("error", True)
-            tracerutils.log_kv(
+            opentracing.set_tag("error", True)
+            opentracing.log_kv(
                 {
                     "message": "Got an update from a user which "
                     + "doesn't share a room with the current user."
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index de22b37b9e..6463d900cd 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -20,7 +20,7 @@ from twisted.internet import defer
 from synapse.api.errors import SynapseError
 from synapse.types import UserID, get_domain_from_id
 from synapse.util.stringutils import random_string
-import synapse.util.tracerutils as tracerutils
+import synapse.logging.opentracing as opentracing
 
 logger = logging.getLogger(__name__)
 
@@ -77,7 +77,7 @@ class DeviceMessageHandler(object):
             "to_device_key", stream_id, users=local_messages.keys()
         )
 
-    @tracerutils.trace_defered_function
+    @opentracing.trace_defered_function
     @defer.inlineCallbacks
     def send_device_message(self, sender_user_id, message_type, messages):
 
@@ -111,7 +111,7 @@ class DeviceMessageHandler(object):
                 "message_id": message_id,
             }
 
-        tracerutils.log_kv(local_messages)
+        opentracing.log_kv(local_messages)
         stream_id = yield self.store.add_messages_to_device_inbox(
             local_messages, remote_edu_contents
         )
@@ -120,7 +120,7 @@ class DeviceMessageHandler(object):
             "to_device_key", stream_id, users=local_messages.keys()
         )
 
-        tracerutils.log_kv(remote_messages)
+        opentracing.log_kv(remote_messages)
         for destination in remote_messages.keys():
             # Enqueue a new federation transaction to send the new
             # device messages to each remote destination.
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 6bf9d955bc..01d5d383bf 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 
 import logging
-import synapse.util.tracerutils as tracerutils
+import synapse.logging.opentracing as opentracing
 
 from six import iteritems
 
@@ -46,7 +46,7 @@ class E2eKeysHandler(object):
             "client_keys", self.on_federation_query_client_keys
         )
 
-    @tracerutils.trace_defered_function
+    @opentracing.trace_defered_function
     @defer.inlineCallbacks
     def query_devices(self, query_body, timeout):
         """ Handle a device key query from a client
@@ -82,8 +82,8 @@ class E2eKeysHandler(object):
             else:
                 remote_queries[user_id] = device_ids
 
-        tracerutils.set_tag("local_key_query", local_query)
-        tracerutils.set_tag("remote_key_query", remote_queries)
+        opentracing.set_tag("local_key_query", local_query)
+        opentracing.set_tag("remote_key_query", remote_queries)
 
         # First get local devices.
         failures = {}
@@ -125,12 +125,12 @@ class E2eKeysHandler(object):
                 r[user_id] = remote_queries[user_id]
 
         # Now fetch any devices that we don't have in our cache
-        @tracerutils.trace_defered_function
+        @opentracing.trace_defered_function
         @defer.inlineCallbacks
         def do_remote_query(destination):
             destination_query = remote_queries_not_in_cache[destination]
 
-            tracerutils.set_tag("key_query", destination_query)
+            opentracing.set_tag("key_query", destination_query)
             try:
                 remote_result = yield self.federation.query_client_keys(
                     destination, {"device_keys": destination_query}, timeout=timeout
@@ -143,8 +143,8 @@ class E2eKeysHandler(object):
             except Exception as e:
                 failure = _exception_to_failure(e)
                 failures[destination] = failure
-                tracerutils.set_tag("error", True)
-                tracerutils.set_tag("reason", failure)
+                opentracing.set_tag("error", True)
+                opentracing.set_tag("reason", failure)
 
         yield make_deferred_yieldable(
             defer.gatherResults(
@@ -158,7 +158,7 @@ class E2eKeysHandler(object):
 
         return {"device_keys": results, "failures": failures}
 
-    @tracerutils.trace_defered_function
+    @opentracing.trace_defered_function
     @defer.inlineCallbacks
     def query_local_devices(self, query):
         """Get E2E device keys for local users
@@ -171,7 +171,7 @@ class E2eKeysHandler(object):
             defer.Deferred: (resolves to dict[string, dict[string, dict]]):
                  map from user_id -> device_id -> device details
         """
-        tracerutils.set_tag("local_query", query)
+        opentracing.set_tag("local_query", query)
         local_query = []
 
         result_dict = {}
@@ -179,14 +179,14 @@ class E2eKeysHandler(object):
             # we use UserID.from_string to catch invalid user ids
             if not self.is_mine(UserID.from_string(user_id)):
                 logger.warning("Request for keys for non-local user %s", user_id)
-                tracerutils.log_kv(
+                opentracing.log_kv(
                     {
                         "message": "Requested a local key for a user which"
                         + " was not local to the homeserver",
                         "user_id": user_id,
                     }
                 )
-                tracerutils.set_tag("error", True)
+                opentracing.set_tag("error", True)
                 raise SynapseError(400, "Not a user here")
 
             if not device_ids:
@@ -211,7 +211,7 @@ class E2eKeysHandler(object):
                     r["unsigned"]["device_display_name"] = display_name
                 result_dict[user_id][device_id] = r
 
-        tracerutils.log_kv(results)
+        opentracing.log_kv(results)
         return result_dict
 
     @defer.inlineCallbacks
@@ -222,7 +222,7 @@ class E2eKeysHandler(object):
         res = yield self.query_local_devices(device_keys_query)
         return {"device_keys": res}
 
-    @tracerutils.trace_defered_function
+    @opentracing.trace_defered_function
     @defer.inlineCallbacks
     def claim_one_time_keys(self, query, timeout):
         local_query = []
@@ -237,8 +237,8 @@ class E2eKeysHandler(object):
                 domain = get_domain_from_id(user_id)
                 remote_queries.setdefault(domain, {})[user_id] = device_keys
 
-        tracerutils.set_tag("local_key_query", local_query)
-        tracerutils.set_tag("remote_key_query", remote_queries)
+        opentracing.set_tag("local_key_query", local_query)
+        opentracing.set_tag("remote_key_query", remote_queries)
 
         results = yield self.store.claim_e2e_one_time_keys(local_query)
 
@@ -251,10 +251,10 @@ class E2eKeysHandler(object):
                         key_id: json.loads(json_bytes)
                     }
 
-        @tracerutils.trace_defered_function
+        @opentracing.trace_defered_function
         @defer.inlineCallbacks
         def claim_client_keys(destination):
-            tracerutils.set_tag("destination", destination)
+            opentracing.set_tag("destination", destination)
             device_keys = remote_queries[destination]
             try:
                 remote_result = yield self.federation.claim_client_keys(
@@ -267,8 +267,8 @@ class E2eKeysHandler(object):
             except Exception as e:
                 failure = _exception_to_failure(e)
                 failures[destination] = failure
-                tracerutils.set_tag("error", True)
-                tracerutils.set_tag("reason", failure)
+                opentracing.set_tag("error", True)
+                opentracing.set_tag("reason", failure)
 
         yield make_deferred_yieldable(
             defer.gatherResults(
@@ -292,21 +292,21 @@ class E2eKeysHandler(object):
             ),
         )
 
-        tracerutils.log_kv({"one_time_keys": json_result, "failures": failures})
+        opentracing.log_kv({"one_time_keys": json_result, "failures": failures})
         return {"one_time_keys": json_result, "failures": failures}
 
-    @tracerutils.trace_defered_function
+    @opentracing.trace_defered_function
     @defer.inlineCallbacks
     def upload_keys_for_user(self, user_id, device_id, keys):
-        tracerutils.set_tag("user_id", user_id)
-        tracerutils.set_tag("device_id", device_id)
-        tracerutils.set_tag("keys", keys)
+        opentracing.set_tag("user_id", user_id)
+        opentracing.set_tag("device_id", device_id)
+        opentracing.set_tag("keys", keys)
 
         time_now = self.clock.time_msec()
 
         # TODO: Validate the JSON to make sure it has the right keys.
         device_keys = keys.get("device_keys", None)
-        tracerutils.set_tag("device_keys", device_keys)
+        opentracing.set_tag("device_keys", device_keys)
         if device_keys:
             logger.info(
                 "Updating device_keys for device %r for user %s at %d",
@@ -328,7 +328,7 @@ class E2eKeysHandler(object):
                 user_id, device_id, time_now, one_time_keys
             )
         else:
-            tracerutils.log_kv(
+            opentracing.log_kv(
                 {"event": "did not upload one_time_keys", "reason": "no keys given"}
             )
 
@@ -341,10 +341,10 @@ class E2eKeysHandler(object):
 
         result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
 
-        tracerutils.set_tag("one_time_key_counts", result)
+        opentracing.set_tag("one_time_key_counts", result)
         return {"one_time_key_counts": result}
 
-    @tracerutils.trace_defered_function
+    @opentracing.trace_defered_function
     @defer.inlineCallbacks
     def _upload_one_time_keys_for_user(
         self, user_id, device_id, time_now, one_time_keys
diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
index a90ec2f9fc..0459d52a3c 100644
--- a/synapse/handlers/e2e_room_keys.py
+++ b/synapse/handlers/e2e_room_keys.py
@@ -27,7 +27,7 @@ from synapse.api.errors import (
     SynapseError,
 )
 from synapse.util.async_helpers import Linearizer
-import synapse.util.tracerutils as tracerutils
+import synapse.logging.opentracing as opentracing
 
 logger = logging.getLogger(__name__)
 
@@ -50,7 +50,7 @@ class E2eRoomKeysHandler(object):
         # changed.
         self._upload_linearizer = Linearizer("upload_room_keys_lock")
 
-    @tracerutils.trace_defered_function
+    @opentracing.trace_defered_function
     @defer.inlineCallbacks
     def get_room_keys(self, user_id, version, room_id=None, session_id=None):
         """Bulk get the E2E room keys for a given backup, optionally filtered to a given
@@ -86,10 +86,10 @@ class E2eRoomKeysHandler(object):
                 user_id, version, room_id, session_id
             )
 
-            tracerutils.log_kv(results)
+            opentracing.log_kv(results)
             return results
 
-    @tracerutils.trace_defered_function
+    @opentracing.trace_defered_function
     @defer.inlineCallbacks
     def delete_room_keys(self, user_id, version, room_id=None, session_id=None):
         """Bulk delete the E2E room keys for a given backup, optionally filtered to a given
@@ -111,7 +111,7 @@ class E2eRoomKeysHandler(object):
         with (yield self._upload_linearizer.queue(user_id)):
             yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)
 
-    @tracerutils.trace_defered_function
+    @opentracing.trace_defered_function
     @defer.inlineCallbacks
     def upload_room_keys(self, user_id, version, room_keys):
         """Bulk upload a list of room keys into a given backup version, asserting
@@ -179,7 +179,7 @@ class E2eRoomKeysHandler(object):
                         user_id, version, room_id, session_id, session
                     )
 
-    @tracerutils.trace_defered_function
+    @opentracing.trace_defered_function
     @defer.inlineCallbacks
     def _upload_room_key(self, user_id, version, room_id, session_id, room_key):
         """Upload a given room_key for a given room and session into a given
@@ -242,7 +242,7 @@ class E2eRoomKeysHandler(object):
                 return False
         return True
 
-    @tracerutils.trace_defered_function
+    @opentracing.trace_defered_function
     @defer.inlineCallbacks
     def create_version(self, user_id, version_info):
         """Create a new backup version.  This automatically becomes the new
@@ -301,7 +301,7 @@ class E2eRoomKeysHandler(object):
                     raise
             return res
 
-    @tracerutils.trace_defered_function
+    @opentracing.trace_defered_function
     @defer.inlineCallbacks
     def delete_version(self, user_id, version=None):
         """Deletes a given version of the user's e2e_room_keys backup
@@ -322,7 +322,7 @@ class E2eRoomKeysHandler(object):
                 else:
                     raise
 
-    @tracerutils.trace_defered_function
+    @opentracing.trace_defered_function
     @defer.inlineCallbacks
     def update_version(self, user_id, version, version_info):
         """Update the info about a given version of the user's backup
diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py
index 40052e7a05..00260cde73 100644
--- a/synapse/rest/client/v2_alpha/keys.py
+++ b/synapse/rest/client/v2_alpha/keys.py
@@ -25,7 +25,7 @@ from synapse.http.servlet import (
     parse_string,
 )
 from synapse.types import StreamToken
-import synapse.util.tracerutils as tracerutils
+import synapse.logging.opentracing as opentracing
 from ._base import client_patterns
 
 logger = logging.getLogger(__name__)
@@ -68,7 +68,7 @@ class KeyUploadServlet(RestServlet):
         self.auth = hs.get_auth()
         self.e2e_keys_handler = hs.get_e2e_keys_handler()
 
-    @tracerutils.trace_defered_function_using_operation_name("upload_keys")
+    @opentracing.trace_defered_function_using_operation_name("upload_keys")
     @defer.inlineCallbacks
     def on_POST(self, request, device_id):
         requester = yield self.auth.get_user_by_req(request, allow_guest=True)
@@ -79,8 +79,8 @@ class KeyUploadServlet(RestServlet):
             # passing the device_id here is deprecated; however, we allow it
             # for now for compatibility with older clients.
             if requester.device_id is not None and device_id != requester.device_id:
-                tracerutils.set_tag("error", True)
-                tracerutils.log_kv(
+                opentracing.set_tag("error", True)
+                opentracing.log_kv(
                     {
                         "message": "Client uploading keys for a different device",
                         "logged_in_id": requester.device_id,
diff --git a/synapse/rest/client/v2_alpha/room_keys.py b/synapse/rest/client/v2_alpha/room_keys.py
index ac03d58899..51bd4f12ff 100644
--- a/synapse/rest/client/v2_alpha/room_keys.py
+++ b/synapse/rest/client/v2_alpha/room_keys.py
@@ -23,7 +23,7 @@ from synapse.http.servlet import (
     parse_json_object_from_request,
     parse_string,
 )
-import synapse.util.tracerutils as tracerutils
+import synapse.logging.opentracing as opentracing
 
 from ._base import client_patterns
 
@@ -312,7 +312,7 @@ class RoomKeysVersionServlet(RestServlet):
         self.auth = hs.get_auth()
         self.e2e_room_keys_handler = hs.get_e2e_room_keys_handler()
 
-    @tracerutils.trace_defered_function_using_operation_name("get_room_keys_version")
+    @opentracing.trace_defered_function_using_operation_name("get_room_keys_version")
     @defer.inlineCallbacks
     def on_GET(self, request, version):
         """
diff --git a/synapse/rest/client/v2_alpha/sendtodevice.py b/synapse/rest/client/v2_alpha/sendtodevice.py
index d58291fc85..68bbcf4a48 100644
--- a/synapse/rest/client/v2_alpha/sendtodevice.py
+++ b/synapse/rest/client/v2_alpha/sendtodevice.py
@@ -20,7 +20,7 @@ from twisted.internet import defer
 from synapse.http import servlet
 from synapse.http.servlet import parse_json_object_from_request
 from synapse.rest.client.transactions import HttpTransactionCache
-import synapse.util.tracerutils as tracerutils
+import synapse.logging.opentracing as opentracing
 
 from ._base import client_patterns
 
@@ -43,10 +43,10 @@ class SendToDeviceRestServlet(servlet.RestServlet):
         self.txns = HttpTransactionCache(hs)
         self.device_message_handler = hs.get_device_message_handler()
 
-    @tracerutils.trace_function_using_operation_name("sendToDevice")
+    @opentracing.trace_function_using_operation_name("sendToDevice")
     def on_PUT(self, request, message_type, txn_id):
-        tracerutils.set_tag("message_type", message_type)
-        tracerutils.set_tag("txn_id", txn_id)
+        opentracing.set_tag("message_type", message_type)
+        opentracing.set_tag("txn_id", txn_id)
         return self.txns.fetch_or_execute_request(
             request, self._put, request, message_type, txn_id
         )
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index 6011312af9..91c6cd4cf6 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -22,7 +22,7 @@ from twisted.internet import defer
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.background_updates import BackgroundUpdateStore
 from synapse.util.caches.expiringcache import ExpiringCache
-import synapse.util.tracerutils as tracerutils
+import synapse.logging.opentracing as opentracing
 
 logger = logging.getLogger(__name__)
 
@@ -73,7 +73,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
             "get_new_messages_for_device", get_new_messages_for_device_txn
         )
 
-    @tracerutils.trace_defered_function
+    @opentracing.trace_defered_function
     @defer.inlineCallbacks
     def delete_messages_for_device(self, user_id, device_id, up_to_stream_id):
         """
@@ -90,14 +90,14 @@ class DeviceInboxWorkerStore(SQLBaseStore):
             (user_id, device_id), None
         )
 
-        tracerutils.set_tag("last_deleted_stream_id", last_deleted_stream_id)
+        opentracing.set_tag("last_deleted_stream_id", last_deleted_stream_id)
 
         if last_deleted_stream_id:
             has_changed = self._device_inbox_stream_cache.has_entity_changed(
                 user_id, last_deleted_stream_id
             )
             if not has_changed:
-                tracerutils.log_kv({"message": "No changes in cache since last check"})
+                opentracing.log_kv({"message": "No changes in cache since last check"})
                 return 0
 
         def delete_messages_for_device_txn(txn):
@@ -113,7 +113,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
             "delete_messages_for_device", delete_messages_for_device_txn
         )
 
-        tracerutils.log_kv(
+        opentracing.log_kv(
             {"message": "deleted {} messages for device".format(count), "count": count}
         )
 
@@ -127,7 +127,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
 
         return count
 
-    @tracerutils.trace_function
+    @opentracing.trace_function
     def get_new_device_msgs_for_remote(
         self, destination, last_stream_id, current_stream_id, limit
     ):
@@ -143,23 +143,23 @@ class DeviceInboxWorkerStore(SQLBaseStore):
                 in the stream the messages got to.
         """
 
-        tracerutils.set_tag("destination", destination)
-        tracerutils.set_tag("last_stream_id", last_stream_id)
-        tracerutils.set_tag("current_stream_id", current_stream_id)
-        tracerutils.set_tag("limit", limit)
+        opentracing.set_tag("destination", destination)
+        opentracing.set_tag("last_stream_id", last_stream_id)
+        opentracing.set_tag("current_stream_id", current_stream_id)
+        opentracing.set_tag("limit", limit)
 
         has_changed = self._device_federation_outbox_stream_cache.has_entity_changed(
             destination, last_stream_id
         )
         if not has_changed or last_stream_id == current_stream_id:
-            tracerutils.log_kv({"message": "No new messages in stream"})
+            opentracing.log_kv({"message": "No new messages in stream"})
             return defer.succeed(([], current_stream_id))
 
         if limit <= 0:
             # This can happen if we run out of room for EDUs in the transaction.
             return defer.succeed(([], last_stream_id))
 
-        @tracerutils.trace_function
+        @opentracing.trace_function
         def get_new_messages_for_remote_destination_txn(txn):
             sql = (
                 "SELECT stream_id, messages_json FROM device_federation_outbox"
@@ -174,7 +174,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
                 stream_pos = row[0]
                 messages.append(json.loads(row[1]))
             if len(messages) < limit:
-                tracerutils.log_kv(
+                opentracing.log_kv(
                     {"message": "Set stream position to current position"}
                 )
                 stream_pos = current_stream_id
@@ -185,7 +185,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
             get_new_messages_for_remote_destination_txn,
         )
 
-    @tracerutils.trace_defered_function
+    @opentracing.trace_defered_function
     def delete_device_msgs_for_remote(self, destination, up_to_stream_id):
         """Used to delete messages when the remote destination acknowledges
         their receipt.
@@ -236,7 +236,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
             expiry_ms=30 * 60 * 1000,
         )
 
-    @tracerutils.trace_defered_function
+    @opentracing.trace_defered_function
     @defer.inlineCallbacks
     def add_messages_to_device_inbox(
         self, local_messages_by_user_then_device, remote_messages_by_destination
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 12fa70d416..e88e3f69ba 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-import synapse.util.tracerutils as tracerutils
+import synapse.logging.opentracing as opentracing
 
 from six import iteritems
 
@@ -300,7 +300,7 @@ class DeviceWorkerStore(SQLBaseStore):
     def get_device_stream_token(self):
         return self._device_list_id_gen.get_current_token()
 
-    @tracerutils.trace_defered_function
+    @opentracing.trace_defered_function
     @defer.inlineCallbacks
     def get_user_devices_from_cache(self, query_list):
         """Get the devices (and keys if any) for remote users from the cache.
@@ -332,8 +332,8 @@ class DeviceWorkerStore(SQLBaseStore):
             else:
                 results[user_id] = yield self._get_cached_devices_for_user(user_id)
 
-        tracerutils.set_tag("in_cache", results)
-        tracerutils.set_tag("not_in_cache", user_ids_not_in_cache)
+        opentracing.set_tag("in_cache", results)
+        opentracing.set_tag("not_in_cache", user_ids_not_in_cache)
 
         return (user_ids_not_in_cache, results)
 
diff --git a/synapse/storage/e2e_room_keys.py b/synapse/storage/e2e_room_keys.py
index ced6dd04de..f18605dfc4 100644
--- a/synapse/storage/e2e_room_keys.py
+++ b/synapse/storage/e2e_room_keys.py
@@ -18,7 +18,7 @@ import json
 from twisted.internet import defer
 
 from synapse.api.errors import StoreError
-from synapse.util.tracerutils import trace_defered_function
+from synapse.logging.opentracing import trace_defered_function
 
 from ._base import SQLBaseStore
 
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index 9fbff3ebf4..4fa694e06a 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -22,11 +22,11 @@ from synapse.util.caches.descriptors import cached
 
 from ._base import SQLBaseStore, db_to_json
 
-import synapse.util.tracerutils as tracerutils
+import synapse.logging.opentracing as opentracing
 
 
 class EndToEndKeyWorkerStore(SQLBaseStore):
-    @tracerutils.trace_defered_function
+    @opentracing.trace_defered_function
     @defer.inlineCallbacks
     def get_e2e_device_keys(
         self, query_list, include_all_devices=False, include_deleted_devices=False
@@ -43,7 +43,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
             Dict mapping from user-id to dict mapping from device_id to
             dict containing "key_json", "device_display_name".
         """
-        tracerutils.set_tag("query_list", query_list)
+        opentracing.set_tag("query_list", query_list)
         if not query_list:
             return {}
 
@@ -61,12 +61,12 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
 
         return results
 
-    @tracerutils.trace_function
+    @opentracing.trace_function
     def _get_e2e_device_keys_txn(
         self, txn, query_list, include_all_devices=False, include_deleted_devices=False
     ):
-        tracerutils.set_tag("include_all_devices", include_all_devices)
-        tracerutils.set_tag("include_deleted_devices", include_deleted_devices)
+        opentracing.set_tag("include_all_devices", include_all_devices)
+        opentracing.set_tag("include_deleted_devices", include_deleted_devices)
 
         query_clauses = []
         query_params = []
@@ -112,10 +112,10 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
             for user_id, device_id in deleted_devices:
                 result.setdefault(user_id, {})[device_id] = None
 
-        tracerutils.log_kv(result)
+        opentracing.log_kv(result)
         return result
 
-    @tracerutils.trace_defered_function
+    @opentracing.trace_defered_function
     @defer.inlineCallbacks
     def get_e2e_one_time_keys(self, user_id, device_id, key_ids):
         """Retrieve a number of one-time keys for a user
@@ -131,9 +131,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
             key_id) to json string for key
         """
 
-        tracerutils.set_tag("user_id", user_id)
-        tracerutils.set_tag("device_id", device_id)
-        tracerutils.set_tag("key_ids", key_ids)
+        opentracing.set_tag("user_id", user_id)
+        opentracing.set_tag("device_id", device_id)
+        opentracing.set_tag("key_ids", key_ids)
 
         rows = yield self._simple_select_many_batch(
             table="e2e_one_time_keys_json",
@@ -159,11 +159,11 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
                 (algorithm, key_id, key json)
         """
 
-        @tracerutils.trace_function
+        @opentracing.trace_function
         def _add_e2e_one_time_keys(txn):
-            tracerutils.set_tag("user_id", user_id)
-            tracerutils.set_tag("device_id", device_id)
-            tracerutils.set_tag("new_keys", new_keys)
+            opentracing.set_tag("user_id", user_id)
+            opentracing.set_tag("device_id", device_id)
+            opentracing.set_tag("new_keys", new_keys)
             # We are protected from race between lookup and insertion due to
             # a unique constraint. If there is a race of two calls to
             # `add_e2e_one_time_keys` then they'll conflict and we will only
@@ -219,12 +219,12 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
         or the keys were already in the database.
         """
 
-        @tracerutils.trace_function
+        @opentracing.trace_function
         def _set_e2e_device_keys_txn(txn):
-            tracerutils.set_tag("user_id", user_id)
-            tracerutils.set_tag("device_id", device_id)
-            tracerutils.set_tag("time_now", time_now)
-            tracerutils.set_tag("device_keys", device_keys)
+            opentracing.set_tag("user_id", user_id)
+            opentracing.set_tag("device_id", device_id)
+            opentracing.set_tag("time_now", time_now)
+            opentracing.set_tag("device_keys", device_keys)
 
             old_key_json = self._simple_select_one_onecol_txn(
                 txn,
@@ -239,7 +239,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
             new_key_json = encode_canonical_json(device_keys).decode("utf-8")
 
             if old_key_json == new_key_json:
-                tracerutils.log_kv({"event", "key already stored"})
+                opentracing.log_kv({"event", "key already stored"})
                 return False
 
             self._simple_upsert_txn(
@@ -256,7 +256,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
     def claim_e2e_one_time_keys(self, query_list):
         """Take a list of one time keys out of the database"""
 
-        @tracerutils.trace_function
+        @opentracing.trace_function
         def _claim_e2e_one_time_keys(txn):
             sql = (
                 "SELECT key_id, key_json FROM e2e_one_time_keys_json"
@@ -278,11 +278,11 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
                 " AND key_id = ?"
             )
             for user_id, device_id, algorithm, key_id in delete:
-                tracerutils.log_kv(
+                opentracing.log_kv(
                     {"message": "executing claim transaction on database"}
                 )
                 txn.execute(sql, (user_id, device_id, algorithm, key_id))
-                tracerutils.log_kv(
+                opentracing.log_kv(
                     {"message": "finished executing and invalidating cache"}
                 )
                 self._invalidate_cache_and_stream(
@@ -293,10 +293,10 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
         return self.runInteraction("claim_e2e_one_time_keys", _claim_e2e_one_time_keys)
 
     def delete_e2e_keys_by_device(self, user_id, device_id):
-        @tracerutils.trace_function
+        @opentracing.trace_function
         def delete_e2e_keys_by_device_txn(txn):
-            tracerutils.set_tag("user_id", user_id)
-            tracerutils.set_tag("device_id", device_id)
+            opentracing.set_tag("user_id", user_id)
+            opentracing.set_tag("device_id", device_id)
             self._simple_delete_txn(
                 txn,
                 table="e2e_device_keys_json",