From 6f68e32bfbe439435410e81ac70fdca10f28fbf7 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Thu, 30 Mar 2023 19:41:14 +0200 Subject: to_device updates could be dropped when consuming the replication stream (#15349) Co-authored-by: reivilibre --- synapse/storage/databases/main/deviceinbox.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) (limited to 'synapse/storage/databases') diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 0d75d9739a..b471fcb064 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -617,14 +617,14 @@ class DeviceInboxWorkerStore(SQLBaseStore): # We limit like this as we might have multiple rows per stream_id, and # we want to make sure we always get all entries for any stream_id # we return. - upper_pos = min(current_id, last_id + limit) + upto_token = min(current_id, last_id + limit) sql = ( "SELECT max(stream_id), user_id" " FROM device_inbox" " WHERE ? < stream_id AND stream_id <= ?" " GROUP BY user_id" ) - txn.execute(sql, (last_id, upper_pos)) + txn.execute(sql, (last_id, upto_token)) updates = [(row[0], row[1:]) for row in txn] sql = ( @@ -633,19 +633,13 @@ class DeviceInboxWorkerStore(SQLBaseStore): " WHERE ? < stream_id AND stream_id <= ?" " GROUP BY destination" ) - txn.execute(sql, (last_id, upper_pos)) + txn.execute(sql, (last_id, upto_token)) updates.extend((row[0], row[1:]) for row in txn) # Order by ascending stream ordering updates.sort() - limited = False - upto_token = current_id - if len(updates) >= limit: - upto_token = updates[-1][0] - limited = True - - return updates, upto_token, limited + return updates, upto_token, upto_token < current_id return await self.db_pool.runInteraction( "get_all_new_device_messages", get_all_new_device_messages_txn -- cgit 1.4.1