diff options
author | Olivier Wilkinson (reivilibre) <oliverw@matrix.org> | 2021-09-21 16:07:58 +0100 |
---|---|---|
committer | Olivier Wilkinson (reivilibre) <oliverw@matrix.org> | 2021-09-21 17:14:18 +0100 |
commit | 247f558c1c0a4dc850cd1259568f899f64ae4efa (patch) | |
tree | 21bf4353da0f421ec4840a73d41c616fba8518a4 | |
parent | Add method to gather in-flight requests and calculate left-over filter (diff) | |
download | synapse-247f558c1c0a4dc850cd1259568f899f64ae4efa.tar.xz |
Add method to get 1 state group, both using and updating in-flight cache
-rw-r--r-- | synapse/storage/databases/state/store.py | 52 |
1 files changed, 52 insertions, 0 deletions
diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py index 6bc384c02c..5e85c5c40d 100644 --- a/synapse/storage/databases/state/store.py +++ b/synapse/storage/databases/state/store.py @@ -340,6 +340,58 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): return await request_deferred + async def _get_state_for_group_using_inflight_cache( + self, group: int, state_filter: StateFilter + ) -> MutableStateMap[str]: + """ + Gets the state at a state group, potentially filtering by type and/or + state key. + + Args: + group: ID of the state group for which we want to get state + state_filter: the state filter used to fetch state from the database + Returns: + state map + """ + + # first, figure out whether we can re-use any in-flight requests + # (and if so, what would be left over) + ( + reusable_requests, + state_filter_left_over, + ) = self._get_state_for_group_gather_inflight_requests(group, state_filter) + + fired_off_requests = [] + + if state_filter_left_over.types or state_filter_left_over.include_others: + # we need to fire off a request for remaining state + # ostd log contexts? + fired_off_requests.append( + make_deferred_yieldable( + self._get_state_for_group_fire_request(group, state_filter) + ) + ) + + for request in reusable_requests: + # Observe the requests that we want to re-use + # ostd log contexts? + fired_off_requests.append(make_deferred_yieldable(request.observe())) + + # ostd is this right wrt. log context rules? + gathered = await make_deferred_yieldable( + defer.gatherResults( + fired_off_requests, consumeErrors=True # ostd not sure this is wanted + ) + ) + + # assemble our result. + assembled_state: MutableStateMap[str] = {} + for result_piece in gathered: + assembled_state.update(result_piece) + + # Filter out any state that may be more than what we asked for. + return state_filter.filter_state(assembled_state) + async def _get_state_for_groups( self, groups: Iterable[int], state_filter: Optional[StateFilter] = None ) -> Dict[int, MutableStateMap[str]]: |