diff options
author | Mathieu Velten <mathieuv@matrix.org> | 2023-03-30 19:41:14 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-03-30 19:41:14 +0200 |
commit | 6f68e32bfbe439435410e81ac70fdca10f28fbf7 (patch) | |
tree | 6871ba0e2d83b6057dbadec5bd2ad523e746f369 /synapse/storage | |
parent | Speed up SQLite unit test CI (#15334) (diff) | |
download | synapse-6f68e32bfbe439435410e81ac70fdca10f28fbf7.tar.xz |
to_device updates could be dropped when consuming the replication stream (#15349)
Co-authored-by: reivilibre <oliverw@matrix.org>
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/databases/main/deviceinbox.py | 14 |
1 files changed, 4 insertions, 10 deletions
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 0d75d9739a..b471fcb064 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -617,14 +617,14 @@ class DeviceInboxWorkerStore(SQLBaseStore): # We limit like this as we might have multiple rows per stream_id, and # we want to make sure we always get all entries for any stream_id # we return. - upper_pos = min(current_id, last_id + limit) + upto_token = min(current_id, last_id + limit) sql = ( "SELECT max(stream_id), user_id" " FROM device_inbox" " WHERE ? < stream_id AND stream_id <= ?" " GROUP BY user_id" ) - txn.execute(sql, (last_id, upper_pos)) + txn.execute(sql, (last_id, upto_token)) updates = [(row[0], row[1:]) for row in txn] sql = ( @@ -633,19 +633,13 @@ class DeviceInboxWorkerStore(SQLBaseStore): " WHERE ? < stream_id AND stream_id <= ?" " GROUP BY destination" ) - txn.execute(sql, (last_id, upper_pos)) + txn.execute(sql, (last_id, upto_token)) updates.extend((row[0], row[1:]) for row in txn) # Order by ascending stream ordering updates.sort() - limited = False - upto_token = current_id - if len(updates) >= limit: - upto_token = updates[-1][0] - limited = True - - return updates, upto_token, limited + return updates, upto_token, upto_token < current_id return await self.db_pool.runInteraction( "get_all_new_device_messages", get_all_new_device_messages_txn |