summary refs log tree commit diff
path: root/synapse/storage/databases/main/deviceinbox.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/deviceinbox.py')
-rw-r--r--synapse/storage/databases/main/deviceinbox.py39
1 files changed, 37 insertions, 2 deletions
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index c55508867d..3ece4e0f66 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -135,6 +135,15 @@ class DeviceInboxWorkerStore(SQLBaseStore):
         has_changed = self._device_inbox_stream_cache.has_entity_changed(
             user_id, last_stream_id
         )
+        issue9533_logger.debug(
+            "get_new_messages_for_device"
+            " user %s device %s last_stream_id %s current_stream_id %s has_changed %s",
+            user_id,
+            device_id,
+            last_stream_id,
+            current_stream_id,
+            has_changed,
+        )
         if not has_changed:
             return ([], current_stream_id)
 
@@ -157,9 +166,19 @@ class DeviceInboxWorkerStore(SQLBaseStore):
                 stream_pos = current_stream_id
             return messages, stream_pos
 
-        return await self.db_pool.runInteraction(
+        result = await self.db_pool.runInteraction(
             "get_new_messages_for_device", get_new_messages_for_device_txn
         )
+        issue9533_logger.debug(
+            "get_all_new_device_messages: after sorting, "
+            "last_stream_id %s, current_stream_id %s result %s",
+            last_stream_id,
+            current_stream_id,
+            result,
+        )
+        # TODO check that the stream_pos here gets read by the consumer---we may need
+        # to adjust the next_batch token if we did use teh limit above
+        return result
 
     @trace
     async def delete_messages_for_device(
@@ -181,6 +200,12 @@ class DeviceInboxWorkerStore(SQLBaseStore):
         )
 
         set_tag("last_deleted_stream_id", last_deleted_stream_id)
+        issue9533_logger.debug(
+            "delete_messages_for_device: user %s device %s; cache says last_deleted_stream_id=%s",
+            user_id,
+            device_id,
+            last_deleted_stream_id,
+        )
 
         if last_deleted_stream_id:
             has_changed = self._device_inbox_stream_cache.has_entity_changed(
@@ -188,6 +213,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
             )
             if not has_changed:
                 log_kv({"message": "No changes in cache since last check"})
+                issue9533_logger.debug("No changes in cache since last check")
                 return 0
 
         def delete_messages_for_device_txn(txn):
@@ -204,6 +230,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
         )
 
         log_kv({"message": f"deleted {count} messages for device", "count": count})
+        issue9533_logger.debug("deleted %s messages for device", count)
 
         # Update the cache, ensuring that we only ever increase the value
         last_deleted_stream_id = self._last_device_delete_cache.get(
@@ -212,6 +239,10 @@ class DeviceInboxWorkerStore(SQLBaseStore):
         self._last_device_delete_cache[(user_id, device_id)] = max(
             last_deleted_stream_id, up_to_stream_id
         )
+        issue9533_logger.debug(
+            "cache updated for (user, device) to %s",
+            max(last_deleted_stream_id, up_to_stream_id),
+        )
 
         return count
 
@@ -355,9 +386,13 @@ class DeviceInboxWorkerStore(SQLBaseStore):
 
             return updates, upto_token, limited
 
-        return await self.db_pool.runInteraction(
+        result = await self.db_pool.runInteraction(
             "get_all_new_device_messages", get_all_new_device_messages_txn
         )
+        issue9533_logger.debug(
+            "get_all_new_device_messages: after sorting, got %s", result
+        )
+        return result
 
     @trace
     async def add_messages_to_device_inbox(