diff options
Diffstat (limited to 'synapse/storage/databases/main/deviceinbox.py')
-rw-r--r-- | synapse/storage/databases/main/deviceinbox.py | 39 |
1 files changed, 37 insertions, 2 deletions
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index c55508867d..3ece4e0f66 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -135,6 +135,15 @@ class DeviceInboxWorkerStore(SQLBaseStore): has_changed = self._device_inbox_stream_cache.has_entity_changed( user_id, last_stream_id ) + issue9533_logger.debug( + "get_new_messages_for_device" + " user %s device %s last_stream_id %s current_stream_id %s has_changed %s", + user_id, + device_id, + last_stream_id, + current_stream_id, + has_changed, + ) if not has_changed: return ([], current_stream_id) @@ -157,9 +166,19 @@ class DeviceInboxWorkerStore(SQLBaseStore): stream_pos = current_stream_id return messages, stream_pos - return await self.db_pool.runInteraction( + result = await self.db_pool.runInteraction( "get_new_messages_for_device", get_new_messages_for_device_txn ) + issue9533_logger.debug( + "get_all_new_device_messages: after sorting, " + "last_stream_id %s, current_stream_id %s result %s", + last_stream_id, + current_stream_id, + result, + ) + # TODO check that the stream_pos here gets read by the consumer---we may need + # to adjust the next_batch token if we did use teh limit above + return result @trace async def delete_messages_for_device( @@ -181,6 +200,12 @@ class DeviceInboxWorkerStore(SQLBaseStore): ) set_tag("last_deleted_stream_id", last_deleted_stream_id) + issue9533_logger.debug( + "delete_messages_for_device: user %s device %s; cache says last_deleted_stream_id=%s", + user_id, + device_id, + last_deleted_stream_id, + ) if last_deleted_stream_id: has_changed = self._device_inbox_stream_cache.has_entity_changed( @@ -188,6 +213,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): ) if not has_changed: log_kv({"message": "No changes in cache since last check"}) + issue9533_logger.debug("No changes in cache since last check") return 0 def delete_messages_for_device_txn(txn): @@ -204,6 +230,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): ) log_kv({"message": f"deleted {count} messages for device", "count": count}) + issue9533_logger.debug("deleted %s messages for device", count) # Update the cache, ensuring that we only ever increase the value last_deleted_stream_id = self._last_device_delete_cache.get( @@ -212,6 +239,10 @@ class DeviceInboxWorkerStore(SQLBaseStore): self._last_device_delete_cache[(user_id, device_id)] = max( last_deleted_stream_id, up_to_stream_id ) + issue9533_logger.debug( + "cache updated for (user, device) to %s", + max(last_deleted_stream_id, up_to_stream_id), + ) return count @@ -355,9 +386,13 @@ class DeviceInboxWorkerStore(SQLBaseStore): return updates, upto_token, limited - return await self.db_pool.runInteraction( + result = await self.db_pool.runInteraction( "get_all_new_device_messages", get_all_new_device_messages_txn ) + issue9533_logger.debug( + "get_all_new_device_messages: after sorting, got %s", result + ) + return result @trace async def add_messages_to_device_inbox( |