diff options
author | Olivier Wilkinson (reivilibre) <olivier@librepush.net> | 2021-08-04 14:33:05 +0100 |
---|---|---|
committer | Olivier Wilkinson (reivilibre) <olivier@librepush.net> | 2021-08-04 14:56:30 +0100 |
commit | b09de10dffa0de0614bfe36488f19224f3a0ba3f (patch) | |
tree | b96a957016d0ad3db556b12472ed80b38dc1070f | |
parent | Newsfile (diff) | |
download | synapse-b09de10dffa0de0614bfe36488f19224f3a0ba3f.tar.xz |
Remove _get_state_groups_from_groups_txn
-rw-r--r-- | synapse/storage/databases/state/bg_updates.py | 111 | ||||
-rw-r--r-- | synapse/storage/databases/state/store.py | 2 |
2 files changed, 3 insertions, 110 deletions
diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py index 7608bb67a5..85aa7a052a 100644 --- a/synapse/storage/databases/state/bg_updates.py +++ b/synapse/storage/databases/state/bg_updates.py @@ -76,109 +76,6 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore): return count - def _get_state_groups_from_groups_txn( - self, txn, groups: List[int], state_filter: Optional[StateFilter] = None - ) -> Dict[int, "StateMap[str]"]: - state_filter = state_filter or StateFilter.all() - - results = {group: {} for group in groups} - - where_clause, where_args = state_filter.make_sql_filter_clause() - - # Unless the filter clause is empty, we're going to append it after an - # existing where clause - if where_clause: - where_clause = " AND (%s)" % (where_clause,) - - if isinstance(self.database_engine, PostgresEngine): - # Temporarily disable sequential scans in this transaction. This is - # a temporary hack until we can add the right indices in - txn.execute("SET LOCAL enable_seqscan=off") - - # The below query walks the state_group tree so that the "state" - # table includes all state_groups in the tree. It then joins - # against `state_groups_state` to fetch the latest state. - # It assumes that previous state groups are always numerically - # lesser. - # The PARTITION is used to get the event_id in the greatest state - # group for the given type, state_key. - # This may return multiple rows per (type, state_key), but last_value - # should be the same. - sql = """ - WITH RECURSIVE state(state_group) AS ( - VALUES(?::bigint) - UNION ALL - SELECT prev_state_group FROM state_group_edges e, state s - WHERE s.state_group = e.state_group - ) - SELECT DISTINCT ON (type, state_key) - type, state_key, event_id - FROM state_groups_state - WHERE state_group IN ( - SELECT state_group FROM state - ) %s - ORDER BY type, state_key, state_group DESC - """ - - for group in groups: - args = [group] - args.extend(where_args) - - txn.execute(sql % (where_clause,), args) - for row in txn: - typ, state_key, event_id = row - key = (typ, state_key) - results[group][key] = event_id - else: - max_entries_returned = state_filter.max_entries_returned() - - # We don't use WITH RECURSIVE on sqlite3 as there are distributions - # that ship with an sqlite3 version that doesn't support it (e.g. wheezy) - for group in groups: - next_group = group - - while next_group: - # We did this before by getting the list of group ids, and - # then passing that list to sqlite to get latest event for - # each (type, state_key). However, that was terribly slow - # without the right indices (which we can't add until - # after we finish deduping state, which requires this func) - args = [next_group] - args.extend(where_args) - - txn.execute( - "SELECT type, state_key, event_id FROM state_groups_state" - " WHERE state_group = ? " + where_clause, - args, - ) - results[group].update( - ((typ, state_key), event_id) - for typ, state_key, event_id in txn - if (typ, state_key) not in results[group] - ) - - # If the number of entries in the (type,state_key)->event_id dict - # matches the number of (type,state_keys) types we were searching - # for, then we must have found them all, so no need to go walk - # further down the tree... UNLESS our types filter contained - # wildcards (i.e. Nones) in which case we have to do an exhaustive - # search - if ( - max_entries_returned is not None - and len(results[group]) == max_entries_returned - ): - break - - next_group = self.db_pool.simple_select_one_onecol_txn( - txn, - table="state_group_edges", - keyvalues={"state_group": next_group}, - retcol="prev_state_group", - allow_none=True, - ) - - return results - def _get_state_groups_from_group_txn( self, txn, group: int, state_filter: Optional[StateFilter] = None ) -> "StateMap[str]": @@ -366,14 +263,10 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore): # otherwise read performance degrades. continue - prev_state = self._get_state_groups_from_groups_txn( - txn, [prev_group] - ) + prev_state = self._get_state_groups_from_group_txn(txn, prev_group) prev_state = prev_state[prev_group] - curr_state = self._get_state_groups_from_groups_txn( - txn, [state_group] - ) + curr_state = self._get_state_groups_from_group_txn(txn, state_group) curr_state = curr_state[state_group] if not set(prev_state.keys()) - set(curr_state.keys()): diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py index e68a409597..7119323ed4 100644 --- a/synapse/storage/databases/state/store.py +++ b/synapse/storage/databases/state/store.py @@ -588,7 +588,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): # groups to non delta versions. for sg in remaining_state_groups: logger.info("[purge] de-delta-ing remaining state group %s", sg) - curr_state = self._get_state_groups_from_groups_txn(txn, [sg]) + curr_state = self._get_state_groups_from_group_txn(txn, sg) curr_state = curr_state[sg] self.db_pool.simple_delete_txn( |