summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorMathieu Velten <mathieuv@matrix.org>2023-03-30 19:41:14 +0200
committerGitHub <noreply@github.com>2023-03-30 19:41:14 +0200
commit6f68e32bfbe439435410e81ac70fdca10f28fbf7 (patch)
tree6871ba0e2d83b6057dbadec5bd2ad523e746f369 /synapse/storage
parentSpeed up SQLite unit test CI (#15334) (diff)
downloadsynapse-6f68e32bfbe439435410e81ac70fdca10f28fbf7.tar.xz
to_device updates could be dropped when consuming the replication stream (#15349)
Co-authored-by: reivilibre <oliverw@matrix.org>
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/deviceinbox.py14
1 files changed, 4 insertions, 10 deletions
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py

index 0d75d9739a..b471fcb064 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py
@@ -617,14 +617,14 @@ class DeviceInboxWorkerStore(SQLBaseStore): # We limit like this as we might have multiple rows per stream_id, and # we want to make sure we always get all entries for any stream_id # we return. - upper_pos = min(current_id, last_id + limit) + upto_token = min(current_id, last_id + limit) sql = ( "SELECT max(stream_id), user_id" " FROM device_inbox" " WHERE ? < stream_id AND stream_id <= ?" " GROUP BY user_id" ) - txn.execute(sql, (last_id, upper_pos)) + txn.execute(sql, (last_id, upto_token)) updates = [(row[0], row[1:]) for row in txn] sql = ( @@ -633,19 +633,13 @@ class DeviceInboxWorkerStore(SQLBaseStore): " WHERE ? < stream_id AND stream_id <= ?" " GROUP BY destination" ) - txn.execute(sql, (last_id, upper_pos)) + txn.execute(sql, (last_id, upto_token)) updates.extend((row[0], row[1:]) for row in txn) # Order by ascending stream ordering updates.sort() - limited = False - upto_token = current_id - if len(updates) >= limit: - upto_token = updates[-1][0] - limited = True - - return updates, upto_token, limited + return updates, upto_token, upto_token < current_id return await self.db_pool.runInteraction( "get_all_new_device_messages", get_all_new_device_messages_txn