diff --git a/changelog.d/6295.misc b/changelog.d/6295.misc
new file mode 100644
index 0000000000..a3e6b8296e
--- /dev/null
+++ b/changelog.d/6295.misc
@@ -0,0 +1 @@
+Split out state storage into separate data store.
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 97f15a1c32..260a4351ca 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -127,7 +127,9 @@ class PaginationHandler(object):
self._purges_in_progress_by_room.add(room_id)
try:
with (yield self.pagination_lock.write(room_id)):
- yield self.store.purge_history(room_id, token, delete_local_events)
+ yield self.storage.purge_events.purge_history(
+ room_id, token, delete_local_events
+ )
logger.info("[purge] complete")
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
except Exception:
@@ -170,7 +172,7 @@ class PaginationHandler(object):
if joined:
raise SynapseError(400, "Users are still joined to this room")
- await self.store.purge_room(room_id)
+ await self.storage.purge_events.purge_room(room_id)
@defer.inlineCallbacks
def get_messages(
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 0a1a8cc1e5..0460fe8cc9 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -30,6 +30,7 @@ stored in `synapse.storage.schema`.
from synapse.storage.data_stores import DataStores
from synapse.storage.data_stores.main import DataStore
from synapse.storage.persist_events import EventsPersistenceStorage
+from synapse.storage.purge_events import PurgeEventsStorage
from synapse.storage.state import StateGroupStorage
__all__ = ["DataStores", "DataStore"]
@@ -46,6 +47,7 @@ class Storage(object):
self.main = stores.main
self.persistence = EventsPersistenceStorage(hs, stores)
+ self.purge_events = PurgeEventsStorage(hs, stores)
self.state = StateGroupStorage(hs, stores)
diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index 301f8ea128..878f7568a6 100644
--- a/synapse/storage/data_stores/main/events.py
+++ b/synapse/storage/data_stores/main/events.py
@@ -1375,6 +1375,10 @@ class EventsStore(
if True, we will delete local events as well as remote ones
(instead of just marking them as outliers and deleting their
state groups).
+
+ Returns:
+ Deferred[set[int]]: The set of state groups that are referenced by
+ deleted events.
"""
return self.runInteraction(
@@ -1511,11 +1515,10 @@ class EventsStore(
[(room_id, event_id) for event_id, in new_backwards_extrems],
)
- logger.info("[purge] finding redundant state groups")
+ logger.info("[purge] finding state groups referenced by deleted events")
# Get all state groups that are referenced by events that are to be
- # deleted. We then go and check if they are referenced by other events
- # or state groups, and if not we delete them.
+ # deleted.
txn.execute(
"""
SELECT DISTINCT state_group FROM events_to_purge
@@ -1528,60 +1531,6 @@ class EventsStore(
"[purge] found %i referenced state groups", len(referenced_state_groups)
)
- logger.info("[purge] finding state groups that can be deleted")
-
- _ = self._find_unreferenced_groups_during_purge(txn, referenced_state_groups)
- state_groups_to_delete, remaining_state_groups = _
-
- logger.info(
- "[purge] found %i state groups to delete", len(state_groups_to_delete)
- )
-
- logger.info(
- "[purge] de-delta-ing %i remaining state groups",
- len(remaining_state_groups),
- )
-
- # Now we turn the state groups that reference to-be-deleted state
- # groups to non delta versions.
- for sg in remaining_state_groups:
- logger.info("[purge] de-delta-ing remaining state group %s", sg)
- curr_state = self._get_state_groups_from_groups_txn(txn, [sg])
- curr_state = curr_state[sg]
-
- self._simple_delete_txn(
- txn, table="state_groups_state", keyvalues={"state_group": sg}
- )
-
- self._simple_delete_txn(
- txn, table="state_group_edges", keyvalues={"state_group": sg}
- )
-
- self._simple_insert_many_txn(
- txn,
- table="state_groups_state",
- values=[
- {
- "state_group": sg,
- "room_id": room_id,
- "type": key[0],
- "state_key": key[1],
- "event_id": state_id,
- }
- for key, state_id in iteritems(curr_state)
- ],
- )
-
- logger.info("[purge] removing redundant state groups")
- txn.executemany(
- "DELETE FROM state_groups_state WHERE state_group = ?",
- ((sg,) for sg in state_groups_to_delete),
- )
- txn.executemany(
- "DELETE FROM state_groups WHERE id = ?",
- ((sg,) for sg in state_groups_to_delete),
- )
-
logger.info("[purge] removing events from event_to_state_groups")
txn.execute(
"DELETE FROM event_to_state_groups "
@@ -1668,138 +1617,35 @@ class EventsStore(
logger.info("[purge] done")
- def _find_unreferenced_groups_during_purge(self, txn, state_groups):
- """Used when purging history to figure out which state groups can be
- deleted and which need to be de-delta'ed (due to one of its prev groups
- being scheduled for deletion).
-
- Args:
- txn
- state_groups (set[int]): Set of state groups referenced by events
- that are going to be deleted.
-
- Returns:
- tuple[set[int], set[int]]: The set of state groups that can be
- deleted and the set of state groups that need to be de-delta'ed
- """
- # Graph of state group -> previous group
- graph = {}
-
- # Set of events that we have found to be referenced by events
- referenced_groups = set()
-
- # Set of state groups we've already seen
- state_groups_seen = set(state_groups)
-
- # Set of state groups to handle next.
- next_to_search = set(state_groups)
- while next_to_search:
- # We bound size of groups we're looking up at once, to stop the
- # SQL query getting too big
- if len(next_to_search) < 100:
- current_search = next_to_search
- next_to_search = set()
- else:
- current_search = set(itertools.islice(next_to_search, 100))
- next_to_search -= current_search
-
- # Check if state groups are referenced
- sql = """
- SELECT DISTINCT state_group FROM event_to_state_groups
- LEFT JOIN events_to_purge AS ep USING (event_id)
- WHERE ep.event_id IS NULL AND
- """
- clause, args = make_in_list_sql_clause(
- txn.database_engine, "state_group", current_search
- )
- txn.execute(sql + clause, list(args))
-
- referenced = set(sg for sg, in txn)
- referenced_groups |= referenced
-
- # We don't continue iterating up the state group graphs for state
- # groups that are referenced.
- current_search -= referenced
-
- rows = self._simple_select_many_txn(
- txn,
- table="state_group_edges",
- column="prev_state_group",
- iterable=current_search,
- keyvalues={},
- retcols=("prev_state_group", "state_group"),
- )
-
- prevs = set(row["state_group"] for row in rows)
- # We don't bother re-handling groups we've already seen
- prevs -= state_groups_seen
- next_to_search |= prevs
- state_groups_seen |= prevs
-
- for row in rows:
- # Note: Each state group can have at most one prev group
- graph[row["state_group"]] = row["prev_state_group"]
-
- to_delete = state_groups_seen - referenced_groups
-
- to_dedelta = set()
- for sg in referenced_groups:
- prev_sg = graph.get(sg)
- if prev_sg and prev_sg in to_delete:
- to_dedelta.add(sg)
-
- return to_delete, to_dedelta
+ return referenced_state_groups
def purge_room(self, room_id):
"""Deletes all record of a room
Args:
- room_id (str):
+ room_id (str)
+
+ Returns:
+ Deferred[List[int]]: The list of state groups to delete.
"""
return self.runInteraction("purge_room", self._purge_room_txn, room_id)
def _purge_room_txn(self, txn, room_id):
- # first we have to delete the state groups states
- logger.info("[purge] removing %s from state_groups_state", room_id)
-
+ # First we fetch all the state groups that should be deleted, before
+ # we delete that information.
txn.execute(
"""
- DELETE FROM state_groups_state WHERE state_group IN (
- SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
- WHERE events.room_id=?
- )
+ SELECT DISTINCT state_group FROM events
+ INNER JOIN event_to_state_groups USING(event_id)
+ WHERE events.room_id = ?
""",
(room_id,),
)
- # ... and the state group edges
- logger.info("[purge] removing %s from state_group_edges", room_id)
-
- txn.execute(
- """
- DELETE FROM state_group_edges WHERE state_group IN (
- SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
- WHERE events.room_id=?
- )
- """,
- (room_id,),
- )
-
- # ... and the state groups
- logger.info("[purge] removing %s from state_groups", room_id)
-
- txn.execute(
- """
- DELETE FROM state_groups WHERE id IN (
- SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
- WHERE events.room_id=?
- )
- """,
- (room_id,),
- )
+ state_groups = [row[0] for row in txn]
- # and then tables which lack an index on room_id but have one on event_id
+ # Now we delete tables which lack an index on room_id but have one on event_id
for table in (
"event_auth",
"event_edges",
@@ -1887,6 +1733,165 @@ class EventsStore(
logger.info("[purge] done")
+ return state_groups
+
+ def purge_unreferenced_state_groups(
+ self, room_id: str, state_groups_to_delete
+ ) -> defer.Deferred:
+ """Deletes no longer referenced state groups and de-deltas any state
+ groups that reference them.
+
+ Args:
+ room_id: The room the state groups belong to (must all be in the
+ same room).
+ state_groups_to_delete (Collection[int]): Set of all state groups
+ to delete.
+ """
+
+ return self.runInteraction(
+ "purge_unreferenced_state_groups",
+ self._purge_unreferenced_state_groups,
+ room_id,
+ state_groups_to_delete,
+ )
+
+ def _purge_unreferenced_state_groups(self, txn, room_id, state_groups_to_delete):
+ logger.info(
+ "[purge] found %i state groups to delete", len(state_groups_to_delete)
+ )
+
+ rows = self._simple_select_many_txn(
+ txn,
+ table="state_group_edges",
+ column="prev_state_group",
+ iterable=state_groups_to_delete,
+ keyvalues={},
+ retcols=("state_group",),
+ )
+
+ remaining_state_groups = set(
+ row["state_group"]
+ for row in rows
+ if row["state_group"] not in state_groups_to_delete
+ )
+
+ logger.info(
+ "[purge] de-delta-ing %i remaining state groups",
+ len(remaining_state_groups),
+ )
+
+ # Now we turn the state groups that reference to-be-deleted state
+ # groups to non delta versions.
+ for sg in remaining_state_groups:
+ logger.info("[purge] de-delta-ing remaining state group %s", sg)
+ curr_state = self._get_state_groups_from_groups_txn(txn, [sg])
+ curr_state = curr_state[sg]
+
+ self._simple_delete_txn(
+ txn, table="state_groups_state", keyvalues={"state_group": sg}
+ )
+
+ self._simple_delete_txn(
+ txn, table="state_group_edges", keyvalues={"state_group": sg}
+ )
+
+ self._simple_insert_many_txn(
+ txn,
+ table="state_groups_state",
+ values=[
+ {
+ "state_group": sg,
+ "room_id": room_id,
+ "type": key[0],
+ "state_key": key[1],
+ "event_id": state_id,
+ }
+ for key, state_id in iteritems(curr_state)
+ ],
+ )
+
+ logger.info("[purge] removing redundant state groups")
+ txn.executemany(
+ "DELETE FROM state_groups_state WHERE state_group = ?",
+ ((sg,) for sg in state_groups_to_delete),
+ )
+ txn.executemany(
+ "DELETE FROM state_groups WHERE id = ?",
+ ((sg,) for sg in state_groups_to_delete),
+ )
+
+ @defer.inlineCallbacks
+ def get_previous_state_groups(self, state_groups):
+ """Fetch the previous groups of the given state groups.
+
+ Args:
+ state_groups (Iterable[int])
+
+ Returns:
+ Deferred[dict[int, int]]: mapping from state group to previous
+ state group.
+ """
+
+ rows = yield self._simple_select_many_batch(
+ table="state_group_edges",
+ column="prev_state_group",
+ iterable=state_groups,
+ keyvalues={},
+ retcols=("prev_state_group", "state_group"),
+ desc="get_previous_state_groups",
+ )
+
+ return {row["state_group"]: row["prev_state_group"] for row in rows}
+
+ def purge_room_state(self, room_id, state_groups_to_delete):
+ """Deletes all record of a room from state tables
+
+ Args:
+ room_id (str):
+ state_groups_to_delete (list[int]): State groups to delete
+ """
+
+ return self.runInteraction(
+ "purge_room_state",
+ self._purge_room_state_txn,
+ room_id,
+ state_groups_to_delete,
+ )
+
+ def _purge_room_state_txn(self, txn, room_id, state_groups_to_delete):
+ # first we have to delete the state groups states
+ logger.info("[purge] removing %s from state_groups_state", room_id)
+
+ self._simple_delete_many_txn(
+ txn,
+ table="state_groups_state",
+ column="state_group",
+ iterable=state_groups_to_delete,
+ keyvalues={},
+ )
+
+ # ... and the state group edges
+ logger.info("[purge] removing %s from state_group_edges", room_id)
+
+ self._simple_delete_many_txn(
+ txn,
+ table="state_group_edges",
+ column="state_group",
+ iterable=state_groups_to_delete,
+ keyvalues={},
+ )
+
+ # ... and the state groups
+ logger.info("[purge] removing %s from state_groups", room_id)
+
+ self._simple_delete_many_txn(
+ txn,
+ table="state_groups",
+ column="id",
+ iterable=state_groups_to_delete,
+ keyvalues={},
+ )
+
async def is_event_after(self, event_id1, event_id2):
"""Returns True if event_id1 is after event_id2 in the stream
"""
diff --git a/synapse/storage/data_stores/main/state.py b/synapse/storage/data_stores/main/state.py
index 5c293e0fa3..6a90daea31 100644
--- a/synapse/storage/data_stores/main/state.py
+++ b/synapse/storage/data_stores/main/state.py
@@ -995,6 +995,29 @@ class StateGroupWorkerStore(
return self.runInteraction("store_state_group", _store_state_group_txn)
+ @defer.inlineCallbacks
+ def get_referenced_state_groups(self, state_groups):
+ """Check if the state groups are referenced by events.
+
+ Args:
+ state_groups (Iterable[int])
+
+ Returns:
+ Deferred[set[int]]: The subset of state groups that are
+ referenced.
+ """
+
+ rows = yield self._simple_select_many_batch(
+ table="event_to_state_groups",
+ column="state_group",
+ iterable=state_groups,
+ keyvalues={},
+ retcols=("DISTINCT state_group",),
+ desc="get_referenced_state_groups",
+ )
+
+ return set(row["state_group"] for row in rows)
+
class StateBackgroundUpdateStore(
StateGroupBackgroundUpdateStore, BackgroundUpdateStore
diff --git a/synapse/storage/purge_events.py b/synapse/storage/purge_events.py
new file mode 100644
index 0000000000..a368182034
--- /dev/null
+++ b/synapse/storage/purge_events.py
@@ -0,0 +1,117 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import itertools
+import logging
+
+from twisted.internet import defer
+
+logger = logging.getLogger(__name__)
+
+
+class PurgeEventsStorage(object):
+ """High level interface for purging rooms and event history.
+ """
+
+ def __init__(self, hs, stores):
+ self.stores = stores
+
+ @defer.inlineCallbacks
+ def purge_room(self, room_id: str):
+ """Deletes all record of a room
+ """
+
+ state_groups_to_delete = yield self.stores.main.purge_room(room_id)
+ yield self.stores.main.purge_room_state(room_id, state_groups_to_delete)
+
+ @defer.inlineCallbacks
+ def purge_history(self, room_id, token, delete_local_events):
+ """Deletes room history before a certain point
+
+ Args:
+ room_id (str):
+
+ token (str): A topological token to delete events before
+
+ delete_local_events (bool):
+ if True, we will delete local events as well as remote ones
+ (instead of just marking them as outliers and deleting their
+ state groups).
+ """
+ state_groups = yield self.stores.main.purge_history(
+ room_id, token, delete_local_events
+ )
+
+ logger.info("[purge] finding state groups that can be deleted")
+
+ sg_to_delete = yield self._find_unreferenced_groups(state_groups)
+
+ yield self.stores.main.purge_unreferenced_state_groups(room_id, sg_to_delete)
+
+ @defer.inlineCallbacks
+ def _find_unreferenced_groups(self, state_groups):
+ """Used when purging history to figure out which state groups can be
+ deleted.
+
+ Args:
+ state_groups (set[int]): Set of state groups referenced by events
+ that are going to be deleted.
+
+ Returns:
+ Deferred[set[int]] The set of state groups that can be deleted.
+ """
+ # Graph of state group -> previous group
+ graph = {}
+
+ # Set of events that we have found to be referenced by events
+ referenced_groups = set()
+
+ # Set of state groups we've already seen
+ state_groups_seen = set(state_groups)
+
+ # Set of state groups to handle next.
+ next_to_search = set(state_groups)
+ while next_to_search:
+ # We bound size of groups we're looking up at once, to stop the
+ # SQL query getting too big
+ if len(next_to_search) < 100:
+ current_search = next_to_search
+ next_to_search = set()
+ else:
+ current_search = set(itertools.islice(next_to_search, 100))
+ next_to_search -= current_search
+
+ referenced = yield self.stores.main.get_referenced_state_groups(
+ current_search
+ )
+ referenced_groups |= referenced
+
+ # We don't continue iterating up the state group graphs for state
+ # groups that are referenced.
+ current_search -= referenced
+
+ edges = yield self.stores.main.get_previous_state_groups(current_search)
+
+ prevs = set(edges.values())
+ # We don't bother re-handling groups we've already seen
+ prevs -= state_groups_seen
+ next_to_search |= prevs
+ state_groups_seen |= prevs
+
+ graph.update(edges)
+
+ to_delete = state_groups_seen - referenced_groups
+
+ return to_delete
diff --git a/tests/rest/admin/test_admin.py b/tests/rest/admin/test_admin.py
index 8e1ca8b738..d9f1b95cb0 100644
--- a/tests/rest/admin/test_admin.py
+++ b/tests/rest/admin/test_admin.py
@@ -628,10 +628,12 @@ class PurgeRoomTestCase(unittest.HomeserverTestCase):
"local_invites",
"room_account_data",
"room_tags",
+ "state_groups",
+ "state_groups_state",
):
count = self.get_success(
self.store._simple_select_one_onecol(
- table="events",
+ table=table,
keyvalues={"room_id": room_id},
retcol="COUNT(*)",
desc="test_purge_room",
diff --git a/tests/storage/test_purge.py b/tests/storage/test_purge.py
index f671599cb8..b9fafaa1a6 100644
--- a/tests/storage/test_purge.py
+++ b/tests/storage/test_purge.py
@@ -40,23 +40,24 @@ class PurgeTests(HomeserverTestCase):
third = self.helper.send(self.room_id, body="test3")
last = self.helper.send(self.room_id, body="test4")
- storage = self.hs.get_datastore()
+ store = self.hs.get_datastore()
+ storage = self.hs.get_storage()
# Get the topological token
- event = storage.get_topological_token_for_event(last["event_id"])
+ event = store.get_topological_token_for_event(last["event_id"])
self.pump()
event = self.successResultOf(event)
# Purge everything before this topological token
- purge = storage.purge_history(self.room_id, event, True)
+ purge = storage.purge_events.purge_history(self.room_id, event, True)
self.pump()
self.assertEqual(self.successResultOf(purge), None)
# Try and get the events
- get_first = storage.get_event(first["event_id"])
- get_second = storage.get_event(second["event_id"])
- get_third = storage.get_event(third["event_id"])
- get_last = storage.get_event(last["event_id"])
+ get_first = store.get_event(first["event_id"])
+ get_second = store.get_event(second["event_id"])
+ get_third = store.get_event(third["event_id"])
+ get_last = store.get_event(last["event_id"])
self.pump()
# 1-3 should fail and last will succeed, meaning that 1-3 are deleted
|