summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/deviceinbox.py30
-rw-r--r--synapse/storage/devices.py8
-rw-r--r--synapse/storage/e2e_room_keys.py2
-rw-r--r--synapse/storage/end_to_end_keys.py54
4 files changed, 47 insertions, 47 deletions
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index 6011312af9..91c6cd4cf6 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -22,7 +22,7 @@ from twisted.internet import defer
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.background_updates import BackgroundUpdateStore
 from synapse.util.caches.expiringcache import ExpiringCache
-import synapse.util.tracerutils as tracerutils
+import synapse.logging.opentracing as opentracing
 
 logger = logging.getLogger(__name__)
 
@@ -73,7 +73,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
             "get_new_messages_for_device", get_new_messages_for_device_txn
         )
 
-    @tracerutils.trace_defered_function
+    @opentracing.trace_defered_function
     @defer.inlineCallbacks
     def delete_messages_for_device(self, user_id, device_id, up_to_stream_id):
         """
@@ -90,14 +90,14 @@ class DeviceInboxWorkerStore(SQLBaseStore):
             (user_id, device_id), None
         )
 
-        tracerutils.set_tag("last_deleted_stream_id", last_deleted_stream_id)
+        opentracing.set_tag("last_deleted_stream_id", last_deleted_stream_id)
 
         if last_deleted_stream_id:
             has_changed = self._device_inbox_stream_cache.has_entity_changed(
                 user_id, last_deleted_stream_id
             )
             if not has_changed:
-                tracerutils.log_kv({"message": "No changes in cache since last check"})
+                opentracing.log_kv({"message": "No changes in cache since last check"})
                 return 0
 
         def delete_messages_for_device_txn(txn):
@@ -113,7 +113,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
             "delete_messages_for_device", delete_messages_for_device_txn
         )
 
-        tracerutils.log_kv(
+        opentracing.log_kv(
             {"message": "deleted {} messages for device".format(count), "count": count}
         )
 
@@ -127,7 +127,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
 
         return count
 
-    @tracerutils.trace_function
+    @opentracing.trace_function
     def get_new_device_msgs_for_remote(
         self, destination, last_stream_id, current_stream_id, limit
     ):
@@ -143,23 +143,23 @@ class DeviceInboxWorkerStore(SQLBaseStore):
                 in the stream the messages got to.
         """
 
-        tracerutils.set_tag("destination", destination)
-        tracerutils.set_tag("last_stream_id", last_stream_id)
-        tracerutils.set_tag("current_stream_id", current_stream_id)
-        tracerutils.set_tag("limit", limit)
+        opentracing.set_tag("destination", destination)
+        opentracing.set_tag("last_stream_id", last_stream_id)
+        opentracing.set_tag("current_stream_id", current_stream_id)
+        opentracing.set_tag("limit", limit)
 
         has_changed = self._device_federation_outbox_stream_cache.has_entity_changed(
             destination, last_stream_id
         )
         if not has_changed or last_stream_id == current_stream_id:
-            tracerutils.log_kv({"message": "No new messages in stream"})
+            opentracing.log_kv({"message": "No new messages in stream"})
             return defer.succeed(([], current_stream_id))
 
         if limit <= 0:
             # This can happen if we run out of room for EDUs in the transaction.
             return defer.succeed(([], last_stream_id))
 
-        @tracerutils.trace_function
+        @opentracing.trace_function
         def get_new_messages_for_remote_destination_txn(txn):
             sql = (
                 "SELECT stream_id, messages_json FROM device_federation_outbox"
@@ -174,7 +174,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
                 stream_pos = row[0]
                 messages.append(json.loads(row[1]))
             if len(messages) < limit:
-                tracerutils.log_kv(
+                opentracing.log_kv(
                     {"message": "Set stream position to current position"}
                 )
                 stream_pos = current_stream_id
@@ -185,7 +185,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
             get_new_messages_for_remote_destination_txn,
         )
 
-    @tracerutils.trace_defered_function
+    @opentracing.trace_defered_function
     def delete_device_msgs_for_remote(self, destination, up_to_stream_id):
         """Used to delete messages when the remote destination acknowledges
         their receipt.
@@ -236,7 +236,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
             expiry_ms=30 * 60 * 1000,
         )
 
-    @tracerutils.trace_defered_function
+    @opentracing.trace_defered_function
     @defer.inlineCallbacks
     def add_messages_to_device_inbox(
         self, local_messages_by_user_then_device, remote_messages_by_destination
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 12fa70d416..e88e3f69ba 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-import synapse.util.tracerutils as tracerutils
+import synapse.logging.opentracing as opentracing
 
 from six import iteritems
 
@@ -300,7 +300,7 @@ class DeviceWorkerStore(SQLBaseStore):
     def get_device_stream_token(self):
         return self._device_list_id_gen.get_current_token()
 
-    @tracerutils.trace_defered_function
+    @opentracing.trace_defered_function
     @defer.inlineCallbacks
     def get_user_devices_from_cache(self, query_list):
         """Get the devices (and keys if any) for remote users from the cache.
@@ -332,8 +332,8 @@ class DeviceWorkerStore(SQLBaseStore):
             else:
                 results[user_id] = yield self._get_cached_devices_for_user(user_id)
 
-        tracerutils.set_tag("in_cache", results)
-        tracerutils.set_tag("not_in_cache", user_ids_not_in_cache)
+        opentracing.set_tag("in_cache", results)
+        opentracing.set_tag("not_in_cache", user_ids_not_in_cache)
 
         return (user_ids_not_in_cache, results)
 
diff --git a/synapse/storage/e2e_room_keys.py b/synapse/storage/e2e_room_keys.py
index ced6dd04de..f18605dfc4 100644
--- a/synapse/storage/e2e_room_keys.py
+++ b/synapse/storage/e2e_room_keys.py
@@ -18,7 +18,7 @@ import json
 from twisted.internet import defer
 
 from synapse.api.errors import StoreError
-from synapse.util.tracerutils import trace_defered_function
+from synapse.logging.opentracing import trace_defered_function
 
 from ._base import SQLBaseStore
 
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index 9fbff3ebf4..4fa694e06a 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -22,11 +22,11 @@ from synapse.util.caches.descriptors import cached
 
 from ._base import SQLBaseStore, db_to_json
 
-import synapse.util.tracerutils as tracerutils
+import synapse.logging.opentracing as opentracing
 
 
 class EndToEndKeyWorkerStore(SQLBaseStore):
-    @tracerutils.trace_defered_function
+    @opentracing.trace_defered_function
     @defer.inlineCallbacks
     def get_e2e_device_keys(
         self, query_list, include_all_devices=False, include_deleted_devices=False
@@ -43,7 +43,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
             Dict mapping from user-id to dict mapping from device_id to
             dict containing "key_json", "device_display_name".
         """
-        tracerutils.set_tag("query_list", query_list)
+        opentracing.set_tag("query_list", query_list)
         if not query_list:
             return {}
 
@@ -61,12 +61,12 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
 
         return results
 
-    @tracerutils.trace_function
+    @opentracing.trace_function
     def _get_e2e_device_keys_txn(
         self, txn, query_list, include_all_devices=False, include_deleted_devices=False
     ):
-        tracerutils.set_tag("include_all_devices", include_all_devices)
-        tracerutils.set_tag("include_deleted_devices", include_deleted_devices)
+        opentracing.set_tag("include_all_devices", include_all_devices)
+        opentracing.set_tag("include_deleted_devices", include_deleted_devices)
 
         query_clauses = []
         query_params = []
@@ -112,10 +112,10 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
             for user_id, device_id in deleted_devices:
                 result.setdefault(user_id, {})[device_id] = None
 
-        tracerutils.log_kv(result)
+        opentracing.log_kv(result)
         return result
 
-    @tracerutils.trace_defered_function
+    @opentracing.trace_defered_function
     @defer.inlineCallbacks
     def get_e2e_one_time_keys(self, user_id, device_id, key_ids):
         """Retrieve a number of one-time keys for a user
@@ -131,9 +131,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
             key_id) to json string for key
         """
 
-        tracerutils.set_tag("user_id", user_id)
-        tracerutils.set_tag("device_id", device_id)
-        tracerutils.set_tag("key_ids", key_ids)
+        opentracing.set_tag("user_id", user_id)
+        opentracing.set_tag("device_id", device_id)
+        opentracing.set_tag("key_ids", key_ids)
 
         rows = yield self._simple_select_many_batch(
             table="e2e_one_time_keys_json",
@@ -159,11 +159,11 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
                 (algorithm, key_id, key json)
         """
 
-        @tracerutils.trace_function
+        @opentracing.trace_function
         def _add_e2e_one_time_keys(txn):
-            tracerutils.set_tag("user_id", user_id)
-            tracerutils.set_tag("device_id", device_id)
-            tracerutils.set_tag("new_keys", new_keys)
+            opentracing.set_tag("user_id", user_id)
+            opentracing.set_tag("device_id", device_id)
+            opentracing.set_tag("new_keys", new_keys)
             # We are protected from race between lookup and insertion due to
             # a unique constraint. If there is a race of two calls to
             # `add_e2e_one_time_keys` then they'll conflict and we will only
@@ -219,12 +219,12 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
         or the keys were already in the database.
         """
 
-        @tracerutils.trace_function
+        @opentracing.trace_function
         def _set_e2e_device_keys_txn(txn):
-            tracerutils.set_tag("user_id", user_id)
-            tracerutils.set_tag("device_id", device_id)
-            tracerutils.set_tag("time_now", time_now)
-            tracerutils.set_tag("device_keys", device_keys)
+            opentracing.set_tag("user_id", user_id)
+            opentracing.set_tag("device_id", device_id)
+            opentracing.set_tag("time_now", time_now)
+            opentracing.set_tag("device_keys", device_keys)
 
             old_key_json = self._simple_select_one_onecol_txn(
                 txn,
@@ -239,7 +239,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
             new_key_json = encode_canonical_json(device_keys).decode("utf-8")
 
             if old_key_json == new_key_json:
-                tracerutils.log_kv({"event", "key already stored"})
+                opentracing.log_kv({"event", "key already stored"})
                 return False
 
             self._simple_upsert_txn(
@@ -256,7 +256,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
     def claim_e2e_one_time_keys(self, query_list):
         """Take a list of one time keys out of the database"""
 
-        @tracerutils.trace_function
+        @opentracing.trace_function
         def _claim_e2e_one_time_keys(txn):
             sql = (
                 "SELECT key_id, key_json FROM e2e_one_time_keys_json"
@@ -278,11 +278,11 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
                 " AND key_id = ?"
             )
             for user_id, device_id, algorithm, key_id in delete:
-                tracerutils.log_kv(
+                opentracing.log_kv(
                     {"message": "executing claim transaction on database"}
                 )
                 txn.execute(sql, (user_id, device_id, algorithm, key_id))
-                tracerutils.log_kv(
+                opentracing.log_kv(
                     {"message": "finished executing and invalidating cache"}
                 )
                 self._invalidate_cache_and_stream(
@@ -293,10 +293,10 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
         return self.runInteraction("claim_e2e_one_time_keys", _claim_e2e_one_time_keys)
 
     def delete_e2e_keys_by_device(self, user_id, device_id):
-        @tracerutils.trace_function
+        @opentracing.trace_function
         def delete_e2e_keys_by_device_txn(txn):
-            tracerutils.set_tag("user_id", user_id)
-            tracerutils.set_tag("device_id", device_id)
+            opentracing.set_tag("user_id", user_id)
+            opentracing.set_tag("device_id", device_id)
             self._simple_delete_txn(
                 txn,
                 table="e2e_device_keys_json",