summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2017-01-30 10:12:00 +0000
committerErik Johnston <erik@matrix.org>2017-01-30 10:14:37 +0000
commitd360c97ae1e79789e97ab6a12e005d22334e416f (patch)
treee31cb9a4f842a66e64157f9964fdc7dd90074708 /synapse/storage
parentAlways use the latest stream_id, sent or unsent (diff)
downloadsynapse-d360c97ae1e79789e97ab6a12e005d22334e416f.tar.xz
Clear out old destination pokes.
Diffstat (limited to '')
-rw-r--r--synapse/storage/devices.py42
1 files changed, 42 insertions, 0 deletions
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index d72f60d94b..c05ca7c5e0 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -24,6 +24,13 @@ logger = logging.getLogger(__name__)
 
 
 class DeviceStore(SQLBaseStore):
+    def __init__(self, hs):
+        super(DeviceStore, self).__init__(hs)
+
+        self._clock.looping_call(
+            self._prune_old_outbound_device_pokes, 60 * 60 * 1000
+        )
+
     @defer.inlineCallbacks
     def store_device(self, user_id, device_id,
                      initial_device_display_name):
@@ -530,3 +537,38 @@ class DeviceStore(SQLBaseStore):
 
     def get_device_stream_token(self):
         return self._device_list_id_gen.get_current_token()
+
+    def _prune_old_outbound_device_pokes(self):
+        """Delete old entries out of the device_lists_outbound_pokes to ensure
+        that we don't fill up due to dead servers. We keep one entry per
+        (destination, user_id) tuple to ensure that the prev_ids remain correct
+        if the server does come back.
+        """
+        now = self._clock.time_msec()
+
+        def _prune_txn(txn):
+            select_sql = """
+                SELECT destination, user_id, max(stream_id) as stream_id
+                FROM device_lists_outbound_pokes
+                GROUP BY destination, user_id
+            """
+
+            txn.execute(select_sql)
+            rows = txn.fetchall()
+
+            delete_sql = """
+                DELETE FROM device_lists_outbound_pokes
+                WHERE ts < ? AND destination = ? AND user_id = ? AND stream_id < ?
+            """
+
+            txn.executemany(
+                delete_sql,
+                (
+                    (now, row["destination"], row["user_id"], row["stream_id"])
+                    for row in rows
+                )
+            )
+
+        return self.runInteraction(
+            "_prune_old_outbound_device_pokes", _prune_txn
+        )