diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 5c1cf83c9d..71a8f33da3 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -25,6 +25,7 @@ from synapse.api.errors import (
HttpResponseException,
RequestSendFailed,
)
+from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.util import stringutils
from synapse.util.async_helpers import Linearizer
@@ -45,6 +46,7 @@ class DeviceWorkerHandler(BaseHandler):
self.state = hs.get_state_handler()
self._auth_handler = hs.get_auth_handler()
+ @trace
@defer.inlineCallbacks
def get_devices_by_user(self, user_id):
"""
@@ -56,6 +58,7 @@ class DeviceWorkerHandler(BaseHandler):
defer.Deferred: list[dict[str, X]]: info on each device
"""
+ set_tag("user_id", user_id)
device_map = yield self.store.get_devices_by_user(user_id)
ips = yield self.store.get_last_client_ip_by_device(user_id, device_id=None)
@@ -64,8 +67,10 @@ class DeviceWorkerHandler(BaseHandler):
for device in devices:
_update_device_from_client_ips(device, ips)
+ log_kv(device_map)
return devices
+ @trace
@defer.inlineCallbacks
def get_device(self, user_id, device_id):
""" Retrieve the given device
@@ -85,9 +90,14 @@ class DeviceWorkerHandler(BaseHandler):
raise errors.NotFoundError
ips = yield self.store.get_last_client_ip_by_device(user_id, device_id)
_update_device_from_client_ips(device, ips)
+
+ set_tag("device", device)
+ set_tag("ips", ips)
+
return device
@measure_func("device.get_user_ids_changed")
+ @trace
@defer.inlineCallbacks
def get_user_ids_changed(self, user_id, from_token):
"""Get list of users that have had the devices updated, or have newly
@@ -97,6 +107,9 @@ class DeviceWorkerHandler(BaseHandler):
user_id (str)
from_token (StreamToken)
"""
+
+ set_tag("user_id", user_id)
+ set_tag("from_token", from_token)
now_room_key = yield self.store.get_room_events_max_id()
room_ids = yield self.store.get_rooms_for_user(user_id)
@@ -148,6 +161,9 @@ class DeviceWorkerHandler(BaseHandler):
# special-case for an empty prev state: include all members
# in the changed list
if not event_ids:
+ log_kv(
+ {"event": "encountered empty previous state", "room_id": room_id}
+ )
for key, event_id in iteritems(current_state_ids):
etype, state_key = key
if etype != EventTypes.Member:
@@ -200,7 +216,11 @@ class DeviceWorkerHandler(BaseHandler):
possibly_joined = []
possibly_left = []
- return {"changed": list(possibly_joined), "left": list(possibly_left)}
+ result = {"changed": list(possibly_joined), "left": list(possibly_left)}
+
+ log_kv(result)
+
+ return result
class DeviceHandler(DeviceWorkerHandler):
@@ -267,6 +287,7 @@ class DeviceHandler(DeviceWorkerHandler):
raise errors.StoreError(500, "Couldn't generate a device ID.")
+ @trace
@defer.inlineCallbacks
def delete_device(self, user_id, device_id):
""" Delete the given device
@@ -284,6 +305,10 @@ class DeviceHandler(DeviceWorkerHandler):
except errors.StoreError as e:
if e.code == 404:
# no match
+ set_tag("error", True)
+ log_kv(
+ {"reason": "User doesn't have device id.", "device_id": device_id}
+ )
pass
else:
raise
@@ -296,6 +321,7 @@ class DeviceHandler(DeviceWorkerHandler):
yield self.notify_device_update(user_id, [device_id])
+ @trace
@defer.inlineCallbacks
def delete_all_devices_for_user(self, user_id, except_device_id=None):
"""Delete all of the user's devices
@@ -331,6 +357,8 @@ class DeviceHandler(DeviceWorkerHandler):
except errors.StoreError as e:
if e.code == 404:
# no match
+ set_tag("error", True)
+ set_tag("reason", "User doesn't have that device id.")
pass
else:
raise
@@ -371,6 +399,7 @@ class DeviceHandler(DeviceWorkerHandler):
else:
raise
+ @trace
@measure_func("notify_device_update")
@defer.inlineCallbacks
def notify_device_update(self, user_id, device_ids):
@@ -386,6 +415,8 @@ class DeviceHandler(DeviceWorkerHandler):
hosts.update(get_domain_from_id(u) for u in users_who_share_room)
hosts.discard(self.server_name)
+ set_tag("target_hosts", hosts)
+
position = yield self.store.add_device_change_to_streams(
user_id, device_ids, list(hosts)
)
@@ -405,6 +436,7 @@ class DeviceHandler(DeviceWorkerHandler):
)
for host in hosts:
self.federation_sender.send_device_messages(host)
+ log_kv({"message": "sent device update to host", "host": host})
@defer.inlineCallbacks
def on_federation_query_user_devices(self, user_id):
@@ -451,12 +483,15 @@ class DeviceListUpdater(object):
iterable=True,
)
+ @trace
@defer.inlineCallbacks
def incoming_device_list_update(self, origin, edu_content):
"""Called on incoming device list update from federation. Responsible
for parsing the EDU and adding to pending updates list.
"""
+ set_tag("origin", origin)
+ set_tag("edu_content", edu_content)
user_id = edu_content.pop("user_id")
device_id = edu_content.pop("device_id")
stream_id = str(edu_content.pop("stream_id")) # They may come as ints
@@ -471,12 +506,30 @@ class DeviceListUpdater(object):
device_id,
origin,
)
+
+ set_tag("error", True)
+ log_kv(
+ {
+ "message": "Got a device list update edu from a user and "
+ "device which does not match the origin of the request.",
+ "user_id": user_id,
+ "device_id": device_id,
+ }
+ )
return
room_ids = yield self.store.get_rooms_for_user(user_id)
if not room_ids:
# We don't share any rooms with this user. Ignore update, as we
# probably won't get any further updates.
+ set_tag("error", True)
+ log_kv(
+ {
+ "message": "Got an update from a user for which "
+ "we don't share any rooms",
+ "other user_id": user_id,
+ }
+ )
logger.warning(
"Got device list update edu for %r/%r, but don't share a room",
user_id,
@@ -578,6 +631,7 @@ class DeviceListUpdater(object):
request:
https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid
"""
+ log_kv({"message": "Doing resync to update device list."})
# Fetch all devices for the user.
origin = get_domain_from_id(user_id)
try:
@@ -594,13 +648,20 @@ class DeviceListUpdater(object):
# eventually become consistent.
return
except FederationDeniedError as e:
+ set_tag("error", True)
+ log_kv({"reason": "FederationDeniedError"})
logger.info(e)
return
- except Exception:
+ except Exception as e:
# TODO: Remember that we are now out of sync and try again
# later
+ set_tag("error", True)
+ log_kv(
+ {"message": "Exception raised by federation request", "exception": e}
+ )
logger.exception("Failed to handle device list update for %s", user_id)
return
+ log_kv({"result": result})
stream_id = result["stream_id"]
devices = result["devices"]
|