From 9b334b3f97057ac145622d2e4d0ad036ef27b468 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Sun, 11 Mar 2018 20:01:41 +0000 Subject: WIP experiment in lazyloading room members --- synapse/storage/state.py | 47 ++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 42 insertions(+), 5 deletions(-) (limited to 'synapse/storage/state.py') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 2b325e1c1f..da6bb685fa 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -198,8 +198,15 @@ class StateGroupWorkerStore(SQLBaseStore): def _get_state_groups_from_groups_txn(self, txn, groups, types=None): results = {group: {} for group in groups} + + include_other_types = False + if types is not None: - types = list(set(types)) # deduplicate types list + type_set = set(types) + if (None, None) in type_set: + include_other_types = True + type_set.remove((None, None)) + types = list(type_set) # deduplicate types list if isinstance(self.database_engine, PostgresEngine): # Temporarily disable sequential scans in this transaction. This is @@ -238,11 +245,21 @@ class StateGroupWorkerStore(SQLBaseStore): if types: clause_to_args = [ ( - "AND type = ? AND state_key = ?", - (etype, state_key) + "AND type = ? AND state_key = ?" if state_key is not None else "AND type = ?", + (etype, state_key) if state_key is not None else (etype) ) for etype, state_key in types ] + + if include_other_types: + # XXX: check whether this slows postgres down like a list of + # ORs does too? + clause_to_args.append( + ( + "AND type <> ? " * len(types), + [t for (t, _) in types] + ) + ) else: # If types is None we fetch all the state, and so just use an # empty where clause with no extra args. @@ -263,6 +280,10 @@ class StateGroupWorkerStore(SQLBaseStore): where_clause = "AND (%s)" % ( " OR ".join(["(type = ? AND state_key = ?)"] * len(types)), ) + if include_other_types: + where_clause += " AND (%s)" % ( + " AND ".join(["type <> ?"] * len(types)), + ) else: where_clause = "" @@ -449,17 +470,27 @@ class StateGroupWorkerStore(SQLBaseStore): group: The state group to lookup types (list): List of 2-tuples of the form (`type`, `state_key`), where a `state_key` of `None` matches all state_keys for the - `type`. + `type`. Presence of type of `None` indicates that types not + in the list should not be filtered out. """ is_all, known_absent, state_dict_ids = self._state_group_cache.get(group) type_to_key = {} missing_types = set() + include_other_types = False + for typ, state_key in types: key = (typ, state_key) + + if typ is None: + include_other_types = True + next + if state_key is None: type_to_key[typ] = None + # XXX: why do we mark the type as missing from our cache just + # because we weren't filtering on a specific value of state_key? missing_types.add(key) else: if type_to_key.get(typ, object()) is not None: @@ -478,7 +509,7 @@ class StateGroupWorkerStore(SQLBaseStore): return True if state_key in valid_state_keys: return True - return False + return include_other_types got_all = is_all or not missing_types @@ -507,6 +538,12 @@ class StateGroupWorkerStore(SQLBaseStore): with matching types. `types` is a list of `(type, state_key)`, where a `state_key` of None matches all state_keys. If `types` is None then all events are returned. + + XXX: is it really true that `state_key` of None in `types` matches all + state_keys? it looks like _get-some_state_from_cache does the right thing, + but _get_state_groups_from_groups_txn treats ths None is turned into + 'AND state_key = NULL' or similar (at least until i just fixed it) --Matthew + I've filed this as https://github.com/matrix-org/synapse/issues/2969 """ if types: types = frozenset(types) -- cgit 1.5.1 From 87133652657c5073616419b0afc533eac6ae6750 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Sun, 11 Mar 2018 20:10:25 +0000 Subject: typos --- synapse/handlers/sync.py | 4 ++-- synapse/storage/state.py | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) (limited to 'synapse/storage/state.py') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 809e9fece9..fa730ca760 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -471,13 +471,13 @@ class SyncHandler(object): if filter_members: # We only request state for the members needed to display the # timeline: - types = ( + types = [ (EventTypes.Member, state_key) for state_key in set( event.sender # FIXME: we also care about targets etc. for event in batch.events ) - ) + ] types.append((None, None)) # don't just filter to room members # TODO: we should opportunistically deduplicate these members too diff --git a/synapse/storage/state.py b/synapse/storage/state.py index da6bb685fa..0238200286 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -301,6 +301,8 @@ class StateGroupWorkerStore(SQLBaseStore): args = [next_group] if types: args.extend(i for typ in types for i in typ) + if include_other_types: + args.extend(typ for (typ, _) in types) txn.execute( "SELECT type, state_key, event_id FROM state_groups_state" -- cgit 1.5.1 From 97c0496cfa89b037d89fccd05dd03442b80e07fc Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 12 Mar 2018 00:27:06 +0000 Subject: fix sqlite where clause --- synapse/storage/state.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'synapse/storage/state.py') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 0238200286..b796d3c995 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -277,13 +277,14 @@ class StateGroupWorkerStore(SQLBaseStore): results[group][key] = event_id else: if types is not None: - where_clause = "AND (%s)" % ( + where_clause = "AND (%s" % ( " OR ".join(["(type = ? AND state_key = ?)"] * len(types)), ) if include_other_types: - where_clause += " AND (%s)" % ( + where_clause += " OR (%s)" % ( " AND ".join(["type <> ?"] * len(types)), ) + where_clause += ")" else: where_clause = "" -- cgit 1.5.1 From fdedcd1f4ddeaa3ed5bfd3c05ab2977b4e8ed457 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 12 Mar 2018 01:39:06 +0000 Subject: correctly handle None state_keys and fix include_other_types thinko --- synapse/storage/state.py | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) (limited to 'synapse/storage/state.py') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index b796d3c995..405e6b6770 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -276,15 +276,23 @@ class StateGroupWorkerStore(SQLBaseStore): key = (typ, state_key) results[group][key] = event_id else: + where_args = [] if types is not None: - where_clause = "AND (%s" % ( - " OR ".join(["(type = ? AND state_key = ?)"] * len(types)), - ) + where_clause = "AND (" + for typ in types: + if typ[1] is None: + where_clause += "(type = ?) OR " + where_args.extend(typ[0]) + else: + where_clause += "(type = ? AND state_key = ?) OR " + where_args.extend([typ[0], typ[1]]) + if include_other_types: - where_clause += " OR (%s)" % ( + where_clause += "(%s) OR " % ( " AND ".join(["type <> ?"] * len(types)), ) - where_clause += ")" + where_args.extend(t for (t, _) in types) + where_clause += "0)" # 0 to terminate the last OR else: where_clause = "" @@ -301,9 +309,7 @@ class StateGroupWorkerStore(SQLBaseStore): # after we finish deduping state, which requires this func) args = [next_group] if types: - args.extend(i for typ in types for i in typ) - if include_other_types: - args.extend(typ for (typ, _) in types) + args.extend(where_args) txn.execute( "SELECT type, state_key, event_id FROM state_groups_state" @@ -507,12 +513,12 @@ class StateGroupWorkerStore(SQLBaseStore): def include(typ, state_key): valid_state_keys = type_to_key.get(typ, sentinel) if valid_state_keys is sentinel: - return False + return include_other_types if valid_state_keys is None: return True if state_key in valid_state_keys: return True - return include_other_types + return False got_all = is_all or not missing_types -- cgit 1.5.1 From 1b1c13777154b5b0cf8bf8cf809381f889a2a82d Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 13 Mar 2018 17:52:52 +0000 Subject: fix bug #2926 --- synapse/storage/state.py | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) (limited to 'synapse/storage/state.py') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 2b325e1c1f..783cebb351 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -240,6 +240,10 @@ class StateGroupWorkerStore(SQLBaseStore): ( "AND type = ? AND state_key = ?", (etype, state_key) + ) if state_key is not None else + ( + "AND type = ?", + (etype) ) for etype, state_key in types ] @@ -259,10 +263,19 @@ class StateGroupWorkerStore(SQLBaseStore): key = (typ, state_key) results[group][key] = event_id else: + where_args = [] if types is not None: - where_clause = "AND (%s)" % ( - " OR ".join(["(type = ? AND state_key = ?)"] * len(types)), - ) + where_clause = "AND (" + for typ in types: + if typ[1] is None: + where_clause += "(type = ?)" + where_args.extend(typ[0]) + else: + where_clause += "(type = ? AND state_key = ?)" + where_args.extend([typ[0], typ[1]]) + if typ != types[-1]: + where_clause += " OR " + where_clause += ")" else: where_clause = "" @@ -279,7 +292,7 @@ class StateGroupWorkerStore(SQLBaseStore): # after we finish deduping state, which requires this func) args = [next_group] if types: - args.extend(i for typ in types for i in typ) + args.extend(where_args) txn.execute( "SELECT type, state_key, event_id FROM state_groups_state" -- cgit 1.5.1 From 52f7e23c7276b2848aa5291d8b78875b7c32a658 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 13 Mar 2018 18:07:55 +0000 Subject: PR feedbackz --- synapse/storage/state.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'synapse/storage/state.py') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 783cebb351..77259a3141 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -240,10 +240,9 @@ class StateGroupWorkerStore(SQLBaseStore): ( "AND type = ? AND state_key = ?", (etype, state_key) - ) if state_key is not None else - ( + ) if state_key is not None else ( "AND type = ?", - (etype) + (etype,) ) for etype, state_key in types ] -- cgit 1.5.1 From b2aba9e43053f9f297671fce0051bfc18a8b655a Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 13 Mar 2018 18:13:44 +0000 Subject: build where_clause sanely --- synapse/storage/state.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) (limited to 'synapse/storage/state.py') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 77259a3141..82740266bf 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -263,18 +263,16 @@ class StateGroupWorkerStore(SQLBaseStore): results[group][key] = event_id else: where_args = [] + where_clauses = [] if types is not None: - where_clause = "AND (" for typ in types: if typ[1] is None: - where_clause += "(type = ?)" + where_clauses.append("(type = ?)") where_args.extend(typ[0]) else: - where_clause += "(type = ? AND state_key = ?)" + where_clauses.append("(type = ? AND state_key = ?)") where_args.extend([typ[0], typ[1]]) - if typ != types[-1]: - where_clause += " OR " - where_clause += ")" + where_clause = "AND (%s)" % (" OR ".join(where_clauses)) else: where_clause = "" -- cgit 1.5.1 From 865377a70d9d7db27b89348d2ebbd394f701c490 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 13 Mar 2018 19:45:36 +0000 Subject: disable optimisation for searching for state groups when type filter includes wildcards on state_key --- synapse/storage/state.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) (limited to 'synapse/storage/state.py') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 82740266bf..39f73afaa2 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -264,11 +264,13 @@ class StateGroupWorkerStore(SQLBaseStore): else: where_args = [] where_clauses = [] + wildcard_types = False if types is not None: for typ in types: if typ[1] is None: where_clauses.append("(type = ?)") where_args.extend(typ[0]) + wildcard_types = True else: where_clauses.append("(type = ? AND state_key = ?)") where_args.extend([typ[0], typ[1]]) @@ -302,9 +304,17 @@ class StateGroupWorkerStore(SQLBaseStore): if (typ, state_key) not in results[group] ) - # If the lengths match then we must have all the types, - # so no need to go walk further down the tree. - if types is not None and len(results[group]) == len(types): + # If the number of entries inthe (type,state_key)->event_id dict + # matches the number of (type,state_keys) types we were searching + # for, then we must have found them all, so no need to go walk + # further down the tree... UNLESS our types filter contained + # wildcards (i.e. Nones) in which case we have to do an exhaustive + # search + if ( + types is not None and + not wildcard_types and + len(results[group]) == len(types) + ): break next_group = self._simple_select_one_onecol_txn( -- cgit 1.5.1 From afbf4d3dccb3d18276e4b119b0267490ca522b4b Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 13 Mar 2018 19:48:04 +0000 Subject: typoe --- synapse/storage/state.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage/state.py') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 39f73afaa2..ffa4246031 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -304,7 +304,7 @@ class StateGroupWorkerStore(SQLBaseStore): if (typ, state_key) not in results[group] ) - # If the number of entries inthe (type,state_key)->event_id dict + # If the number of entries in the (type,state_key)->event_id dict # matches the number of (type,state_keys) types we were searching # for, then we must have found them all, so no need to go walk # further down the tree... UNLESS our types filter contained -- cgit 1.5.1 From f0f9a0605b1bddc1b01d1bbb6af93f00763b8496 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 13 Mar 2018 22:12:15 +0000 Subject: remove comment now #2969 is fixed --- synapse/storage/state.py | 6 ------ 1 file changed, 6 deletions(-) (limited to 'synapse/storage/state.py') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 4ab16e18b2..4291cde7ab 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -561,12 +561,6 @@ class StateGroupWorkerStore(SQLBaseStore): with matching types. `types` is a list of `(type, state_key)`, where a `state_key` of None matches all state_keys. If `types` is None then all events are returned. - - XXX: is it really true that `state_key` of None in `types` matches all - state_keys? it looks like _get-some_state_from_cache does the right thing, - but _get_state_groups_from_groups_txn treats ths None is turned into - 'AND state_key = NULL' or similar (at least until i just fixed it) --Matthew - I've filed this as https://github.com/matrix-org/synapse/issues/2969 """ if types: types = frozenset(types) -- cgit 1.5.1 From ccca02846d07124f537b0c475308f9a26bfb3fb1 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 13 Mar 2018 22:31:41 +0000 Subject: make it work --- synapse/handlers/sync.py | 6 +++--- synapse/storage/state.py | 10 ++++++---- 2 files changed, 9 insertions(+), 7 deletions(-) (limited to 'synapse/storage/state.py') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index c754cfdeeb..c05e3d107f 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -498,7 +498,7 @@ class SyncHandler(object): if filter_members: member_state_ids = { t: state_ids[t] - for t in state_ids if t[0] == EventTypes.member + for t in state_ids if t[0] == EventTypes.Member } else: @@ -511,7 +511,7 @@ class SyncHandler(object): if filter_members: member_state_ids = { t: state_ids[t] - for t in state_ids if t[0] == EventTypes.member + for t in state_ids if t[0] == EventTypes.Member } timeline_state = { @@ -542,7 +542,7 @@ class SyncHandler(object): if filter_members: member_state_ids = { t: state_at_timeline_start[t] - for t in state_ids if t[0] == EventTypes.member + for t in state_ids if t[0] == EventTypes.Member } timeline_state = { diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 4291cde7ab..9c9994c073 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -257,10 +257,11 @@ class StateGroupWorkerStore(SQLBaseStore): if include_other_types: # XXX: check whether this slows postgres down like a list of # ORs does too? + unique_types = set([ t for (t, _) in types ]) clause_to_args.append( ( - "AND type <> ? " * len(types), - [t for (t, _) in types] + "AND type <> ? " * len(unique_types), + list(unique_types) ) ) else: @@ -293,10 +294,11 @@ class StateGroupWorkerStore(SQLBaseStore): where_args.extend([typ[0], typ[1]]) if include_other_types: + unique_types = set([ t for (t, _) in types ]) where_clauses.append( - "(" + " AND ".join(["type <> ?"] * len(types)) + ")" + "(" + " AND ".join(["type <> ?"] * len(unique_types)) + ")" ) - where_args.extend(t for (t, _) in types) + where_args.extend(list(unique_types)) where_clause = "AND (%s)" % (" OR ".join(where_clauses)) else: -- cgit 1.5.1 From 9f77001e2747c36046e136c5d3c706c0aef54b15 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Wed, 14 Mar 2018 00:07:47 +0000 Subject: pep8 --- synapse/storage/state.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage/state.py') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 9c9994c073..55159e64d0 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -257,7 +257,7 @@ class StateGroupWorkerStore(SQLBaseStore): if include_other_types: # XXX: check whether this slows postgres down like a list of # ORs does too? - unique_types = set([ t for (t, _) in types ]) + unique_types = set([t for (t, _) in types]) clause_to_args.append( ( "AND type <> ? " * len(unique_types), @@ -294,7 +294,7 @@ class StateGroupWorkerStore(SQLBaseStore): where_args.extend([typ[0], typ[1]]) if include_other_types: - unique_types = set([ t for (t, _) in types ]) + unique_types = set([t for (t, _) in types]) where_clauses.append( "(" + " AND ".join(["type <> ?"] * len(unique_types)) + ")" ) -- cgit 1.5.1 From a6c8f7c875348ff8d63a7032c2f73a08551c516c Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 29 May 2018 01:09:55 +0100 Subject: add pydoc --- synapse/handlers/sync.py | 18 ++++++++---- synapse/storage/state.py | 76 ++++++++++++++++++++++++++++++++++-------------- 2 files changed, 67 insertions(+), 27 deletions(-) (limited to 'synapse/storage/state.py') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 05bf6d46dd..8e38078332 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -423,7 +423,11 @@ class SyncHandler(object): Args: event(synapse.events.EventBase): event of interest - + types(list[(str|None, str|None)]|None): List of (type, state_key) tuples + which are used to filter the state fetched. If `state_key` is None, + all events are returned of the given type. Presence of type of `None` + indicates that types not in the list should not be filtered out. + May be None, which matches any key. Returns: A Deferred map from ((type, state_key)->Event) """ @@ -440,6 +444,11 @@ class SyncHandler(object): Args: room_id(str): room for which to get state stream_position(StreamToken): point at which to get state + types(list[(str|None, str|None)]|None): List of (type, state_key) tuples + which are used to filter the state fetched. If `state_key` is None, + all events are returned of the given type. Presence of type of `None` + indicates that types not in the list should not be filtered out. + May be None, which matches any key. Returns: A Deferred map from ((type, state_key)->Event) @@ -472,8 +481,6 @@ class SyncHandler(object): be None. now_token(str): Token of the end of the current batch. full_state(bool): Whether to force returning the full state. - lazy_load_members(bool): Whether to only return state for members - referenced in this timeline segment Returns: A deferred new event dictionary @@ -496,7 +503,7 @@ class SyncHandler(object): types = [ (EventTypes.Member, state_key) for state_key in set( - event.sender # FIXME: we also care about targets etc. + event.sender # FIXME: we also care about invite targets etc. for event in batch.events ) ] @@ -1398,7 +1405,8 @@ class SyncHandler(object): return state = yield self.compute_state_delta( - room_id, batch, sync_config, since_token, now_token, full_state=full_state + room_id, batch, sync_config, since_token, now_token, + full_state=full_state ) if room_builder.rtype == "joined": diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 55159e64d0..63b6834202 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -182,7 +182,19 @@ class StateGroupWorkerStore(SQLBaseStore): @defer.inlineCallbacks def _get_state_groups_from_groups(self, groups, types): - """Returns dictionary state_group -> (dict of (type, state_key) -> event id) + """Returns the state groups for a given set of groups, filtering on + types of state events. + + Args: + groups(list[int]): list of state group IDs to query + types(list[str|None, str|None])|None: List of 2-tuples of the form + (`type`, `state_key`), where a `state_key` of `None` matches all + state_keys for the `type`. Presence of type of `None` indicates + that types not in the list should not be filtered out. If None, + all types are returned. + + Returns: + dictionary state_group -> (dict of (type, state_key) -> event id) """ results = {} @@ -204,6 +216,9 @@ class StateGroupWorkerStore(SQLBaseStore): if types is not None: type_set = set(types) if (None, None) in type_set: + # special case (None, None) to mean that other types should be + # returned - i.e. we were just filtering down the state keys + # for particular types. include_other_types = True type_set.remove((None, None)) types = list(type_set) # deduplicate types list @@ -360,10 +375,12 @@ class StateGroupWorkerStore(SQLBaseStore): that are in the `types` list. Args: - event_ids (list) - types (list): List of (type, state_key) tuples which are used to - filter the state fetched. `state_key` may be None, which matches - any `state_key` + event_ids (list[string]) + types (list[(str|None, str|None)]|None): List of (type, state_key) tuples + which are used to filter the state fetched. If `state_key` is None, + all events are returned of the given type. Presence of type of `None` + indicates that types not in the list should not be filtered out. + May be None, which matches any key. Returns: deferred: A list of dicts corresponding to the event_ids given. @@ -399,9 +416,11 @@ class StateGroupWorkerStore(SQLBaseStore): Args: event_ids(list(str)): events whose state should be returned - types(list[(str, str)]|None): List of (type, state_key) tuples - which are used to filter the state fetched. May be None, which - matches any key + types(list[(str|None, str|None)]|None): List of (type, state_key) tuples + which are used to filter the state fetched. If `state_key` is None, + all events are returned of the given type. Presence of type of `None` + indicates that types not in the list should not be filtered out. + May be None, which matches any key. Returns: A deferred dict from event_id -> (type, state_key) -> state_event @@ -427,9 +446,11 @@ class StateGroupWorkerStore(SQLBaseStore): Args: event_id(str): event whose state should be returned - types(list[(str, str)]|None): List of (type, state_key) tuples - which are used to filter the state fetched. May be None, which - matches any key + types(list[(str|None, str|None)]|None): List of (type, state_key) tuples + which are used to filter the state fetched. If `state_key` is None, + all events are returned of the given type. Presence of type of `None` + indicates that types not in the list should not be filtered out. + May be None, which matches any key. Returns: A deferred dict from (type, state_key) -> state_event @@ -444,9 +465,11 @@ class StateGroupWorkerStore(SQLBaseStore): Args: event_id(str): event whose state should be returned - types(list[(str, str)]|None): List of (type, state_key) tuples - which are used to filter the state fetched. May be None, which - matches any key + types(list[(str|None, str|None)]|None): List of (type, state_key) tuples + which are used to filter the state fetched. If `state_key` is None, + all events are returned of the given type. Presence of type of `None` + indicates that types not in the list should not be filtered out. + May be None, which matches any key. Returns: A deferred dict from (type, state_key) -> state_event @@ -492,11 +515,11 @@ class StateGroupWorkerStore(SQLBaseStore): missing state. Args: - group: The state group to lookup - types (list): List of 2-tuples of the form (`type`, `state_key`), - where a `state_key` of `None` matches all state_keys for the - `type`. Presence of type of `None` indicates that types not - in the list should not be filtered out. + group(int): The state group to lookup + types(list[str|None, str|None]): List of 2-tuples of the form + (`type`, `state_key`), where a `state_key` of `None` matches all + state_keys for the `type`. Presence of type of `None` indicates + that types not in the list should not be filtered out. """ is_all, known_absent, state_dict_ids = self._state_group_cache.get(group) @@ -560,9 +583,18 @@ class StateGroupWorkerStore(SQLBaseStore): @defer.inlineCallbacks def _get_state_for_groups(self, groups, types=None): """Given list of groups returns dict of group -> list of state events - with matching types. `types` is a list of `(type, state_key)`, where - a `state_key` of None matches all state_keys. If `types` is None then - all events are returned. + with matching types. + + Args: + groups(list[int]): list of groups whose state to query + types(list[str|None, str|None]|None): List of 2-tuples of the form + (`type`, `state_key`), where a `state_key` of `None` matches all + state_keys for the `type`. Presence of type of `None` indicates + that types not in the list should not be filtered out. If None, + all events are returned. + + Returns: + dict of group -> list of state events """ if types: types = frozenset(types) -- cgit 1.5.1 From 924eb34d9428a4163a03249abbb6f40d4baa29c6 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Thu, 19 Jul 2018 18:32:02 +0100 Subject: add a filtered_types param to limit filtering to specific types --- synapse/handlers/sync.py | 65 +++++++++++++++------------ synapse/storage/state.py | 113 +++++++++++++++++++++++++---------------------- 2 files changed, 96 insertions(+), 82 deletions(-) (limited to 'synapse/storage/state.py') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 0c21ac2c77..cb711b8758 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -417,38 +417,44 @@ class SyncHandler(object): )) @defer.inlineCallbacks - def get_state_after_event(self, event, types=None): + def get_state_after_event(self, event, types=None, filtered_types=None): """ Get the room state after the given event Args: event(synapse.events.EventBase): event of interest - types(list[(str|None, str|None)]|None): List of (type, state_key) tuples + types(list[(str, str|None)]|None): List of (type, state_key) tuples which are used to filter the state fetched. If `state_key` is None, - all events are returned of the given type. Presence of type of `None` - indicates that types not in the list should not be filtered out. + all events are returned of the given type. May be None, which matches any key. + filtered_types(list[str]|None): Only apply filtering via `types` to this + list of event types. Other types of events are returned unfiltered. + If None, `types` filtering is applied to all events. + Returns: A Deferred map from ((type, state_key)->Event) """ - state_ids = yield self.store.get_state_ids_for_event(event.event_id, types) + state_ids = yield self.store.get_state_ids_for_event( + event.event_id, types, filtered_types=filtered_types + ) if event.is_state(): state_ids = state_ids.copy() state_ids[(event.type, event.state_key)] = event.event_id defer.returnValue(state_ids) @defer.inlineCallbacks - def get_state_at(self, room_id, stream_position, types=None): + def get_state_at(self, room_id, stream_position, types=None, filtered_types=None): """ Get the room state at a particular stream position Args: room_id(str): room for which to get state stream_position(StreamToken): point at which to get state - types(list[(str|None, str|None)]|None): List of (type, state_key) tuples + types(list[(str, str|None)]|None): List of (type, state_key) tuples which are used to filter the state fetched. If `state_key` is None, - all events are returned of the given type. Presence of type of `None` - indicates that types not in the list should not be filtered out. - May be None, which matches any key. + all events are returned of the given type. + filtered_types(list[str]|None): Only apply filtering via `types` to this + list of event types. Other types of events are returned unfiltered. + If None, `types` filtering is applied to all events. Returns: A Deferred map from ((type, state_key)->Event) @@ -463,7 +469,9 @@ class SyncHandler(object): if last_events: last_event = last_events[-1] - state = yield self.get_state_after_event(last_event, types) + state = yield self.get_state_after_event( + last_event, types, filtered_types=filtered_types + ) else: # no events in this room - so presumably no state @@ -499,6 +507,7 @@ class SyncHandler(object): types = None member_state_ids = {} lazy_load_members = sync_config.filter_collection.lazy_load_members() + filtered_types = None if lazy_load_members: # We only request state for the members needed to display the @@ -516,29 +525,25 @@ class SyncHandler(object): # to be done based on event_id, and we don't have the member # event ids until we've pulled them out of the DB. - if not types: - # an optimisation to stop needlessly trying to calculate - # member_state_ids - # - # XXX: i can't remember what this trying to do. why would - # types ever be []? --matthew - lazy_load_members = False - - types.append((None, None)) # don't just filter to room members + # only apply the filtering to room members + filtered_types = [EventTypes.Member] if full_state: if batch: current_state_ids = yield self.store.get_state_ids_for_event( - batch.events[-1].event_id, types=types + batch.events[-1].event_id, types=types, + filtered_types=filtered_types ) state_ids = yield self.store.get_state_ids_for_event( - batch.events[0].event_id, types=types + batch.events[0].event_id, types=types, + filtered_types=filtered_types ) else: current_state_ids = yield self.get_state_at( - room_id, stream_position=now_token, types=types + room_id, stream_position=now_token, types=types, + filtered_types=filtered_types ) state_ids = current_state_ids @@ -563,15 +568,18 @@ class SyncHandler(object): ) elif batch.limited: state_at_previous_sync = yield self.get_state_at( - room_id, stream_position=since_token, types=types + room_id, stream_position=since_token, types=types, + filtered_types=filtered_types ) current_state_ids = yield self.store.get_state_ids_for_event( - batch.events[-1].event_id, types=types + batch.events[-1].event_id, types=types, + filtered_types=filtered_types ) state_at_timeline_start = yield self.store.get_state_ids_for_event( - batch.events[0].event_id, types=types + batch.events[0].event_id, types=types, + filtered_types=filtered_types ) if lazy_load_members: @@ -603,11 +611,10 @@ class SyncHandler(object): # event_ids) at this point. We know we can do it based on mxid as this # is an non-gappy incremental sync. - # strip off the (None, None) and filter to just room members - types = types[:-1] if types: state_ids = yield self.store.get_state_ids_for_event( - batch.events[0].event_id, types=types + batch.events[0].event_id, types=types, + filtered_types=filtered_types ) state = {} diff --git a/synapse/storage/state.py b/synapse/storage/state.py index c5ff44fef7..ee531a2ce0 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -185,7 +185,7 @@ class StateGroupWorkerStore(SQLBaseStore): }) @defer.inlineCallbacks - def _get_state_groups_from_groups(self, groups, types): + def _get_state_groups_from_groups(self, groups, types, filtered_types=None): """Returns the state groups for a given set of groups, filtering on types of state events. @@ -193,9 +193,10 @@ class StateGroupWorkerStore(SQLBaseStore): groups(list[int]): list of state group IDs to query types(list[str|None, str|None])|None: List of 2-tuples of the form (`type`, `state_key`), where a `state_key` of `None` matches all - state_keys for the `type`. Presence of type of `None` indicates - that types not in the list should not be filtered out. If None, - all types are returned. + state_keys for the `type`. If None, all types are returned. + filtered_types(list[str]|None): Only apply filtering via `types` to this + list of event types. Other types of events are returned unfiltered. + If None, `types` filtering is applied to all events. Returns: dictionary state_group -> (dict of (type, state_key) -> event id) @@ -206,26 +207,21 @@ class StateGroupWorkerStore(SQLBaseStore): for chunk in chunks: res = yield self.runInteraction( "_get_state_groups_from_groups", - self._get_state_groups_from_groups_txn, chunk, types, + self._get_state_groups_from_groups_txn, chunk, types, filtered_types ) results.update(res) defer.returnValue(results) - def _get_state_groups_from_groups_txn(self, txn, groups, types=None): + def _get_state_groups_from_groups_txn( + self, txn, groups, types=None, filtered_types=None + ): results = {group: {} for group in groups} - include_other_types = False + include_other_types = False if filtered_types is None else True if types is not None: - type_set = set(types) - if (None, None) in type_set: - # special case (None, None) to mean that other types should be - # returned - i.e. we were just filtering down the state keys - # for particular types. - include_other_types = True - type_set.remove((None, None)) - types = list(type_set) # deduplicate types list + types = list(set(types)) # deduplicate types list if isinstance(self.database_engine, PostgresEngine): # Temporarily disable sequential scans in this transaction. This is @@ -276,7 +272,7 @@ class StateGroupWorkerStore(SQLBaseStore): if include_other_types: # XXX: check whether this slows postgres down like a list of # ORs does too? - unique_types = set([t for (t, _) in types]) + unique_types = set(filtered_types) clause_to_args.append( ( "AND type <> ? " * len(unique_types), @@ -313,7 +309,7 @@ class StateGroupWorkerStore(SQLBaseStore): where_args.extend([typ[0], typ[1]]) if include_other_types: - unique_types = set([t for (t, _) in types]) + unique_types = set(filtered_types) where_clauses.append( "(" + " AND ".join(["type <> ?"] * len(unique_types)) + ")" ) @@ -373,18 +369,20 @@ class StateGroupWorkerStore(SQLBaseStore): return results @defer.inlineCallbacks - def get_state_for_events(self, event_ids, types): + def get_state_for_events(self, event_ids, types, filtered_types): """Given a list of event_ids and type tuples, return a list of state dicts for each event. The state dicts will only have the type/state_keys that are in the `types` list. Args: event_ids (list[string]) - types (list[(str|None, str|None)]|None): List of (type, state_key) tuples + types (list[(str, str|None)]|None): List of (type, state_key) tuples which are used to filter the state fetched. If `state_key` is None, - all events are returned of the given type. Presence of type of `None` - indicates that types not in the list should not be filtered out. + all events are returned of the given type. May be None, which matches any key. + filtered_types(list[str]|None): Only apply filtering via `types` to this + list of event types. Other types of events are returned unfiltered. + If None, `types` filtering is applied to all events. Returns: deferred: A list of dicts corresponding to the event_ids given. @@ -395,7 +393,7 @@ class StateGroupWorkerStore(SQLBaseStore): ) groups = set(itervalues(event_to_groups)) - group_to_state = yield self._get_state_for_groups(groups, types) + group_to_state = yield self._get_state_for_groups(groups, types, filtered_types) state_event_map = yield self.get_events( [ev_id for sd in itervalues(group_to_state) for ev_id in itervalues(sd)], @@ -414,17 +412,19 @@ class StateGroupWorkerStore(SQLBaseStore): defer.returnValue({event: event_to_state[event] for event in event_ids}) @defer.inlineCallbacks - def get_state_ids_for_events(self, event_ids, types=None): + def get_state_ids_for_events(self, event_ids, types=None, filtered_types=None): """ Get the state dicts corresponding to a list of events Args: event_ids(list(str)): events whose state should be returned - types(list[(str|None, str|None)]|None): List of (type, state_key) tuples + types(list[(str, str|None)]|None): List of (type, state_key) tuples which are used to filter the state fetched. If `state_key` is None, - all events are returned of the given type. Presence of type of `None` - indicates that types not in the list should not be filtered out. + all events are returned of the given type. May be None, which matches any key. + filtered_types(list[str]|None): Only apply filtering via `types` to this + list of event types. Other types of events are returned unfiltered. + If None, `types` filtering is applied to all events. Returns: A deferred dict from event_id -> (type, state_key) -> state_event @@ -434,7 +434,7 @@ class StateGroupWorkerStore(SQLBaseStore): ) groups = set(itervalues(event_to_groups)) - group_to_state = yield self._get_state_for_groups(groups, types) + group_to_state = yield self._get_state_for_groups(groups, types, filtered_types) event_to_state = { event_id: group_to_state[group] @@ -444,41 +444,45 @@ class StateGroupWorkerStore(SQLBaseStore): defer.returnValue({event: event_to_state[event] for event in event_ids}) @defer.inlineCallbacks - def get_state_for_event(self, event_id, types=None): + def get_state_for_event(self, event_id, types=None, filtered_types=None): """ Get the state dict corresponding to a particular event Args: event_id(str): event whose state should be returned - types(list[(str|None, str|None)]|None): List of (type, state_key) tuples + types(list[(str, str|None)]|None): List of (type, state_key) tuples which are used to filter the state fetched. If `state_key` is None, - all events are returned of the given type. Presence of type of `None` - indicates that types not in the list should not be filtered out. + all events are returned of the given type. May be None, which matches any key. + filtered_types(list[str]|None): Only apply filtering via `types` to this + list of event types. Other types of events are returned unfiltered. + If None, `types` filtering is applied to all events. Returns: A deferred dict from (type, state_key) -> state_event """ - state_map = yield self.get_state_for_events([event_id], types) + state_map = yield self.get_state_for_events([event_id], types, filtered_types) defer.returnValue(state_map[event_id]) @defer.inlineCallbacks - def get_state_ids_for_event(self, event_id, types=None): + def get_state_ids_for_event(self, event_id, types=None, filtered_types=None): """ Get the state dict corresponding to a particular event Args: event_id(str): event whose state should be returned - types(list[(str|None, str|None)]|None): List of (type, state_key) tuples + types(list[(str, str|None)]|None): List of (type, state_key) tuples which are used to filter the state fetched. If `state_key` is None, - all events are returned of the given type. Presence of type of `None` - indicates that types not in the list should not be filtered out. + all events are returned of the given type. May be None, which matches any key. + filtered_types(list[str]|None): Only apply filtering via `types` to this + list of event types. Other types of events are returned unfiltered. + If None, `types` filtering is applied to all events. Returns: A deferred dict from (type, state_key) -> state_event """ - state_map = yield self.get_state_ids_for_events([event_id], types) + state_map = yield self.get_state_ids_for_events([event_id], types, filtered_types) defer.returnValue(state_map[event_id]) @cached(max_entries=50000) @@ -509,7 +513,7 @@ class StateGroupWorkerStore(SQLBaseStore): defer.returnValue({row["event_id"]: row["state_group"] for row in rows}) - def _get_some_state_from_cache(self, group, types): + def _get_some_state_from_cache(self, group, types, filtered_types=None): """Checks if group is in cache. See `_get_state_for_groups` Returns 3-tuple (`state_dict`, `missing_types`, `got_all`). @@ -520,29 +524,30 @@ class StateGroupWorkerStore(SQLBaseStore): Args: group(int): The state group to lookup - types(list[str|None, str|None]): List of 2-tuples of the form + types(list[str, str|None]): List of 2-tuples of the form (`type`, `state_key`), where a `state_key` of `None` matches all - state_keys for the `type`. Presence of type of `None` indicates - that types not in the list should not be filtered out. + state_keys for the `type`. + filtered_types(list[str]|None): Only apply filtering via `types` to this + list of event types. Other types of events are returned unfiltered. + If None, `types` filtering is applied to all events. """ is_all, known_absent, state_dict_ids = self._state_group_cache.get(group) type_to_key = {} + + # tracks which of the requested types are missing from our cache missing_types = set() - include_other_types = False + include_other_types = True if filtered_types is None else False for typ, state_key in types: key = (typ, state_key) - if typ is None: - include_other_types = True - next - if state_key is None: type_to_key[typ] = None # XXX: why do we mark the type as missing from our cache just # because we weren't filtering on a specific value of state_key? + # is it because the cache doesn't handle wildcards? missing_types.add(key) else: if type_to_key.get(typ, object()) is not None: @@ -556,7 +561,7 @@ class StateGroupWorkerStore(SQLBaseStore): def include(typ, state_key): valid_state_keys = type_to_key.get(typ, sentinel) if valid_state_keys is sentinel: - return include_other_types + return include_other_types and typ not in filtered_types if valid_state_keys is None: return True if state_key in valid_state_keys: @@ -585,21 +590,23 @@ class StateGroupWorkerStore(SQLBaseStore): return state_dict_ids, is_all @defer.inlineCallbacks - def _get_state_for_groups(self, groups, types=None): + def _get_state_for_groups(self, groups, types=None, filtered_types=None): """Gets the state at each of a list of state groups, optionally filtering by type/state_key Args: groups (iterable[int]): list of state groups for which we want to get the state. - types (None|iterable[(None|str, None|str)]): + types (None|iterable[(None, None|str)]): indicates the state type/keys required. If None, the whole state is fetched and returned. Otherwise, each entry should be a `(type, state_key)` tuple to include in the response. A `state_key` of None is a wildcard - meaning that we require all state with that type. A `type` of None - indicates that types not in the list should not be filtered out. + meaning that we require all state with that type. + filtered_types(list[str]|None): Only apply filtering via `types` to this + list of event types. Other types of events are returned unfiltered. + If None, `types` filtering is applied to all events. Returns: Deferred[dict[int, dict[(type, state_key), EventBase]]] @@ -612,7 +619,7 @@ class StateGroupWorkerStore(SQLBaseStore): if types is not None: for group in set(groups): state_dict_ids, _, got_all = self._get_some_state_from_cache( - group, types, + group, types, filtered_types ) results[group] = state_dict_ids @@ -645,7 +652,7 @@ class StateGroupWorkerStore(SQLBaseStore): types_to_fetch = types group_to_state_dict = yield self._get_state_groups_from_groups( - missing_groups, types_to_fetch, + missing_groups, types_to_fetch, filtered_types ) for group, group_state_dict in iteritems(group_to_state_dict): -- cgit 1.5.1 From bcaec2915ac74937171e27d507b8f9c0e39d3677 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Thu, 19 Jul 2018 19:03:50 +0100 Subject: incorporate review --- synapse/handlers/sync.py | 44 +++++++++++++++++++++++++++----------------- synapse/storage/state.py | 7 ++++--- 2 files changed, 31 insertions(+), 20 deletions(-) (limited to 'synapse/storage/state.py') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index cb711b8758..b597f94cf6 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -435,7 +435,7 @@ class SyncHandler(object): A Deferred map from ((type, state_key)->Event) """ state_ids = yield self.store.get_state_ids_for_event( - event.event_id, types, filtered_types=filtered_types + event.event_id, types, filtered_types=filtered_types, ) if event.is_state(): state_ids = state_ids.copy() @@ -470,7 +470,7 @@ class SyncHandler(object): if last_events: last_event = last_events[-1] state = yield self.get_state_after_event( - last_event, types, filtered_types=filtered_types + last_event, types, filtered_types=filtered_types, ) else: @@ -505,7 +505,6 @@ class SyncHandler(object): with Measure(self.clock, "compute_state_delta"): types = None - member_state_ids = {} lazy_load_members = sync_config.filter_collection.lazy_load_members() filtered_types = None @@ -521,10 +520,6 @@ class SyncHandler(object): ) ] - # We can't remove redundant member types at this stage as it has - # to be done based on event_id, and we don't have the member - # event ids until we've pulled them out of the DB. - # only apply the filtering to room members filtered_types = [EventTypes.Member] @@ -532,27 +527,32 @@ class SyncHandler(object): if batch: current_state_ids = yield self.store.get_state_ids_for_event( batch.events[-1].event_id, types=types, - filtered_types=filtered_types + filtered_types=filtered_types, ) state_ids = yield self.store.get_state_ids_for_event( batch.events[0].event_id, types=types, - filtered_types=filtered_types + filtered_types=filtered_types, ) else: current_state_ids = yield self.get_state_at( room_id, stream_position=now_token, types=types, - filtered_types=filtered_types + filtered_types=filtered_types, ) state_ids = current_state_ids + # track the membership state events as of the beginning of this + # timeline sequence, so they can be filtered out of the state + # if we are lazy loading members. if lazy_load_members: member_state_ids = { t: state_ids[t] for t in state_ids if t[0] == EventTypes.Member } + else: + member_state_ids = {} timeline_state = { (event.type, event.state_key): event.event_id @@ -569,28 +569,38 @@ class SyncHandler(object): elif batch.limited: state_at_previous_sync = yield self.get_state_at( room_id, stream_position=since_token, types=types, - filtered_types=filtered_types + filtered_types=filtered_types, ) current_state_ids = yield self.store.get_state_ids_for_event( batch.events[-1].event_id, types=types, - filtered_types=filtered_types + filtered_types=filtered_types, ) state_at_timeline_start = yield self.store.get_state_ids_for_event( batch.events[0].event_id, types=types, - filtered_types=filtered_types + filtered_types=filtered_types, ) + # track the membership state events as of the beginning of this + # timeline sequence, so they can be filtered out of the state + # if we are lazy loading members. if lazy_load_members: - # TODO: filter out redundant members based on their event_ids - # (not mxids) at this point. In practice, limited syncs are + # TODO: optionally filter out redundant membership events at this + # point, to stop repeatedly sending members in every /sync as if + # the client isn't tracking them. + # When implement, this should filter using event_ids (not mxids). + # In practice, limited syncs are # relatively rare so it's not a total disaster to send redundant - # members down at this point. + # members down at this point. Redundant members are ones which + # repeatedly get sent down /sync because we don't know if the client + # is caching them or not. member_state_ids = { t: state_at_timeline_start[t] for t in state_at_timeline_start if t[0] == EventTypes.Member } + else: + member_state_ids = {} timeline_state = { (event.type, event.state_key): event.event_id @@ -614,7 +624,7 @@ class SyncHandler(object): if types: state_ids = yield self.store.get_state_ids_for_event( batch.events[0].event_id, types=types, - filtered_types=filtered_types + filtered_types=filtered_types, ) state = {} diff --git a/synapse/storage/state.py b/synapse/storage/state.py index ee531a2ce0..75c6366e7a 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -545,9 +545,10 @@ class StateGroupWorkerStore(SQLBaseStore): if state_key is None: type_to_key[typ] = None - # XXX: why do we mark the type as missing from our cache just - # because we weren't filtering on a specific value of state_key? - # is it because the cache doesn't handle wildcards? + # we mark the type as missing from the cache because + # when the cache was populated it might have been done with a + # restricted set of state_keys, so the wildcard will not work + # and the cache may be incomplete. missing_types.add(key) else: if type_to_key.get(typ, object()) is not None: -- cgit 1.5.1 From 2f558300cc648e633342746dc7b42a36fcb6b32e Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Thu, 19 Jul 2018 19:22:27 +0100 Subject: fix thinkos; unbreak tests --- synapse/storage/state.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage/state.py') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 75c6366e7a..f09be7172d 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -369,7 +369,7 @@ class StateGroupWorkerStore(SQLBaseStore): return results @defer.inlineCallbacks - def get_state_for_events(self, event_ids, types, filtered_types): + def get_state_for_events(self, event_ids, types, filtered_types=None): """Given a list of event_ids and type tuples, return a list of state dicts for each event. The state dicts will only have the type/state_keys that are in the `types` list. @@ -538,7 +538,7 @@ class StateGroupWorkerStore(SQLBaseStore): # tracks which of the requested types are missing from our cache missing_types = set() - include_other_types = True if filtered_types is None else False + include_other_types = False if filtered_types is None else True for typ, state_key in types: key = (typ, state_key) -- cgit 1.5.1 From 254fb430d1662c93c56c2abbd6984e07fb04c36b Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 23 Jul 2018 19:21:20 +0100 Subject: incorporate review --- synapse/handlers/sync.py | 67 +++++++++++++++++++----------------------------- synapse/storage/state.py | 20 ++++++--------- 2 files changed, 35 insertions(+), 52 deletions(-) (limited to 'synapse/storage/state.py') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index b597f94cf6..5689ad2f58 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -543,17 +543,6 @@ class SyncHandler(object): state_ids = current_state_ids - # track the membership state events as of the beginning of this - # timeline sequence, so they can be filtered out of the state - # if we are lazy loading members. - if lazy_load_members: - member_state_ids = { - t: state_ids[t] - for t in state_ids if t[0] == EventTypes.Member - } - else: - member_state_ids = {} - timeline_state = { (event.type, event.state_key): event.event_id for event in batch.events if event.is_state() @@ -562,9 +551,9 @@ class SyncHandler(object): state_ids = _calculate_state( timeline_contains=timeline_state, timeline_start=state_ids, - timeline_start_members=member_state_ids, previous={}, current=current_state_ids, + lazy_load_members=lazy_load_members, ) elif batch.limited: state_at_previous_sync = yield self.get_state_at( @@ -582,37 +571,27 @@ class SyncHandler(object): filtered_types=filtered_types, ) - # track the membership state events as of the beginning of this - # timeline sequence, so they can be filtered out of the state - # if we are lazy loading members. - if lazy_load_members: - # TODO: optionally filter out redundant membership events at this - # point, to stop repeatedly sending members in every /sync as if - # the client isn't tracking them. - # When implement, this should filter using event_ids (not mxids). - # In practice, limited syncs are - # relatively rare so it's not a total disaster to send redundant - # members down at this point. Redundant members are ones which - # repeatedly get sent down /sync because we don't know if the client - # is caching them or not. - member_state_ids = { - t: state_at_timeline_start[t] - for t in state_at_timeline_start if t[0] == EventTypes.Member - } - else: - member_state_ids = {} - timeline_state = { (event.type, event.state_key): event.event_id for event in batch.events if event.is_state() } + # TODO: optionally filter out redundant membership events at this + # point, to stop repeatedly sending members in every /sync as if + # the client isn't tracking them. + # When implemented, this should filter using event_ids (not mxids). + # In practice, limited syncs are + # relatively rare so it's not a total disaster to send redundant + # members down at this point. Redundant members are ones which + # repeatedly get sent down /sync because we don't know if the client + # is caching them or not. + state_ids = _calculate_state( timeline_contains=timeline_state, timeline_start=state_at_timeline_start, - timeline_start_members=member_state_ids, previous=state_at_previous_sync, current=current_state_ids, + lazy_load_members=lazy_load_members, ) else: state_ids = {} @@ -1536,16 +1515,14 @@ def _action_has_highlight(actions): return False -def _calculate_state(timeline_contains, timeline_start, timeline_start_members, - previous, current): +def _calculate_state( + timeline_contains, timeline_start, previous, current, lazy_load_members, +): """Works out what state to include in a sync response. Args: timeline_contains (dict): state in the timeline timeline_start (dict): state at the start of the timeline - timeline_start_members (dict): state at the start of the timeline - for room members who participate in this chunk of timeline. - Should always be a subset of timeline_start. previous (dict): state at the end of the previous sync (or empty dict if this is an initial sync) current (dict): state at the end of the timeline @@ -1565,11 +1542,21 @@ def _calculate_state(timeline_contains, timeline_start, timeline_start_members, c_ids = set(e for e in current.values()) ts_ids = set(e for e in timeline_start.values()) - tsm_ids = set(e for e in timeline_start_members.values()) p_ids = set(e for e in previous.values()) tc_ids = set(e for e in timeline_contains.values()) - state_ids = (((c_ids | ts_ids) - p_ids) - tc_ids) | tsm_ids + # track the membership events in the state as of the start of the timeline + # so we can add them back in to the state if we're lazyloading. We don't + # add them into state if they're already contained in the timeline. + if lazy_load_members: + ll_ids = set( + e for t, e in timeline_start.iteritems() + if t[0] == EventTypes.Member and e not in tc_ids + ) + else: + ll_ids = set() + + state_ids = (((c_ids | ts_ids) - p_ids) - tc_ids) | ll_ids return { event_id_to_key[e]: e for e in state_ids diff --git a/synapse/storage/state.py b/synapse/storage/state.py index f09be7172d..40ca8bd2a2 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -191,10 +191,10 @@ class StateGroupWorkerStore(SQLBaseStore): Args: groups(list[int]): list of state group IDs to query - types(list[str|None, str|None])|None: List of 2-tuples of the form + types (Iterable[str, str|None]|None): list of 2-tuples of the form (`type`, `state_key`), where a `state_key` of `None` matches all state_keys for the `type`. If None, all types are returned. - filtered_types(list[str]|None): Only apply filtering via `types` to this + filtered_types(Iterable[str]|None): Only apply filtering via `types` to this list of event types. Other types of events are returned unfiltered. If None, `types` filtering is applied to all events. @@ -207,19 +207,17 @@ class StateGroupWorkerStore(SQLBaseStore): for chunk in chunks: res = yield self.runInteraction( "_get_state_groups_from_groups", - self._get_state_groups_from_groups_txn, chunk, types, filtered_types + self._get_state_groups_from_groups_txn, chunk, types, filtered_types, ) results.update(res) defer.returnValue(results) def _get_state_groups_from_groups_txn( - self, txn, groups, types=None, filtered_types=None + self, txn, groups, types=None, filtered_types=None, ): results = {group: {} for group in groups} - include_other_types = False if filtered_types is None else True - if types is not None: types = list(set(types)) # deduplicate types list @@ -269,7 +267,7 @@ class StateGroupWorkerStore(SQLBaseStore): for etype, state_key in types ] - if include_other_types: + if filtered_types is not None: # XXX: check whether this slows postgres down like a list of # ORs does too? unique_types = set(filtered_types) @@ -308,7 +306,7 @@ class StateGroupWorkerStore(SQLBaseStore): where_clauses.append("(type = ? AND state_key = ?)") where_args.extend([typ[0], typ[1]]) - if include_other_types: + if filtered_types is not None: unique_types = set(filtered_types) where_clauses.append( "(" + " AND ".join(["type <> ?"] * len(unique_types)) + ")" @@ -538,8 +536,6 @@ class StateGroupWorkerStore(SQLBaseStore): # tracks which of the requested types are missing from our cache missing_types = set() - include_other_types = False if filtered_types is None else True - for typ, state_key in types: key = (typ, state_key) @@ -562,7 +558,7 @@ class StateGroupWorkerStore(SQLBaseStore): def include(typ, state_key): valid_state_keys = type_to_key.get(typ, sentinel) if valid_state_keys is sentinel: - return include_other_types and typ not in filtered_types + return filtered_types is not None and typ not in filtered_types if valid_state_keys is None: return True if state_key in valid_state_keys: @@ -598,7 +594,7 @@ class StateGroupWorkerStore(SQLBaseStore): Args: groups (iterable[int]): list of state groups for which we want to get the state. - types (None|iterable[(None, None|str)]): + types (None|iterable[(str, None|str)]): indicates the state type/keys required. If None, the whole state is fetched and returned. -- cgit 1.5.1 From efcdacad7d1b7f52f879179701c7e0d9b763511f Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 23 Jul 2018 22:41:05 +0100 Subject: handle case where types is [] on postgres correctly --- synapse/storage/state.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage/state.py') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 40ca8bd2a2..f99d3871e4 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -255,7 +255,7 @@ class StateGroupWorkerStore(SQLBaseStore): # Turns out that postgres doesn't like doing a list of OR's and # is about 1000x slower, so we just issue a query for each specific # type seperately. - if types: + if types is not None: clause_to_args = [ ( "AND type = ? AND state_key = ?", -- cgit 1.5.1 From cd241d6bda01a761fbe1ca29727dacd918fb8975 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 24 Jul 2018 12:39:40 +0100 Subject: incorporate more review --- synapse/handlers/sync.py | 12 +++++++++--- synapse/storage/state.py | 36 +++++++++--------------------------- tests/storage/test_state.py | 9 +++++++++ 3 files changed, 27 insertions(+), 30 deletions(-) (limited to 'synapse/storage/state.py') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 5689ad2f58..e5a2329d73 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1526,6 +1526,9 @@ def _calculate_state( previous (dict): state at the end of the previous sync (or empty dict if this is an initial sync) current (dict): state at the end of the timeline + lazy_load_members (bool): whether to return members from timeline_start + or not. assumes that timeline_start has already been filtered to + include only the members the client needs to know about. Returns: dict @@ -1545,9 +1548,12 @@ def _calculate_state( p_ids = set(e for e in previous.values()) tc_ids = set(e for e in timeline_contains.values()) - # track the membership events in the state as of the start of the timeline - # so we can add them back in to the state if we're lazyloading. We don't - # add them into state if they're already contained in the timeline. + # If we are lazyloading room members, we explicitly add the membership events + # for the senders in the timeline into the state block returned by /sync, + # as we may not have sent them to the client before. We find these membership + # events by filtering them out of timeline_start, which has already been filtered + # to only include membership events for the senders in the timeline. + if lazy_load_members: ll_ids = set( e for t, e in timeline_start.iteritems() diff --git a/synapse/storage/state.py b/synapse/storage/state.py index f99d3871e4..1413a6f910 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -185,7 +185,7 @@ class StateGroupWorkerStore(SQLBaseStore): }) @defer.inlineCallbacks - def _get_state_groups_from_groups(self, groups, types, filtered_types=None): + def _get_state_groups_from_groups(self, groups, types): """Returns the state groups for a given set of groups, filtering on types of state events. @@ -194,9 +194,6 @@ class StateGroupWorkerStore(SQLBaseStore): types (Iterable[str, str|None]|None): list of 2-tuples of the form (`type`, `state_key`), where a `state_key` of `None` matches all state_keys for the `type`. If None, all types are returned. - filtered_types(Iterable[str]|None): Only apply filtering via `types` to this - list of event types. Other types of events are returned unfiltered. - If None, `types` filtering is applied to all events. Returns: dictionary state_group -> (dict of (type, state_key) -> event id) @@ -207,14 +204,14 @@ class StateGroupWorkerStore(SQLBaseStore): for chunk in chunks: res = yield self.runInteraction( "_get_state_groups_from_groups", - self._get_state_groups_from_groups_txn, chunk, types, filtered_types, + self._get_state_groups_from_groups_txn, chunk, types, ) results.update(res) defer.returnValue(results) def _get_state_groups_from_groups_txn( - self, txn, groups, types=None, filtered_types=None, + self, txn, groups, types=None, ): results = {group: {} for group in groups} @@ -266,17 +263,6 @@ class StateGroupWorkerStore(SQLBaseStore): ) for etype, state_key in types ] - - if filtered_types is not None: - # XXX: check whether this slows postgres down like a list of - # ORs does too? - unique_types = set(filtered_types) - clause_to_args.append( - ( - "AND type <> ? " * len(unique_types), - list(unique_types) - ) - ) else: # If types is None we fetch all the state, and so just use an # empty where clause with no extra args. @@ -306,13 +292,6 @@ class StateGroupWorkerStore(SQLBaseStore): where_clauses.append("(type = ? AND state_key = ?)") where_args.extend([typ[0], typ[1]]) - if filtered_types is not None: - unique_types = set(filtered_types) - where_clauses.append( - "(" + " AND ".join(["type <> ?"] * len(unique_types)) + ")" - ) - where_args.extend(list(unique_types)) - where_clause = "AND (%s)" % (" OR ".join(where_clauses)) else: where_clause = "" @@ -643,13 +622,13 @@ class StateGroupWorkerStore(SQLBaseStore): # cache. Hence, if we are doing a wildcard lookup, populate the # cache fully so that we can do an efficient lookup next time. - if types and any(k is None for (t, k) in types): + if filtered_types or (types and any(k is None for (t, k) in types)): types_to_fetch = None else: types_to_fetch = types group_to_state_dict = yield self._get_state_groups_from_groups( - missing_groups, types_to_fetch, filtered_types + missing_groups, types_to_fetch ) for group, group_state_dict in iteritems(group_to_state_dict): @@ -659,7 +638,10 @@ class StateGroupWorkerStore(SQLBaseStore): if types: for k, v in iteritems(group_state_dict): (typ, _) = k - if k in types or (typ, None) in types: + if ( + (k in types or (typ, None) in types) or + (filtered_types and typ not in filtered_types) + ): state_dict[k] = v else: state_dict.update(group_state_dict) diff --git a/tests/storage/test_state.py b/tests/storage/test_state.py index 8924ba9f7f..b2f314e9db 100644 --- a/tests/storage/test_state.py +++ b/tests/storage/test_state.py @@ -158,3 +158,12 @@ class StateStoreTestCase(tests.unittest.TestCase): (e2.type, e2.state_key): e2, (e3.type, e3.state_key): e3, }, state) + + state = yield self.store.get_state_for_event( + e5.event_id, [], filtered_types=[EventTypes.Member], + ) + + self.assertStateMapEqual({ + (e1.type, e1.state_key): e1, + (e2.type, e2.state_key): e2, + }, state) -- cgit 1.5.1 From e22700c3dd929f9f0953b3d2b37e503eece82b38 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 24 Jul 2018 13:59:07 +0100 Subject: consider non-filter_type types as wildcards, thus missing from the state-group-cache --- synapse/storage/state.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'synapse/storage/state.py') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 1413a6f910..86f2c2e6b1 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -518,7 +518,10 @@ class StateGroupWorkerStore(SQLBaseStore): for typ, state_key in types: key = (typ, state_key) - if state_key is None: + if ( + state_key is None or + filtered_types is not None and typ not in filtered_types + ): type_to_key[typ] = None # we mark the type as missing from the cache because # when the cache was populated it might have been done with a -- cgit 1.5.1 From cb5c37a57c387a74f6079008870cf024e674dfe5 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 24 Jul 2018 20:34:45 +0100 Subject: handle the edge case for _get_some_state_from_cache where types is [] --- synapse/storage/state.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) (limited to 'synapse/storage/state.py') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 86f2c2e6b1..989977c644 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -547,7 +547,13 @@ class StateGroupWorkerStore(SQLBaseStore): return True return False - got_all = is_all or not missing_types + if types == [] and filtered_types is not None: + # special wildcard case for empty type-list but an explicit filtered_types + # which means that we'll try to return all types which aren't in the + # filtered_types list. missing_types will always be empty, so we ignore it. + got_all = is_all + else: + got_all = is_all or not missing_types return { k: v for k, v in iteritems(state_dict_ids) -- cgit 1.5.1 From 7d9fb88617f475ac7e064c5cccc8d78dbd78d2a3 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Wed, 25 Jul 2018 16:15:33 +0100 Subject: incorporate more review. --- synapse/storage/state.py | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) (limited to 'synapse/storage/state.py') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 989977c644..e38427bf9d 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -493,12 +493,6 @@ class StateGroupWorkerStore(SQLBaseStore): def _get_some_state_from_cache(self, group, types, filtered_types=None): """Checks if group is in cache. See `_get_state_for_groups` - Returns 3-tuple (`state_dict`, `missing_types`, `got_all`). - `missing_types` is the list of types that aren't in the cache for that - group. `got_all` is a bool indicating if we successfully retrieved all - requests state from the cache, if False we need to query the DB for the - missing state. - Args: group(int): The state group to lookup types(list[str, str|None]): List of 2-tuples of the form @@ -507,6 +501,11 @@ class StateGroupWorkerStore(SQLBaseStore): filtered_types(list[str]|None): Only apply filtering via `types` to this list of event types. Other types of events are returned unfiltered. If None, `types` filtering is applied to all events. + + Returns 2-tuple (`state_dict`, `got_all`). + `got_all` is a bool indicating if we successfully retrieved all + requests state from the cache, if False we need to query the DB for the + missing state. """ is_all, known_absent, state_dict_ids = self._state_group_cache.get(group) @@ -520,7 +519,7 @@ class StateGroupWorkerStore(SQLBaseStore): if ( state_key is None or - filtered_types is not None and typ not in filtered_types + (filtered_types is not None and typ not in filtered_types) ): type_to_key[typ] = None # we mark the type as missing from the cache because @@ -547,18 +546,17 @@ class StateGroupWorkerStore(SQLBaseStore): return True return False - if types == [] and filtered_types is not None: - # special wildcard case for empty type-list but an explicit filtered_types - # which means that we'll try to return all types which aren't in the - # filtered_types list. missing_types will always be empty, so we ignore it. - got_all = is_all - else: - got_all = is_all or not missing_types + got_all = is_all + if not got_all: + # the cache is incomplete. We may still have got all the results we need, if + # we don't have any wildcards in the match list. + if not missing_types and filtered_types is None: + got_all = True return { k: v for k, v in iteritems(state_dict_ids) if include(k[0], k[1]) - }, missing_types, got_all + }, got_all def _get_all_state_from_cache(self, group): """Checks if group is in cache. See `_get_state_for_groups` @@ -603,7 +601,7 @@ class StateGroupWorkerStore(SQLBaseStore): missing_groups = [] if types is not None: for group in set(groups): - state_dict_ids, _, got_all = self._get_some_state_from_cache( + state_dict_ids, got_all = self._get_some_state_from_cache( group, types, filtered_types ) results[group] = state_dict_ids -- cgit 1.5.1 From bc7944e6d2ea0076badd0eba414e1ba7020eb1e6 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Wed, 25 Jul 2018 23:36:31 +0100 Subject: switch missing_types to be a bool --- synapse/storage/state.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'synapse/storage/state.py') diff --git a/synapse/storage/state.py b/synapse/storage/state.py index e38427bf9d..b27b3ae144 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -511,8 +511,8 @@ class StateGroupWorkerStore(SQLBaseStore): type_to_key = {} - # tracks which of the requested types are missing from our cache - missing_types = set() + # tracks whether any of ourrequested types are missing from the cache + missing_types = False for typ, state_key in types: key = (typ, state_key) @@ -526,13 +526,13 @@ class StateGroupWorkerStore(SQLBaseStore): # when the cache was populated it might have been done with a # restricted set of state_keys, so the wildcard will not work # and the cache may be incomplete. - missing_types.add(key) + missing_types = True else: if type_to_key.get(typ, object()) is not None: type_to_key.setdefault(typ, set()).add(state_key) if key not in state_dict_ids and key not in known_absent: - missing_types.add(key) + missing_types = True sentinel = object() -- cgit 1.5.1