diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index d9936c88bb..bb27fd1f70 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -368,7 +368,7 @@ class DeviceStore(SQLBaseStore):
prev_sent_id_sql = """
SELECT coalesce(max(stream_id), 0) as stream_id
- FROM device_lists_outbound_pokes
+ FROM device_lists_outbound_last_success
WHERE destination = ? AND user_id = ? AND stream_id <= ?
"""
@@ -510,32 +510,43 @@ class DeviceStore(SQLBaseStore):
)
def _mark_as_sent_devices_by_remote_txn(self, txn, destination, stream_id):
- # First we DELETE all rows such that only the latest row for each
- # (destination, user_id is left. We do this by selecting first and
- # deleting.
+ # We update the device_lists_outbound_last_success with the successfully
+ # poked users. We do the join to see which users need to be inserted and
+ # which updated.
sql = """
- SELECT user_id, coalesce(max(stream_id), 0) FROM device_lists_outbound_pokes
- WHERE destination = ? AND stream_id <= ?
+ SELECT user_id, coalesce(max(o.stream_id), 0), (max(s.stream_id) IS NOT NULL)
+ FROM device_lists_outbound_pokes as o
+ LEFT JOIN device_lists_outbound_last_success as s
+ USING (destination, user_id)
+ WHERE destination = ? AND o.stream_id <= ?
GROUP BY user_id
- HAVING count(*) > 1
"""
txn.execute(sql, (destination, stream_id,))
rows = txn.fetchall()
sql = """
- DELETE FROM device_lists_outbound_pokes
- WHERE destination = ? AND user_id = ? AND stream_id < ?
+ UPDATE device_lists_outbound_last_success
+ SET stream_id = ?
+ WHERE destination = ? AND user_id = ?
"""
txn.executemany(
- sql, ((destination, row[0], row[1],) for row in rows)
+ sql, ((row[1], destination, row[0],) for row in rows if row[2])
)
- # Mark everything that is left as sent
sql = """
- UPDATE device_lists_outbound_pokes SET sent = ?
+ INSERT INTO device_lists_outbound_last_success
+ (destination, user_id, stream_id) VALUES (?, ?, ?)
+ """
+ txn.executemany(
+ sql, ((destination, row[0], row[1],) for row in rows if not row[2])
+ )
+
+ # Delete all sent outbound pokes
+ sql = """
+ DELETE FROM device_lists_outbound_pokes
WHERE destination = ? AND stream_id <= ?
"""
- txn.execute(sql, (True, destination, stream_id,))
+ txn.execute(sql, (destination, stream_id,))
@defer.inlineCallbacks
def get_user_whose_devices_changed(self, from_key):
@@ -670,6 +681,14 @@ class DeviceStore(SQLBaseStore):
)
)
+ # Since we've deleted unsent deltas, we need to remove the entry
+ # of last successful sent so that the prev_ids are correctly set.
+ sql = """
+ DELETE FROM device_lists_outbound_last_success
+ WHERE destination = ? AND user_id = ?
+ """
+ txn.executemany(sql, ((row[0], row[1]) for row in rows))
+
logger.info("Pruned %d device list outbound pokes", txn.rowcount)
return self.runInteraction(
|