diff options
Diffstat (limited to 'synapse/storage/deviceinbox.py')
-rw-r--r-- | synapse/storage/deviceinbox.py | 26 |
1 files changed, 14 insertions, 12 deletions
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index f640e73714..87398d60bc 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -269,27 +269,29 @@ class DeviceInboxStore(SQLBaseStore): return defer.succeed([]) def get_all_new_device_messages_txn(txn): + # We limit like this as we might have multiple rows per stream_id, and + # we want to make sure we always get all entries for any stream_id + # we return. + upper_pos = min(current_pos, last_pos + limit) sql = ( - "SELECT stream_id FROM device_inbox" + "SELECT stream_id, user_id" + " FROM device_inbox" " WHERE ? < stream_id AND stream_id <= ?" - " GROUP BY stream_id" " ORDER BY stream_id ASC" - " LIMIT ?" ) - txn.execute(sql, (last_pos, current_pos, limit)) - stream_ids = txn.fetchall() - if not stream_ids: - return [] - max_stream_id_in_limit = stream_ids[-1] + txn.execute(sql, (last_pos, upper_pos)) + rows = txn.fetchall() sql = ( - "SELECT stream_id, user_id, device_id, message_json" - " FROM device_inbox" + "SELECT stream_id, destination" + " FROM device_federation_outbox" " WHERE ? < stream_id AND stream_id <= ?" " ORDER BY stream_id ASC" ) - txn.execute(sql, (last_pos, max_stream_id_in_limit)) - return txn.fetchall() + txn.execute(sql, (last_pos, upper_pos)) + rows.extend(txn.fetchall()) + + return rows return self.runInteraction( "get_all_new_device_messages", get_all_new_device_messages_txn |