diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index db43219d24..7158dd75e9 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -260,6 +260,9 @@ class FilterCollection(object):
def ephemeral_limit(self):
return self._room_ephemeral_filter.limit()
+ def lazy_load_members(self):
+ return self._room_state_filter.lazy_load_members()
+
def filter_presence(self, events):
return self._presence_filter.filter(events)
@@ -416,6 +419,9 @@ class Filter(object):
def limit(self):
return self.filter_json.get("limit", 10)
+ def lazy_load_members(self):
+ return self.filter_json.get("lazy_load_members", False)
+
def _matches_wildcard(actual_value, filter_value):
if filter_value.endswith("*"):
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index d0c99c35e3..05bf6d46dd 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
-# Copyright 2015 - 2016 OpenMarket Ltd
+# Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -416,7 +417,7 @@ class SyncHandler(object):
))
@defer.inlineCallbacks
- def get_state_after_event(self, event):
+ def get_state_after_event(self, event, types=None):
"""
Get the room state after the given event
@@ -426,14 +427,14 @@ class SyncHandler(object):
Returns:
A Deferred map from ((type, state_key)->Event)
"""
- state_ids = yield self.store.get_state_ids_for_event(event.event_id)
+ state_ids = yield self.store.get_state_ids_for_event(event.event_id, types)
if event.is_state():
state_ids = state_ids.copy()
state_ids[(event.type, event.state_key)] = event.event_id
defer.returnValue(state_ids)
@defer.inlineCallbacks
- def get_state_at(self, room_id, stream_position):
+ def get_state_at(self, room_id, stream_position, types=None):
""" Get the room state at a particular stream position
Args:
@@ -449,7 +450,7 @@ class SyncHandler(object):
if last_events:
last_event = last_events[-1]
- state = yield self.get_state_after_event(last_event)
+ state = yield self.get_state_after_event(last_event, types)
else:
# no events in this room - so presumably no state
@@ -471,6 +472,8 @@ class SyncHandler(object):
be None.
now_token(str): Token of the end of the current batch.
full_state(bool): Whether to force returning the full state.
+ lazy_load_members(bool): Whether to only return state for members
+ referenced in this timeline segment
Returns:
A deferred new event dictionary
@@ -481,22 +484,57 @@ class SyncHandler(object):
# TODO(mjark) Check for new redactions in the state events.
with Measure(self.clock, "compute_state_delta"):
+
+ types = None
+ member_state_ids = {}
+ lazy_load_members = sync_config.filter_collection.lazy_load_members()
+
+ if lazy_load_members:
+ # We only request state for the members needed to display the
+ # timeline:
+
+ types = [
+ (EventTypes.Member, state_key)
+ for state_key in set(
+ event.sender # FIXME: we also care about targets etc.
+ for event in batch.events
+ )
+ ]
+
+ # We can't remove redundant member types at this stage as it has
+ # to be done based on event_id, and we don't have the member
+ # event ids until we've pulled them out of the DB.
+
+ if not types:
+ # an optimisation to stop needlessly trying to calculate
+ # member_state_ids
+ lazy_load_members = False
+
+ types.append((None, None)) # don't just filter to room members
+
if full_state:
if batch:
current_state_ids = yield self.store.get_state_ids_for_event(
- batch.events[-1].event_id
+ batch.events[-1].event_id, types=types
)
state_ids = yield self.store.get_state_ids_for_event(
- batch.events[0].event_id
+ batch.events[0].event_id, types=types
)
+
else:
current_state_ids = yield self.get_state_at(
- room_id, stream_position=now_token
+ room_id, stream_position=now_token, types=types
)
state_ids = current_state_ids
+ if lazy_load_members:
+ member_state_ids = {
+ t: state_ids[t]
+ for t in state_ids if t[0] == EventTypes.Member
+ }
+
timeline_state = {
(event.type, event.state_key): event.event_id
for event in batch.events if event.is_state()
@@ -505,22 +543,29 @@ class SyncHandler(object):
state_ids = _calculate_state(
timeline_contains=timeline_state,
timeline_start=state_ids,
+ timeline_start_members=member_state_ids,
previous={},
current=current_state_ids,
)
elif batch.limited:
state_at_previous_sync = yield self.get_state_at(
- room_id, stream_position=since_token
+ room_id, stream_position=since_token, types=types
)
current_state_ids = yield self.store.get_state_ids_for_event(
- batch.events[-1].event_id
+ batch.events[-1].event_id, types=types
)
state_at_timeline_start = yield self.store.get_state_ids_for_event(
- batch.events[0].event_id
+ batch.events[0].event_id, types=types
)
+ if lazy_load_members:
+ member_state_ids = {
+ t: state_at_timeline_start[t]
+ for t in state_at_timeline_start if t[0] == EventTypes.Member
+ }
+
timeline_state = {
(event.type, event.state_key): event.event_id
for event in batch.events if event.is_state()
@@ -529,11 +574,23 @@ class SyncHandler(object):
state_ids = _calculate_state(
timeline_contains=timeline_state,
timeline_start=state_at_timeline_start,
+ timeline_start_members=member_state_ids,
previous=state_at_previous_sync,
current=current_state_ids,
)
else:
state_ids = {}
+ if lazy_load_members:
+ # TODO: filter out redundant members based on their mxids (not their
+ # event_ids) at this point. We know we can do it based on mxid as this
+ # is an non-gappy incremental sync.
+
+ # strip off the (None, None) and filter to just room members
+ types = types[:-1]
+ if types:
+ state_ids = yield self.store.get_state_ids_for_event(
+ batch.events[0].event_id, types=types
+ )
state = {}
if state_ids:
@@ -1341,8 +1398,7 @@ class SyncHandler(object):
return
state = yield self.compute_state_delta(
- room_id, batch, sync_config, since_token, now_token,
- full_state=full_state
+ room_id, batch, sync_config, since_token, now_token, full_state=full_state
)
if room_builder.rtype == "joined":
@@ -1438,12 +1494,16 @@ def _action_has_highlight(actions):
return False
-def _calculate_state(timeline_contains, timeline_start, previous, current):
+def _calculate_state(timeline_contains, timeline_start, timeline_start_members,
+ previous, current):
"""Works out what state to include in a sync response.
Args:
timeline_contains (dict): state in the timeline
timeline_start (dict): state at the start of the timeline
+ timeline_start_members (dict): state at the start of the timeline
+ for room members who participate in this chunk of timeline.
+ Should always be a subset of timeline_start.
previous (dict): state at the end of the previous sync (or empty dict
if this is an initial sync)
current (dict): state at the end of the timeline
@@ -1462,11 +1522,12 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
}
c_ids = set(e for e in current.values())
- tc_ids = set(e for e in timeline_contains.values())
- p_ids = set(e for e in previous.values())
ts_ids = set(e for e in timeline_start.values())
+ tsm_ids = set(e for e in timeline_start_members.values())
+ p_ids = set(e for e in previous.values())
+ tc_ids = set(e for e in timeline_contains.values())
- state_ids = ((c_ids | ts_ids) - p_ids) - tc_ids
+ state_ids = (((c_ids | ts_ids) - p_ids) - tc_ids) | tsm_ids
return {
event_id_to_key[e]: e for e in state_ids
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index ffa4246031..55159e64d0 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -198,8 +198,15 @@ class StateGroupWorkerStore(SQLBaseStore):
def _get_state_groups_from_groups_txn(self, txn, groups, types=None):
results = {group: {} for group in groups}
+
+ include_other_types = False
+
if types is not None:
- types = list(set(types)) # deduplicate types list
+ type_set = set(types)
+ if (None, None) in type_set:
+ include_other_types = True
+ type_set.remove((None, None))
+ types = list(type_set) # deduplicate types list
if isinstance(self.database_engine, PostgresEngine):
# Temporarily disable sequential scans in this transaction. This is
@@ -246,6 +253,17 @@ class StateGroupWorkerStore(SQLBaseStore):
)
for etype, state_key in types
]
+
+ if include_other_types:
+ # XXX: check whether this slows postgres down like a list of
+ # ORs does too?
+ unique_types = set([t for (t, _) in types])
+ clause_to_args.append(
+ (
+ "AND type <> ? " * len(unique_types),
+ list(unique_types)
+ )
+ )
else:
# If types is None we fetch all the state, and so just use an
# empty where clause with no extra args.
@@ -274,6 +292,14 @@ class StateGroupWorkerStore(SQLBaseStore):
else:
where_clauses.append("(type = ? AND state_key = ?)")
where_args.extend([typ[0], typ[1]])
+
+ if include_other_types:
+ unique_types = set([t for (t, _) in types])
+ where_clauses.append(
+ "(" + " AND ".join(["type <> ?"] * len(unique_types)) + ")"
+ )
+ where_args.extend(list(unique_types))
+
where_clause = "AND (%s)" % (" OR ".join(where_clauses))
else:
where_clause = ""
@@ -469,17 +495,27 @@ class StateGroupWorkerStore(SQLBaseStore):
group: The state group to lookup
types (list): List of 2-tuples of the form (`type`, `state_key`),
where a `state_key` of `None` matches all state_keys for the
- `type`.
+ `type`. Presence of type of `None` indicates that types not
+ in the list should not be filtered out.
"""
is_all, known_absent, state_dict_ids = self._state_group_cache.get(group)
type_to_key = {}
missing_types = set()
+ include_other_types = False
+
for typ, state_key in types:
key = (typ, state_key)
+
+ if typ is None:
+ include_other_types = True
+ next
+
if state_key is None:
type_to_key[typ] = None
+ # XXX: why do we mark the type as missing from our cache just
+ # because we weren't filtering on a specific value of state_key?
missing_types.add(key)
else:
if type_to_key.get(typ, object()) is not None:
@@ -493,7 +529,7 @@ class StateGroupWorkerStore(SQLBaseStore):
def include(typ, state_key):
valid_state_keys = type_to_key.get(typ, sentinel)
if valid_state_keys is sentinel:
- return False
+ return include_other_types
if valid_state_keys is None:
return True
if state_key in valid_state_keys:
|