summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2022-02-11 13:38:05 +0000
committerGitHub <noreply@github.com>2022-02-11 13:38:05 +0000
commit79fb64e417686c91eeb64016e91dffeac9115a80 (patch)
tree47da409576556e9d8683ad55a8bd962da7e069ab
parentfix import cycle (#11965) (diff)
downloadsynapse-79fb64e417686c91eeb64016e91dffeac9115a80.tar.xz
Fix to-device being dropped in limited sync in SQLite. (#11966)
If ther are more than 100 to-device messages pending for a device
`/sync` will only return the first 100, however the next batch token was
incorrectly calculated and so all other pending messages would be
dropped.

This is due to `txn.rowcount` only returning the number of rows that
*changed*, rather than the number *selected* in SQLite.
-rw-r--r--changelog.d/11966.bugfix1
-rw-r--r--synapse/storage/databases/main/deviceinbox.py5
-rw-r--r--tests/rest/client/test_sendtodevice.py40
3 files changed, 45 insertions, 1 deletions
diff --git a/changelog.d/11966.bugfix b/changelog.d/11966.bugfix
new file mode 100644
index 0000000000..af8e096667
--- /dev/null
+++ b/changelog.d/11966.bugfix
@@ -0,0 +1 @@
+Fix to-device messages being dropped during limited sync when using SQLite.
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 8801b7b2dd..1392363de1 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -362,7 +362,10 @@ class DeviceInboxWorkerStore(SQLBaseStore):
             # intended for each device.
             last_processed_stream_pos = to_stream_id
             recipient_device_to_messages: Dict[Tuple[str, str], List[JsonDict]] = {}
+            rowcount = 0
             for row in txn:
+                rowcount += 1
+
                 last_processed_stream_pos = row[0]
                 recipient_user_id = row[1]
                 recipient_device_id = row[2]
@@ -373,7 +376,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
                     (recipient_user_id, recipient_device_id), []
                 ).append(message_dict)
 
-            if limit is not None and txn.rowcount == limit:
+            if limit is not None and rowcount == limit:
                 # We ended up bumping up against the message limit. There may be more messages
                 # to retrieve. Return what we have, as well as the last stream position that
                 # was processed.
diff --git a/tests/rest/client/test_sendtodevice.py b/tests/rest/client/test_sendtodevice.py
index 6db7062a8e..e2ed14457f 100644
--- a/tests/rest/client/test_sendtodevice.py
+++ b/tests/rest/client/test_sendtodevice.py
@@ -198,3 +198,43 @@ class SendToDeviceTestCase(HomeserverTestCase):
                 "content": {"idx": 3},
             },
         )
+
+    def test_limited_sync(self):
+        """If a limited sync for to-devices happens the next /sync should respond immediately."""
+
+        self.register_user("u1", "pass")
+        user1_tok = self.login("u1", "pass", "d1")
+
+        user2 = self.register_user("u2", "pass")
+        user2_tok = self.login("u2", "pass", "d2")
+
+        # Do an initial sync
+        channel = self.make_request("GET", "/sync", access_token=user2_tok)
+        self.assertEqual(channel.code, 200, channel.result)
+        sync_token = channel.json_body["next_batch"]
+
+        # Send 150 to-device messages. We limit to 100 in `/sync`
+        for i in range(150):
+            test_msg = {"foo": "bar"}
+            chan = self.make_request(
+                "PUT",
+                f"/_matrix/client/r0/sendToDevice/m.test/1234-{i}",
+                content={"messages": {user2: {"d2": test_msg}}},
+                access_token=user1_tok,
+            )
+            self.assertEqual(chan.code, 200, chan.result)
+
+        channel = self.make_request(
+            "GET", f"/sync?since={sync_token}&timeout=300000", access_token=user2_tok
+        )
+        self.assertEqual(channel.code, 200, channel.result)
+        messages = channel.json_body.get("to_device", {}).get("events", [])
+        self.assertEqual(len(messages), 100)
+        sync_token = channel.json_body["next_batch"]
+
+        channel = self.make_request(
+            "GET", f"/sync?since={sync_token}&timeout=300000", access_token=user2_tok
+        )
+        self.assertEqual(channel.code, 200, channel.result)
+        messages = channel.json_body.get("to_device", {}).get("events", [])
+        self.assertEqual(len(messages), 50)