diff options
author | Erik Johnston <erik@matrix.org> | 2016-11-17 15:46:44 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2016-11-17 15:48:04 +0000 |
commit | f8ee66250a16cb9dd3af01fb1150ff18cfebbc39 (patch) | |
tree | 9920bd4e8164f705b4e27c714d6c053082dcf7a5 /synapse/storage/deviceinbox.py | |
parent | Hook up the send queue and create a federation sender worker (diff) | |
download | synapse-f8ee66250a16cb9dd3af01fb1150ff18cfebbc39.tar.xz |
Handle sending events and device messages over federation
Diffstat (limited to 'synapse/storage/deviceinbox.py')
-rw-r--r-- | synapse/storage/deviceinbox.py | 26 |
1 files changed, 14 insertions, 12 deletions
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index f640e73714..87398d60bc 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -269,27 +269,29 @@ class DeviceInboxStore(SQLBaseStore): return defer.succeed([]) def get_all_new_device_messages_txn(txn): + # We limit like this as we might have multiple rows per stream_id, and + # we want to make sure we always get all entries for any stream_id + # we return. + upper_pos = min(current_pos, last_pos + limit) sql = ( - "SELECT stream_id FROM device_inbox" + "SELECT stream_id, user_id" + " FROM device_inbox" " WHERE ? < stream_id AND stream_id <= ?" - " GROUP BY stream_id" " ORDER BY stream_id ASC" - " LIMIT ?" ) - txn.execute(sql, (last_pos, current_pos, limit)) - stream_ids = txn.fetchall() - if not stream_ids: - return [] - max_stream_id_in_limit = stream_ids[-1] + txn.execute(sql, (last_pos, upper_pos)) + rows = txn.fetchall() sql = ( - "SELECT stream_id, user_id, device_id, message_json" - " FROM device_inbox" + "SELECT stream_id, destination" + " FROM device_federation_outbox" " WHERE ? < stream_id AND stream_id <= ?" " ORDER BY stream_id ASC" ) - txn.execute(sql, (last_pos, max_stream_id_in_limit)) - return txn.fetchall() + txn.execute(sql, (last_pos, upper_pos)) + rows.extend(txn.fetchall()) + + return rows return self.runInteraction( "get_all_new_device_messages", get_all_new_device_messages_txn |