diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index c55508867d..3ece4e0f66 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -135,6 +135,15 @@ class DeviceInboxWorkerStore(SQLBaseStore):
has_changed = self._device_inbox_stream_cache.has_entity_changed(
user_id, last_stream_id
)
+ issue9533_logger.debug(
+ "get_new_messages_for_device"
+ " user %s device %s last_stream_id %s current_stream_id %s has_changed %s",
+ user_id,
+ device_id,
+ last_stream_id,
+ current_stream_id,
+ has_changed,
+ )
if not has_changed:
return ([], current_stream_id)
@@ -157,9 +166,19 @@ class DeviceInboxWorkerStore(SQLBaseStore):
stream_pos = current_stream_id
return messages, stream_pos
- return await self.db_pool.runInteraction(
+ result = await self.db_pool.runInteraction(
"get_new_messages_for_device", get_new_messages_for_device_txn
)
+ issue9533_logger.debug(
+ "get_all_new_device_messages: after sorting, "
+ "last_stream_id %s, current_stream_id %s result %s",
+ last_stream_id,
+ current_stream_id,
+ result,
+ )
+ # TODO check that the stream_pos here gets read by the consumer---we may need
+ # to adjust the next_batch token if we did use teh limit above
+ return result
@trace
async def delete_messages_for_device(
@@ -181,6 +200,12 @@ class DeviceInboxWorkerStore(SQLBaseStore):
)
set_tag("last_deleted_stream_id", last_deleted_stream_id)
+ issue9533_logger.debug(
+ "delete_messages_for_device: user %s device %s; cache says last_deleted_stream_id=%s",
+ user_id,
+ device_id,
+ last_deleted_stream_id,
+ )
if last_deleted_stream_id:
has_changed = self._device_inbox_stream_cache.has_entity_changed(
@@ -188,6 +213,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
)
if not has_changed:
log_kv({"message": "No changes in cache since last check"})
+ issue9533_logger.debug("No changes in cache since last check")
return 0
def delete_messages_for_device_txn(txn):
@@ -204,6 +230,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
)
log_kv({"message": f"deleted {count} messages for device", "count": count})
+ issue9533_logger.debug("deleted %s messages for device", count)
# Update the cache, ensuring that we only ever increase the value
last_deleted_stream_id = self._last_device_delete_cache.get(
@@ -212,6 +239,10 @@ class DeviceInboxWorkerStore(SQLBaseStore):
self._last_device_delete_cache[(user_id, device_id)] = max(
last_deleted_stream_id, up_to_stream_id
)
+ issue9533_logger.debug(
+ "cache updated for (user, device) to %s",
+ max(last_deleted_stream_id, up_to_stream_id),
+ )
return count
@@ -355,9 +386,13 @@ class DeviceInboxWorkerStore(SQLBaseStore):
return updates, upto_token, limited
- return await self.db_pool.runInteraction(
+ result = await self.db_pool.runInteraction(
"get_all_new_device_messages", get_all_new_device_messages_txn
)
+ issue9533_logger.debug(
+ "get_all_new_device_messages: after sorting, got %s", result
+ )
+ return result
@trace
async def add_messages_to_device_inbox(
|