summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-09-05 09:34:24 +0100
committerErik Johnston <erik@matrix.org>2016-09-05 10:05:36 +0100
commita99e9335502df3389ff6f16ef52c43ce391b6955 (patch)
tree0a28d8144db46a4676aeb14efda2d51db89e205f /synapse/storage
parentMerge branch 'develop' of github.com:matrix-org/synapse into erikj/state_storage (diff)
downloadsynapse-a99e9335502df3389ff6f16ef52c43ce391b6955.tar.xz
Add upgrade script that will slowly prune state_groups_state entries
Diffstat (limited to '')
-rw-r--r--synapse/storage/schema/delta/35/state_dedupe.sql17
-rw-r--r--synapse/storage/state.py278
2 files changed, 220 insertions, 75 deletions
diff --git a/synapse/storage/schema/delta/35/state_dedupe.sql b/synapse/storage/schema/delta/35/state_dedupe.sql
new file mode 100644
index 0000000000..97e5067ef4
--- /dev/null
+++ b/synapse/storage/schema/delta/35/state_dedupe.sql
@@ -0,0 +1,17 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT into background_updates (update_name, progress_json)
+    VALUES ('state_group_state_deduplication', '{}');
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 7f45c0cd99..968b68f462 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -47,6 +47,15 @@ class StateStore(SQLBaseStore):
       * `state_groups_state`: Maps state group to state events.
     """
 
+    STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication"
+
+    def __init__(self, hs):
+        super(StateStore, self).__init__(hs)
+        self.register_background_update_handler(
+            self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME,
+            self._background_deduplicate_state,
+        )
+
     @defer.inlineCallbacks
     def get_state_groups_ids(self, room_id, event_ids):
         if not event_ids:
@@ -288,92 +297,92 @@ class StateStore(SQLBaseStore):
     def _get_state_groups_from_groups(self, groups, types):
         """Returns dictionary state_group -> (dict of (type, state_key) -> event id)
         """
-        def f(txn, groups):
-            if types is not None:
-                where_clause = "AND (%s)" % (
-                    " OR ".join(["(type = ? AND state_key = ?)"] * len(types)),
-                )
-            else:
-                where_clause = ""
-
-            results = {group: {} for group in groups}
-            if isinstance(self.database_engine, PostgresEngine):
-                sql = ("""
-                    WITH RECURSIVE state(state_group) AS (
-                        VALUES(?::bigint)
-                        UNION ALL
-                        SELECT prev_state_group FROM state_group_edges e, state s
-                        WHERE s.state_group = e.state_group
-                    )
-                    SELECT type, state_key, event_id FROM state_groups_state
-                    WHERE ROW(type, state_key, state_group) IN (
-                        SELECT type, state_key, max(state_group) FROM state
-                        INNER JOIN state_groups_state USING (state_group)
-                        GROUP BY type, state_key
-                    )
-                    %s;
-                """) % (where_clause,)
-
-                for group in groups:
-                    args = [group]
-                    if types is not None:
-                        args.extend([i for typ in types for i in typ])
-
-                    txn.execute(sql, args)
-                    rows = self.cursor_to_dict(txn)
-                    for row in rows:
-                        key = (row["type"], row["state_key"])
-                        results[group][key] = row["event_id"]
-            else:
-                for group in groups:
-                    group_tree = [group]
-                    next_group = group
-
-                    while next_group:
-                        next_group = self._simple_select_one_onecol_txn(
-                            txn,
-                            table="state_group_edges",
-                            keyvalues={"state_group": next_group},
-                            retcol="prev_state_group",
-                            allow_none=True,
-                        )
-                        if next_group:
-                            group_tree.append(next_group)
-
-                    sql = ("""
-                        SELECT type, state_key, event_id FROM state_groups_state
-                        INNER JOIN (
-                            SELECT type, state_key, max(state_group) as state_group
-                            FROM state_groups_state
-                            WHERE state_group IN (%s) %s
-                            GROUP BY type, state_key
-                        ) USING (type, state_key, state_group);
-                    """) % (",".join("?" for _ in group_tree), where_clause,)
-
-                    args = list(group_tree)
-                    if types is not None:
-                        args.extend([i for typ in types for i in typ])
-
-                    txn.execute(sql, args)
-                    rows = self.cursor_to_dict(txn)
-                    for row in rows:
-                        key = (row["type"], row["state_key"])
-                        results[group][key] = row["event_id"]
-
-            return results
-
         results = {}
 
         chunks = [groups[i:i + 100] for i in xrange(0, len(groups), 100)]
         for chunk in chunks:
             res = yield self.runInteraction(
                 "_get_state_groups_from_groups",
-                f, chunk
+                self._get_state_groups_from_groups_txn, chunk, types,
             )
             results.update(res)
 
         defer.returnValue(results)
 
+    def _get_state_groups_from_groups_txn(self, txn, groups, types=None):
+        if types is not None:
+            where_clause = "AND (%s)" % (
+                " OR ".join(["(type = ? AND state_key = ?)"] * len(types)),
+            )
+        else:
+            where_clause = ""
+
+        results = {group: {} for group in groups}
+        if isinstance(self.database_engine, PostgresEngine):
+            sql = ("""
+                WITH RECURSIVE state(state_group) AS (
+                    VALUES(?::bigint)
+                    UNION ALL
+                    SELECT prev_state_group FROM state_group_edges e, state s
+                    WHERE s.state_group = e.state_group
+                )
+                SELECT type, state_key, event_id FROM state_groups_state
+                WHERE ROW(type, state_key, state_group) IN (
+                    SELECT type, state_key, max(state_group) FROM state
+                    INNER JOIN state_groups_state USING (state_group)
+                    GROUP BY type, state_key
+                )
+                %s;
+            """) % (where_clause,)
+
+            for group in groups:
+                args = [group]
+                if types is not None:
+                    args.extend([i for typ in types for i in typ])
+
+                txn.execute(sql, args)
+                rows = self.cursor_to_dict(txn)
+                for row in rows:
+                    key = (row["type"], row["state_key"])
+                    results[group][key] = row["event_id"]
+        else:
+            for group in groups:
+                group_tree = [group]
+                next_group = group
+
+                while next_group:
+                    next_group = self._simple_select_one_onecol_txn(
+                        txn,
+                        table="state_group_edges",
+                        keyvalues={"state_group": next_group},
+                        retcol="prev_state_group",
+                        allow_none=True,
+                    )
+                    if next_group:
+                        group_tree.append(next_group)
+
+                sql = ("""
+                    SELECT type, state_key, event_id FROM state_groups_state
+                    INNER JOIN (
+                        SELECT type, state_key, max(state_group) as state_group
+                        FROM state_groups_state
+                        WHERE state_group IN (%s) %s
+                        GROUP BY type, state_key
+                    ) USING (type, state_key, state_group);
+                """) % (",".join("?" for _ in group_tree), where_clause,)
+
+                args = list(group_tree)
+                if types is not None:
+                    args.extend([i for typ in types for i in typ])
+
+                txn.execute(sql, args)
+                rows = self.cursor_to_dict(txn)
+                for row in rows:
+                    key = (row["type"], row["state_key"])
+                    results[group][key] = row["event_id"]
+
+        return results
+
     @defer.inlineCallbacks
     def get_state_for_events(self, event_ids, types):
         """Given a list of event_ids and type tuples, return a list of state
@@ -632,3 +641,122 @@ class StateStore(SQLBaseStore):
 
     def get_next_state_group(self):
         return self._state_groups_id_gen.get_next()
+
+    @defer.inlineCallbacks
+    def _background_deduplicate_state(self, progress, batch_size):
+        last_state_group = progress.get("last_state_group", 0)
+        rows_inserted = progress.get("rows_inserted", 0)
+        max_group = progress.get("max_group", None)
+
+        if max_group is None:
+            rows = yield self._execute(
+                "_background_deduplicate_state", None,
+                "SELECT coalesce(max(id), 0) FROM state_groups",
+            )
+            max_group = rows[0][0]
+
+        def reindex_txn(txn):
+            new_last_state_group = last_state_group
+            for count in xrange(batch_size):
+                txn.execute(
+                    "SELECT id, room_id FROM state_groups"
+                    " WHERE ? < id AND id <= ?"
+                    " ORDER BY id ASC"
+                    " LIMIT 1",
+                    (new_last_state_group, max_group,)
+                )
+                row = txn.fetchone()
+                if row:
+                    state_group, room_id = row
+
+                if not row or not state_group:
+                    return True, count
+
+                txn.execute(
+                    "SELECT coalesce(max(id), 0) FROM state_groups"
+                    " WHERE id < ? AND room_id = ?",
+                    (state_group, room_id,)
+                )
+                prev_group, = txn.fetchone()
+                new_last_state_group = state_group
+
+                if prev_group:
+                    potential_hops = self._count_state_group_hops_txn(
+                        txn, prev_group
+                    )
+                    if potential_hops >= MAX_STATE_DELTA_HOPS:
+                        # We want to ensure chains are at most this long,#
+                        # otherwise read performance degrades.
+                        continue
+
+                    prev_state = self._get_state_groups_from_groups_txn(
+                        txn, [prev_group], types=None
+                    )
+                    prev_state = prev_state.values()[0]
+
+                    curr_state = self._get_state_groups_from_groups_txn(
+                        txn, [state_group], types=None
+                    )
+                    curr_state = curr_state.values()[0]
+
+                    if not set(prev_state.keys()) - set(curr_state.keys()):
+                        # We can only do a delta if the current has a strict super set
+                        # of keys
+
+                        delta_state = {
+                            key: value for key, value in curr_state.items()
+                            if prev_state.get(key, None) != value
+                        }
+
+                        self._simple_insert_txn(
+                            txn,
+                            table="state_group_edges",
+                            values={
+                                "state_group": state_group,
+                                "prev_state_group": prev_group,
+                            }
+                        )
+
+                        self._simple_delete_txn(
+                            txn,
+                            table="state_groups_state",
+                            keyvalues={
+                                "state_group": state_group,
+                            }
+                        )
+
+                        self._simple_insert_many_txn(
+                            txn,
+                            table="state_groups_state",
+                            values=[
+                                {
+                                    "state_group": state_group,
+                                    "room_id": room_id,
+                                    "type": key[0],
+                                    "state_key": key[1],
+                                    "event_id": state_id,
+                                }
+                                for key, state_id in delta_state.items()
+                            ],
+                        )
+
+            progress = {
+                "last_state_group": state_group,
+                "rows_inserted": rows_inserted + batch_size,
+                "max_group": max_group,
+            }
+
+            self._background_update_progress_txn(
+                txn, self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME, progress
+            )
+
+            return False, batch_size
+
+        finished, result = yield self.runInteraction(
+            self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME, reindex_txn
+        )
+
+        if finished:
+            yield self._end_background_update(self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME)
+
+        defer.returnValue(result)