diff options
author | Eric Eastwood <contact@ericeastwood.com> | 2023-05-18 01:59:34 -0500 |
---|---|---|
committer | Eric Eastwood <contact@ericeastwood.com> | 2023-05-18 01:59:34 -0500 |
commit | ab576b6b6bc14ec28bfabe24b6836f109a9b12b0 (patch) | |
tree | dd36c0151bdf5a686d79564b3913f2631f4b9f0f | |
parent | Fix empty case (diff) | |
download | synapse-ab576b6b6bc14ec28bfabe24b6836f109a9b12b0.tar.xz |
Fix when the state_filter prevented us from returning any rows before
-rw-r--r-- | synapse/storage/databases/state/bg_updates.py | 64 |
1 files changed, 53 insertions, 11 deletions
diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py index 05bce339e8..131314b128 100644 --- a/synapse/storage/databases/state/bg_updates.py +++ b/synapse/storage/databases/state/bg_updates.py @@ -131,6 +131,21 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore): """ overall_select_query_args: List[Union[int, str]] = [] + # Make sure we always have a row that tells us if we linked up to another + # state group that we already processed (`state_group_reached`) regardless + # of whether we find any state according to the state_filter. + # + # We use a `UNION ALL` to make sure it is always the first row returned. + # `UNION` will merge and sort in with the rows from the next query + # otherwise. + overall_select_clause = """ + ( + SELECT NULL, NULL, NULL, state_group_reached + FROM sgs + ORDER BY state_group ASC + LIMIT 1 + ) UNION ALL (%s) + """ # This is an optimization to create a select clause per-condition. This # makes the query planner a lot smarter on what rows should pull out in the @@ -178,7 +193,7 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore): """ ) - overall_select_clause = " UNION ".join(select_clause_list) + main_select_clause = " UNION ".join(select_clause_list) else: where_clause, where_args = state_filter.make_sql_filter_clause() # Unless the filter clause is empty, we're going to append it after an @@ -188,7 +203,7 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore): overall_select_query_args.extend(where_args) - overall_select_clause = f""" + main_select_clause = f""" SELECT DISTINCT ON (type, state_key) type, state_key, event_id, state_group FROM state_groups_state @@ -209,6 +224,7 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore): state_groups_we_have_already_fetched_string = ", ".join( [ + # TODO: Is this string manipulation safe? f"{state_group}::bigint" # We default to `[-1]` just to fill in the query with something # that will have no effct @@ -220,29 +236,54 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore): sql % ( state_groups_we_have_already_fetched_string, + overall_select_clause % (main_select_clause,), + ), + args, + ) + + logger.info( + "sql=%s, args=%s", + sql + % ( + state_groups_we_have_already_fetched_string, overall_select_clause, ), args, ) - min_state_group: Optional[int] = None + # The first row is always our special `state_group_reached` row which + # tells us if we linked up to any other existing state_group that we + # already fetched and if so, which one we linked up to (see the `UNION + # ALL` above) + first_row = txn.fetchone() + if first_row: + _, _, _, state_group_reached = first_row + partial_state_map_for_state_group: MutableStateMap[str] = {} for row in txn: - typ, state_key, event_id, state_group = row + typ, state_key, event_id, _state_group = row + logger.info( + "row from db -> group=%s type=%s state_key=%s event_id=%s", + group, + typ, + state_key, + event_id, + ) key = (intern_string(typ), intern_string(state_key)) partial_state_map_for_state_group[key] = event_id - if min_state_group is None or state_group < min_state_group: - min_state_group = state_group + logger.info( + "group=%s state_group_reached=%s, partial_state_map_for_state_group=%s", + group, + state_group_reached, + partial_state_map_for_state_group, + ) # If we see a state group edge link to a previous state_group that we # already fetched from the database, link up the base state to the # partial state we retrieved from the database to build on top of. - if ( - min_state_group is not None - and results.get(min_state_group) is not None - ): - resultant_state_map = dict(results[min_state_group]) + if state_group_reached in results: + resultant_state_map = dict(results[state_group_reached]) resultant_state_map.update(partial_state_map_for_state_group) results[group] = resultant_state_map @@ -310,6 +351,7 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore): allow_none=True, ) + logger.info("_get_state_groups_from_groups_txn results=%s", results) # The results shouldn't be considered mutable. return results |