summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/events/snapshot.py21
-rw-r--r--synapse/replication/slave/storage/events.py3
-rw-r--r--synapse/state.py33
-rw-r--r--synapse/storage/events.py49
-rw-r--r--synapse/storage/prepare_database.py2
-rw-r--r--synapse/storage/schema/delta/35/state.sql21
-rw-r--r--synapse/storage/schema/delta/35/state_dedupe.sql17
-rw-r--r--synapse/storage/state.py362
8 files changed, 415 insertions, 93 deletions
diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py
index e895b1c450..11605b34a3 100644
--- a/synapse/events/snapshot.py
+++ b/synapse/events/snapshot.py
@@ -15,9 +15,30 @@
 
 
 class EventContext(object):
+    __slots__ = [
+        "current_state_ids",
+        "prev_state_ids",
+        "state_group",
+        "rejected",
+        "push_actions",
+        "prev_group",
+        "delta_ids",
+        "prev_state_events",
+    ]
+
     def __init__(self):
+        # The current state including the current event
         self.current_state_ids = None
+        # The current state excluding the current event
         self.prev_state_ids = None
         self.state_group = None
+
         self.rejected = False
         self.push_actions = []
+
+        # A previously persisted state group and a delta between that
+        # and this state.
+        self.prev_group = None
+        self.delta_ids = None
+
+        self.prev_state_events = None
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index cbebd5b2f7..15c52774a2 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -86,6 +86,9 @@ class SlavedEventStore(BaseSlavedStore):
     _get_state_groups_from_groups = (
         StateStore.__dict__["_get_state_groups_from_groups"]
     )
+    _get_state_groups_from_groups_txn = (
+        DataStore._get_state_groups_from_groups_txn.__func__
+    )
     _get_state_group_from_group = (
         StateStore.__dict__["_get_state_group_from_group"]
     )
diff --git a/synapse/state.py b/synapse/state.py
index cd792afed1..4520fa0415 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -55,12 +55,15 @@ def _gen_state_id():
 
 
 class _StateCacheEntry(object):
-    __slots__ = ["state", "state_group", "state_id"]
+    __slots__ = ["state", "state_group", "state_id", "prev_group", "delta_ids"]
 
-    def __init__(self, state, state_group):
+    def __init__(self, state, state_group, prev_group=None, delta_ids=None):
         self.state = state
         self.state_group = state_group
 
+        self.prev_group = prev_group
+        self.delta_ids = delta_ids
+
         # The `state_id` is a unique ID we generate that can be used as ID for
         # this collection of state. Usually this would be the same as the
         # state group, but on worker instances we can't generate a new state
@@ -245,11 +248,20 @@ class StateHandler(object):
             if key in context.prev_state_ids:
                 replaces = context.prev_state_ids[key]
                 event.unsigned["replaces_state"] = replaces
+
             context.current_state_ids = dict(context.prev_state_ids)
             context.current_state_ids[key] = event.event_id
+
+            context.prev_group = entry.prev_group
+            context.delta_ids = entry.delta_ids
+            if context.delta_ids is not None:
+                context.delta_ids[key] = event.event_id
         else:
             context.current_state_ids = context.prev_state_ids
 
+            context.prev_group = entry.prev_group
+            context.delta_ids = entry.delta_ids
+
         context.prev_state_events = []
         defer.returnValue(context)
 
@@ -283,6 +295,8 @@ class StateHandler(object):
             defer.returnValue(_StateCacheEntry(
                 state=state_list,
                 state_group=name,
+                prev_group=name,
+                delta_ids={},
             ))
 
         with (yield self.resolve_linearizer.queue(group_names)):
@@ -340,9 +354,24 @@ class StateHandler(object):
                 if hasattr(self.store, "get_next_state_group"):
                     state_group = self.store.get_next_state_group()
 
+            prev_group = None
+            delta_ids = None
+            for old_group, old_ids in state_groups_ids.items():
+                if not set(new_state.iterkeys()) - set(old_ids.iterkeys()):
+                    n_delta_ids = {
+                        k: v
+                        for k, v in new_state.items()
+                        if old_ids.get(k) != v
+                    }
+                    if not delta_ids or len(n_delta_ids) < len(delta_ids):
+                        prev_group = old_group
+                        delta_ids = n_delta_ids
+
             cache = _StateCacheEntry(
                 state=new_state,
                 state_group=state_group,
+                prev_group=prev_group,
+                delta_ids=delta_ids,
             )
 
             if self._state_cache is not None:
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 1a7d4c5199..7e9b351513 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -497,7 +497,11 @@ class EventsStore(SQLBaseStore):
 
                 # insert into the state_group, state_groups_state and
                 # event_to_state_groups tables.
-                self._store_mult_state_groups_txn(txn, ((event, context),))
+                try:
+                    self._store_mult_state_groups_txn(txn, ((event, context),))
+                except Exception:
+                    logger.exception("")
+                    raise
 
                 metadata_json = encode_json(
                     event.internal_metadata.get_dict()
@@ -1543,6 +1547,9 @@ class EventsStore(SQLBaseStore):
         )
         event_rows = txn.fetchall()
 
+        for event_id, state_key in event_rows:
+            txn.call_after(self._get_state_group_for_event.invalidate, (event_id,))
+
         # We calculate the new entries for the backward extremeties by finding
         # all events that point to events that are to be purged
         txn.execute(
@@ -1571,26 +1578,26 @@ class EventsStore(SQLBaseStore):
 
         # Get all state groups that are only referenced by events that are
         # to be deleted.
-        txn.execute(
-            "SELECT state_group FROM event_to_state_groups"
-            " INNER JOIN events USING (event_id)"
-            " WHERE state_group IN ("
-            "   SELECT DISTINCT state_group FROM events"
-            "   INNER JOIN event_to_state_groups USING (event_id)"
-            "   WHERE room_id = ? AND topological_ordering < ?"
-            " )"
-            " GROUP BY state_group HAVING MAX(topological_ordering) < ?",
-            (room_id, topological_ordering, topological_ordering)
-        )
-        state_rows = txn.fetchall()
-        txn.executemany(
-            "DELETE FROM state_groups_state WHERE state_group = ?",
-            state_rows
-        )
-        txn.executemany(
-            "DELETE FROM state_groups WHERE id = ?",
-            state_rows
-        )
+        # txn.execute(
+        #     "SELECT state_group FROM event_to_state_groups"
+        #     " INNER JOIN events USING (event_id)"
+        #     " WHERE state_group IN ("
+        #     "   SELECT DISTINCT state_group FROM events"
+        #     "   INNER JOIN event_to_state_groups USING (event_id)"
+        #     "   WHERE room_id = ? AND topological_ordering < ?"
+        #     " )"
+        #     " GROUP BY state_group HAVING MAX(topological_ordering) < ?",
+        #     (room_id, topological_ordering, topological_ordering)
+        # )
+        # state_rows = txn.fetchall()
+        # txn.executemany(
+        #     "DELETE FROM state_groups_state WHERE state_group = ?",
+        #     state_rows
+        # )
+        # txn.executemany(
+        #     "DELETE FROM state_groups WHERE id = ?",
+        #     state_rows
+        # )
         # Delete all non-state
         txn.executemany(
             "DELETE FROM event_to_state_groups WHERE event_id = ?",
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index b94ce7bea1..b1fbc4ffa5 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
 
 # Remember to update this number every time a change is made to database
 # schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 34
+SCHEMA_VERSION = 35
 
 dir_path = os.path.abspath(os.path.dirname(__file__))
 
diff --git a/synapse/storage/schema/delta/35/state.sql b/synapse/storage/schema/delta/35/state.sql
new file mode 100644
index 0000000000..c4c244c169
--- /dev/null
+++ b/synapse/storage/schema/delta/35/state.sql
@@ -0,0 +1,21 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE state_group_edges(
+    state_group BIGINT NOT NULL,
+    prev_state_group BIGINT NOT NULL
+);
+
+CREATE INDEX state_group_edges_idx ON state_group_edges(state_group);
diff --git a/synapse/storage/schema/delta/35/state_dedupe.sql b/synapse/storage/schema/delta/35/state_dedupe.sql
new file mode 100644
index 0000000000..97e5067ef4
--- /dev/null
+++ b/synapse/storage/schema/delta/35/state_dedupe.sql
@@ -0,0 +1,17 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT into background_updates (update_name, progress_json)
+    VALUES ('state_group_state_deduplication', '{}');
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index ec551b0b4f..ee8b763008 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -16,6 +16,7 @@
 from ._base import SQLBaseStore
 from synapse.util.caches.descriptors import cached, cachedList
 from synapse.util.caches import intern_string
+from synapse.storage.engines import PostgresEngine
 
 from twisted.internet import defer
 
@@ -24,6 +25,9 @@ import logging
 logger = logging.getLogger(__name__)
 
 
+MAX_STATE_DELTA_HOPS = 100
+
+
 class StateStore(SQLBaseStore):
     """ Keeps track of the state at a given event.
 
@@ -43,6 +47,15 @@ class StateStore(SQLBaseStore):
       * `state_groups_state`: Maps state group to state events.
     """
 
+    STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication"
+
+    def __init__(self, hs):
+        super(StateStore, self).__init__(hs)
+        self.register_background_update_handler(
+            self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME,
+            self._background_deduplicate_state,
+        )
+
     @defer.inlineCallbacks
     def get_state_groups_ids(self, room_id, event_ids):
         if not event_ids:
@@ -103,11 +116,8 @@ class StateStore(SQLBaseStore):
             state_groups[event.event_id] = context.state_group
 
             if self._have_persisted_state_group_txn(txn, context.state_group):
-                logger.info("Already persisted state_group: %r", context.state_group)
                 continue
 
-            state_event_ids = dict(context.current_state_ids)
-
             self._simple_insert_txn(
                 txn,
                 table="state_groups",
@@ -118,20 +128,51 @@ class StateStore(SQLBaseStore):
                 },
             )
 
-            self._simple_insert_many_txn(
-                txn,
-                table="state_groups_state",
-                values=[
-                    {
+            # We persist as a delta if we can, while also ensuring the chain
+            # of deltas isn't tooo long, as otherwise read performance degrades.
+            if context.prev_group:
+                potential_hops = self._count_state_group_hops_txn(
+                    txn, context.prev_group
+                )
+            if context.prev_group and potential_hops < MAX_STATE_DELTA_HOPS:
+                self._simple_insert_txn(
+                    txn,
+                    table="state_group_edges",
+                    values={
                         "state_group": context.state_group,
-                        "room_id": event.room_id,
-                        "type": key[0],
-                        "state_key": key[1],
-                        "event_id": state_id,
-                    }
-                    for key, state_id in state_event_ids.items()
-                ],
-            )
+                        "prev_state_group": context.prev_group,
+                    },
+                )
+
+                self._simple_insert_many_txn(
+                    txn,
+                    table="state_groups_state",
+                    values=[
+                        {
+                            "state_group": context.state_group,
+                            "room_id": event.room_id,
+                            "type": key[0],
+                            "state_key": key[1],
+                            "event_id": state_id,
+                        }
+                        for key, state_id in context.delta_ids.items()
+                    ],
+                )
+            else:
+                self._simple_insert_many_txn(
+                    txn,
+                    table="state_groups_state",
+                    values=[
+                        {
+                            "state_group": context.state_group,
+                            "room_id": event.room_id,
+                            "type": key[0],
+                            "state_key": key[1],
+                            "event_id": state_id,
+                        }
+                        for key, state_id in context.current_state_ids.items()
+                    ],
+                )
 
         self._simple_insert_many_txn(
             txn,
@@ -145,6 +186,45 @@ class StateStore(SQLBaseStore):
             ],
         )
 
+    def _count_state_group_hops_txn(self, txn, state_group):
+        """Given a state group, count how many hops there are in the tree.
+
+        This is used to ensure the delta chains don't get too long.
+        """
+        if isinstance(self.database_engine, PostgresEngine):
+            sql = ("""
+                WITH RECURSIVE state(state_group) AS (
+                    VALUES(?::bigint)
+                    UNION ALL
+                    SELECT prev_state_group FROM state_group_edges e, state s
+                    WHERE s.state_group = e.state_group
+                )
+                SELECT count(*) FROM state;
+            """)
+
+            txn.execute(sql, (state_group,))
+            row = txn.fetchone()
+            if row and row[0]:
+                return row[0]
+            else:
+                return 0
+        else:
+            next_group = state_group
+            count = 0
+
+            while next_group:
+                next_group = self._simple_select_one_onecol_txn(
+                    txn,
+                    table="state_group_edges",
+                    keyvalues={"state_group": next_group},
+                    retcol="prev_state_group",
+                    allow_none=True,
+                )
+                if next_group:
+                    count += 1
+
+            return count
+
     @defer.inlineCallbacks
     def get_current_state(self, room_id, event_type=None, state_key=""):
         if event_type and state_key is not None:
@@ -206,48 +286,97 @@ class StateStore(SQLBaseStore):
     def _get_state_groups_from_groups(self, groups, types):
         """Returns dictionary state_group -> (dict of (type, state_key) -> event id)
         """
-        def f(txn, groups):
-            if types is not None:
-                where_clause = "AND (%s)" % (
-                    " OR ".join(["(type = ? AND state_key = ?)"] * len(types)),
-                )
-            else:
-                where_clause = ""
-
-            sql = (
-                "SELECT state_group, event_id, type, state_key"
-                " FROM state_groups_state WHERE"
-                " state_group IN (%s) %s" % (
-                    ",".join("?" for _ in groups),
-                    where_clause,
-                )
-            )
-
-            args = list(groups)
-            if types is not None:
-                args.extend([i for typ in types for i in typ])
-
-            txn.execute(sql, args)
-            rows = self.cursor_to_dict(txn)
-
-            results = {group: {} for group in groups}
-            for row in rows:
-                key = (row["type"], row["state_key"])
-                results[row["state_group"]][key] = row["event_id"]
-            return results
-
         results = {}
 
         chunks = [groups[i:i + 100] for i in xrange(0, len(groups), 100)]
         for chunk in chunks:
             res = yield self.runInteraction(
                 "_get_state_groups_from_groups",
-                f, chunk
+                self._get_state_groups_from_groups_txn, chunk, types,
             )
             results.update(res)
 
         defer.returnValue(results)
 
+    def _get_state_groups_from_groups_txn(self, txn, groups, types=None):
+        if types is not None:
+            where_clause = "AND (%s)" % (
+                " OR ".join(["(type = ? AND state_key = ?)"] * len(types)),
+            )
+        else:
+            where_clause = ""
+
+        results = {group: {} for group in groups}
+        if isinstance(self.database_engine, PostgresEngine):
+            # The below query walks the state_group tree so that the "state"
+            # table includes all state_groups in the tree. It then joins
+            # against `state_groups_state` to fetch the latest state.
+            # It assumes that previous state groups are always numerically
+            # lesser.
+            sql = ("""
+                WITH RECURSIVE state(state_group) AS (
+                    VALUES(?::bigint)
+                    UNION ALL
+                    SELECT prev_state_group FROM state_group_edges e, state s
+                    WHERE s.state_group = e.state_group
+                )
+                SELECT type, state_key, event_id FROM state_groups_state
+                WHERE ROW(type, state_key, state_group) IN (
+                    SELECT type, state_key, max(state_group) FROM state
+                    INNER JOIN state_groups_state USING (state_group)
+                    GROUP BY type, state_key
+                )
+                %s;
+            """) % (where_clause,)
+
+            for group in groups:
+                args = [group]
+                if types is not None:
+                    args.extend([i for typ in types for i in typ])
+
+                txn.execute(sql, args)
+                rows = self.cursor_to_dict(txn)
+                for row in rows:
+                    key = (row["type"], row["state_key"])
+                    results[group][key] = row["event_id"]
+        else:
+            for group in groups:
+                group_tree = [group]
+                next_group = group
+
+                while next_group:
+                    next_group = self._simple_select_one_onecol_txn(
+                        txn,
+                        table="state_group_edges",
+                        keyvalues={"state_group": next_group},
+                        retcol="prev_state_group",
+                        allow_none=True,
+                    )
+                    if next_group:
+                        group_tree.append(next_group)
+
+                sql = ("""
+                    SELECT type, state_key, event_id FROM state_groups_state
+                    INNER JOIN (
+                        SELECT type, state_key, max(state_group) as state_group
+                        FROM state_groups_state
+                        WHERE state_group IN (%s) %s
+                        GROUP BY type, state_key
+                    ) USING (type, state_key, state_group);
+                """) % (",".join("?" for _ in group_tree), where_clause,)
+
+                args = list(group_tree)
+                if types is not None:
+                    args.extend([i for typ in types for i in typ])
+
+                txn.execute(sql, args)
+                rows = self.cursor_to_dict(txn)
+                for row in rows:
+                    key = (row["type"], row["state_key"])
+                    results[group][key] = row["event_id"]
+
+        return results
+
     @defer.inlineCallbacks
     def get_state_for_events(self, event_ids, types):
         """Given a list of event_ids and type tuples, return a list of state
@@ -504,32 +633,127 @@ class StateStore(SQLBaseStore):
 
         defer.returnValue(results)
 
-    def get_all_new_state_groups(self, last_id, current_id, limit):
-        def get_all_new_state_groups_txn(txn):
-            sql = (
-                "SELECT id, room_id, event_id FROM state_groups"
-                " WHERE ? < id AND id <= ? ORDER BY id LIMIT ?"
+    def get_next_state_group(self):
+        return self._state_groups_id_gen.get_next()
+
+    @defer.inlineCallbacks
+    def _background_deduplicate_state(self, progress, batch_size):
+        """This background update will slowly deduplicate state by reencoding
+        them as deltas.
+        """
+        last_state_group = progress.get("last_state_group", 0)
+        rows_inserted = progress.get("rows_inserted", 0)
+        max_group = progress.get("max_group", None)
+
+        if max_group is None:
+            rows = yield self._execute(
+                "_background_deduplicate_state", None,
+                "SELECT coalesce(max(id), 0) FROM state_groups",
             )
-            txn.execute(sql, (last_id, current_id, limit))
-            groups = txn.fetchall()
+            max_group = rows[0][0]
+
+        def reindex_txn(txn):
+            new_last_state_group = last_state_group
+            for count in xrange(batch_size):
+                txn.execute(
+                    "SELECT id, room_id FROM state_groups"
+                    " WHERE ? < id AND id <= ?"
+                    " ORDER BY id ASC"
+                    " LIMIT 1",
+                    (new_last_state_group, max_group,)
+                )
+                row = txn.fetchone()
+                if row:
+                    state_group, room_id = row
 
-            if not groups:
-                return ([], [])
+                if not row or not state_group:
+                    return True, count
 
-            lower_bound = groups[0][0]
-            upper_bound = groups[-1][0]
-            sql = (
-                "SELECT state_group, type, state_key, event_id"
-                " FROM state_groups_state"
-                " WHERE ? <= state_group AND state_group <= ?"
+                txn.execute(
+                    "SELECT coalesce(max(id), 0) FROM state_groups"
+                    " WHERE id < ? AND room_id = ?",
+                    (state_group, room_id,)
+                )
+                prev_group, = txn.fetchone()
+                new_last_state_group = state_group
+
+                if prev_group:
+                    potential_hops = self._count_state_group_hops_txn(
+                        txn, prev_group
+                    )
+                    if potential_hops >= MAX_STATE_DELTA_HOPS:
+                        # We want to ensure chains are at most this long,#
+                        # otherwise read performance degrades.
+                        continue
+
+                    prev_state = self._get_state_groups_from_groups_txn(
+                        txn, [prev_group], types=None
+                    )
+                    prev_state = prev_state.values()[0]
+
+                    curr_state = self._get_state_groups_from_groups_txn(
+                        txn, [state_group], types=None
+                    )
+                    curr_state = curr_state.values()[0]
+
+                    if not set(prev_state.keys()) - set(curr_state.keys()):
+                        # We can only do a delta if the current has a strict super set
+                        # of keys
+
+                        delta_state = {
+                            key: value for key, value in curr_state.items()
+                            if prev_state.get(key, None) != value
+                        }
+
+                        self._simple_insert_txn(
+                            txn,
+                            table="state_group_edges",
+                            values={
+                                "state_group": state_group,
+                                "prev_state_group": prev_group,
+                            }
+                        )
+
+                        self._simple_delete_txn(
+                            txn,
+                            table="state_groups_state",
+                            keyvalues={
+                                "state_group": state_group,
+                            }
+                        )
+
+                        self._simple_insert_many_txn(
+                            txn,
+                            table="state_groups_state",
+                            values=[
+                                {
+                                    "state_group": state_group,
+                                    "room_id": room_id,
+                                    "type": key[0],
+                                    "state_key": key[1],
+                                    "event_id": state_id,
+                                }
+                                for key, state_id in delta_state.items()
+                            ],
+                        )
+
+            progress = {
+                "last_state_group": state_group,
+                "rows_inserted": rows_inserted + batch_size,
+                "max_group": max_group,
+            }
+
+            self._background_update_progress_txn(
+                txn, self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME, progress
             )
 
-            txn.execute(sql, (lower_bound, upper_bound))
-            state_group_state = txn.fetchall()
-            return (groups, state_group_state)
-        return self.runInteraction(
-            "get_all_new_state_groups", get_all_new_state_groups_txn
+            return False, batch_size
+
+        finished, result = yield self.runInteraction(
+            self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME, reindex_txn
         )
 
-    def get_next_state_group(self):
-        return self._state_groups_id_gen.get_next()
+        if finished:
+            yield self._end_background_update(self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME)
+
+        defer.returnValue(result)