From 8767b63a821eb8612e2ab830534fd6f40eb1aaaa Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Thu, 22 Aug 2019 18:21:10 +0100 Subject: Propagate opentracing contexts through EDUs (#5852) Propagate opentracing contexts through EDUs Co-Authored-By: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- synapse/handlers/devicemessage.py | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) (limited to 'synapse/handlers/devicemessage.py') diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index e1ebb6346c..c7d56779b8 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -15,9 +15,17 @@ import logging +from canonicaljson import json + from twisted.internet import defer from synapse.api.errors import SynapseError +from synapse.logging.opentracing import ( + get_active_span_text_map, + set_tag, + start_active_span, + whitelisted_homeserver, +) from synapse.types import UserID, get_domain_from_id from synapse.util.stringutils import random_string @@ -100,14 +108,21 @@ class DeviceMessageHandler(object): message_id = random_string(16) + context = get_active_span_text_map() + remote_edu_contents = {} for destination, messages in remote_messages.items(): - remote_edu_contents[destination] = { - "messages": messages, - "sender": sender_user_id, - "type": message_type, - "message_id": message_id, - } + with start_active_span("to_device_for_user"): + set_tag("destination", destination) + remote_edu_contents[destination] = { + "messages": messages, + "sender": sender_user_id, + "type": message_type, + "message_id": message_id, + "org.matrix.opentracing_context": json.dumps(context) + if whitelisted_homeserver(destination) + else None, + } stream_id = yield self.store.add_messages_to_device_inbox( local_messages, remote_edu_contents -- cgit 1.4.1 From a90d16dabc6f498136a098568b1d37858d4af5b6 Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Tue, 3 Sep 2019 10:21:30 +0100 Subject: Opentrace device lists (#5853) Trace device list changes. --- changelog.d/5853.feature | 1 + synapse/handlers/device.py | 65 +++++++++++++++++++++++++- synapse/handlers/devicemessage.py | 6 ++- synapse/logging/opentracing.py | 70 +++++++--------------------- synapse/rest/client/v2_alpha/keys.py | 4 +- synapse/rest/client/v2_alpha/sendtodevice.py | 4 ++ synapse/storage/deviceinbox.py | 21 +++++++++ synapse/storage/devices.py | 5 ++ 8 files changed, 118 insertions(+), 58 deletions(-) create mode 100644 changelog.d/5853.feature (limited to 'synapse/handlers/devicemessage.py') diff --git a/changelog.d/5853.feature b/changelog.d/5853.feature new file mode 100644 index 0000000000..80a04ae2ee --- /dev/null +++ b/changelog.d/5853.feature @@ -0,0 +1 @@ +Opentracing for device list updates. diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 5c1cf83c9d..71a8f33da3 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -25,6 +25,7 @@ from synapse.api.errors import ( HttpResponseException, RequestSendFailed, ) +from synapse.logging.opentracing import log_kv, set_tag, trace from synapse.types import RoomStreamToken, get_domain_from_id from synapse.util import stringutils from synapse.util.async_helpers import Linearizer @@ -45,6 +46,7 @@ class DeviceWorkerHandler(BaseHandler): self.state = hs.get_state_handler() self._auth_handler = hs.get_auth_handler() + @trace @defer.inlineCallbacks def get_devices_by_user(self, user_id): """ @@ -56,6 +58,7 @@ class DeviceWorkerHandler(BaseHandler): defer.Deferred: list[dict[str, X]]: info on each device """ + set_tag("user_id", user_id) device_map = yield self.store.get_devices_by_user(user_id) ips = yield self.store.get_last_client_ip_by_device(user_id, device_id=None) @@ -64,8 +67,10 @@ class DeviceWorkerHandler(BaseHandler): for device in devices: _update_device_from_client_ips(device, ips) + log_kv(device_map) return devices + @trace @defer.inlineCallbacks def get_device(self, user_id, device_id): """ Retrieve the given device @@ -85,9 +90,14 @@ class DeviceWorkerHandler(BaseHandler): raise errors.NotFoundError ips = yield self.store.get_last_client_ip_by_device(user_id, device_id) _update_device_from_client_ips(device, ips) + + set_tag("device", device) + set_tag("ips", ips) + return device @measure_func("device.get_user_ids_changed") + @trace @defer.inlineCallbacks def get_user_ids_changed(self, user_id, from_token): """Get list of users that have had the devices updated, or have newly @@ -97,6 +107,9 @@ class DeviceWorkerHandler(BaseHandler): user_id (str) from_token (StreamToken) """ + + set_tag("user_id", user_id) + set_tag("from_token", from_token) now_room_key = yield self.store.get_room_events_max_id() room_ids = yield self.store.get_rooms_for_user(user_id) @@ -148,6 +161,9 @@ class DeviceWorkerHandler(BaseHandler): # special-case for an empty prev state: include all members # in the changed list if not event_ids: + log_kv( + {"event": "encountered empty previous state", "room_id": room_id} + ) for key, event_id in iteritems(current_state_ids): etype, state_key = key if etype != EventTypes.Member: @@ -200,7 +216,11 @@ class DeviceWorkerHandler(BaseHandler): possibly_joined = [] possibly_left = [] - return {"changed": list(possibly_joined), "left": list(possibly_left)} + result = {"changed": list(possibly_joined), "left": list(possibly_left)} + + log_kv(result) + + return result class DeviceHandler(DeviceWorkerHandler): @@ -267,6 +287,7 @@ class DeviceHandler(DeviceWorkerHandler): raise errors.StoreError(500, "Couldn't generate a device ID.") + @trace @defer.inlineCallbacks def delete_device(self, user_id, device_id): """ Delete the given device @@ -284,6 +305,10 @@ class DeviceHandler(DeviceWorkerHandler): except errors.StoreError as e: if e.code == 404: # no match + set_tag("error", True) + log_kv( + {"reason": "User doesn't have device id.", "device_id": device_id} + ) pass else: raise @@ -296,6 +321,7 @@ class DeviceHandler(DeviceWorkerHandler): yield self.notify_device_update(user_id, [device_id]) + @trace @defer.inlineCallbacks def delete_all_devices_for_user(self, user_id, except_device_id=None): """Delete all of the user's devices @@ -331,6 +357,8 @@ class DeviceHandler(DeviceWorkerHandler): except errors.StoreError as e: if e.code == 404: # no match + set_tag("error", True) + set_tag("reason", "User doesn't have that device id.") pass else: raise @@ -371,6 +399,7 @@ class DeviceHandler(DeviceWorkerHandler): else: raise + @trace @measure_func("notify_device_update") @defer.inlineCallbacks def notify_device_update(self, user_id, device_ids): @@ -386,6 +415,8 @@ class DeviceHandler(DeviceWorkerHandler): hosts.update(get_domain_from_id(u) for u in users_who_share_room) hosts.discard(self.server_name) + set_tag("target_hosts", hosts) + position = yield self.store.add_device_change_to_streams( user_id, device_ids, list(hosts) ) @@ -405,6 +436,7 @@ class DeviceHandler(DeviceWorkerHandler): ) for host in hosts: self.federation_sender.send_device_messages(host) + log_kv({"message": "sent device update to host", "host": host}) @defer.inlineCallbacks def on_federation_query_user_devices(self, user_id): @@ -451,12 +483,15 @@ class DeviceListUpdater(object): iterable=True, ) + @trace @defer.inlineCallbacks def incoming_device_list_update(self, origin, edu_content): """Called on incoming device list update from federation. Responsible for parsing the EDU and adding to pending updates list. """ + set_tag("origin", origin) + set_tag("edu_content", edu_content) user_id = edu_content.pop("user_id") device_id = edu_content.pop("device_id") stream_id = str(edu_content.pop("stream_id")) # They may come as ints @@ -471,12 +506,30 @@ class DeviceListUpdater(object): device_id, origin, ) + + set_tag("error", True) + log_kv( + { + "message": "Got a device list update edu from a user and " + "device which does not match the origin of the request.", + "user_id": user_id, + "device_id": device_id, + } + ) return room_ids = yield self.store.get_rooms_for_user(user_id) if not room_ids: # We don't share any rooms with this user. Ignore update, as we # probably won't get any further updates. + set_tag("error", True) + log_kv( + { + "message": "Got an update from a user for which " + "we don't share any rooms", + "other user_id": user_id, + } + ) logger.warning( "Got device list update edu for %r/%r, but don't share a room", user_id, @@ -578,6 +631,7 @@ class DeviceListUpdater(object): request: https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid """ + log_kv({"message": "Doing resync to update device list."}) # Fetch all devices for the user. origin = get_domain_from_id(user_id) try: @@ -594,13 +648,20 @@ class DeviceListUpdater(object): # eventually become consistent. return except FederationDeniedError as e: + set_tag("error", True) + log_kv({"reason": "FederationDeniedError"}) logger.info(e) return - except Exception: + except Exception as e: # TODO: Remember that we are now out of sync and try again # later + set_tag("error", True) + log_kv( + {"message": "Exception raised by federation request", "exception": e} + ) logger.exception("Failed to handle device list update for %s", user_id) return + log_kv({"result": result}) stream_id = result["stream_id"] devices = result["devices"] diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index c7d56779b8..01731cb2d0 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -22,6 +22,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError from synapse.logging.opentracing import ( get_active_span_text_map, + log_kv, set_tag, start_active_span, whitelisted_homeserver, @@ -86,7 +87,8 @@ class DeviceMessageHandler(object): @defer.inlineCallbacks def send_device_message(self, sender_user_id, message_type, messages): - + set_tag("number_of_messages", len(messages)) + set_tag("sender", sender_user_id) local_messages = {} remote_messages = {} for user_id, by_device in messages.items(): @@ -124,6 +126,7 @@ class DeviceMessageHandler(object): else None, } + log_kv({"local_messages": local_messages}) stream_id = yield self.store.add_messages_to_device_inbox( local_messages, remote_edu_contents ) @@ -132,6 +135,7 @@ class DeviceMessageHandler(object): "to_device_key", stream_id, users=local_messages.keys() ) + log_kv({"remote_messages": remote_messages}) for destination in remote_messages.keys(): # Enqueue a new federation transaction to send the new # device messages to each remote destination. diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index dd296027a1..256b972aaa 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -85,14 +85,14 @@ the function becomes the operation name for the span. return something_usual_and_useful -Operation names can be explicitly set for functions by using -``trace_using_operation_name`` +Operation names can be explicitly set for a function by passing the +operation name to ``trace`` .. code-block:: python - from synapse.logging.opentracing import trace_using_operation_name + from synapse.logging.opentracing import trace - @trace_using_operation_name("A *much* better operation name") + @trace(opname="a_better_operation_name") def interesting_badly_named_function(*args, **kwargs): # Does all kinds of cool and expected things return something_usual_and_useful @@ -641,66 +641,26 @@ def extract_text_map(carrier): # Tracing decorators -def trace(func): +def trace(func=None, opname=None): """ Decorator to trace a function. - Sets the operation name to that of the function's. + Sets the operation name to that of the function's or that given + as operation_name. See the module's doc string for usage + examples. """ - if opentracing is None: - return func - @wraps(func) - def _trace_inner(self, *args, **kwargs): - if opentracing is None: - return func(self, *args, **kwargs) - - scope = start_active_span(func.__name__) - scope.__enter__() - - try: - result = func(self, *args, **kwargs) - if isinstance(result, defer.Deferred): - - def call_back(result): - scope.__exit__(None, None, None) - return result - - def err_back(result): - scope.span.set_tag(tags.ERROR, True) - scope.__exit__(None, None, None) - return result - - result.addCallbacks(call_back, err_back) - - else: - scope.__exit__(None, None, None) - - return result - - except Exception as e: - scope.__exit__(type(e), None, e.__traceback__) - raise - - return _trace_inner - - -def trace_using_operation_name(operation_name): - """Decorator to trace a function. Explicitely sets the operation_name.""" - - def trace(func): - """ - Decorator to trace a function. - Sets the operation name to that of the function's. - """ + def decorator(func): if opentracing is None: return func + _opname = opname if opname else func.__name__ + @wraps(func) def _trace_inner(self, *args, **kwargs): if opentracing is None: return func(self, *args, **kwargs) - scope = start_active_span(operation_name) + scope = start_active_span(_opname) scope.__enter__() try: @@ -717,6 +677,7 @@ def trace_using_operation_name(operation_name): return result result.addCallbacks(call_back, err_back) + else: scope.__exit__(None, None, None) @@ -728,7 +689,10 @@ def trace_using_operation_name(operation_name): return _trace_inner - return trace + if func: + return decorator(func) + else: + return decorator def tag_args(func): diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index 64b6898eb8..2e680134a0 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -24,7 +24,7 @@ from synapse.http.servlet import ( parse_json_object_from_request, parse_string, ) -from synapse.logging.opentracing import log_kv, set_tag, trace_using_operation_name +from synapse.logging.opentracing import log_kv, set_tag, trace from synapse.types import StreamToken from ._base import client_patterns @@ -69,7 +69,7 @@ class KeyUploadServlet(RestServlet): self.auth = hs.get_auth() self.e2e_keys_handler = hs.get_e2e_keys_handler() - @trace_using_operation_name("upload_keys") + @trace(opname="upload_keys") @defer.inlineCallbacks def on_POST(self, request, device_id): requester = yield self.auth.get_user_by_req(request, allow_guest=True) diff --git a/synapse/rest/client/v2_alpha/sendtodevice.py b/synapse/rest/client/v2_alpha/sendtodevice.py index 2613648d82..d90e52ed1a 100644 --- a/synapse/rest/client/v2_alpha/sendtodevice.py +++ b/synapse/rest/client/v2_alpha/sendtodevice.py @@ -19,6 +19,7 @@ from twisted.internet import defer from synapse.http import servlet from synapse.http.servlet import parse_json_object_from_request +from synapse.logging.opentracing import set_tag, trace from synapse.rest.client.transactions import HttpTransactionCache from ._base import client_patterns @@ -42,7 +43,10 @@ class SendToDeviceRestServlet(servlet.RestServlet): self.txns = HttpTransactionCache(hs) self.device_message_handler = hs.get_device_message_handler() + @trace(opname="sendToDevice") def on_PUT(self, request, message_type, txn_id): + set_tag("message_type", message_type) + set_tag("txn_id", txn_id) return self.txns.fetch_or_execute_request( request, self._put, request, message_type, txn_id ) diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 4dca9de617..6b7458304e 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -19,6 +19,7 @@ from canonicaljson import json from twisted.internet import defer +from synapse.logging.opentracing import log_kv, set_tag, trace from synapse.storage._base import SQLBaseStore from synapse.storage.background_updates import BackgroundUpdateStore from synapse.util.caches.expiringcache import ExpiringCache @@ -72,6 +73,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): "get_new_messages_for_device", get_new_messages_for_device_txn ) + @trace @defer.inlineCallbacks def delete_messages_for_device(self, user_id, device_id, up_to_stream_id): """ @@ -87,11 +89,15 @@ class DeviceInboxWorkerStore(SQLBaseStore): last_deleted_stream_id = self._last_device_delete_cache.get( (user_id, device_id), None ) + + set_tag("last_deleted_stream_id", last_deleted_stream_id) + if last_deleted_stream_id: has_changed = self._device_inbox_stream_cache.has_entity_changed( user_id, last_deleted_stream_id ) if not has_changed: + log_kv({"message": "No changes in cache since last check"}) return 0 def delete_messages_for_device_txn(txn): @@ -107,6 +113,10 @@ class DeviceInboxWorkerStore(SQLBaseStore): "delete_messages_for_device", delete_messages_for_device_txn ) + log_kv( + {"message": "deleted {} messages for device".format(count), "count": count} + ) + # Update the cache, ensuring that we only ever increase the value last_deleted_stream_id = self._last_device_delete_cache.get( (user_id, device_id), 0 @@ -117,6 +127,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): return count + @trace def get_new_device_msgs_for_remote( self, destination, last_stream_id, current_stream_id, limit ): @@ -132,16 +143,23 @@ class DeviceInboxWorkerStore(SQLBaseStore): in the stream the messages got to. """ + set_tag("destination", destination) + set_tag("last_stream_id", last_stream_id) + set_tag("current_stream_id", current_stream_id) + set_tag("limit", limit) + has_changed = self._device_federation_outbox_stream_cache.has_entity_changed( destination, last_stream_id ) if not has_changed or last_stream_id == current_stream_id: + log_kv({"message": "No new messages in stream"}) return defer.succeed(([], current_stream_id)) if limit <= 0: # This can happen if we run out of room for EDUs in the transaction. return defer.succeed(([], last_stream_id)) + @trace def get_new_messages_for_remote_destination_txn(txn): sql = ( "SELECT stream_id, messages_json FROM device_federation_outbox" @@ -156,6 +174,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): stream_pos = row[0] messages.append(json.loads(row[1])) if len(messages) < limit: + log_kv({"message": "Set stream position to current position"}) stream_pos = current_stream_id return messages, stream_pos @@ -164,6 +183,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): get_new_messages_for_remote_destination_txn, ) + @trace def delete_device_msgs_for_remote(self, destination, up_to_stream_id): """Used to delete messages when the remote destination acknowledges their receipt. @@ -214,6 +234,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore): expiry_ms=30 * 60 * 1000, ) + @trace @defer.inlineCallbacks def add_messages_to_device_inbox( self, local_messages_by_user_then_device, remote_messages_by_destination diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 76542c512d..41f62828bd 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -23,6 +23,7 @@ from twisted.internet import defer from synapse.api.errors import StoreError from synapse.logging.opentracing import ( get_active_span_text_map, + set_tag, trace, whitelisted_homeserver, ) @@ -321,6 +322,7 @@ class DeviceWorkerStore(SQLBaseStore): def get_device_stream_token(self): return self._device_list_id_gen.get_current_token() + @trace @defer.inlineCallbacks def get_user_devices_from_cache(self, query_list): """Get the devices (and keys if any) for remote users from the cache. @@ -352,6 +354,9 @@ class DeviceWorkerStore(SQLBaseStore): else: results[user_id] = yield self._get_cached_devices_for_user(user_id) + set_tag("in_cache", results) + set_tag("not_in_cache", user_ids_not_in_cache) + return user_ids_not_in_cache, results @cachedInlineCallbacks(num_args=2, tree=True) -- cgit 1.4.1 From 1d65292e94077390af0ad9c5ee8cd8b0db9b357c Mon Sep 17 00:00:00 2001 From: Jorik Schellekens Date: Thu, 5 Sep 2019 14:41:04 +0100 Subject: Link the send loop with the edus contexts The contexts were being filtered too early so the send loop wasn't being linked to them unless the destination was whitelisted. --- synapse/federation/sender/transaction_manager.py | 11 ++++++++--- synapse/federation/units.py | 3 +++ synapse/handlers/devicemessage.py | 5 +---- 3 files changed, 12 insertions(+), 7 deletions(-) (limited to 'synapse/handlers/devicemessage.py') diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py index 62ca6a3e87..42f46394bc 100644 --- a/synapse/federation/sender/transaction_manager.py +++ b/synapse/federation/sender/transaction_manager.py @@ -26,6 +26,7 @@ from synapse.logging.opentracing import ( set_tag, start_active_span_follows_from, tags, + whitelisted_homeserver, ) from synapse.util.metrics import measure_func @@ -59,9 +60,13 @@ class TransactionManager(object): # The span_contexts is a generator so that it won't be evaluated if # opentracing is disabled. (Yay speed!) - span_contexts = ( - extract_text_map(json.loads(edu.get_context())) for edu in pending_edus - ) + span_contexts = [] + keep_destination = whitelisted_homeserver(destination) + + for edu in pending_edus: + span_contexts.append(extract_text_map(json.loads(edu.get_context()))) + if keep_destination: + edu.strip_context() with start_active_span_follows_from("send_transaction", span_contexts): diff --git a/synapse/federation/units.py b/synapse/federation/units.py index aa84621206..b4d743cde7 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -41,6 +41,9 @@ class Edu(JsonEncodedObject): def get_context(self): return getattr(self, "content", {}).get("org.matrix.opentracing_context", "{}") + def strip_context(self): + getattr(self, "content", {})["org.matrix.opentracing_context"] = "{}" + class Transaction(JsonEncodedObject): """ A transaction is a list of Pdus and Edus to be sent to a remote home diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index 01731cb2d0..0043cbea17 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -25,7 +25,6 @@ from synapse.logging.opentracing import ( log_kv, set_tag, start_active_span, - whitelisted_homeserver, ) from synapse.types import UserID, get_domain_from_id from synapse.util.stringutils import random_string @@ -121,9 +120,7 @@ class DeviceMessageHandler(object): "sender": sender_user_id, "type": message_type, "message_id": message_id, - "org.matrix.opentracing_context": json.dumps(context) - if whitelisted_homeserver(destination) - else None, + "org.matrix.opentracing_context": json.dumps(context), } log_kv({"local_messages": local_messages}) -- cgit 1.4.1