summary refs log tree commit diff
diff options
context:
space:
mode:
authorOlivier Wilkinson (reivilibre) <oliverw@matrix.org>2021-12-20 13:21:30 +0000
committerOlivier Wilkinson (reivilibre) <oliverw@matrix.org>2021-12-20 13:21:30 +0000
commit9858a2ca6852a3421186825d3f371429c8dc7f1b (patch)
treee9812b6311685f942a2940730895954d0cbc224b
parentNewsfile (diff)
downloadsynapse-9858a2ca6852a3421186825d3f371429c8dc7f1b.tar.xz
Add some logging and validation to help determine whether this all works or not :)
-rw-r--r--synapse/storage/databases/state/store.py106
1 files changed, 106 insertions, 0 deletions
diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py
index 602f3e8233..a5d13a361d 100644
--- a/synapse/storage/databases/state/store.py
+++ b/synapse/storage/databases/state/store.py
@@ -13,6 +13,7 @@
 # limitations under the License.
 
 import logging
+from collections import Counter
 from typing import (
     TYPE_CHECKING,
     Collection,
@@ -52,6 +53,11 @@ if TYPE_CHECKING:
 
 logger = logging.getLogger(__name__)
 
+validation_logger = logging.getLogger(f"{__name__}.validation")
+validation_counter: Counter[bool] = Counter()
+effectiveness_counter: Counter[str] = Counter()
+log_ticker = 0
+
 MAX_STATE_DELTA_HOPS = 100
 MAX_INFLIGHT_REQUESTS_PER_GROUP = 5
 
@@ -146,6 +152,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
         self._state_group_inflight_requests: Dict[
             int, SortedDict[StateFilter, ObservableDeferred[StateMap[str]]]
         ] = {}
+        validation_logger.info("(Validation Logging present)")
 
         def get_max_state_group_txn(txn: Cursor) -> int:
             txn.execute("SELECT COALESCE(max(id), 0) FROM state_groups")
@@ -402,6 +409,13 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
             state_filter_left_over,
         ) = self._get_state_for_group_gather_inflight_requests(group, state_filter)
 
+        if state_filter_left_over == StateFilter.none():
+            effectiveness_counter["fully"] += 1
+        elif state_filter_left_over == state_filter:
+            effectiveness_counter["useless"] += 1
+        else:
+            effectiveness_counter["partially"] += 1
+
         if state_filter_left_over != StateFilter.none():
             # Fetch remaining state
             remaining = await self._get_state_for_group_fire_request(
@@ -438,6 +452,14 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
         Returns:
             Dict of state group to state map.
         """
+        global log_ticker, effectiveness_counter, validation_counter
+        old_result_for_verification = await self._OLD_get_state_for_groups(
+            groups, state_filter
+        )
+        groups = list(groups)
+        arg_groups = groups.copy()
+        arg_state_filter = state_filter
+
         state_filter = state_filter or StateFilter.all()
 
         member_filter, non_member_filter = state_filter.get_member_split()
@@ -474,6 +496,90 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
         for group, group_result in zip(incomplete_groups, results_from_requests):
             state[group] = group_result
 
+        the_same = state == old_result_for_verification
+        validation_counter[the_same] += 1
+        if not the_same:
+            validation_logger.critical(
+                "NOT THE SAME: for groups %r SF %r", arg_groups, arg_state_filter
+            )
+
+        log_ticker += 1
+        if log_ticker % 1000 == 0:
+            validation_logger.info(
+                "%d. Correct: %r. Effective: %r",
+                log_ticker,
+                validation_counter,
+                effectiveness_counter,
+            )
+
+        return old_result_for_verification
+
+    async def _OLD_get_state_for_groups(
+        self, groups: Iterable[int], state_filter: Optional[StateFilter] = None
+    ) -> Dict[int, MutableStateMap[str]]:
+        """Gets the state at each of a list of state groups, optionally
+        filtering by type/state_key
+
+        Args:
+            groups: list of state groups for which we want
+                to get the state.
+            state_filter: The state filter used to fetch state
+                from the database.
+        Returns:
+            Dict of state group to state map.
+        """
+        state_filter = state_filter or StateFilter.all()
+
+        member_filter, non_member_filter = state_filter.get_member_split()
+
+        # Now we look them up in the member and non-member caches
+        (
+            non_member_state,
+            incomplete_groups_nm,
+        ) = self._get_state_for_groups_using_cache(
+            groups, self._state_group_cache, state_filter=non_member_filter
+        )
+
+        (member_state, incomplete_groups_m,) = self._get_state_for_groups_using_cache(
+            groups, self._state_group_members_cache, state_filter=member_filter
+        )
+
+        state = dict(non_member_state)
+        for group in groups:
+            state[group].update(member_state[group])
+
+        # Now fetch any missing groups from the database
+
+        incomplete_groups = incomplete_groups_m | incomplete_groups_nm
+
+        if not incomplete_groups:
+            return state
+
+        cache_sequence_nm = self._state_group_cache.sequence
+        cache_sequence_m = self._state_group_members_cache.sequence
+
+        # Help the cache hit ratio by expanding the filter a bit
+        db_state_filter = state_filter.return_expanded()
+
+        group_to_state_dict = await self._get_state_groups_from_groups(
+            list(incomplete_groups), state_filter=db_state_filter
+        )
+
+        # Now lets update the caches
+        self._insert_into_cache(
+            group_to_state_dict,
+            db_state_filter,
+            cache_seq_num_members=cache_sequence_m,
+            cache_seq_num_non_members=cache_sequence_nm,
+        )
+
+        # And finally update the result dict, by filtering out any extra
+        # stuff we pulled out of the database.
+        for group, group_state_dict in group_to_state_dict.items():
+            # We just replace any existing entries, as we will have loaded
+            # everything we need from the database anyway.
+            state[group] = state_filter.filter_state(group_state_dict)
+
         return state
 
     def _get_state_for_groups_using_cache(