diff options
author | Jorik Schellekens <joriksch@gmail.com> | 2019-08-22 18:21:10 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-08-22 18:21:10 +0100 |
commit | 8767b63a821eb8612e2ab830534fd6f40eb1aaaa (patch) | |
tree | 43eb1604f1251cd36a1e394a3421cf8f850d0312 /synapse/storage | |
parent | Merge pull request #5877 from Awesome-Technologies/remove_shared_secret_regis... (diff) | |
download | synapse-8767b63a821eb8612e2ab830534fd6f40eb1aaaa.tar.xz |
Propagate opentracing contexts through EDUs (#5852)
Propagate opentracing contexts through EDUs Co-Authored-By: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/devices.py | 39 | ||||
-rw-r--r-- | synapse/storage/schema/delta/56/add_spans_to_device_lists.sql | 20 |
2 files changed, 53 insertions, 6 deletions
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 8f72d92895..e11881161d 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -21,6 +21,11 @@ from canonicaljson import json from twisted.internet import defer from synapse.api.errors import StoreError +from synapse.logging.opentracing import ( + get_active_span_text_map, + trace, + whitelisted_homeserver, +) from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage._base import Cache, SQLBaseStore, db_to_json from synapse.storage.background_updates import BackgroundUpdateStore @@ -73,6 +78,7 @@ class DeviceWorkerStore(SQLBaseStore): return {d["device_id"]: d for d in devices} + @trace @defer.inlineCallbacks def get_devices_by_remote(self, destination, from_stream_id, limit): """Get stream of updates to send to remote servers @@ -127,8 +133,15 @@ class DeviceWorkerStore(SQLBaseStore): # (user_id, device_id) entries into a map, with the value being # the max stream_id across each set of duplicate entries # - # maps (user_id, device_id) -> stream_id + # maps (user_id, device_id) -> (stream_id, opentracing_context) # as long as their stream_id does not match that of the last row + # + # opentracing_context contains the opentracing metadata for the request + # that created the poke + # + # The most recent request's opentracing_context is used as the + # context which created the Edu. + query_map = {} for update in updates: if stream_id_cutoff is not None and update[2] >= stream_id_cutoff: @@ -136,7 +149,14 @@ class DeviceWorkerStore(SQLBaseStore): break key = (update[0], update[1]) - query_map[key] = max(query_map.get(key, 0), update[2]) + + update_context = update[3] + update_stream_id = update[2] + + previous_update_stream_id, _ = query_map.get(key, (0, None)) + + if update_stream_id > previous_update_stream_id: + query_map[key] = (update_stream_id, update_context) # If we didn't find any updates with a stream_id lower than the cutoff, it # means that there are more than limit updates all of which have the same @@ -171,7 +191,7 @@ class DeviceWorkerStore(SQLBaseStore): List: List of device updates """ sql = """ - SELECT user_id, device_id, stream_id FROM device_lists_outbound_pokes + SELECT user_id, device_id, stream_id, opentracing_context FROM device_lists_outbound_pokes WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ? ORDER BY stream_id LIMIT ? @@ -187,8 +207,9 @@ class DeviceWorkerStore(SQLBaseStore): Args: destination (str): The host the device updates are intended for from_stream_id (int): The minimum stream_id to filter updates by, exclusive - query_map (Dict[(str, str): int]): Dictionary mapping - user_id/device_id to update stream_id + query_map (Dict[(str, str): (int, str|None)]): Dictionary mapping + user_id/device_id to update stream_id and the relevent json-encoded + opentracing context Returns: List[Dict]: List of objects representing an device update EDU @@ -210,12 +231,13 @@ class DeviceWorkerStore(SQLBaseStore): destination, user_id, from_stream_id ) for device_id, device in iteritems(user_devices): - stream_id = query_map[(user_id, device_id)] + stream_id, opentracing_context = query_map[(user_id, device_id)] result = { "user_id": user_id, "device_id": device_id, "prev_id": [prev_id] if prev_id else [], "stream_id": stream_id, + "org.matrix.opentracing_context": opentracing_context, } prev_id = stream_id @@ -814,6 +836,8 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore): ], ) + context = get_active_span_text_map() + self._simple_insert_many_txn( txn, table="device_lists_outbound_pokes", @@ -825,6 +849,9 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore): "device_id": device_id, "sent": False, "ts": now, + "opentracing_context": json.dumps(context) + if whitelisted_homeserver(destination) + else None, } for destination in hosts for device_id in device_ids diff --git a/synapse/storage/schema/delta/56/add_spans_to_device_lists.sql b/synapse/storage/schema/delta/56/add_spans_to_device_lists.sql new file mode 100644 index 0000000000..41807eb1e7 --- /dev/null +++ b/synapse/storage/schema/delta/56/add_spans_to_device_lists.sql @@ -0,0 +1,20 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Opentracing context data for inclusion in the device_list_update EDUs, as a + * json-encoded dictionary. NULL if opentracing is disabled (or not enabled for this destination). + */ +ALTER TABLE device_lists_outbound_pokes ADD opentracing_context TEXT; |