diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index d72f60d94b..c05ca7c5e0 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -24,6 +24,13 @@ logger = logging.getLogger(__name__)
class DeviceStore(SQLBaseStore):
+ def __init__(self, hs):
+ super(DeviceStore, self).__init__(hs)
+
+ self._clock.looping_call(
+ self._prune_old_outbound_device_pokes, 60 * 60 * 1000
+ )
+
@defer.inlineCallbacks
def store_device(self, user_id, device_id,
initial_device_display_name):
@@ -530,3 +537,38 @@ class DeviceStore(SQLBaseStore):
def get_device_stream_token(self):
return self._device_list_id_gen.get_current_token()
+
+ def _prune_old_outbound_device_pokes(self):
+ """Delete old entries out of the device_lists_outbound_pokes to ensure
+ that we don't fill up due to dead servers. We keep one entry per
+ (destination, user_id) tuple to ensure that the prev_ids remain correct
+ if the server does come back.
+ """
+ now = self._clock.time_msec()
+
+ def _prune_txn(txn):
+ select_sql = """
+ SELECT destination, user_id, max(stream_id) as stream_id
+ FROM device_lists_outbound_pokes
+ GROUP BY destination, user_id
+ """
+
+ txn.execute(select_sql)
+ rows = txn.fetchall()
+
+ delete_sql = """
+ DELETE FROM device_lists_outbound_pokes
+ WHERE ts < ? AND destination = ? AND user_id = ? AND stream_id < ?
+ """
+
+ txn.executemany(
+ delete_sql,
+ (
+ (now, row["destination"], row["user_id"], row["stream_id"])
+ for row in rows
+ )
+ )
+
+ return self.runInteraction(
+ "_prune_old_outbound_device_pokes", _prune_txn
+ )
|