summary refs log tree commit diff
diff options
context:
space:
mode:
authorDavid Robertson <davidr@element.io>2021-08-19 18:11:02 +0100
committerDavid Robertson <davidr@element.io>2021-08-19 18:21:16 +0100
commitceb29d4e0f58d21396eedbf9213d4c754b274931 (patch)
treeed1939d2e5e85a846e5aa83e4f9d9050b82a6ead
parentConvert room member storage tuples to attrs. (#10629) (diff)
downloadsynapse-dmr/missing_to_device.tar.xz
If in doubt, print it out. Thanks Erik for advising.
-rw-r--r--synapse/rest/client/sync.py7
-rw-r--r--synapse/storage/databases/main/deviceinbox.py39
2 files changed, 44 insertions, 2 deletions
diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index e18f4d01b3..eb68a672c7 100644
--- a/synapse/rest/client/sync.py
+++ b/synapse/rest/client/sync.py
@@ -30,6 +30,7 @@ from synapse.http.site import SynapseRequest
 from synapse.types import JsonDict, StreamToken
 from synapse.util import json_decoder
 
+from ...logging import issue9533_logger
 from ._base import client_patterns, set_timeline_upper_limit
 
 if TYPE_CHECKING:
@@ -185,6 +186,9 @@ class SyncRestServlet(RestServlet):
                 full_state=full_state,
             )
 
+        issue9533_logger.debug(
+            "Sync body generated, next batch: %s", sync_result.next_batch
+        )
         # the client may have disconnected by now; don't bother to serialize the
         # response if so.
         if request._disconnected:
@@ -246,6 +250,9 @@ class SyncRestServlet(RestServlet):
 
         if sync_result.to_device:
             response["to_device"] = {"events": sync_result.to_device}
+            issue9533_logger.debug(
+                "to_device sent down in sync %s", response["to_device"]
+            )
 
         if sync_result.device_lists.changed:
             response["device_lists"]["changed"] = list(sync_result.device_lists.changed)
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index c55508867d..3ece4e0f66 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -135,6 +135,15 @@ class DeviceInboxWorkerStore(SQLBaseStore):
         has_changed = self._device_inbox_stream_cache.has_entity_changed(
             user_id, last_stream_id
         )
+        issue9533_logger.debug(
+            "get_new_messages_for_device"
+            " user %s device %s last_stream_id %s current_stream_id %s has_changed %s",
+            user_id,
+            device_id,
+            last_stream_id,
+            current_stream_id,
+            has_changed,
+        )
         if not has_changed:
             return ([], current_stream_id)
 
@@ -157,9 +166,19 @@ class DeviceInboxWorkerStore(SQLBaseStore):
                 stream_pos = current_stream_id
             return messages, stream_pos
 
-        return await self.db_pool.runInteraction(
+        result = await self.db_pool.runInteraction(
             "get_new_messages_for_device", get_new_messages_for_device_txn
         )
+        issue9533_logger.debug(
+            "get_all_new_device_messages: after sorting, "
+            "last_stream_id %s, current_stream_id %s result %s",
+            last_stream_id,
+            current_stream_id,
+            result,
+        )
+        # TODO check that the stream_pos here gets read by the consumer---we may need
+        # to adjust the next_batch token if we did use teh limit above
+        return result
 
     @trace
     async def delete_messages_for_device(
@@ -181,6 +200,12 @@ class DeviceInboxWorkerStore(SQLBaseStore):
         )
 
         set_tag("last_deleted_stream_id", last_deleted_stream_id)
+        issue9533_logger.debug(
+            "delete_messages_for_device: user %s device %s; cache says last_deleted_stream_id=%s",
+            user_id,
+            device_id,
+            last_deleted_stream_id,
+        )
 
         if last_deleted_stream_id:
             has_changed = self._device_inbox_stream_cache.has_entity_changed(
@@ -188,6 +213,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
             )
             if not has_changed:
                 log_kv({"message": "No changes in cache since last check"})
+                issue9533_logger.debug("No changes in cache since last check")
                 return 0
 
         def delete_messages_for_device_txn(txn):
@@ -204,6 +230,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
         )
 
         log_kv({"message": f"deleted {count} messages for device", "count": count})
+        issue9533_logger.debug("deleted %s messages for device", count)
 
         # Update the cache, ensuring that we only ever increase the value
         last_deleted_stream_id = self._last_device_delete_cache.get(
@@ -212,6 +239,10 @@ class DeviceInboxWorkerStore(SQLBaseStore):
         self._last_device_delete_cache[(user_id, device_id)] = max(
             last_deleted_stream_id, up_to_stream_id
         )
+        issue9533_logger.debug(
+            "cache updated for (user, device) to %s",
+            max(last_deleted_stream_id, up_to_stream_id),
+        )
 
         return count
 
@@ -355,9 +386,13 @@ class DeviceInboxWorkerStore(SQLBaseStore):
 
             return updates, upto_token, limited
 
-        return await self.db_pool.runInteraction(
+        result = await self.db_pool.runInteraction(
             "get_all_new_device_messages", get_all_new_device_messages_txn
         )
+        issue9533_logger.debug(
+            "get_all_new_device_messages: after sorting, got %s", result
+        )
+        return result
 
     @trace
     async def add_messages_to_device_inbox(