From a90d16dabc6f498136a098568b1d37858d4af5b6 Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Tue, 3 Sep 2019 10:21:30 +0100 Subject: Opentrace device lists (#5853) Trace device list changes. --- synapse/handlers/device.py | 65 +++++++++++++++++++++++++++++++++++++-- synapse/handlers/devicemessage.py | 6 +++- 2 files changed, 68 insertions(+), 3 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 5c1cf83c9d..71a8f33da3 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -25,6 +25,7 @@ from synapse.api.errors import ( HttpResponseException, RequestSendFailed, ) +from synapse.logging.opentracing import log_kv, set_tag, trace from synapse.types import RoomStreamToken, get_domain_from_id from synapse.util import stringutils from synapse.util.async_helpers import Linearizer @@ -45,6 +46,7 @@ class DeviceWorkerHandler(BaseHandler): self.state = hs.get_state_handler() self._auth_handler = hs.get_auth_handler() + @trace @defer.inlineCallbacks def get_devices_by_user(self, user_id): """ @@ -56,6 +58,7 @@ class DeviceWorkerHandler(BaseHandler): defer.Deferred: list[dict[str, X]]: info on each device """ + set_tag("user_id", user_id) device_map = yield self.store.get_devices_by_user(user_id) ips = yield self.store.get_last_client_ip_by_device(user_id, device_id=None) @@ -64,8 +67,10 @@ class DeviceWorkerHandler(BaseHandler): for device in devices: _update_device_from_client_ips(device, ips) + log_kv(device_map) return devices + @trace @defer.inlineCallbacks def get_device(self, user_id, device_id): """ Retrieve the given device @@ -85,9 +90,14 @@ class DeviceWorkerHandler(BaseHandler): raise errors.NotFoundError ips = yield self.store.get_last_client_ip_by_device(user_id, device_id) _update_device_from_client_ips(device, ips) + + set_tag("device", device) + set_tag("ips", ips) + return device @measure_func("device.get_user_ids_changed") + @trace @defer.inlineCallbacks def get_user_ids_changed(self, user_id, from_token): """Get list of users that have had the devices updated, or have newly @@ -97,6 +107,9 @@ class DeviceWorkerHandler(BaseHandler): user_id (str) from_token (StreamToken) """ + + set_tag("user_id", user_id) + set_tag("from_token", from_token) now_room_key = yield self.store.get_room_events_max_id() room_ids = yield self.store.get_rooms_for_user(user_id) @@ -148,6 +161,9 @@ class DeviceWorkerHandler(BaseHandler): # special-case for an empty prev state: include all members # in the changed list if not event_ids: + log_kv( + {"event": "encountered empty previous state", "room_id": room_id} + ) for key, event_id in iteritems(current_state_ids): etype, state_key = key if etype != EventTypes.Member: @@ -200,7 +216,11 @@ class DeviceWorkerHandler(BaseHandler): possibly_joined = [] possibly_left = [] - return {"changed": list(possibly_joined), "left": list(possibly_left)} + result = {"changed": list(possibly_joined), "left": list(possibly_left)} + + log_kv(result) + + return result class DeviceHandler(DeviceWorkerHandler): @@ -267,6 +287,7 @@ class DeviceHandler(DeviceWorkerHandler): raise errors.StoreError(500, "Couldn't generate a device ID.") + @trace @defer.inlineCallbacks def delete_device(self, user_id, device_id): """ Delete the given device @@ -284,6 +305,10 @@ class DeviceHandler(DeviceWorkerHandler): except errors.StoreError as e: if e.code == 404: # no match + set_tag("error", True) + log_kv( + {"reason": "User doesn't have device id.", "device_id": device_id} + ) pass else: raise @@ -296,6 +321,7 @@ class DeviceHandler(DeviceWorkerHandler): yield self.notify_device_update(user_id, [device_id]) + @trace @defer.inlineCallbacks def delete_all_devices_for_user(self, user_id, except_device_id=None): """Delete all of the user's devices @@ -331,6 +357,8 @@ class DeviceHandler(DeviceWorkerHandler): except errors.StoreError as e: if e.code == 404: # no match + set_tag("error", True) + set_tag("reason", "User doesn't have that device id.") pass else: raise @@ -371,6 +399,7 @@ class DeviceHandler(DeviceWorkerHandler): else: raise + @trace @measure_func("notify_device_update") @defer.inlineCallbacks def notify_device_update(self, user_id, device_ids): @@ -386,6 +415,8 @@ class DeviceHandler(DeviceWorkerHandler): hosts.update(get_domain_from_id(u) for u in users_who_share_room) hosts.discard(self.server_name) + set_tag("target_hosts", hosts) + position = yield self.store.add_device_change_to_streams( user_id, device_ids, list(hosts) ) @@ -405,6 +436,7 @@ class DeviceHandler(DeviceWorkerHandler): ) for host in hosts: self.federation_sender.send_device_messages(host) + log_kv({"message": "sent device update to host", "host": host}) @defer.inlineCallbacks def on_federation_query_user_devices(self, user_id): @@ -451,12 +483,15 @@ class DeviceListUpdater(object): iterable=True, ) + @trace @defer.inlineCallbacks def incoming_device_list_update(self, origin, edu_content): """Called on incoming device list update from federation. Responsible for parsing the EDU and adding to pending updates list. """ + set_tag("origin", origin) + set_tag("edu_content", edu_content) user_id = edu_content.pop("user_id") device_id = edu_content.pop("device_id") stream_id = str(edu_content.pop("stream_id")) # They may come as ints @@ -471,12 +506,30 @@ class DeviceListUpdater(object): device_id, origin, ) + + set_tag("error", True) + log_kv( + { + "message": "Got a device list update edu from a user and " + "device which does not match the origin of the request.", + "user_id": user_id, + "device_id": device_id, + } + ) return room_ids = yield self.store.get_rooms_for_user(user_id) if not room_ids: # We don't share any rooms with this user. Ignore update, as we # probably won't get any further updates. + set_tag("error", True) + log_kv( + { + "message": "Got an update from a user for which " + "we don't share any rooms", + "other user_id": user_id, + } + ) logger.warning( "Got device list update edu for %r/%r, but don't share a room", user_id, @@ -578,6 +631,7 @@ class DeviceListUpdater(object): request: https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid """ + log_kv({"message": "Doing resync to update device list."}) # Fetch all devices for the user. origin = get_domain_from_id(user_id) try: @@ -594,13 +648,20 @@ class DeviceListUpdater(object): # eventually become consistent. return except FederationDeniedError as e: + set_tag("error", True) + log_kv({"reason": "FederationDeniedError"}) logger.info(e) return - except Exception: + except Exception as e: # TODO: Remember that we are now out of sync and try again # later + set_tag("error", True) + log_kv( + {"message": "Exception raised by federation request", "exception": e} + ) logger.exception("Failed to handle device list update for %s", user_id) return + log_kv({"result": result}) stream_id = result["stream_id"] devices = result["devices"] diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index c7d56779b8..01731cb2d0 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -22,6 +22,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError from synapse.logging.opentracing import ( get_active_span_text_map, + log_kv, set_tag, start_active_span, whitelisted_homeserver, @@ -86,7 +87,8 @@ class DeviceMessageHandler(object): @defer.inlineCallbacks def send_device_message(self, sender_user_id, message_type, messages): - + set_tag("number_of_messages", len(messages)) + set_tag("sender", sender_user_id) local_messages = {} remote_messages = {} for user_id, by_device in messages.items(): @@ -124,6 +126,7 @@ class DeviceMessageHandler(object): else None, } + log_kv({"local_messages": local_messages}) stream_id = yield self.store.add_messages_to_device_inbox( local_messages, remote_edu_contents ) @@ -132,6 +135,7 @@ class DeviceMessageHandler(object): "to_device_key", stream_id, users=local_messages.keys() ) + log_kv({"remote_messages": remote_messages}) for destination in remote_messages.keys(): # Enqueue a new federation transaction to send the new # device messages to each remote destination. -- cgit 1.4.1