diff options
author | Jorik Schellekens <joriks@matrix.org> | 2019-07-02 17:34:48 +0100 |
---|---|---|
committer | Jorik Schellekens <joriks@matrix.org> | 2019-07-23 16:04:02 +0100 |
commit | 21940cadf05d6c1fa55c30b7bda07bc6643b7bf2 (patch) | |
tree | dfaccd4fdb9e7e77365b54d9d106a9fb46ed4974 | |
parent | Trace device messages. (diff) | |
download | synapse-21940cadf05d6c1fa55c30b7bda07bc6643b7bf2.tar.xz |
Update to new access pattern
-rw-r--r-- | synapse/handlers/device.py | 46 | ||||
-rw-r--r-- | synapse/handlers/devicemessage.py | 8 | ||||
-rw-r--r-- | synapse/handlers/e2e_keys.py | 58 | ||||
-rw-r--r-- | synapse/handlers/e2e_room_keys.py | 18 | ||||
-rw-r--r-- | synapse/rest/client/v2_alpha/keys.py | 12 | ||||
-rw-r--r-- | synapse/rest/client/v2_alpha/room_keys.py | 7 | ||||
-rw-r--r-- | synapse/rest/client/v2_alpha/sendtodevice.py | 12 | ||||
-rw-r--r-- | synapse/storage/deviceinbox.py | 30 | ||||
-rw-r--r-- | synapse/storage/devices.py | 8 | ||||
-rw-r--r-- | synapse/storage/end_to_end_keys.py | 54 |
10 files changed, 121 insertions, 132 deletions
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 2383ac670c..fa9669c47b 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -31,7 +31,7 @@ from synapse.util.async_helpers import Linearizer from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.metrics import measure_func from synapse.util.retryutils import NotRetryingDestination -from synapse.util.tracerutils import TracerUtil, trace_defered_function +import synapse.util.tracerutils as tracerutils from ._base import BaseHandler @@ -46,7 +46,7 @@ class DeviceWorkerHandler(BaseHandler): self.state = hs.get_state_handler() self._auth_handler = hs.get_auth_handler() - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def get_devices_by_user(self, user_id): """ @@ -58,7 +58,7 @@ class DeviceWorkerHandler(BaseHandler): defer.Deferred: list[dict[str, X]]: info on each device """ - TracerUtil.set_tag("user_id", user_id) + tracerutils.set_tag("user_id", user_id) device_map = yield self.store.get_devices_by_user(user_id) ips = yield self.store.get_last_client_ip_by_device(user_id, device_id=None) @@ -67,10 +67,10 @@ class DeviceWorkerHandler(BaseHandler): for device in devices: _update_device_from_client_ips(device, ips) - TracerUtil.log_kv(device_map) + tracerutils.log_kv(device_map) return devices - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def get_device(self, user_id, device_id): """ Retrieve the given device @@ -91,13 +91,13 @@ class DeviceWorkerHandler(BaseHandler): ips = yield self.store.get_last_client_ip_by_device(user_id, device_id) _update_device_from_client_ips(device, ips) - TracerUtil.set_tag("device", device) - TracerUtil.set_tag("ips", ips) + tracerutils.set_tag("device", device) + tracerutils.set_tag("ips", ips) return device @measure_func("device.get_user_ids_changed") - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def get_user_ids_changed(self, user_id, from_token): """Get list of users that have had the devices updated, or have newly @@ -108,8 +108,8 @@ class DeviceWorkerHandler(BaseHandler): from_token (StreamToken) """ - TracerUtil("user_id", user_id) - TracerUtil.set_tag("from_token", from_token) + tracerutils("user_id", user_id) + tracerutils.set_tag("from_token", from_token) now_room_key = yield self.store.get_room_events_max_id() room_ids = yield self.store.get_rooms_for_user(user_id) @@ -161,7 +161,7 @@ class DeviceWorkerHandler(BaseHandler): # special-case for an empty prev state: include all members # in the changed list if not event_ids: - TracerUtil.log_kv( + tracerutils.log_kv( {"event": "encountered empty previous state", "room_id": room_id} ) for key, event_id in iteritems(current_state_ids): @@ -216,7 +216,7 @@ class DeviceWorkerHandler(BaseHandler): possibly_joined = [] possibly_left = [] - TracerUtil.log_kv( + tracerutils.log_kv( {"changed": list(possibly_joined), "left": list(possibly_left)} ) @@ -287,7 +287,7 @@ class DeviceHandler(DeviceWorkerHandler): raise errors.StoreError(500, "Couldn't generate a device ID.") - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def delete_device(self, user_id, device_id): """ Delete the given device @@ -305,8 +305,8 @@ class DeviceHandler(DeviceWorkerHandler): except errors.StoreError as e: if e.code == 404: # no match - TracerUtil.set_tag("error", True) - TracerUtil.set_tag("reason", "User doesn't have that device id.") + tracerutils.set_tag("error", True) + tracerutils.set_tag("reason", "User doesn't have that device id.") pass else: raise @@ -319,7 +319,7 @@ class DeviceHandler(DeviceWorkerHandler): yield self.notify_device_update(user_id, [device_id]) - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def delete_all_devices_for_user(self, user_id, except_device_id=None): """Delete all of the user's devices @@ -355,8 +355,8 @@ class DeviceHandler(DeviceWorkerHandler): except errors.StoreError as e: if e.code == 404: # no match - TracerUtil.set_tag("error", True) - TracerUtil.set_tag("reason", "User doesn't have that device id.") + tracerutils.set_tag("error", True) + tracerutils.set_tag("reason", "User doesn't have that device id.") pass else: raise @@ -477,15 +477,15 @@ class DeviceListEduUpdater(object): iterable=True, ) - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def incoming_device_list_update(self, origin, edu_content): """Called on incoming device list update from federation. Responsible for parsing the EDU and adding to pending updates list. """ - TracerUtil.set_tag("origin", origin) - TracerUtil.set_tag("edu_content", edu_content) + tracerutils.set_tag("origin", origin) + tracerutils.set_tag("edu_content", edu_content) user_id = edu_content.pop("user_id") device_id = edu_content.pop("device_id") stream_id = str(edu_content.pop("stream_id")) # They may come as ints @@ -506,8 +506,8 @@ class DeviceListEduUpdater(object): if not room_ids: # We don't share any rooms with this user. Ignore update, as we # probably won't get any further updates. - TracerUtil.set_tag("error", True) - TracerUtil.log_kv( + tracerutils.set_tag("error", True) + tracerutils.log_kv( { "message": "Got an update from a user which " + "doesn't share a room with the current user." diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index c999805184..de22b37b9e 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -20,7 +20,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError from synapse.types import UserID, get_domain_from_id from synapse.util.stringutils import random_string -from synapse.util.tracerutils import TracerUtil, trace_defered_function +import synapse.util.tracerutils as tracerutils logger = logging.getLogger(__name__) @@ -77,7 +77,7 @@ class DeviceMessageHandler(object): "to_device_key", stream_id, users=local_messages.keys() ) - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def send_device_message(self, sender_user_id, message_type, messages): @@ -111,7 +111,7 @@ class DeviceMessageHandler(object): "message_id": message_id, } - TracerUtil.log_kv(local_messages) + tracerutils.log_kv(local_messages) stream_id = yield self.store.add_messages_to_device_inbox( local_messages, remote_edu_contents ) @@ -120,7 +120,7 @@ class DeviceMessageHandler(object): "to_device_key", stream_id, users=local_messages.keys() ) - TracerUtil.log_kv(remote_messages) + tracerutils.log_kv(remote_messages) for destination in remote_messages.keys(): # Enqueue a new federation transaction to send the new # device messages to each remote destination. diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 8b2249fc5b..6bf9d955bc 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -15,7 +15,7 @@ # limitations under the License. import logging -from synapse.util.tracerutils import TracerUtil, trace_defered_function +import synapse.util.tracerutils as tracerutils from six import iteritems @@ -46,7 +46,7 @@ class E2eKeysHandler(object): "client_keys", self.on_federation_query_client_keys ) - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def query_devices(self, query_body, timeout): """ Handle a device key query from a client @@ -82,8 +82,8 @@ class E2eKeysHandler(object): else: remote_queries[user_id] = device_ids - TracerUtil.set_tag("local_key_query", local_query) - TracerUtil.set_tag("remote_key_query", remote_queries) + tracerutils.set_tag("local_key_query", local_query) + tracerutils.set_tag("remote_key_query", remote_queries) # First get local devices. failures = {} @@ -125,12 +125,12 @@ class E2eKeysHandler(object): r[user_id] = remote_queries[user_id] # Now fetch any devices that we don't have in our cache - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def do_remote_query(destination): destination_query = remote_queries_not_in_cache[destination] - TracerUtil.set_tag("key_query", destination_query) + tracerutils.set_tag("key_query", destination_query) try: remote_result = yield self.federation.query_client_keys( destination, {"device_keys": destination_query}, timeout=timeout @@ -143,8 +143,8 @@ class E2eKeysHandler(object): except Exception as e: failure = _exception_to_failure(e) failures[destination] = failure - TracerUtil.set_tag("error", True) - TracerUtil.set_tag("reason", failure) + tracerutils.set_tag("error", True) + tracerutils.set_tag("reason", failure) yield make_deferred_yieldable( defer.gatherResults( @@ -158,7 +158,7 @@ class E2eKeysHandler(object): return {"device_keys": results, "failures": failures} - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def query_local_devices(self, query): """Get E2E device keys for local users @@ -171,7 +171,7 @@ class E2eKeysHandler(object): defer.Deferred: (resolves to dict[string, dict[string, dict]]): map from user_id -> device_id -> device details """ - TracerUtil.set_tag("local_query", query) + tracerutils.set_tag("local_query", query) local_query = [] result_dict = {} @@ -179,14 +179,14 @@ class E2eKeysHandler(object): # we use UserID.from_string to catch invalid user ids if not self.is_mine(UserID.from_string(user_id)): logger.warning("Request for keys for non-local user %s", user_id) - TracerUtil.log_kv( + tracerutils.log_kv( { "message": "Requested a local key for a user which" + " was not local to the homeserver", "user_id": user_id, } ) - TracerUtil.set_tag("error", True) + tracerutils.set_tag("error", True) raise SynapseError(400, "Not a user here") if not device_ids: @@ -211,7 +211,7 @@ class E2eKeysHandler(object): r["unsigned"]["device_display_name"] = display_name result_dict[user_id][device_id] = r - TracerUtil.log_kv(results) + tracerutils.log_kv(results) return result_dict @defer.inlineCallbacks @@ -222,7 +222,7 @@ class E2eKeysHandler(object): res = yield self.query_local_devices(device_keys_query) return {"device_keys": res} - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def claim_one_time_keys(self, query, timeout): local_query = [] @@ -237,8 +237,8 @@ class E2eKeysHandler(object): domain = get_domain_from_id(user_id) remote_queries.setdefault(domain, {})[user_id] = device_keys - TracerUtil.set_tag("local_key_query", local_query) - TracerUtil.set_tag("remote_key_query", remote_queries) + tracerutils.set_tag("local_key_query", local_query) + tracerutils.set_tag("remote_key_query", remote_queries) results = yield self.store.claim_e2e_one_time_keys(local_query) @@ -251,10 +251,10 @@ class E2eKeysHandler(object): key_id: json.loads(json_bytes) } - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def claim_client_keys(destination): - TracerUtil.set_tag("destination", destination) + tracerutils.set_tag("destination", destination) device_keys = remote_queries[destination] try: remote_result = yield self.federation.claim_client_keys( @@ -267,8 +267,8 @@ class E2eKeysHandler(object): except Exception as e: failure = _exception_to_failure(e) failures[destination] = failure - TracerUtil.set_tag("error", True) - TracerUtil.set_tag("reason", failure) + tracerutils.set_tag("error", True) + tracerutils.set_tag("reason", failure) yield make_deferred_yieldable( defer.gatherResults( @@ -292,21 +292,21 @@ class E2eKeysHandler(object): ), ) - TracerUtil.log_kv({"one_time_keys": json_result, "failures": failures}) + tracerutils.log_kv({"one_time_keys": json_result, "failures": failures}) return {"one_time_keys": json_result, "failures": failures} - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def upload_keys_for_user(self, user_id, device_id, keys): - TracerUtil.set_tag("user_id", user_id) - TracerUtil.set_tag("device_id", device_id) - TracerUtil.set_tag("keys", keys) + tracerutils.set_tag("user_id", user_id) + tracerutils.set_tag("device_id", device_id) + tracerutils.set_tag("keys", keys) time_now = self.clock.time_msec() # TODO: Validate the JSON to make sure it has the right keys. device_keys = keys.get("device_keys", None) - TracerUtil.set_tag("device_keys", device_keys) + tracerutils.set_tag("device_keys", device_keys) if device_keys: logger.info( "Updating device_keys for device %r for user %s at %d", @@ -328,7 +328,7 @@ class E2eKeysHandler(object): user_id, device_id, time_now, one_time_keys ) else: - TracerUtil.log_kv( + tracerutils.log_kv( {"event": "did not upload one_time_keys", "reason": "no keys given"} ) @@ -341,10 +341,10 @@ class E2eKeysHandler(object): result = yield self.store.count_e2e_one_time_keys(user_id, device_id) - TracerUtil.set_tag("one_time_key_counts", result) + tracerutils.set_tag("one_time_key_counts", result) return {"one_time_key_counts": result} - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def _upload_one_time_keys_for_user( self, user_id, device_id, time_now, one_time_keys diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py index 07724d1c4a..a90ec2f9fc 100644 --- a/synapse/handlers/e2e_room_keys.py +++ b/synapse/handlers/e2e_room_keys.py @@ -27,7 +27,7 @@ from synapse.api.errors import ( SynapseError, ) from synapse.util.async_helpers import Linearizer -from synapse.util.tracerutils import TracerUtil, trace_defered_function +import synapse.util.tracerutils as tracerutils logger = logging.getLogger(__name__) @@ -50,7 +50,7 @@ class E2eRoomKeysHandler(object): # changed. self._upload_linearizer = Linearizer("upload_room_keys_lock") - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def get_room_keys(self, user_id, version, room_id=None, session_id=None): """Bulk get the E2E room keys for a given backup, optionally filtered to a given @@ -86,10 +86,10 @@ class E2eRoomKeysHandler(object): user_id, version, room_id, session_id ) - TracerUtil.log_kv(results) + tracerutils.log_kv(results) return results - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def delete_room_keys(self, user_id, version, room_id=None, session_id=None): """Bulk delete the E2E room keys for a given backup, optionally filtered to a given @@ -111,7 +111,7 @@ class E2eRoomKeysHandler(object): with (yield self._upload_linearizer.queue(user_id)): yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id) - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def upload_room_keys(self, user_id, version, room_keys): """Bulk upload a list of room keys into a given backup version, asserting @@ -179,7 +179,7 @@ class E2eRoomKeysHandler(object): user_id, version, room_id, session_id, session ) - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def _upload_room_key(self, user_id, version, room_id, session_id, room_key): """Upload a given room_key for a given room and session into a given @@ -242,7 +242,7 @@ class E2eRoomKeysHandler(object): return False return True - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def create_version(self, user_id, version_info): """Create a new backup version. This automatically becomes the new @@ -301,7 +301,7 @@ class E2eRoomKeysHandler(object): raise return res - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def delete_version(self, user_id, version=None): """Deletes a given version of the user's e2e_room_keys backup @@ -322,7 +322,7 @@ class E2eRoomKeysHandler(object): else: raise - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def update_version(self, user_id, version, version_info): """Update the info about a given version of the user's backup diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index 8de9e12df4..40052e7a05 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -25,11 +25,7 @@ from synapse.http.servlet import ( parse_string, ) from synapse.types import StreamToken -from synapse.util.tracerutils import ( - TracerUtil, - trace_defered_function_using_operation_name, -) - +import synapse.util.tracerutils as tracerutils from ._base import client_patterns logger = logging.getLogger(__name__) @@ -72,7 +68,7 @@ class KeyUploadServlet(RestServlet): self.auth = hs.get_auth() self.e2e_keys_handler = hs.get_e2e_keys_handler() - @trace_defered_function_using_operation_name("upload_keys") + @tracerutils.trace_defered_function_using_operation_name("upload_keys") @defer.inlineCallbacks def on_POST(self, request, device_id): requester = yield self.auth.get_user_by_req(request, allow_guest=True) @@ -83,8 +79,8 @@ class KeyUploadServlet(RestServlet): # passing the device_id here is deprecated; however, we allow it # for now for compatibility with older clients. if requester.device_id is not None and device_id != requester.device_id: - TracerUtil.set_tag("error", True) - TracerUtil.log_kv( + tracerutils.set_tag("error", True) + tracerutils.log_kv( { "message": "Client uploading keys for a different device", "logged_in_id": requester.device_id, diff --git a/synapse/rest/client/v2_alpha/room_keys.py b/synapse/rest/client/v2_alpha/room_keys.py index 34ab74d3a4..ac03d58899 100644 --- a/synapse/rest/client/v2_alpha/room_keys.py +++ b/synapse/rest/client/v2_alpha/room_keys.py @@ -23,10 +23,7 @@ from synapse.http.servlet import ( parse_json_object_from_request, parse_string, ) -from synapse.util.tracerutils import ( - TracerUtil, - trace_defered_function_using_operation_name, -) +import synapse.util.tracerutils as tracerutils from ._base import client_patterns @@ -315,7 +312,7 @@ class RoomKeysVersionServlet(RestServlet): self.auth = hs.get_auth() self.e2e_room_keys_handler = hs.get_e2e_room_keys_handler() - @trace_defered_function_using_operation_name("get_room_keys_version") + @tracerutils.trace_defered_function_using_operation_name("get_room_keys_version") @defer.inlineCallbacks def on_GET(self, request, version): """ diff --git a/synapse/rest/client/v2_alpha/sendtodevice.py b/synapse/rest/client/v2_alpha/sendtodevice.py index a7782cbd7c..9df38a8f5e 100644 --- a/synapse/rest/client/v2_alpha/sendtodevice.py +++ b/synapse/rest/client/v2_alpha/sendtodevice.py @@ -20,11 +20,7 @@ from twisted.internet import defer from synapse.http import servlet from synapse.http.servlet import parse_json_object_from_request from synapse.rest.client.transactions import HttpTransactionCache -from synapse.util.tracerutils import ( - TracerUtil, - trace_defered_function_using_operation_name, - tag_args, -) +import synapse.util.tracerutils as tracerutils from ._base import client_patterns @@ -47,10 +43,10 @@ class SendToDeviceRestServlet(servlet.RestServlet): self.txns = HttpTransactionCache(hs) self.device_message_handler = hs.get_device_message_handler() - @trace_defered_function_using_operation_name("sendToDevice") + @tracerutils.trace_defered_function_using_operation_name("sendToDevice") def on_PUT(self, request, message_type, txn_id): - TracerUtil.set_tag("message_type", message_type) - TracerUtil.set_tag("txn_id", txn_id) + tracerutils.set_tag("message_type", message_type) + tracerutils.set_tag("txn_id", txn_id) return self.txns.fetch_or_execute_request( request, self._put, request, message_type, txn_id ) diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 4a45926c45..8687b5a046 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -22,7 +22,7 @@ from twisted.internet import defer from synapse.storage._base import SQLBaseStore from synapse.storage.background_updates import BackgroundUpdateStore from synapse.util.caches.expiringcache import ExpiringCache -from synapse.util.tracerutils import TracerUtil, trace_defered_function, trace_function +import synapse.util.tracerutils as tracerutils logger = logging.getLogger(__name__) @@ -73,7 +73,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): "get_new_messages_for_device", get_new_messages_for_device_txn ) - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def delete_messages_for_device(self, user_id, device_id, up_to_stream_id): """ @@ -90,14 +90,14 @@ class DeviceInboxWorkerStore(SQLBaseStore): (user_id, device_id), None ) - TracerUtil.set_tag("last_deleted_stream_id", last_deleted_stream_id) + tracerutils.set_tag("last_deleted_stream_id", last_deleted_stream_id) if last_deleted_stream_id: has_changed = self._device_inbox_stream_cache.has_entity_changed( user_id, last_deleted_stream_id ) if not has_changed: - TracerUtil.log_kv({"message": "No changes in cache since last check"}) + tracerutils.log_kv({"message": "No changes in cache since last check"}) return 0 def delete_messages_for_device_txn(txn): @@ -113,7 +113,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): "delete_messages_for_device", delete_messages_for_device_txn ) - TracerUtil.log_kv( + tracerutils.log_kv( {"message": "deleted {} messages for device".format(count), "count": count} ) @@ -127,7 +127,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): return count - @trace_defered_function + @tracerutils.trace_defered_function def get_new_device_msgs_for_remote( self, destination, last_stream_id, current_stream_id, limit ): @@ -143,23 +143,23 @@ class DeviceInboxWorkerStore(SQLBaseStore): in the stream the messages got to. """ - TracerUtil.set_tag("destination", destination) - TracerUtil.set_tag("last_stream_id", last_stream_id) - TracerUtil.set_tag("current_stream_id", current_stream_id) - TracerUtil.set_tag("limit", limit) + tracerutils.set_tag("destination", destination) + tracerutils.set_tag("last_stream_id", last_stream_id) + tracerutils.set_tag("current_stream_id", current_stream_id) + tracerutils.set_tag("limit", limit) has_changed = self._device_federation_outbox_stream_cache.has_entity_changed( destination, last_stream_id ) if not has_changed or last_stream_id == current_stream_id: - TracerUtil.log_kv({"message": "No new messages in stream"}) + tracerutils.log_kv({"message": "No new messages in stream"}) return defer.succeed(([], current_stream_id)) if limit <= 0: # This can happen if we run out of room for EDUs in the transaction. return defer.succeed(([], last_stream_id)) - @trace_function + @tracerutils.trace_function def get_new_messages_for_remote_destination_txn(txn): sql = ( "SELECT stream_id, messages_json FROM device_federation_outbox" @@ -174,7 +174,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): stream_pos = row[0] messages.append(json.loads(row[1])) if len(messages) < limit: - TracerUtil.log_kv( + tracerutils.log_kv( {"message": "Set stream position to current position"} ) stream_pos = current_stream_id @@ -185,7 +185,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): get_new_messages_for_remote_destination_txn, ) - @trace_defered_function + @tracerutils.trace_defered_function def delete_device_msgs_for_remote(self, destination, up_to_stream_id): """Used to delete messages when the remote destination acknowledges their receipt. @@ -236,7 +236,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore): expiry_ms=30 * 60 * 1000, ) - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def add_messages_to_device_inbox( self, local_messages_by_user_then_device, remote_messages_by_destination diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 4627bc049a..12fa70d416 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from synapse.util.tracerutils import TracerUtil, trace_defered_function +import synapse.util.tracerutils as tracerutils from six import iteritems @@ -300,7 +300,7 @@ class DeviceWorkerStore(SQLBaseStore): def get_device_stream_token(self): return self._device_list_id_gen.get_current_token() - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def get_user_devices_from_cache(self, query_list): """Get the devices (and keys if any) for remote users from the cache. @@ -332,8 +332,8 @@ class DeviceWorkerStore(SQLBaseStore): else: results[user_id] = yield self._get_cached_devices_for_user(user_id) - TracerUtil.set_tag("in_cache", results) - TracerUtil.set_tag("not_in_cache", user_ids_not_in_cache) + tracerutils.set_tag("in_cache", results) + tracerutils.set_tag("not_in_cache", user_ids_not_in_cache) return (user_ids_not_in_cache, results) diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index acfe3da0ad..98246c347d 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -22,11 +22,11 @@ from synapse.util.caches.descriptors import cached from ._base import SQLBaseStore, db_to_json -from synapse.util.tracerutils import TracerUtil, trace_defered_function, trace_function +import synapse.util.tracerutils as tracerutils class EndToEndKeyWorkerStore(SQLBaseStore): - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def get_e2e_device_keys( self, query_list, include_all_devices=False, include_deleted_devices=False @@ -43,7 +43,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore): Dict mapping from user-id to dict mapping from device_id to dict containing "key_json", "device_display_name". """ - TracerUtil.set_tag("query_list", query_list) + tracerutils.set_tag("query_list", query_list) if not query_list: return {} @@ -61,12 +61,12 @@ class EndToEndKeyWorkerStore(SQLBaseStore): return results - @trace_function + @tracerutils.trace_function def _get_e2e_device_keys_txn( self, txn, query_list, include_all_devices=False, include_deleted_devices=False ): - TracerUtil.set_tag("include_all_devices", include_all_devices) - TracerUtil.set_tag("include_deleted_devices", include_deleted_devices) + tracerutils.set_tag("include_all_devices", include_all_devices) + tracerutils.set_tag("include_deleted_devices", include_deleted_devices) query_clauses = [] query_params = [] @@ -112,10 +112,10 @@ class EndToEndKeyWorkerStore(SQLBaseStore): for user_id, device_id in deleted_devices: result.setdefault(user_id, {})[device_id] = None - TracerUtil.log_kv(result) + tracerutils.log_kv(result) return result - @trace_defered_function + @tracerutils.trace_defered_function @defer.inlineCallbacks def get_e2e_one_time_keys(self, user_id, device_id, key_ids): """Retrieve a number of one-time keys for a user @@ -131,9 +131,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore): key_id) to json string for key """ - TracerUtil.set_tag("user_id", user_id) - TracerUtil.set_tag("device_id", device_id) - TracerUtil.set_tag("key_ids", key_ids) + tracerutils.set_tag("user_id", user_id) + tracerutils.set_tag("device_id", device_id) + tracerutils.set_tag("key_ids", key_ids) rows = yield self._simple_select_many_batch( table="e2e_one_time_keys_json", @@ -159,11 +159,11 @@ class EndToEndKeyWorkerStore(SQLBaseStore): (algorithm, key_id, key json) """ - @trace_function + @tracerutils.strace_function def _add_e2e_one_time_keys(txn): - TracerUtil.set_tag("user_id", user_id) - TracerUtil.set_tag("device_id", device_id) - TracerUtil.set_tag("new_keys", new_keys) + tracerutils.set_tag("user_id", user_id) + tracerutils.set_tag("device_id", device_id) + tracerutils.set_tag("new_keys", new_keys) # We are protected from race between lookup and insertion due to # a unique constraint. If there is a race of two calls to # `add_e2e_one_time_keys` then they'll conflict and we will only @@ -219,12 +219,12 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): or the keys were already in the database. """ - @trace_function + @tracerutils.trace_function def _set_e2e_device_keys_txn(txn): - TracerUtil.set_tag("user_id", user_id) - TracerUtil.set_tag("device_id", device_id) - TracerUtil.set_tag("time_now", time_now) - TracerUtil.set_tag("device_keys", device_keys) + tracerutils.set_tag("user_id", user_id) + tracerutils.set_tag("device_id", device_id) + tracerutils.set_tag("time_now", time_now) + tracerutils.set_tag("device_keys", device_keys) old_key_json = self._simple_select_one_onecol_txn( txn, @@ -239,7 +239,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): new_key_json = encode_canonical_json(device_keys).decode("utf-8") if old_key_json == new_key_json: - TracerUtil.log_kv({"event", "key already stored"}) + tracerutils.log_kv({"event", "key already stored"}) return False self._simple_upsert_txn( @@ -256,7 +256,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): def claim_e2e_one_time_keys(self, query_list): """Take a list of one time keys out of the database""" - @trace_function + @tracerutils.trace_function def _claim_e2e_one_time_keys(txn): sql = ( "SELECT key_id, key_json FROM e2e_one_time_keys_json" @@ -278,11 +278,11 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): " AND key_id = ?" ) for user_id, device_id, algorithm, key_id in delete: - TracerUtil.log_kv( + tracerutils.log_kv( {"message": "executing claim transaction on database"} ) txn.execute(sql, (user_id, device_id, algorithm, key_id)) - TracerUtil.log_kv( + tracerutils.log_kv( {"message": "finished executing and invalidating cache"} ) self._invalidate_cache_and_stream( @@ -293,10 +293,10 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): return self.runInteraction("claim_e2e_one_time_keys", _claim_e2e_one_time_keys) def delete_e2e_keys_by_device(self, user_id, device_id): - @trace_function + @tracerutils.trace_function def delete_e2e_keys_by_device_txn(txn): - TracerUtil.set_tag("user_id", user_id) - TracerUtil.set_tag("device_id", device_id) + tracerutils.set_tag("user_id", user_id) + tracerutils.set_tag("device_id", device_id) self._simple_delete_txn( txn, table="e2e_device_keys_json", |