diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index f640e73714..87398d60bc 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -269,27 +269,29 @@ class DeviceInboxStore(SQLBaseStore):
return defer.succeed([])
def get_all_new_device_messages_txn(txn):
+ # We limit like this as we might have multiple rows per stream_id, and
+ # we want to make sure we always get all entries for any stream_id
+ # we return.
+ upper_pos = min(current_pos, last_pos + limit)
sql = (
- "SELECT stream_id FROM device_inbox"
+ "SELECT stream_id, user_id"
+ " FROM device_inbox"
" WHERE ? < stream_id AND stream_id <= ?"
- " GROUP BY stream_id"
" ORDER BY stream_id ASC"
- " LIMIT ?"
)
- txn.execute(sql, (last_pos, current_pos, limit))
- stream_ids = txn.fetchall()
- if not stream_ids:
- return []
- max_stream_id_in_limit = stream_ids[-1]
+ txn.execute(sql, (last_pos, upper_pos))
+ rows = txn.fetchall()
sql = (
- "SELECT stream_id, user_id, device_id, message_json"
- " FROM device_inbox"
+ "SELECT stream_id, destination"
+ " FROM device_federation_outbox"
" WHERE ? < stream_id AND stream_id <= ?"
" ORDER BY stream_id ASC"
)
- txn.execute(sql, (last_pos, max_stream_id_in_limit))
- return txn.fetchall()
+ txn.execute(sql, (last_pos, upper_pos))
+ rows.extend(txn.fetchall())
+
+ return rows
return self.runInteraction(
"get_all_new_device_messages", get_all_new_device_messages_txn
|