summary refs log tree commit diff
diff options
context:
space:
mode:
authorJorik Schellekens <joriks@matrix.org>2019-07-29 11:37:10 +0100
committerJorik Schellekens <joriks@matrix.org>2019-07-29 11:37:10 +0100
commite126bf862a1f539e2d3082b12136cf3936447340 (patch)
treebc53183264245c3ace6ecd69cd5d0affa18e7bb3
parentMerge pull request #5780 from matrix-org/baboliver/loopingcall-args (diff)
downloadsynapse-github/joriks/opentracing_to_device_messages.tar.xz
-rw-r--r--synapse/storage/deviceinbox.py34
-rw-r--r--synapse/storage/schema/delta/55/add_span_context_to_device_inbox.sql16
2 files changed, 46 insertions, 4 deletions
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index 79bb0ea46d..7bd8896458 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -51,7 +51,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
 
         def get_new_messages_for_device_txn(txn):
             sql = (
-                "SELECT stream_id, message_json FROM device_inbox"
+                "SELECT stream_id, message_json, context FROM device_inbox"
                 " WHERE user_id = ? AND device_id = ?"
                 " AND ? < stream_id AND stream_id <= ?"
                 " ORDER BY stream_id ASC"
@@ -61,11 +61,22 @@ class DeviceInboxWorkerStore(SQLBaseStore):
                 sql, (user_id, device_id, last_stream_id, current_stream_id, limit)
             )
             messages = []
+            references = []
             for row in txn:
                 stream_pos = row[0]
                 messages.append(json.loads(row[1]))
+                references.append(
+                    opentracing.extract_text_map(
+                        json.loads(json.loads(row[2])["opentracing"])
+                    )
+                )
             if len(messages) < limit:
                 stream_pos = current_stream_id
+            with opentracing.start_active_span(
+                "do we have send??"  # , child_of=references[0]
+            ):
+                opentracing.set_tag("ref", references)
+                pass
             return (messages, stream_pos)
 
         return self.runInteraction(
@@ -281,6 +292,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
                 allow_none=True,
             )
             if already_inserted is not None:
+                opentracing.log_kv({"message": "message already received"})
                 return
 
             # Add an entry for this message_id so that we know we've processed
@@ -294,6 +306,9 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
                     "received_ts": now_ms,
                 },
             )
+            opentracing.log_kv(
+                {"message": "device message added to device_federation_inbox"}
+            )
 
             # Add the messages to the approriate local device inboxes so that
             # they'll be sent to the devices when they next sync.
@@ -336,6 +351,13 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
                     messages_json_for_user[device] = message_json
             else:
                 if not devices:
+                    opentracing.log_kv(
+                        {
+                            "message": "No devices for user.",
+                            "user_id": user_id,
+                            "messages": messages_by_device,
+                        }
+                    )
                     continue
                 sql = (
                     "SELECT device_id FROM devices"
@@ -361,13 +383,17 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
 
         sql = (
             "INSERT INTO device_inbox"
-            " (user_id, device_id, stream_id, message_json)"
-            " VALUES (?,?,?,?)"
+            " (user_id, device_id, stream_id, message_json, context)"
+            " VALUES (?,?,?,?,?)"
         )
         rows = []
+        # TODO: User whitelisting?
+        context = json.dumps(
+            {"opentracing": opentracing.active_span_context_as_string()}
+        )
         for user_id, messages_by_device in local_by_user_then_device.items():
             for device_id, message_json in messages_by_device.items():
-                rows.append((user_id, device_id, stream_id, message_json))
+                rows.append((user_id, device_id, stream_id, message_json, context))
 
         txn.executemany(sql, rows)
 
diff --git a/synapse/storage/schema/delta/55/add_span_context_to_device_inbox.sql b/synapse/storage/schema/delta/55/add_span_context_to_device_inbox.sql
new file mode 100644
index 0000000000..769c17e6ef
--- /dev/null
+++ b/synapse/storage/schema/delta/55/add_span_context_to_device_inbox.sql
@@ -0,0 +1,16 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE device_inbox ADD context TEXT;
\ No newline at end of file