diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 0f713ce038..809e9fece9 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -399,7 +399,7 @@ class SyncHandler(object):
))
@defer.inlineCallbacks
- def get_state_after_event(self, event):
+ def get_state_after_event(self, event, types=None):
"""
Get the room state after the given event
@@ -409,14 +409,14 @@ class SyncHandler(object):
Returns:
A Deferred map from ((type, state_key)->Event)
"""
- state_ids = yield self.store.get_state_ids_for_event(event.event_id)
+ state_ids = yield self.store.get_state_ids_for_event(event.event_id, types)
if event.is_state():
state_ids = state_ids.copy()
state_ids[(event.type, event.state_key)] = event.event_id
defer.returnValue(state_ids)
@defer.inlineCallbacks
- def get_state_at(self, room_id, stream_position):
+ def get_state_at(self, room_id, stream_position, types=None):
""" Get the room state at a particular stream position
Args:
@@ -432,7 +432,7 @@ class SyncHandler(object):
if last_events:
last_event = last_events[-1]
- state = yield self.get_state_after_event(last_event)
+ state = yield self.get_state_after_event(last_event, types)
else:
# no events in this room - so presumably no state
@@ -441,7 +441,7 @@ class SyncHandler(object):
@defer.inlineCallbacks
def compute_state_delta(self, room_id, batch, sync_config, since_token, now_token,
- full_state):
+ full_state, filter_members):
""" Works out the differnce in state between the start of the timeline
and the previous sync.
@@ -454,6 +454,8 @@ class SyncHandler(object):
be None.
now_token(str): Token of the end of the current batch.
full_state(bool): Whether to force returning the full state.
+ filter_members(bool): Whether to only return state for members
+ referenced in this timeline segment
Returns:
A deferred new event dictionary
@@ -464,18 +466,35 @@ class SyncHandler(object):
# TODO(mjark) Check for new redactions in the state events.
with Measure(self.clock, "compute_state_delta"):
+
+ types = None
+ if filter_members:
+ # We only request state for the members needed to display the
+ # timeline:
+ types = (
+ (EventTypes.Member, state_key)
+ for state_key in set(
+ event.sender # FIXME: we also care about targets etc.
+ for event in batch.events
+ )
+ )
+ types.append((None, None)) # don't just filter to room members
+
+ # TODO: we should opportunistically deduplicate these members too
+ # within the same sync series (based on an in-memory cache)
+
if full_state:
if batch:
current_state_ids = yield self.store.get_state_ids_for_event(
- batch.events[-1].event_id
+ batch.events[-1].event_id, types=types
)
state_ids = yield self.store.get_state_ids_for_event(
- batch.events[0].event_id
+ batch.events[0].event_id, types=types
)
else:
current_state_ids = yield self.get_state_at(
- room_id, stream_position=now_token
+ room_id, stream_position=now_token, types=types
)
state_ids = current_state_ids
@@ -493,15 +512,15 @@ class SyncHandler(object):
)
elif batch.limited:
state_at_previous_sync = yield self.get_state_at(
- room_id, stream_position=since_token
+ room_id, stream_position=since_token, types=types
)
current_state_ids = yield self.store.get_state_ids_for_event(
- batch.events[-1].event_id
+ batch.events[-1].event_id, types=types
)
state_at_timeline_start = yield self.store.get_state_ids_for_event(
- batch.events[0].event_id
+ batch.events[0].event_id, types=types
)
timeline_state = {
@@ -1325,7 +1344,7 @@ class SyncHandler(object):
state = yield self.compute_state_delta(
room_id, batch, sync_config, since_token, now_token,
- full_state=full_state
+ full_state=full_state, filter_members=True
)
if room_builder.rtype == "joined":
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 2b325e1c1f..da6bb685fa 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -198,8 +198,15 @@ class StateGroupWorkerStore(SQLBaseStore):
def _get_state_groups_from_groups_txn(self, txn, groups, types=None):
results = {group: {} for group in groups}
+
+ include_other_types = False
+
if types is not None:
- types = list(set(types)) # deduplicate types list
+ type_set = set(types)
+ if (None, None) in type_set:
+ include_other_types = True
+ type_set.remove((None, None))
+ types = list(type_set) # deduplicate types list
if isinstance(self.database_engine, PostgresEngine):
# Temporarily disable sequential scans in this transaction. This is
@@ -238,11 +245,21 @@ class StateGroupWorkerStore(SQLBaseStore):
if types:
clause_to_args = [
(
- "AND type = ? AND state_key = ?",
- (etype, state_key)
+ "AND type = ? AND state_key = ?" if state_key is not None else "AND type = ?",
+ (etype, state_key) if state_key is not None else (etype)
)
for etype, state_key in types
]
+
+ if include_other_types:
+ # XXX: check whether this slows postgres down like a list of
+ # ORs does too?
+ clause_to_args.append(
+ (
+ "AND type <> ? " * len(types),
+ [t for (t, _) in types]
+ )
+ )
else:
# If types is None we fetch all the state, and so just use an
# empty where clause with no extra args.
@@ -263,6 +280,10 @@ class StateGroupWorkerStore(SQLBaseStore):
where_clause = "AND (%s)" % (
" OR ".join(["(type = ? AND state_key = ?)"] * len(types)),
)
+ if include_other_types:
+ where_clause += " AND (%s)" % (
+ " AND ".join(["type <> ?"] * len(types)),
+ )
else:
where_clause = ""
@@ -449,17 +470,27 @@ class StateGroupWorkerStore(SQLBaseStore):
group: The state group to lookup
types (list): List of 2-tuples of the form (`type`, `state_key`),
where a `state_key` of `None` matches all state_keys for the
- `type`.
+ `type`. Presence of type of `None` indicates that types not
+ in the list should not be filtered out.
"""
is_all, known_absent, state_dict_ids = self._state_group_cache.get(group)
type_to_key = {}
missing_types = set()
+ include_other_types = False
+
for typ, state_key in types:
key = (typ, state_key)
+
+ if typ is None:
+ include_other_types = True
+ next
+
if state_key is None:
type_to_key[typ] = None
+ # XXX: why do we mark the type as missing from our cache just
+ # because we weren't filtering on a specific value of state_key?
missing_types.add(key)
else:
if type_to_key.get(typ, object()) is not None:
@@ -478,7 +509,7 @@ class StateGroupWorkerStore(SQLBaseStore):
return True
if state_key in valid_state_keys:
return True
- return False
+ return include_other_types
got_all = is_all or not missing_types
@@ -507,6 +538,12 @@ class StateGroupWorkerStore(SQLBaseStore):
with matching types. `types` is a list of `(type, state_key)`, where
a `state_key` of None matches all state_keys. If `types` is None then
all events are returned.
+
+ XXX: is it really true that `state_key` of None in `types` matches all
+ state_keys? it looks like _get-some_state_from_cache does the right thing,
+ but _get_state_groups_from_groups_txn treats ths None is turned into
+ 'AND state_key = NULL' or similar (at least until i just fixed it) --Matthew
+ I've filed this as https://github.com/matrix-org/synapse/issues/2969
"""
if types:
types = frozenset(types)
|