diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py
index d0e015bf19..f5131fe291 100644
--- a/synapse/storage/controllers/persist_events.py
+++ b/synapse/storage/controllers/persist_events.py
@@ -332,6 +332,7 @@ class EventsPersistenceStorageController:
# store for now.
self.main_store = stores.main
self.state_store = stores.state
+ self._state_deletion_store = stores.state_deletion
assert stores.persist_events
self.persist_events_store = stores.persist_events
@@ -416,7 +417,7 @@ class EventsPersistenceStorageController:
set_tag(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled))
async def enqueue(
- item: Tuple[str, List[Tuple[EventBase, EventContext]]]
+ item: Tuple[str, List[Tuple[EventBase, EventContext]]],
) -> Dict[str, str]:
room_id, evs_ctxs = item
return await self._event_persist_queue.add_to_queue(
@@ -502,8 +503,15 @@ class EventsPersistenceStorageController:
"""
state = await self._calculate_current_state(room_id)
delta = await self._calculate_state_delta(room_id, state)
+ sliding_sync_table_changes = (
+ await self.persist_events_store._calculate_sliding_sync_table_changes(
+ room_id, [], delta
+ )
+ )
- await self.persist_events_store.update_current_state(room_id, delta)
+ await self.persist_events_store.update_current_state(
+ room_id, delta, sliding_sync_table_changes
+ )
async def _calculate_current_state(self, room_id: str) -> StateMap[str]:
"""Calculate the current state of a room, based on the forward extremities
@@ -542,7 +550,9 @@ class EventsPersistenceStorageController:
room_version,
state_maps_by_state_group,
event_map=None,
- state_res_store=StateResolutionStore(self.main_store),
+ state_res_store=StateResolutionStore(
+ self.main_store, self._state_deletion_store
+ ),
)
return await res.get_state(self._state_controller, StateFilter.all())
@@ -628,15 +638,20 @@ class EventsPersistenceStorageController:
room_id, [e for e, _ in chunk]
)
- await self.persist_events_store._persist_events_and_state_updates(
- room_id,
- chunk,
- state_delta_for_room=state_delta_for_room,
- new_forward_extremities=new_forward_extremities,
- use_negative_stream_ordering=backfilled,
- inhibit_local_membership_updates=backfilled,
- new_event_links=new_event_links,
- )
+ # Stop the state groups from being deleted while we're persisting
+ # them.
+ async with self._state_deletion_store.persisting_state_group_references(
+ events_and_contexts
+ ):
+ await self.persist_events_store._persist_events_and_state_updates(
+ room_id,
+ chunk,
+ state_delta_for_room=state_delta_for_room,
+ new_forward_extremities=new_forward_extremities,
+ use_negative_stream_ordering=backfilled,
+ inhibit_local_membership_updates=backfilled,
+ new_event_links=new_event_links,
+ )
return replaced_events
@@ -785,9 +800,9 @@ class EventsPersistenceStorageController:
)
# Remove any events which are prev_events of any existing events.
- existing_prevs: Collection[str] = (
- await self.persist_events_store._get_events_which_are_prevs(result)
- )
+ existing_prevs: Collection[
+ str
+ ] = await self.persist_events_store._get_events_which_are_prevs(result)
result.difference_update(existing_prevs)
# Finally handle the case where the new events have soft-failed prev
@@ -855,8 +870,7 @@ class EventsPersistenceStorageController:
# This should only happen for outlier events.
if not ev.internal_metadata.is_outlier():
raise Exception(
- "Context for new event %s has no state "
- "group" % (ev.event_id,)
+ "Context for new event %s has no state group" % (ev.event_id,)
)
continue
if ctx.state_group_deltas:
@@ -958,7 +972,9 @@ class EventsPersistenceStorageController:
room_version,
state_groups,
events_map,
- state_res_store=StateResolutionStore(self.main_store),
+ state_res_store=StateResolutionStore(
+ self.main_store, self._state_deletion_store
+ ),
)
state_resolutions_during_persistence.inc()
diff --git a/synapse/storage/controllers/purge_events.py b/synapse/storage/controllers/purge_events.py
index e794b370c2..c2d4bf8290 100644
--- a/synapse/storage/controllers/purge_events.py
+++ b/synapse/storage/controllers/purge_events.py
@@ -21,10 +21,19 @@
import itertools
import logging
-from typing import TYPE_CHECKING, Set
+from typing import (
+ TYPE_CHECKING,
+ Collection,
+ Mapping,
+ Optional,
+ Set,
+)
from synapse.logging.context import nested_logging_context
+from synapse.metrics.background_process_metrics import wrap_as_background_process
+from synapse.storage.database import LoggingTransaction
from synapse.storage.databases import Databases
+from synapse.types.storage import _BackgroundUpdates
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -38,12 +47,22 @@ class PurgeEventsStorageController:
def __init__(self, hs: "HomeServer", stores: Databases):
self.stores = stores
+ if hs.config.worker.run_background_tasks:
+ self._delete_state_loop_call = hs.get_clock().looping_call(
+ self._delete_state_groups_loop, 60 * 1000
+ )
+
+ self.stores.state.db_pool.updates.register_background_update_handler(
+ _BackgroundUpdates.MARK_UNREFERENCED_STATE_GROUPS_FOR_DELETION_BG_UPDATE,
+ self._background_delete_unrefereneced_state_groups,
+ )
+
async def purge_room(self, room_id: str) -> None:
"""Deletes all record of a room"""
with nested_logging_context(room_id):
- state_groups_to_delete = await self.stores.main.purge_room(room_id)
- await self.stores.state.purge_room_state(room_id, state_groups_to_delete)
+ await self.stores.main.purge_room(room_id)
+ await self.stores.state.purge_room_state(room_id)
async def purge_history(
self, room_id: str, token: str, delete_local_events: bool
@@ -68,11 +87,16 @@ class PurgeEventsStorageController:
logger.info("[purge] finding state groups that can be deleted")
sg_to_delete = await self._find_unreferenced_groups(state_groups)
- await self.stores.state.purge_unreferenced_state_groups(
- room_id, sg_to_delete
+ # Mark these state groups as pending deletion, they will actually
+ # get deleted automatically later.
+ await self.stores.state_deletion.mark_state_groups_as_pending_deletion(
+ sg_to_delete
)
- async def _find_unreferenced_groups(self, state_groups: Set[int]) -> Set[int]:
+ async def _find_unreferenced_groups(
+ self,
+ state_groups: Collection[int],
+ ) -> Set[int]:
"""Used when purging history to figure out which state groups can be
deleted.
@@ -118,6 +142,307 @@ class PurgeEventsStorageController:
next_to_search |= prevs
state_groups_seen |= prevs
+ # We also check to see if anything referencing the state groups are
+ # also unreferenced. This helps ensure that we delete unreferenced
+ # state groups, if we don't then we will de-delta them when we
+ # delete the other state groups leading to increased DB usage.
+ next_edges = await self.stores.state.get_next_state_groups(current_search)
+ nexts = set(next_edges.keys())
+ nexts -= state_groups_seen
+ next_to_search |= nexts
+ state_groups_seen |= nexts
+
to_delete = state_groups_seen - referenced_groups
return to_delete
+
+ @wrap_as_background_process("_delete_state_groups_loop")
+ async def _delete_state_groups_loop(self) -> None:
+ """Background task that deletes any state groups that may be pending
+ deletion."""
+
+ while True:
+ next_to_delete = await self.stores.state_deletion.get_next_state_group_collection_to_delete()
+ if next_to_delete is None:
+ break
+
+ (room_id, groups_to_sequences) = next_to_delete
+ made_progress = await self._delete_state_groups(
+ room_id, groups_to_sequences
+ )
+
+ # If no progress was made in deleting the state groups, then we
+ # break to allow a pause before trying again next time we get
+ # called.
+ if not made_progress:
+ break
+
+ async def _delete_state_groups(
+ self, room_id: str, groups_to_sequences: Mapping[int, int]
+ ) -> bool:
+ """Tries to delete the given state groups.
+
+ Returns:
+ Whether we made progress in deleting the state groups (or marking
+ them as referenced).
+ """
+
+ # We double check if any of the state groups have become referenced.
+ # This shouldn't happen, as any usages should cause the state group to
+ # be removed as pending deletion.
+ referenced_state_groups = await self.stores.main.get_referenced_state_groups(
+ groups_to_sequences
+ )
+
+ if referenced_state_groups:
+ # We mark any state groups that have become referenced as being
+ # used.
+ await self.stores.state_deletion.mark_state_groups_as_used(
+ referenced_state_groups
+ )
+
+ # Update list of state groups to remove referenced ones
+ groups_to_sequences = {
+ state_group: sequence_number
+ for state_group, sequence_number in groups_to_sequences.items()
+ if state_group not in referenced_state_groups
+ }
+
+ if not groups_to_sequences:
+ # We made progress here as long as we marked some state groups as
+ # now referenced.
+ return len(referenced_state_groups) > 0
+
+ return await self.stores.state.purge_unreferenced_state_groups(
+ room_id,
+ groups_to_sequences,
+ )
+
+ async def _background_delete_unrefereneced_state_groups(
+ self, progress: dict, batch_size: int
+ ) -> int:
+ """This background update will slowly delete any unreferenced state groups"""
+
+ last_checked_state_group = progress.get("last_checked_state_group")
+
+ if last_checked_state_group is None:
+ # This is the first run.
+ last_checked_state_group = (
+ await self.stores.state.db_pool.simple_select_one_onecol(
+ table="state_groups",
+ keyvalues={},
+ retcol="MAX(id)",
+ allow_none=True,
+ desc="get_max_state_group",
+ )
+ )
+ if last_checked_state_group is None:
+ # There are no state groups so the background process is finished.
+ await self.stores.state.db_pool.updates._end_background_update(
+ _BackgroundUpdates.MARK_UNREFERENCED_STATE_GROUPS_FOR_DELETION_BG_UPDATE
+ )
+ return batch_size
+ last_checked_state_group += 1
+
+ (
+ last_checked_state_group,
+ final_batch,
+ ) = await self._delete_unreferenced_state_groups_batch(
+ last_checked_state_group,
+ batch_size,
+ )
+
+ if not final_batch:
+ # There are more state groups to check.
+ progress = {
+ "last_checked_state_group": last_checked_state_group,
+ }
+ await self.stores.state.db_pool.updates._background_update_progress(
+ _BackgroundUpdates.MARK_UNREFERENCED_STATE_GROUPS_FOR_DELETION_BG_UPDATE,
+ progress,
+ )
+ else:
+ # This background process is finished.
+ await self.stores.state.db_pool.updates._end_background_update(
+ _BackgroundUpdates.MARK_UNREFERENCED_STATE_GROUPS_FOR_DELETION_BG_UPDATE
+ )
+
+ return batch_size
+
+ async def _delete_unreferenced_state_groups_batch(
+ self,
+ last_checked_state_group: int,
+ batch_size: int,
+ ) -> tuple[int, bool]:
+ """Looks for unreferenced state groups starting from the last state group
+ checked and marks them for deletion.
+
+ Args:
+ last_checked_state_group: The last state group that was checked.
+ batch_size: How many state groups to process in this iteration.
+
+ Returns:
+ (last_checked_state_group, final_batch)
+ """
+
+ # Find all state groups that can be deleted if any of the original set are deleted.
+ (
+ to_delete,
+ last_checked_state_group,
+ final_batch,
+ ) = await self._find_unreferenced_groups_for_background_deletion(
+ last_checked_state_group, batch_size
+ )
+
+ if len(to_delete) == 0:
+ return last_checked_state_group, final_batch
+
+ await self.stores.state_deletion.mark_state_groups_as_pending_deletion(
+ to_delete
+ )
+
+ return last_checked_state_group, final_batch
+
+ async def _find_unreferenced_groups_for_background_deletion(
+ self,
+ last_checked_state_group: int,
+ batch_size: int,
+ ) -> tuple[Set[int], int, bool]:
+ """Used when deleting unreferenced state groups in the background to figure out
+ which state groups can be deleted.
+ To avoid increased DB usage due to de-deltaing state groups, this returns only
+ state groups which are free standing (ie. no shared edges with referenced groups) or
+ state groups which do not share edges which result in a future referenced group.
+
+ The following scenarios outline the possibilities based on state group data in
+ the DB.
+
+ ie. Free standing -> state groups 1-N would be returned:
+ SG_1
+ |
+ ...
+ |
+ SG_N
+
+ ie. Previous reference -> state groups 2-N would be returned:
+ SG_1 <- referenced by event
+ |
+ SG_2
+ |
+ ...
+ |
+ SG_N
+
+ ie. Future reference -> none of the following state groups would be returned:
+ SG_1
+ |
+ SG_2
+ |
+ ...
+ |
+ SG_N <- referenced by event
+
+ Args:
+ last_checked_state_group: The last state group that was checked.
+ batch_size: How many state groups to process in this iteration.
+
+ Returns:
+ (to_delete, last_checked_state_group, final_batch)
+ """
+
+ # If a state group's next edge is not pending deletion then we don't delete the state group.
+ # If there is no next edge or the next edges are all marked for deletion, then delete
+ # the state group.
+ # This holds since we walk backwards from the latest state groups, ensuring that
+ # we've already checked newer state groups for event references along the way.
+ def get_next_state_groups_marked_for_deletion_txn(
+ txn: LoggingTransaction,
+ ) -> tuple[dict[int, bool], dict[int, int]]:
+ state_group_sql = """
+ SELECT s.id, e.state_group, d.state_group
+ FROM (
+ SELECT id FROM state_groups
+ WHERE id < ? ORDER BY id DESC LIMIT ?
+ ) as s
+ LEFT JOIN state_group_edges AS e ON (s.id = e.prev_state_group)
+ LEFT JOIN state_groups_pending_deletion AS d ON (e.state_group = d.state_group)
+ """
+ txn.execute(state_group_sql, (last_checked_state_group, batch_size))
+
+ # Mapping from state group to whether we should delete it.
+ state_groups_to_deletion: dict[int, bool] = {}
+
+ # Mapping from state group to prev state group.
+ state_groups_to_prev: dict[int, int] = {}
+
+ for row in txn:
+ state_group = row[0]
+ next_edge = row[1]
+ pending_deletion = row[2]
+
+ if next_edge is not None:
+ state_groups_to_prev[next_edge] = state_group
+
+ if next_edge is not None and not pending_deletion:
+ # We have found an edge not marked for deletion.
+ # Check previous results to see if this group is part of a chain
+ # within this batch that qualifies for deletion.
+ # ie. batch contains:
+ # SG_1 -> SG_2 -> SG_3
+ # If SG_3 is a candidate for deletion, then SG_2 & SG_1 should also
+ # be, even though they have edges which may not be marked for
+ # deletion.
+ # This relies on SQL results being sorted in DESC order to work.
+ next_is_deletion_candidate = state_groups_to_deletion.get(next_edge)
+ if (
+ next_is_deletion_candidate is None
+ or not next_is_deletion_candidate
+ ):
+ state_groups_to_deletion[state_group] = False
+ else:
+ state_groups_to_deletion.setdefault(state_group, True)
+ else:
+ # This state group may be a candidate for deletion
+ state_groups_to_deletion.setdefault(state_group, True)
+
+ return state_groups_to_deletion, state_groups_to_prev
+
+ (
+ state_groups_to_deletion,
+ state_group_edges,
+ ) = await self.stores.state.db_pool.runInteraction(
+ "get_next_state_groups_marked_for_deletion",
+ get_next_state_groups_marked_for_deletion_txn,
+ )
+ deletion_candidates = {
+ state_group
+ for state_group, deletion in state_groups_to_deletion.items()
+ if deletion
+ }
+
+ final_batch = False
+ state_groups = state_groups_to_deletion.keys()
+ if len(state_groups) < batch_size:
+ final_batch = True
+ else:
+ last_checked_state_group = min(state_groups)
+
+ if len(state_groups) == 0:
+ return set(), last_checked_state_group, final_batch
+
+ # Determine if any of the remaining state groups are directly referenced.
+ referenced = await self.stores.main.get_referenced_state_groups(
+ deletion_candidates
+ )
+
+ # Remove state groups from deletion_candidates which are directly referenced or share a
+ # future edge with a referenced state group within this batch.
+ def filter_reference_chains(group: Optional[int]) -> None:
+ while group is not None:
+ deletion_candidates.discard(group)
+ group = state_group_edges.get(group)
+
+ for referenced_group in referenced:
+ filter_reference_chains(referenced_group)
+
+ return deletion_candidates, last_checked_state_group, final_batch
diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py
index b50eb8868e..f28f5d7e03 100644
--- a/synapse/storage/controllers/state.py
+++ b/synapse/storage/controllers/state.py
@@ -234,8 +234,11 @@ class StateStorageController:
RuntimeError if we don't have a state group for one or more of the events
(ie they are outliers or unknown)
"""
+ if state_filter is None:
+ state_filter = StateFilter.all()
+
await_full_state = True
- if state_filter and not state_filter.must_await_full_state(self._is_mine_id):
+ if not state_filter.must_await_full_state(self._is_mine_id):
await_full_state = False
event_to_groups = await self.get_state_group_for_events(
@@ -244,7 +247,7 @@ class StateStorageController:
groups = set(event_to_groups.values())
group_to_state = await self.stores.state._get_state_for_groups(
- groups, state_filter or StateFilter.all()
+ groups, state_filter
)
state_event_map = await self.stores.main.get_events(
@@ -292,10 +295,11 @@ class StateStorageController:
RuntimeError if we don't have a state group for one or more of the events
(ie they are outliers or unknown)
"""
- if (
- await_full_state
- and state_filter
- and not state_filter.must_await_full_state(self._is_mine_id)
+ if state_filter is None:
+ state_filter = StateFilter.all()
+
+ if await_full_state and not state_filter.must_await_full_state(
+ self._is_mine_id
):
# Full state is not required if the state filter is restrictive enough.
await_full_state = False
@@ -306,7 +310,7 @@ class StateStorageController:
groups = set(event_to_groups.values())
group_to_state = await self.stores.state._get_state_for_groups(
- groups, state_filter or StateFilter.all()
+ groups, state_filter
)
event_to_state = {
@@ -335,9 +339,10 @@ class StateStorageController:
RuntimeError if we don't have a state group for the event (ie it is an
outlier or is unknown)
"""
- state_map = await self.get_state_for_events(
- [event_id], state_filter or StateFilter.all()
- )
+ if state_filter is None:
+ state_filter = StateFilter.all()
+
+ state_map = await self.get_state_for_events([event_id], state_filter)
return state_map[event_id]
@trace
@@ -365,9 +370,12 @@ class StateStorageController:
RuntimeError if we don't have a state group for the event (ie it is an
outlier or is unknown)
"""
+ if state_filter is None:
+ state_filter = StateFilter.all()
+
state_map = await self.get_state_ids_for_events(
[event_id],
- state_filter or StateFilter.all(),
+ state_filter,
await_full_state=await_full_state,
)
return state_map[event_id]
@@ -388,9 +396,12 @@ class StateStorageController:
at the event and `state_filter` is not satisfied by partial state.
Defaults to `True`.
"""
+ if state_filter is None:
+ state_filter = StateFilter.all()
+
state_ids = await self.get_state_ids_for_event(
event_id,
- state_filter=state_filter or StateFilter.all(),
+ state_filter=state_filter,
await_full_state=await_full_state,
)
@@ -426,6 +437,9 @@ class StateStorageController:
at the last event in the room before `stream_position` and
`state_filter` is not satisfied by partial state. Defaults to `True`.
"""
+ if state_filter is None:
+ state_filter = StateFilter.all()
+
# FIXME: This gets the state at the latest event before the stream ordering,
# which might not be the same as the "current state" of the room at the time
# of the stream token if there were multiple forward extremities at the time.
@@ -442,7 +456,7 @@ class StateStorageController:
if last_event_id:
state = await self.get_state_after_event(
last_event_id,
- state_filter=state_filter or StateFilter.all(),
+ state_filter=state_filter,
await_full_state=await_full_state,
)
@@ -500,9 +514,10 @@ class StateStorageController:
Returns:
Dict of state group to state map.
"""
- return await self.stores.state._get_state_for_groups(
- groups, state_filter or StateFilter.all()
- )
+ if state_filter is None:
+ state_filter = StateFilter.all()
+
+ return await self.stores.state._get_state_for_groups(groups, state_filter)
@trace
@tag_args
@@ -583,12 +598,13 @@ class StateStorageController:
Returns:
The current state of the room.
"""
- if await_full_state and (
- not state_filter or state_filter.must_await_full_state(self._is_mine_id)
- ):
+ if state_filter is None:
+ state_filter = StateFilter.all()
+
+ if await_full_state and state_filter.must_await_full_state(self._is_mine_id):
await self._partial_state_room_tracker.await_full_state(room_id)
- if state_filter and not state_filter.is_full():
+ if state_filter is not None and not state_filter.is_full():
return await self.stores.main.get_partial_filtered_current_state_ids(
room_id, state_filter
)
|