diff options
-rw-r--r-- | synapse/handlers/sync.py | 43 | ||||
-rw-r--r-- | synapse/storage/state.py | 47 |
2 files changed, 73 insertions, 17 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 0f713ce038..809e9fece9 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -399,7 +399,7 @@ class SyncHandler(object): )) @defer.inlineCallbacks - def get_state_after_event(self, event): + def get_state_after_event(self, event, types=None): """ Get the room state after the given event @@ -409,14 +409,14 @@ class SyncHandler(object): Returns: A Deferred map from ((type, state_key)->Event) """ - state_ids = yield self.store.get_state_ids_for_event(event.event_id) + state_ids = yield self.store.get_state_ids_for_event(event.event_id, types) if event.is_state(): state_ids = state_ids.copy() state_ids[(event.type, event.state_key)] = event.event_id defer.returnValue(state_ids) @defer.inlineCallbacks - def get_state_at(self, room_id, stream_position): + def get_state_at(self, room_id, stream_position, types=None): """ Get the room state at a particular stream position Args: @@ -432,7 +432,7 @@ class SyncHandler(object): if last_events: last_event = last_events[-1] - state = yield self.get_state_after_event(last_event) + state = yield self.get_state_after_event(last_event, types) else: # no events in this room - so presumably no state @@ -441,7 +441,7 @@ class SyncHandler(object): @defer.inlineCallbacks def compute_state_delta(self, room_id, batch, sync_config, since_token, now_token, - full_state): + full_state, filter_members): """ Works out the differnce in state between the start of the timeline and the previous sync. @@ -454,6 +454,8 @@ class SyncHandler(object): be None. now_token(str): Token of the end of the current batch. full_state(bool): Whether to force returning the full state. + filter_members(bool): Whether to only return state for members + referenced in this timeline segment Returns: A deferred new event dictionary @@ -464,18 +466,35 @@ class SyncHandler(object): # TODO(mjark) Check for new redactions in the state events. with Measure(self.clock, "compute_state_delta"): + + types = None + if filter_members: + # We only request state for the members needed to display the + # timeline: + types = ( + (EventTypes.Member, state_key) + for state_key in set( + event.sender # FIXME: we also care about targets etc. + for event in batch.events + ) + ) + types.append((None, None)) # don't just filter to room members + + # TODO: we should opportunistically deduplicate these members too + # within the same sync series (based on an in-memory cache) + if full_state: if batch: current_state_ids = yield self.store.get_state_ids_for_event( - batch.events[-1].event_id + batch.events[-1].event_id, types=types ) state_ids = yield self.store.get_state_ids_for_event( - batch.events[0].event_id + batch.events[0].event_id, types=types ) else: current_state_ids = yield self.get_state_at( - room_id, stream_position=now_token + room_id, stream_position=now_token, types=types ) state_ids = current_state_ids @@ -493,15 +512,15 @@ class SyncHandler(object): ) elif batch.limited: state_at_previous_sync = yield self.get_state_at( - room_id, stream_position=since_token + room_id, stream_position=since_token, types=types ) current_state_ids = yield self.store.get_state_ids_for_event( - batch.events[-1].event_id + batch.events[-1].event_id, types=types ) state_at_timeline_start = yield self.store.get_state_ids_for_event( - batch.events[0].event_id + batch.events[0].event_id, types=types ) timeline_state = { @@ -1325,7 +1344,7 @@ class SyncHandler(object): state = yield self.compute_state_delta( room_id, batch, sync_config, since_token, now_token, - full_state=full_state + full_state=full_state, filter_members=True ) if room_builder.rtype == "joined": diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 2b325e1c1f..da6bb685fa 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -198,8 +198,15 @@ class StateGroupWorkerStore(SQLBaseStore): def _get_state_groups_from_groups_txn(self, txn, groups, types=None): results = {group: {} for group in groups} + + include_other_types = False + if types is not None: - types = list(set(types)) # deduplicate types list + type_set = set(types) + if (None, None) in type_set: + include_other_types = True + type_set.remove((None, None)) + types = list(type_set) # deduplicate types list if isinstance(self.database_engine, PostgresEngine): # Temporarily disable sequential scans in this transaction. This is @@ -238,11 +245,21 @@ class StateGroupWorkerStore(SQLBaseStore): if types: clause_to_args = [ ( - "AND type = ? AND state_key = ?", - (etype, state_key) + "AND type = ? AND state_key = ?" if state_key is not None else "AND type = ?", + (etype, state_key) if state_key is not None else (etype) ) for etype, state_key in types ] + + if include_other_types: + # XXX: check whether this slows postgres down like a list of + # ORs does too? + clause_to_args.append( + ( + "AND type <> ? " * len(types), + [t for (t, _) in types] + ) + ) else: # If types is None we fetch all the state, and so just use an # empty where clause with no extra args. @@ -263,6 +280,10 @@ class StateGroupWorkerStore(SQLBaseStore): where_clause = "AND (%s)" % ( " OR ".join(["(type = ? AND state_key = ?)"] * len(types)), ) + if include_other_types: + where_clause += " AND (%s)" % ( + " AND ".join(["type <> ?"] * len(types)), + ) else: where_clause = "" @@ -449,17 +470,27 @@ class StateGroupWorkerStore(SQLBaseStore): group: The state group to lookup types (list): List of 2-tuples of the form (`type`, `state_key`), where a `state_key` of `None` matches all state_keys for the - `type`. + `type`. Presence of type of `None` indicates that types not + in the list should not be filtered out. """ is_all, known_absent, state_dict_ids = self._state_group_cache.get(group) type_to_key = {} missing_types = set() + include_other_types = False + for typ, state_key in types: key = (typ, state_key) + + if typ is None: + include_other_types = True + next + if state_key is None: type_to_key[typ] = None + # XXX: why do we mark the type as missing from our cache just + # because we weren't filtering on a specific value of state_key? missing_types.add(key) else: if type_to_key.get(typ, object()) is not None: @@ -478,7 +509,7 @@ class StateGroupWorkerStore(SQLBaseStore): return True if state_key in valid_state_keys: return True - return False + return include_other_types got_all = is_all or not missing_types @@ -507,6 +538,12 @@ class StateGroupWorkerStore(SQLBaseStore): with matching types. `types` is a list of `(type, state_key)`, where a `state_key` of None matches all state_keys. If `types` is None then all events are returned. + + XXX: is it really true that `state_key` of None in `types` matches all + state_keys? it looks like _get-some_state_from_cache does the right thing, + but _get_state_groups_from_groups_txn treats ths None is turned into + 'AND state_key = NULL' or similar (at least until i just fixed it) --Matthew + I've filed this as https://github.com/matrix-org/synapse/issues/2969 """ if types: types = frozenset(types) |