summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2017-04-04 16:21:21 +0100
committerErik Johnston <erik@matrix.org>2017-04-04 16:21:21 +0100
commit9f26d3b75b6ce369663ee9db7b1241e376772af1 (patch)
treeff9cbbcbfb9f73e65a820fe1ec8411a03a1c3b24
parentMerge pull request #2098 from matrix-org/erikj/repl_tcp_fix (diff)
downloadsynapse-9f26d3b75b6ce369663ee9db7b1241e376772af1.tar.xz
Deduplicate new deviceinbox rows for replication
Diffstat (limited to '')
-rw-r--r--synapse/storage/deviceinbox.py11
1 files changed, 7 insertions, 4 deletions
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index 2714519d21..0b62b493d5 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -325,23 +325,26 @@ class DeviceInboxStore(BackgroundUpdateStore):
             # we return.
             upper_pos = min(current_pos, last_pos + limit)
             sql = (
-                "SELECT stream_id, user_id"
+                "SELECT max(stream_id), user_id"
                 " FROM device_inbox"
                 " WHERE ? < stream_id AND stream_id <= ?"
-                " ORDER BY stream_id ASC"
+                " GROUP BY user_id"
             )
             txn.execute(sql, (last_pos, upper_pos))
             rows = txn.fetchall()
 
             sql = (
-                "SELECT stream_id, destination"
+                "SELECT max(stream_id), destination"
                 " FROM device_federation_outbox"
                 " WHERE ? < stream_id AND stream_id <= ?"
-                " ORDER BY stream_id ASC"
+                " GROUP BY destination"
             )
             txn.execute(sql, (last_pos, upper_pos))
             rows.extend(txn)
 
+            # Order by ascending stream ordering
+            rows.sort()
+
             return rows
 
         return self.runInteraction(