summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/storage/devices.py45
1 files changed, 45 insertions, 0 deletions
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 77b02c8a28..83b1d2eeba 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -37,6 +37,10 @@ class DeviceStore(SQLBaseStore):
             max_entries=10000,
         )
 
+        self._clock.looping_call(
+            self._prune_old_outbound_device_pokes, 60 * 60 * 1000
+        )
+
         self.register_background_index_update(
             "device_lists_stream_idx",
             index_name="device_lists_stream_user_id",
@@ -641,3 +645,44 @@ class DeviceStore(SQLBaseStore):
 
     def get_device_stream_token(self):
         return self._device_list_id_gen.get_current_token()
+
+    def _prune_old_outbound_device_pokes(self):
+        """Delete old entries out of the device_lists_outbound_pokes to ensure
+        that we don't fill up due to dead servers. We keep one entry per
+        (destination, user_id) tuple to ensure that the prev_ids remain correct
+        if the server does come back.
+        """
+        yesterday = self._clock.time_msec() - 24 * 60 * 60 * 1000
+
+        def _prune_txn(txn):
+            select_sql = """
+                SELECT destination, user_id, max(stream_id) as stream_id
+                FROM device_lists_outbound_pokes
+                GROUP BY destination, user_id
+                HAVING min(ts) < ? AND count(*) > 1
+            """
+
+            txn.execute(select_sql, (yesterday,))
+            rows = txn.fetchall()
+
+            if not rows:
+                return
+
+            delete_sql = """
+                DELETE FROM device_lists_outbound_pokes
+                WHERE ts < ? AND destination = ? AND user_id = ? AND stream_id < ?
+            """
+
+            txn.executemany(
+                delete_sql,
+                (
+                    (yesterday, row[0], row[1], row[2])
+                    for row in rows
+                )
+            )
+
+            logger.info("Pruned %d device list outbound pokes", txn.rowcount)
+
+        return self.runInteraction(
+            "_prune_old_outbound_device_pokes", _prune_txn
+        )