summary refs log tree commit diff
path: root/synapse/storage/state.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-10-12 20:43:18 +0100
committerErik Johnston <erik@matrix.org>2018-10-19 15:48:15 +0100
commit47a9da28caa6a4f27d2df31f043971a2c9c7b555 (patch)
tree7cd23b92a8598ad93a85388ffe4c6a43e267a342 /synapse/storage/state.py
parentFix up comments (diff)
downloadsynapse-47a9da28caa6a4f27d2df31f043971a2c9c7b555.tar.xz
Batch process handling state groups
Diffstat (limited to 'synapse/storage/state.py')
-rw-r--r--synapse/storage/state.py112
1 files changed, 71 insertions, 41 deletions
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index f7cf5c86c9..0f86311ed4 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -1041,55 +1041,85 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
 
             return count
 
-    def _is_state_group_referenced(self, txn, state_group):
-        """Checks if a given state group is referenced, or is safe to delete.
+    def _find_unreferenced_groups(self, txn, state_groups):
+        """Used when purging history to figure out which state groups can be
+        deleted and which need to be de-delta'ed (due to one of its prev groups
+        being scheduled for deletion).
 
-        A state group is referenced if it or any of its descendants are
-        pointed at by an event. (A descendant is a state_group whose chain of
-        prev_groups includes the given state_group.)
-        """
-
-        # We check this by doing a depth first search to look for any
-        # descendant referenced by `event_to_state_groups`.
-
-        # State groups we need to check, contains state groups that are
-        # descendants of `state_group`
-        state_groups_to_search = [state_group]
-
-        # Set of state groups we've already checked
-        state_groups_searched = set()
-
-        while state_groups_to_search:
-            state_group = state_groups_to_search.pop()  # Next state group to check
+        Args:
+            txn
+            state_groups (set[int]): Set of state groups referenced by events
+                that are going to be deleted.
 
-            is_referenced = self._simple_select_one_onecol_txn(
+        Returns:
+            tuple[set[int], set[int]]: The set of state groups that can be
+            deleted and the set of state groups that need to be de-delta'ed
+        """
+        # Graph of state group -> previous group
+        graph = {}
+
+        # Set of events that we have found to be referenced by events
+        referenced_groups = set()
+
+        # Set of state groups we've already seen
+        state_groups_seen = set(state_groups)
+
+        # Set of state groups to handle next.
+        next_to_search = set(state_groups)
+        while next_to_search:
+            # We bound size of groups we're looking up at once, to stop the
+            # SQL query getting too big
+            if len(next_to_search) < 100:
+                current_search = next_to_search
+                next_to_search = set()
+            else:
+                lst = list(next_to_search)
+                current_search = set(lst[:100])
+                next_to_search = set(lst[100:])
+
+            # Check if state groups are referenced
+            sql = """
+                SELECT state_group, count(*) FROM event_to_state_groups
+                LEFT JOIN events_to_purge AS ep USING (event_id)
+                WHERE state_group IN (%s) AND ep.event_id IS NULL
+                GROUP BY state_group
+            """ % (",".join("?" for _ in current_search),)
+            txn.execute(sql, list(current_search))
+
+            referenced = set(sg for sg, cnt in txn if cnt > 0)
+            referenced_groups |= referenced
+
+            # We don't continue iterating up the state group graphs for state
+            # groups that are referenced.
+            current_search -= referenced
+
+            rows = self._simple_select_many_txn(
                 txn,
-                table="event_to_state_groups",
-                keyvalues={"state_group": state_group},
-                retcol="event_id",
-                allow_none=True,
+                table="state_group_edges",
+                column="prev_state_group",
+                iterable=current_search,
+                keyvalues={},
+                retcols=("prev_state_group", "state_group",),
             )
-            if is_referenced:
-                # A descendant is referenced by event_to_state_groups, so
-                # original state group is referenced.
-                return True
 
-            state_groups_searched.add(state_group)
+            next_to_search.update(row["state_group"] for row in rows)
+            # We don't bother re-handling groups we've already seen
+            next_to_search -= state_groups_seen
+            state_groups_seen |= next_to_search
 
-            # Find all children of current state group and add to search
-            references = self._simple_select_onecol_txn(
-                txn,
-                table="state_group_edges",
-                keyvalues={"prev_state_group": state_group},
-                retcol="state_group",
-            )
-            state_groups_to_search.extend(references)
+            for row in rows:
+                # Note: Each state group can have at most one prev group
+                graph[row["state_group"]] = row["prev_state_group"]
+
+        to_delete = state_groups_seen - referenced_groups
 
-            # Lets be paranoid and check for cycles
-            if state_groups_searched.intersection(references):
-                raise Exception("State group %s has cyclic dependency", state_group)
+        to_dedelta = set()
+        for sg in referenced_groups:
+            prev_sg = graph.get(sg)
+            if prev_sg and prev_sg in to_delete:
+                to_dedelta.add(sg)
 
-        return False
+        return to_delete, to_dedelta
 
 
 class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):