diff options
author | Olivier Wilkinson (reivilibre) <oliverw@matrix.org> | 2021-12-20 13:21:30 +0000 |
---|---|---|
committer | Olivier Wilkinson (reivilibre) <oliverw@matrix.org> | 2021-12-20 13:21:30 +0000 |
commit | 9858a2ca6852a3421186825d3f371429c8dc7f1b (patch) | |
tree | e9812b6311685f942a2940730895954d0cbc224b | |
parent | Newsfile (diff) | |
download | synapse-9858a2ca6852a3421186825d3f371429c8dc7f1b.tar.xz |
Add some logging and validation to help determine whether this all works or not :)
-rw-r--r-- | synapse/storage/databases/state/store.py | 106 |
1 files changed, 106 insertions, 0 deletions
diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py index 602f3e8233..a5d13a361d 100644 --- a/synapse/storage/databases/state/store.py +++ b/synapse/storage/databases/state/store.py @@ -13,6 +13,7 @@ # limitations under the License. import logging +from collections import Counter from typing import ( TYPE_CHECKING, Collection, @@ -52,6 +53,11 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +validation_logger = logging.getLogger(f"{__name__}.validation") +validation_counter: Counter[bool] = Counter() +effectiveness_counter: Counter[str] = Counter() +log_ticker = 0 + MAX_STATE_DELTA_HOPS = 100 MAX_INFLIGHT_REQUESTS_PER_GROUP = 5 @@ -146,6 +152,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): self._state_group_inflight_requests: Dict[ int, SortedDict[StateFilter, ObservableDeferred[StateMap[str]]] ] = {} + validation_logger.info("(Validation Logging present)") def get_max_state_group_txn(txn: Cursor) -> int: txn.execute("SELECT COALESCE(max(id), 0) FROM state_groups") @@ -402,6 +409,13 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): state_filter_left_over, ) = self._get_state_for_group_gather_inflight_requests(group, state_filter) + if state_filter_left_over == StateFilter.none(): + effectiveness_counter["fully"] += 1 + elif state_filter_left_over == state_filter: + effectiveness_counter["useless"] += 1 + else: + effectiveness_counter["partially"] += 1 + if state_filter_left_over != StateFilter.none(): # Fetch remaining state remaining = await self._get_state_for_group_fire_request( @@ -438,6 +452,14 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): Returns: Dict of state group to state map. """ + global log_ticker, effectiveness_counter, validation_counter + old_result_for_verification = await self._OLD_get_state_for_groups( + groups, state_filter + ) + groups = list(groups) + arg_groups = groups.copy() + arg_state_filter = state_filter + state_filter = state_filter or StateFilter.all() member_filter, non_member_filter = state_filter.get_member_split() @@ -474,6 +496,90 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): for group, group_result in zip(incomplete_groups, results_from_requests): state[group] = group_result + the_same = state == old_result_for_verification + validation_counter[the_same] += 1 + if not the_same: + validation_logger.critical( + "NOT THE SAME: for groups %r SF %r", arg_groups, arg_state_filter + ) + + log_ticker += 1 + if log_ticker % 1000 == 0: + validation_logger.info( + "%d. Correct: %r. Effective: %r", + log_ticker, + validation_counter, + effectiveness_counter, + ) + + return old_result_for_verification + + async def _OLD_get_state_for_groups( + self, groups: Iterable[int], state_filter: Optional[StateFilter] = None + ) -> Dict[int, MutableStateMap[str]]: + """Gets the state at each of a list of state groups, optionally + filtering by type/state_key + + Args: + groups: list of state groups for which we want + to get the state. + state_filter: The state filter used to fetch state + from the database. + Returns: + Dict of state group to state map. + """ + state_filter = state_filter or StateFilter.all() + + member_filter, non_member_filter = state_filter.get_member_split() + + # Now we look them up in the member and non-member caches + ( + non_member_state, + incomplete_groups_nm, + ) = self._get_state_for_groups_using_cache( + groups, self._state_group_cache, state_filter=non_member_filter + ) + + (member_state, incomplete_groups_m,) = self._get_state_for_groups_using_cache( + groups, self._state_group_members_cache, state_filter=member_filter + ) + + state = dict(non_member_state) + for group in groups: + state[group].update(member_state[group]) + + # Now fetch any missing groups from the database + + incomplete_groups = incomplete_groups_m | incomplete_groups_nm + + if not incomplete_groups: + return state + + cache_sequence_nm = self._state_group_cache.sequence + cache_sequence_m = self._state_group_members_cache.sequence + + # Help the cache hit ratio by expanding the filter a bit + db_state_filter = state_filter.return_expanded() + + group_to_state_dict = await self._get_state_groups_from_groups( + list(incomplete_groups), state_filter=db_state_filter + ) + + # Now lets update the caches + self._insert_into_cache( + group_to_state_dict, + db_state_filter, + cache_seq_num_members=cache_sequence_m, + cache_seq_num_non_members=cache_sequence_nm, + ) + + # And finally update the result dict, by filtering out any extra + # stuff we pulled out of the database. + for group, group_state_dict in group_to_state_dict.items(): + # We just replace any existing entries, as we will have loaded + # everything we need from the database anyway. + state[group] = state_filter.filter_state(group_state_dict) + return state def _get_state_for_groups_using_cache( |