summary refs log tree commit diff
path: root/synapse/storage/deviceinbox.py
diff options
context:
space:
mode:
authorErik Johnston <erikj@jki.re>2016-11-23 15:39:12 +0000
committerGitHub <noreply@github.com>2016-11-23 15:39:12 +0000
commit302fbd218ddb00a1d38e778b29cdf043800941e1 (patch)
tree89e462d2fc8d2470bb66d744e2289050df7edef3 /synapse/storage/deviceinbox.py
parentMerge pull request #1641 from matrix-org/erikj/as_pushers (diff)
parentShuffle receipt handler around so that worker apps don't need to load it (diff)
downloadsynapse-302fbd218ddb00a1d38e778b29cdf043800941e1.tar.xz
Merge pull request #1635 from matrix-org/erikj/split_out_fed_txn
Split out federation transaction sending to a worker
Diffstat (limited to 'synapse/storage/deviceinbox.py')
-rw-r--r--synapse/storage/deviceinbox.py26
1 files changed, 14 insertions, 12 deletions
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index f640e73714..87398d60bc 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -269,27 +269,29 @@ class DeviceInboxStore(SQLBaseStore):
             return defer.succeed([])
 
         def get_all_new_device_messages_txn(txn):
+            # We limit like this as we might have multiple rows per stream_id, and
+            # we want to make sure we always get all entries for any stream_id
+            # we return.
+            upper_pos = min(current_pos, last_pos + limit)
             sql = (
-                "SELECT stream_id FROM device_inbox"
+                "SELECT stream_id, user_id"
+                " FROM device_inbox"
                 " WHERE ? < stream_id AND stream_id <= ?"
-                " GROUP BY stream_id"
                 " ORDER BY stream_id ASC"
-                " LIMIT ?"
             )
-            txn.execute(sql, (last_pos, current_pos, limit))
-            stream_ids = txn.fetchall()
-            if not stream_ids:
-                return []
-            max_stream_id_in_limit = stream_ids[-1]
+            txn.execute(sql, (last_pos, upper_pos))
+            rows = txn.fetchall()
 
             sql = (
-                "SELECT stream_id, user_id, device_id, message_json"
-                " FROM device_inbox"
+                "SELECT stream_id, destination"
+                " FROM device_federation_outbox"
                 " WHERE ? < stream_id AND stream_id <= ?"
                 " ORDER BY stream_id ASC"
             )
-            txn.execute(sql, (last_pos, max_stream_id_in_limit))
-            return txn.fetchall()
+            txn.execute(sql, (last_pos, upper_pos))
+            rows.extend(txn.fetchall())
+
+            return rows
 
         return self.runInteraction(
             "get_all_new_device_messages", get_all_new_device_messages_txn