summary refs log tree commit diff
path: root/synapse/storage/devices.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/devices.py')
-rw-r--r--synapse/storage/devices.py39
1 files changed, 33 insertions, 6 deletions
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 6a5572e001..51eac0f5e7 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -23,6 +23,11 @@ from canonicaljson import json
 from twisted.internet import defer
 
 from synapse.api.errors import Codes, StoreError
+from synapse.logging.opentracing import (
+    get_active_span_text_map,
+    trace,
+    whitelisted_homeserver,
+)
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage._base import Cache, SQLBaseStore, db_to_json
 from synapse.storage.background_updates import BackgroundUpdateStore
@@ -77,6 +82,7 @@ class DeviceWorkerStore(SQLBaseStore):
 
         return {d["device_id"]: d for d in devices}
 
+    @trace
     @defer.inlineCallbacks
     def get_devices_by_remote(self, destination, from_stream_id, limit):
         """Get stream of updates to send to remote servers
@@ -131,8 +137,15 @@ class DeviceWorkerStore(SQLBaseStore):
         # (user_id, device_id) entries into a map, with the value being
         # the max stream_id across each set of duplicate entries
         #
-        # maps (user_id, device_id) -> stream_id
+        # maps (user_id, device_id) -> (stream_id, opentracing_context)
         # as long as their stream_id does not match that of the last row
+        #
+        # opentracing_context contains the opentracing metadata for the request
+        # that created the poke
+        #
+        # The most recent request's opentracing_context is used as the
+        # context which created the Edu.
+
         query_map = {}
         for update in updates:
             if stream_id_cutoff is not None and update[2] >= stream_id_cutoff:
@@ -140,7 +153,14 @@ class DeviceWorkerStore(SQLBaseStore):
                 break
 
             key = (update[0], update[1])
-            query_map[key] = max(query_map.get(key, 0), update[2])
+
+            update_context = update[3]
+            update_stream_id = update[2]
+
+            previous_update_stream_id, _ = query_map.get(key, (0, None))
+
+            if update_stream_id > previous_update_stream_id:
+                query_map[key] = (update_stream_id, update_context)
 
         # If we didn't find any updates with a stream_id lower than the cutoff, it
         # means that there are more than limit updates all of which have the same
@@ -175,7 +195,7 @@ class DeviceWorkerStore(SQLBaseStore):
             List: List of device updates
         """
         sql = """
-            SELECT user_id, device_id, stream_id FROM device_lists_outbound_pokes
+            SELECT user_id, device_id, stream_id, opentracing_context FROM device_lists_outbound_pokes
             WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ?
             ORDER BY stream_id
             LIMIT ?
@@ -191,8 +211,9 @@ class DeviceWorkerStore(SQLBaseStore):
         Args:
             destination (str): The host the device updates are intended for
             from_stream_id (int): The minimum stream_id to filter updates by, exclusive
-            query_map (Dict[(str, str): int]): Dictionary mapping
-                user_id/device_id to update stream_id
+            query_map (Dict[(str, str): (int, str|None)]): Dictionary mapping
+                user_id/device_id to update stream_id and the relevent json-encoded
+                opentracing context
 
         Returns:
             List[Dict]: List of objects representing an device update EDU
@@ -214,12 +235,13 @@ class DeviceWorkerStore(SQLBaseStore):
                 destination, user_id, from_stream_id
             )
             for device_id, device in iteritems(user_devices):
-                stream_id = query_map[(user_id, device_id)]
+                stream_id, opentracing_context = query_map[(user_id, device_id)]
                 result = {
                     "user_id": user_id,
                     "device_id": device_id,
                     "prev_id": [prev_id] if prev_id else [],
                     "stream_id": stream_id,
+                    "org.matrix.opentracing_context": opentracing_context,
                 }
 
                 prev_id = stream_id
@@ -891,6 +913,8 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
             ],
         )
 
+        context = get_active_span_text_map()
+
         self._simple_insert_many_txn(
             txn,
             table="device_lists_outbound_pokes",
@@ -902,6 +926,9 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
                     "device_id": device_id,
                     "sent": False,
                     "ts": now,
+                    "opentracing_context": json.dumps(context)
+                    if whitelisted_homeserver(destination)
+                    else None,
                 }
                 for destination in hosts
                 for device_id in device_ids