summary refs log tree commit diff
diff options
context:
space:
mode:
authorEric Eastwood <contact@ericeastwood.com>2023-05-18 01:59:34 -0500
committerEric Eastwood <contact@ericeastwood.com>2023-05-18 01:59:34 -0500
commitab576b6b6bc14ec28bfabe24b6836f109a9b12b0 (patch)
treedd36c0151bdf5a686d79564b3913f2631f4b9f0f
parentFix empty case (diff)
downloadsynapse-ab576b6b6bc14ec28bfabe24b6836f109a9b12b0.tar.xz
Fix when the state_filter prevented us from returning any rows before
-rw-r--r--synapse/storage/databases/state/bg_updates.py64
1 files changed, 53 insertions, 11 deletions
diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py
index 05bce339e8..131314b128 100644
--- a/synapse/storage/databases/state/bg_updates.py
+++ b/synapse/storage/databases/state/bg_updates.py
@@ -131,6 +131,21 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
             """
 
             overall_select_query_args: List[Union[int, str]] = []
+            # Make sure we always have a row that tells us if we linked up to another
+            # state group that we already processed (`state_group_reached`) regardless
+            # of whether we find any state according to the state_filter.
+            #
+            # We use a `UNION ALL` to make sure it is always the first row returned.
+            # `UNION` will merge and sort in with the rows from the next query
+            # otherwise.
+            overall_select_clause = """
+                (
+                    SELECT NULL, NULL, NULL, state_group_reached
+                    FROM sgs
+                    ORDER BY state_group ASC
+                    LIMIT 1
+                ) UNION ALL (%s)
+            """
 
             # This is an optimization to create a select clause per-condition. This
             # makes the query planner a lot smarter on what rows should pull out in the
@@ -178,7 +193,7 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
                         """
                     )
 
-                overall_select_clause = " UNION ".join(select_clause_list)
+                main_select_clause = " UNION ".join(select_clause_list)
             else:
                 where_clause, where_args = state_filter.make_sql_filter_clause()
                 # Unless the filter clause is empty, we're going to append it after an
@@ -188,7 +203,7 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
 
                 overall_select_query_args.extend(where_args)
 
-                overall_select_clause = f"""
+                main_select_clause = f"""
                     SELECT DISTINCT ON (type, state_key)
                         type, state_key, event_id, state_group
                     FROM state_groups_state
@@ -209,6 +224,7 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
 
                 state_groups_we_have_already_fetched_string = ", ".join(
                     [
+                        # TODO: Is this string manipulation safe?
                         f"{state_group}::bigint"
                         # We default to `[-1]` just to fill in the query with something
                         # that will have no effct
@@ -220,29 +236,54 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
                     sql
                     % (
                         state_groups_we_have_already_fetched_string,
+                        overall_select_clause % (main_select_clause,),
+                    ),
+                    args,
+                )
+
+                logger.info(
+                    "sql=%s, args=%s",
+                    sql
+                    % (
+                        state_groups_we_have_already_fetched_string,
                         overall_select_clause,
                     ),
                     args,
                 )
 
-                min_state_group: Optional[int] = None
+                # The first row is always our special `state_group_reached` row which
+                # tells us if we linked up to any other existing state_group that we
+                # already fetched and if so, which one we linked up to (see the `UNION
+                # ALL` above)
+                first_row = txn.fetchone()
+                if first_row:
+                    _, _, _, state_group_reached = first_row
+
                 partial_state_map_for_state_group: MutableStateMap[str] = {}
                 for row in txn:
-                    typ, state_key, event_id, state_group = row
+                    typ, state_key, event_id, _state_group = row
+                    logger.info(
+                        "row from db -> group=%s type=%s state_key=%s event_id=%s",
+                        group,
+                        typ,
+                        state_key,
+                        event_id,
+                    )
                     key = (intern_string(typ), intern_string(state_key))
                     partial_state_map_for_state_group[key] = event_id
 
-                    if min_state_group is None or state_group < min_state_group:
-                        min_state_group = state_group
+                logger.info(
+                    "group=%s state_group_reached=%s, partial_state_map_for_state_group=%s",
+                    group,
+                    state_group_reached,
+                    partial_state_map_for_state_group,
+                )
 
                 # If we see a state group edge link to a previous state_group that we
                 # already fetched from the database, link up the base state to the
                 # partial state we retrieved from the database to build on top of.
-                if (
-                    min_state_group is not None
-                    and results.get(min_state_group) is not None
-                ):
-                    resultant_state_map = dict(results[min_state_group])
+                if state_group_reached in results:
+                    resultant_state_map = dict(results[state_group_reached])
                     resultant_state_map.update(partial_state_map_for_state_group)
 
                     results[group] = resultant_state_map
@@ -310,6 +351,7 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
                         allow_none=True,
                     )
 
+        logger.info("_get_state_groups_from_groups_txn results=%s", results)
         # The results shouldn't be considered mutable.
         return results