diff options
Diffstat (limited to 'synapse/storage/devices.py')
-rw-r--r-- | synapse/storage/devices.py | 39 |
1 files changed, 33 insertions, 6 deletions
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 8f72d92895..e11881161d 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -21,6 +21,11 @@ from canonicaljson import json from twisted.internet import defer from synapse.api.errors import StoreError +from synapse.logging.opentracing import ( + get_active_span_text_map, + trace, + whitelisted_homeserver, +) from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage._base import Cache, SQLBaseStore, db_to_json from synapse.storage.background_updates import BackgroundUpdateStore @@ -73,6 +78,7 @@ class DeviceWorkerStore(SQLBaseStore): return {d["device_id"]: d for d in devices} + @trace @defer.inlineCallbacks def get_devices_by_remote(self, destination, from_stream_id, limit): """Get stream of updates to send to remote servers @@ -127,8 +133,15 @@ class DeviceWorkerStore(SQLBaseStore): # (user_id, device_id) entries into a map, with the value being # the max stream_id across each set of duplicate entries # - # maps (user_id, device_id) -> stream_id + # maps (user_id, device_id) -> (stream_id, opentracing_context) # as long as their stream_id does not match that of the last row + # + # opentracing_context contains the opentracing metadata for the request + # that created the poke + # + # The most recent request's opentracing_context is used as the + # context which created the Edu. + query_map = {} for update in updates: if stream_id_cutoff is not None and update[2] >= stream_id_cutoff: @@ -136,7 +149,14 @@ class DeviceWorkerStore(SQLBaseStore): break key = (update[0], update[1]) - query_map[key] = max(query_map.get(key, 0), update[2]) + + update_context = update[3] + update_stream_id = update[2] + + previous_update_stream_id, _ = query_map.get(key, (0, None)) + + if update_stream_id > previous_update_stream_id: + query_map[key] = (update_stream_id, update_context) # If we didn't find any updates with a stream_id lower than the cutoff, it # means that there are more than limit updates all of which have the same @@ -171,7 +191,7 @@ class DeviceWorkerStore(SQLBaseStore): List: List of device updates """ sql = """ - SELECT user_id, device_id, stream_id FROM device_lists_outbound_pokes + SELECT user_id, device_id, stream_id, opentracing_context FROM device_lists_outbound_pokes WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ? ORDER BY stream_id LIMIT ? @@ -187,8 +207,9 @@ class DeviceWorkerStore(SQLBaseStore): Args: destination (str): The host the device updates are intended for from_stream_id (int): The minimum stream_id to filter updates by, exclusive - query_map (Dict[(str, str): int]): Dictionary mapping - user_id/device_id to update stream_id + query_map (Dict[(str, str): (int, str|None)]): Dictionary mapping + user_id/device_id to update stream_id and the relevent json-encoded + opentracing context Returns: List[Dict]: List of objects representing an device update EDU @@ -210,12 +231,13 @@ class DeviceWorkerStore(SQLBaseStore): destination, user_id, from_stream_id ) for device_id, device in iteritems(user_devices): - stream_id = query_map[(user_id, device_id)] + stream_id, opentracing_context = query_map[(user_id, device_id)] result = { "user_id": user_id, "device_id": device_id, "prev_id": [prev_id] if prev_id else [], "stream_id": stream_id, + "org.matrix.opentracing_context": opentracing_context, } prev_id = stream_id @@ -814,6 +836,8 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore): ], ) + context = get_active_span_text_map() + self._simple_insert_many_txn( txn, table="device_lists_outbound_pokes", @@ -825,6 +849,9 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore): "device_id": device_id, "sent": False, "ts": now, + "opentracing_context": json.dumps(context) + if whitelisted_homeserver(destination) + else None, } for destination in hosts for device_id in device_ids |