diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 77b02c8a28..83b1d2eeba 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -37,6 +37,10 @@ class DeviceStore(SQLBaseStore):
max_entries=10000,
)
+ self._clock.looping_call(
+ self._prune_old_outbound_device_pokes, 60 * 60 * 1000
+ )
+
self.register_background_index_update(
"device_lists_stream_idx",
index_name="device_lists_stream_user_id",
@@ -641,3 +645,44 @@ class DeviceStore(SQLBaseStore):
def get_device_stream_token(self):
return self._device_list_id_gen.get_current_token()
+
+ def _prune_old_outbound_device_pokes(self):
+ """Delete old entries out of the device_lists_outbound_pokes to ensure
+ that we don't fill up due to dead servers. We keep one entry per
+ (destination, user_id) tuple to ensure that the prev_ids remain correct
+ if the server does come back.
+ """
+ yesterday = self._clock.time_msec() - 24 * 60 * 60 * 1000
+
+ def _prune_txn(txn):
+ select_sql = """
+ SELECT destination, user_id, max(stream_id) as stream_id
+ FROM device_lists_outbound_pokes
+ GROUP BY destination, user_id
+ HAVING min(ts) < ? AND count(*) > 1
+ """
+
+ txn.execute(select_sql, (yesterday,))
+ rows = txn.fetchall()
+
+ if not rows:
+ return
+
+ delete_sql = """
+ DELETE FROM device_lists_outbound_pokes
+ WHERE ts < ? AND destination = ? AND user_id = ? AND stream_id < ?
+ """
+
+ txn.executemany(
+ delete_sql,
+ (
+ (yesterday, row[0], row[1], row[2])
+ for row in rows
+ )
+ )
+
+ logger.info("Pruned %d device list outbound pokes", txn.rowcount)
+
+ return self.runInteraction(
+ "_prune_old_outbound_device_pokes", _prune_txn
+ )
|