summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorMathieu Velten <mathieuv@matrix.org>2023-03-30 19:41:14 +0200
committerGitHub <noreply@github.com>2023-03-30 19:41:14 +0200
commit6f68e32bfbe439435410e81ac70fdca10f28fbf7 (patch)
tree6871ba0e2d83b6057dbadec5bd2ad523e746f369 /synapse/storage
parentSpeed up SQLite unit test CI (#15334) (diff)
downloadsynapse-6f68e32bfbe439435410e81ac70fdca10f28fbf7.tar.xz
to_device updates could be dropped when consuming the replication stream (#15349)
Co-authored-by: reivilibre <oliverw@matrix.org>
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/deviceinbox.py14
1 files changed, 4 insertions, 10 deletions
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 0d75d9739a..b471fcb064 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -617,14 +617,14 @@ class DeviceInboxWorkerStore(SQLBaseStore):
             # We limit like this as we might have multiple rows per stream_id, and
             # we want to make sure we always get all entries for any stream_id
             # we return.
-            upper_pos = min(current_id, last_id + limit)
+            upto_token = min(current_id, last_id + limit)
             sql = (
                 "SELECT max(stream_id), user_id"
                 " FROM device_inbox"
                 " WHERE ? < stream_id AND stream_id <= ?"
                 " GROUP BY user_id"
             )
-            txn.execute(sql, (last_id, upper_pos))
+            txn.execute(sql, (last_id, upto_token))
             updates = [(row[0], row[1:]) for row in txn]
 
             sql = (
@@ -633,19 +633,13 @@ class DeviceInboxWorkerStore(SQLBaseStore):
                 " WHERE ? < stream_id AND stream_id <= ?"
                 " GROUP BY destination"
             )
-            txn.execute(sql, (last_id, upper_pos))
+            txn.execute(sql, (last_id, upto_token))
             updates.extend((row[0], row[1:]) for row in txn)
 
             # Order by ascending stream ordering
             updates.sort()
 
-            limited = False
-            upto_token = current_id
-            if len(updates) >= limit:
-                upto_token = updates[-1][0]
-                limited = True
-
-            return updates, upto_token, limited
+            return updates, upto_token, upto_token < current_id
 
         return await self.db_pool.runInteraction(
             "get_all_new_device_messages", get_all_new_device_messages_txn