summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2017-06-07 11:02:38 +0100
committerErik Johnston <erik@matrix.org>2017-06-07 11:02:38 +0100
commit65f0513a3306d21aa5e6959b21642ba15dbdcad5 (patch)
treec570b7fd4e68ccf30bcb0d3e55b6e9dd8af6a416 /synapse
parentIncrease size of IP cache (diff)
downloadsynapse-65f0513a3306d21aa5e6959b21642ba15dbdcad5.tar.xz
Split up device_lists_outbound_pokes table for faster updates.
Diffstat (limited to 'synapse')
-rw-r--r--synapse/storage/devices.py82
-rw-r--r--synapse/storage/schema/delta/42/device_list_last_id.sql33
2 files changed, 57 insertions, 58 deletions
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index d9936c88bb..77b02c8a28 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -37,10 +37,6 @@ class DeviceStore(SQLBaseStore):
             max_entries=10000,
         )
 
-        self._clock.looping_call(
-            self._prune_old_outbound_device_pokes, 60 * 60 * 1000
-        )
-
         self.register_background_index_update(
             "device_lists_stream_idx",
             index_name="device_lists_stream_user_id",
@@ -368,7 +364,7 @@ class DeviceStore(SQLBaseStore):
 
         prev_sent_id_sql = """
             SELECT coalesce(max(stream_id), 0) as stream_id
-            FROM device_lists_outbound_pokes
+            FROM device_lists_outbound_last_success
             WHERE destination = ? AND user_id = ? AND stream_id <= ?
         """
 
@@ -510,32 +506,43 @@ class DeviceStore(SQLBaseStore):
         )
 
     def _mark_as_sent_devices_by_remote_txn(self, txn, destination, stream_id):
-        # First we DELETE all rows such that only the latest row for each
-        # (destination, user_id is left. We do this by selecting first and
-        # deleting.
+        # We update the device_lists_outbound_last_success with the successfully
+        # poked users. We do the join to see which users need to be inserted and
+        # which updated.
         sql = """
-            SELECT user_id, coalesce(max(stream_id), 0) FROM device_lists_outbound_pokes
-            WHERE destination = ? AND stream_id <= ?
+            SELECT user_id, coalesce(max(o.stream_id), 0), (max(s.stream_id) IS NOT NULL)
+            FROM device_lists_outbound_pokes as o
+            LEFT JOIN device_lists_outbound_last_success as s
+                USING (destination, user_id)
+            WHERE destination = ? AND o.stream_id <= ?
             GROUP BY user_id
-            HAVING count(*) > 1
         """
         txn.execute(sql, (destination, stream_id,))
         rows = txn.fetchall()
 
         sql = """
-            DELETE FROM device_lists_outbound_pokes
-            WHERE destination = ? AND user_id = ? AND stream_id < ?
+            UPDATE device_lists_outbound_last_success
+            SET stream_id = ?
+            WHERE destination = ? AND user_id = ?
         """
         txn.executemany(
-            sql, ((destination, row[0], row[1],) for row in rows)
+            sql, ((row[1], destination, row[0],) for row in rows if row[2])
         )
 
-        # Mark everything that is left as sent
         sql = """
-            UPDATE device_lists_outbound_pokes SET sent = ?
+            INSERT INTO device_lists_outbound_last_success
+            (destination, user_id, stream_id) VALUES (?, ?, ?)
+        """
+        txn.executemany(
+            sql, ((destination, row[0], row[1],) for row in rows if not row[2])
+        )
+
+        # Delete all sent outbound pokes
+        sql = """
+            DELETE FROM device_lists_outbound_pokes
             WHERE destination = ? AND stream_id <= ?
         """
-        txn.execute(sql, (True, destination, stream_id,))
+        txn.execute(sql, (destination, stream_id,))
 
     @defer.inlineCallbacks
     def get_user_whose_devices_changed(self, from_key):
@@ -634,44 +641,3 @@ class DeviceStore(SQLBaseStore):
 
     def get_device_stream_token(self):
         return self._device_list_id_gen.get_current_token()
-
-    def _prune_old_outbound_device_pokes(self):
-        """Delete old entries out of the device_lists_outbound_pokes to ensure
-        that we don't fill up due to dead servers. We keep one entry per
-        (destination, user_id) tuple to ensure that the prev_ids remain correct
-        if the server does come back.
-        """
-        yesterday = self._clock.time_msec() - 24 * 60 * 60 * 1000
-
-        def _prune_txn(txn):
-            select_sql = """
-                SELECT destination, user_id, max(stream_id) as stream_id
-                FROM device_lists_outbound_pokes
-                GROUP BY destination, user_id
-                HAVING min(ts) < ? AND count(*) > 1
-            """
-
-            txn.execute(select_sql, (yesterday,))
-            rows = txn.fetchall()
-
-            if not rows:
-                return
-
-            delete_sql = """
-                DELETE FROM device_lists_outbound_pokes
-                WHERE ts < ? AND destination = ? AND user_id = ? AND stream_id < ?
-            """
-
-            txn.executemany(
-                delete_sql,
-                (
-                    (yesterday, row[0], row[1], row[2])
-                    for row in rows
-                )
-            )
-
-            logger.info("Pruned %d device list outbound pokes", txn.rowcount)
-
-        return self.runInteraction(
-            "_prune_old_outbound_device_pokes", _prune_txn
-        )
diff --git a/synapse/storage/schema/delta/42/device_list_last_id.sql b/synapse/storage/schema/delta/42/device_list_last_id.sql
new file mode 100644
index 0000000000..9ab8c14fa3
--- /dev/null
+++ b/synapse/storage/schema/delta/42/device_list_last_id.sql
@@ -0,0 +1,33 @@
+/* Copyright 2017 Vector Creations Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+-- Table of last stream_id that we sent to destination for user_id. This is
+-- used to fill out the `prev_id` fields of outbound device list updates.
+CREATE TABLE device_lists_outbound_last_success (
+    destination TEXT NOT NULL,
+    user_id TEXT NOT NULL,
+    stream_id BIGINT NOT NULL
+);
+
+INSERT INTO device_lists_outbound_last_success
+    SELECT destination, user_id, coalesce(max(stream_id), 0) as stream_id
+        FROM device_lists_outbound_pokes
+        WHERE sent = (1 = 1)  -- sqlite doesn't have inbuilt boolean values
+        GROUP BY destination, user_id;
+
+CREATE INDEX device_lists_outbound_last_success_idx ON device_lists_outbound_last_success(
+    destination, user_id, stream_id
+);