diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/api/filtering.py | 9 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 87 |
2 files changed, 71 insertions, 25 deletions
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index 7e767b9bf5..186831e118 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -117,6 +117,9 @@ ROOM_EVENT_FILTER_SCHEMA = { "lazy_load_members": { "type": "boolean" }, + "include_redundant_members": { + "type": "boolean" + }, } } @@ -267,6 +270,9 @@ class FilterCollection(object): def lazy_load_members(self): return self._room_state_filter.lazy_load_members() + def include_redundant_members(self): + return self._room_state_filter.include_redundant_members() + def filter_presence(self, events): return self._presence_filter.filter(events) @@ -426,6 +432,9 @@ class Filter(object): def lazy_load_members(self): return self.filter_json.get("lazy_load_members", False) + def include_redundant_members(self): + return self.filter_json.get("include_redundant_members", False) + def _matches_wildcard(actual_value, filter_value): if filter_value.endswith("*"): diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 4ced3144c8..dff1f67dcb 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -26,6 +26,8 @@ from synapse.api.constants import EventTypes, Membership from synapse.push.clientformat import format_push_rules_for_user from synapse.types import RoomStreamToken from synapse.util.async import concurrently_execute +from synapse.util.caches.expiringcache import ExpiringCache +from synapse.util.caches.lrucache import LruCache from synapse.util.caches.response_cache import ResponseCache from synapse.util.logcontext import LoggingContext from synapse.util.metrics import Measure, measure_func @@ -33,6 +35,14 @@ from synapse.visibility import filter_events_for_client logger = logging.getLogger(__name__) +# Store the cache that tracks which lazy-loaded members have been sent to a given +# client for no more than 30 minutes. +LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000 + +# Remember the last 100 members we sent to a client for the purposes of +# avoiding redundantly sending the same lazy-loaded members to the client +LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100 + SyncConfig = collections.namedtuple("SyncConfig", [ "user", @@ -182,6 +192,12 @@ class SyncHandler(object): self.response_cache = ResponseCache(hs, "sync") self.state = hs.get_state_handler() + # ExpiringCache((User, Device)) -> LruCache(state_key => event_id) + self.lazy_loaded_members_cache = ExpiringCache( + "lazy_loaded_members_cache", self.clock, + max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE, + ) + def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, full_state=False): """Get the sync for a client if we have new data for it now. Otherwise @@ -505,9 +521,13 @@ class SyncHandler(object): with Measure(self.clock, "compute_state_delta"): types = None - lazy_load_members = sync_config.filter_collection.lazy_load_members() filtered_types = None + lazy_load_members = sync_config.filter_collection.lazy_load_members() + include_redundant_members = ( + sync_config.filter_collection.include_redundant_members() + ) + if lazy_load_members: # We only request state for the members needed to display the # timeline: @@ -523,6 +543,11 @@ class SyncHandler(object): # only apply the filtering to room members filtered_types = [EventTypes.Member] + timeline_state = { + (event.type, event.state_key): event.event_id + for event in batch.events if event.is_state() + } + if full_state: if batch: current_state_ids = yield self.store.get_state_ids_for_event( @@ -543,11 +568,6 @@ class SyncHandler(object): state_ids = current_state_ids - timeline_state = { - (event.type, event.state_key): event.event_id - for event in batch.events if event.is_state() - } - state_ids = _calculate_state( timeline_contains=timeline_state, timeline_start=state_ids, @@ -571,21 +591,6 @@ class SyncHandler(object): filtered_types=filtered_types, ) - timeline_state = { - (event.type, event.state_key): event.event_id - for event in batch.events if event.is_state() - } - - # TODO: optionally filter out redundant membership events at this - # point, to stop repeatedly sending members in every /sync as if - # the client isn't tracking them. - # When implemented, this should filter using event_ids (not mxids). - # In practice, limited syncs are - # relatively rare so it's not a total disaster to send redundant - # members down at this point. Redundant members are ones which - # repeatedly get sent down /sync because we don't know if the client - # is caching them or not. - state_ids = _calculate_state( timeline_contains=timeline_state, timeline_start=state_at_timeline_start, @@ -596,16 +601,48 @@ class SyncHandler(object): else: state_ids = {} if lazy_load_members: - # TODO: filter out redundant members based on their mxids (not their - # event_ids) at this point. We know we can do it based on mxid as this - # is an non-gappy incremental sync. - if types: state_ids = yield self.store.get_state_ids_for_event( batch.events[0].event_id, types=types, filtered_types=filtered_types, ) + if lazy_load_members and not include_redundant_members: + cache_key = (sync_config.user.to_string(), sync_config.device_id) + cache = self.lazy_loaded_members_cache.get(cache_key) + if cache is None: + logger.debug("creating LruCache for %r", cache_key) + cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE) + self.lazy_loaded_members_cache[cache_key] = cache + else: + logger.debug("found LruCache for %r", cache_key) + + # if it's a new sync sequence, then assume the client has had + # amnesia and doesn't want any recent lazy-loaded members + # de-duplicated. + if since_token is None: + logger.debug("clearing LruCache for %r", cache_key) + cache.clear() + else: + # only send members which aren't in our LruCache (either + # because they're new to this client or have been pushed out + # of the cache) + logger.debug("filtering state from %r...", state_ids) + state_ids = { + t: event_id + for t, event_id in state_ids.iteritems() + if cache.get(t[1]) != event_id + } + logger.debug("...to %r", state_ids) + + # add any member IDs we are about to send into our LruCache + for t, event_id in itertools.chain( + state_ids.items(), + timeline_state.items(), + ): + if t[0] == EventTypes.Member: + cache.set(t[1], event_id) + state = {} if state_ids: state = yield self.store.get_events(list(state_ids.values())) |