summary refs log tree commit diff
diff options
context:
space:
mode:
authorOlivier Wilkinson (reivilibre) <oliverw@matrix.org>2021-09-21 16:07:58 +0100
committerOlivier Wilkinson (reivilibre) <oliverw@matrix.org>2021-09-21 17:14:18 +0100
commit247f558c1c0a4dc850cd1259568f899f64ae4efa (patch)
tree21bf4353da0f421ec4840a73d41c616fba8518a4
parentAdd method to gather in-flight requests and calculate left-over filter (diff)
downloadsynapse-247f558c1c0a4dc850cd1259568f899f64ae4efa.tar.xz
Add method to get 1 state group, both using and updating in-flight cache
-rw-r--r--synapse/storage/databases/state/store.py52
1 files changed, 52 insertions, 0 deletions
diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py
index 6bc384c02c..5e85c5c40d 100644
--- a/synapse/storage/databases/state/store.py
+++ b/synapse/storage/databases/state/store.py
@@ -340,6 +340,58 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
 
         return await request_deferred
 
+    async def _get_state_for_group_using_inflight_cache(
+        self, group: int, state_filter: StateFilter
+    ) -> MutableStateMap[str]:
+        """
+        Gets the state at a state group, potentially filtering by type and/or
+        state key.
+
+        Args:
+            group: ID of the state group for which we want to get state
+            state_filter: the state filter used to fetch state from the database
+        Returns:
+            state map
+        """
+
+        # first, figure out whether we can re-use any in-flight requests
+        # (and if so, what would be left over)
+        (
+            reusable_requests,
+            state_filter_left_over,
+        ) = self._get_state_for_group_gather_inflight_requests(group, state_filter)
+
+        fired_off_requests = []
+
+        if state_filter_left_over.types or state_filter_left_over.include_others:
+            # we need to fire off a request for remaining state
+            # ostd log contexts?
+            fired_off_requests.append(
+                make_deferred_yieldable(
+                    self._get_state_for_group_fire_request(group, state_filter)
+                )
+            )
+
+        for request in reusable_requests:
+            # Observe the requests that we want to re-use
+            # ostd log contexts?
+            fired_off_requests.append(make_deferred_yieldable(request.observe()))
+
+        # ostd is this right wrt. log context rules?
+        gathered = await make_deferred_yieldable(
+            defer.gatherResults(
+                fired_off_requests, consumeErrors=True  # ostd not sure this is wanted
+            )
+        )
+
+        # assemble our result.
+        assembled_state: MutableStateMap[str] = {}
+        for result_piece in gathered:
+            assembled_state.update(result_piece)
+
+        # Filter out any state that may be more than what we asked for.
+        return state_filter.filter_state(assembled_state)
+
     async def _get_state_for_groups(
         self, groups: Iterable[int], state_filter: Optional[StateFilter] = None
     ) -> Dict[int, MutableStateMap[str]]: