diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index 79bb0ea46d..7bd8896458 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -51,7 +51,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
def get_new_messages_for_device_txn(txn):
sql = (
- "SELECT stream_id, message_json FROM device_inbox"
+ "SELECT stream_id, message_json, context FROM device_inbox"
" WHERE user_id = ? AND device_id = ?"
" AND ? < stream_id AND stream_id <= ?"
" ORDER BY stream_id ASC"
@@ -61,11 +61,22 @@ class DeviceInboxWorkerStore(SQLBaseStore):
sql, (user_id, device_id, last_stream_id, current_stream_id, limit)
)
messages = []
+ references = []
for row in txn:
stream_pos = row[0]
messages.append(json.loads(row[1]))
+ references.append(
+ opentracing.extract_text_map(
+ json.loads(json.loads(row[2])["opentracing"])
+ )
+ )
if len(messages) < limit:
stream_pos = current_stream_id
+ with opentracing.start_active_span(
+ "do we have send??" # , child_of=references[0]
+ ):
+ opentracing.set_tag("ref", references)
+ pass
return (messages, stream_pos)
return self.runInteraction(
@@ -281,6 +292,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
allow_none=True,
)
if already_inserted is not None:
+ opentracing.log_kv({"message": "message already received"})
return
# Add an entry for this message_id so that we know we've processed
@@ -294,6 +306,9 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
"received_ts": now_ms,
},
)
+ opentracing.log_kv(
+ {"message": "device message added to device_federation_inbox"}
+ )
# Add the messages to the approriate local device inboxes so that
# they'll be sent to the devices when they next sync.
@@ -336,6 +351,13 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
messages_json_for_user[device] = message_json
else:
if not devices:
+ opentracing.log_kv(
+ {
+ "message": "No devices for user.",
+ "user_id": user_id,
+ "messages": messages_by_device,
+ }
+ )
continue
sql = (
"SELECT device_id FROM devices"
@@ -361,13 +383,17 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
sql = (
"INSERT INTO device_inbox"
- " (user_id, device_id, stream_id, message_json)"
- " VALUES (?,?,?,?)"
+ " (user_id, device_id, stream_id, message_json, context)"
+ " VALUES (?,?,?,?,?)"
)
rows = []
+ # TODO: User whitelisting?
+ context = json.dumps(
+ {"opentracing": opentracing.active_span_context_as_string()}
+ )
for user_id, messages_by_device in local_by_user_then_device.items():
for device_id, message_json in messages_by_device.items():
- rows.append((user_id, device_id, stream_id, message_json))
+ rows.append((user_id, device_id, stream_id, message_json, context))
txn.executemany(sql, rows)
diff --git a/synapse/storage/schema/delta/55/add_span_context_to_device_inbox.sql b/synapse/storage/schema/delta/55/add_span_context_to_device_inbox.sql
new file mode 100644
index 0000000000..769c17e6ef
--- /dev/null
+++ b/synapse/storage/schema/delta/55/add_span_context_to_device_inbox.sql
@@ -0,0 +1,16 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE device_inbox ADD context TEXT;
\ No newline at end of file
|