diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index c24e35362a..dff1f67dcb 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
-# Copyright 2015 - 2016 OpenMarket Ltd
+# Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -25,6 +26,8 @@ from synapse.api.constants import EventTypes, Membership
from synapse.push.clientformat import format_push_rules_for_user
from synapse.types import RoomStreamToken
from synapse.util.async import concurrently_execute
+from synapse.util.caches.expiringcache import ExpiringCache
+from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure, measure_func
@@ -32,6 +35,14 @@ from synapse.visibility import filter_events_for_client
logger = logging.getLogger(__name__)
+# Store the cache that tracks which lazy-loaded members have been sent to a given
+# client for no more than 30 minutes.
+LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
+
+# Remember the last 100 members we sent to a client for the purposes of
+# avoiding redundantly sending the same lazy-loaded members to the client
+LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
+
SyncConfig = collections.namedtuple("SyncConfig", [
"user",
@@ -181,6 +192,12 @@ class SyncHandler(object):
self.response_cache = ResponseCache(hs, "sync")
self.state = hs.get_state_handler()
+ # ExpiringCache((User, Device)) -> LruCache(state_key => event_id)
+ self.lazy_loaded_members_cache = ExpiringCache(
+ "lazy_loaded_members_cache", self.clock,
+ max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
+ )
+
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
full_state=False):
"""Get the sync for a client if we have new data for it now. Otherwise
@@ -416,29 +433,44 @@ class SyncHandler(object):
))
@defer.inlineCallbacks
- def get_state_after_event(self, event):
+ def get_state_after_event(self, event, types=None, filtered_types=None):
"""
Get the room state after the given event
Args:
event(synapse.events.EventBase): event of interest
+ types(list[(str, str|None)]|None): List of (type, state_key) tuples
+ which are used to filter the state fetched. If `state_key` is None,
+ all events are returned of the given type.
+ May be None, which matches any key.
+ filtered_types(list[str]|None): Only apply filtering via `types` to this
+ list of event types. Other types of events are returned unfiltered.
+ If None, `types` filtering is applied to all events.
Returns:
A Deferred map from ((type, state_key)->Event)
"""
- state_ids = yield self.store.get_state_ids_for_event(event.event_id)
+ state_ids = yield self.store.get_state_ids_for_event(
+ event.event_id, types, filtered_types=filtered_types,
+ )
if event.is_state():
state_ids = state_ids.copy()
state_ids[(event.type, event.state_key)] = event.event_id
defer.returnValue(state_ids)
@defer.inlineCallbacks
- def get_state_at(self, room_id, stream_position):
+ def get_state_at(self, room_id, stream_position, types=None, filtered_types=None):
""" Get the room state at a particular stream position
Args:
room_id(str): room for which to get state
stream_position(StreamToken): point at which to get state
+ types(list[(str, str|None)]|None): List of (type, state_key) tuples
+ which are used to filter the state fetched. If `state_key` is None,
+ all events are returned of the given type.
+ filtered_types(list[str]|None): Only apply filtering via `types` to this
+ list of event types. Other types of events are returned unfiltered.
+ If None, `types` filtering is applied to all events.
Returns:
A Deferred map from ((type, state_key)->Event)
@@ -453,7 +485,9 @@ class SyncHandler(object):
if last_events:
last_event = last_events[-1]
- state = yield self.get_state_after_event(last_event)
+ state = yield self.get_state_after_event(
+ last_event, types, filtered_types=filtered_types,
+ )
else:
# no events in this room - so presumably no state
@@ -485,59 +519,129 @@ class SyncHandler(object):
# TODO(mjark) Check for new redactions in the state events.
with Measure(self.clock, "compute_state_delta"):
+
+ types = None
+ filtered_types = None
+
+ lazy_load_members = sync_config.filter_collection.lazy_load_members()
+ include_redundant_members = (
+ sync_config.filter_collection.include_redundant_members()
+ )
+
+ if lazy_load_members:
+ # We only request state for the members needed to display the
+ # timeline:
+
+ types = [
+ (EventTypes.Member, state_key)
+ for state_key in set(
+ event.sender # FIXME: we also care about invite targets etc.
+ for event in batch.events
+ )
+ ]
+
+ # only apply the filtering to room members
+ filtered_types = [EventTypes.Member]
+
+ timeline_state = {
+ (event.type, event.state_key): event.event_id
+ for event in batch.events if event.is_state()
+ }
+
if full_state:
if batch:
current_state_ids = yield self.store.get_state_ids_for_event(
- batch.events[-1].event_id
+ batch.events[-1].event_id, types=types,
+ filtered_types=filtered_types,
)
state_ids = yield self.store.get_state_ids_for_event(
- batch.events[0].event_id
+ batch.events[0].event_id, types=types,
+ filtered_types=filtered_types,
)
+
else:
current_state_ids = yield self.get_state_at(
- room_id, stream_position=now_token
+ room_id, stream_position=now_token, types=types,
+ filtered_types=filtered_types,
)
state_ids = current_state_ids
- timeline_state = {
- (event.type, event.state_key): event.event_id
- for event in batch.events if event.is_state()
- }
-
state_ids = _calculate_state(
timeline_contains=timeline_state,
timeline_start=state_ids,
previous={},
current=current_state_ids,
+ lazy_load_members=lazy_load_members,
)
elif batch.limited:
state_at_previous_sync = yield self.get_state_at(
- room_id, stream_position=since_token
+ room_id, stream_position=since_token, types=types,
+ filtered_types=filtered_types,
)
current_state_ids = yield self.store.get_state_ids_for_event(
- batch.events[-1].event_id
+ batch.events[-1].event_id, types=types,
+ filtered_types=filtered_types,
)
state_at_timeline_start = yield self.store.get_state_ids_for_event(
- batch.events[0].event_id
+ batch.events[0].event_id, types=types,
+ filtered_types=filtered_types,
)
- timeline_state = {
- (event.type, event.state_key): event.event_id
- for event in batch.events if event.is_state()
- }
-
state_ids = _calculate_state(
timeline_contains=timeline_state,
timeline_start=state_at_timeline_start,
previous=state_at_previous_sync,
current=current_state_ids,
+ lazy_load_members=lazy_load_members,
)
else:
state_ids = {}
+ if lazy_load_members:
+ if types:
+ state_ids = yield self.store.get_state_ids_for_event(
+ batch.events[0].event_id, types=types,
+ filtered_types=filtered_types,
+ )
+
+ if lazy_load_members and not include_redundant_members:
+ cache_key = (sync_config.user.to_string(), sync_config.device_id)
+ cache = self.lazy_loaded_members_cache.get(cache_key)
+ if cache is None:
+ logger.debug("creating LruCache for %r", cache_key)
+ cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE)
+ self.lazy_loaded_members_cache[cache_key] = cache
+ else:
+ logger.debug("found LruCache for %r", cache_key)
+
+ # if it's a new sync sequence, then assume the client has had
+ # amnesia and doesn't want any recent lazy-loaded members
+ # de-duplicated.
+ if since_token is None:
+ logger.debug("clearing LruCache for %r", cache_key)
+ cache.clear()
+ else:
+ # only send members which aren't in our LruCache (either
+ # because they're new to this client or have been pushed out
+ # of the cache)
+ logger.debug("filtering state from %r...", state_ids)
+ state_ids = {
+ t: event_id
+ for t, event_id in state_ids.iteritems()
+ if cache.get(t[1]) != event_id
+ }
+ logger.debug("...to %r", state_ids)
+
+ # add any member IDs we are about to send into our LruCache
+ for t, event_id in itertools.chain(
+ state_ids.items(),
+ timeline_state.items(),
+ ):
+ if t[0] == EventTypes.Member:
+ cache.set(t[1], event_id)
state = {}
if state_ids:
@@ -1448,7 +1552,9 @@ def _action_has_highlight(actions):
return False
-def _calculate_state(timeline_contains, timeline_start, previous, current):
+def _calculate_state(
+ timeline_contains, timeline_start, previous, current, lazy_load_members,
+):
"""Works out what state to include in a sync response.
Args:
@@ -1457,6 +1563,9 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
previous (dict): state at the end of the previous sync (or empty dict
if this is an initial sync)
current (dict): state at the end of the timeline
+ lazy_load_members (bool): whether to return members from timeline_start
+ or not. assumes that timeline_start has already been filtered to
+ include only the members the client needs to know about.
Returns:
dict
@@ -1472,9 +1581,25 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
}
c_ids = set(e for e in current.values())
- tc_ids = set(e for e in timeline_contains.values())
- p_ids = set(e for e in previous.values())
ts_ids = set(e for e in timeline_start.values())
+ p_ids = set(e for e in previous.values())
+ tc_ids = set(e for e in timeline_contains.values())
+
+ # If we are lazyloading room members, we explicitly add the membership events
+ # for the senders in the timeline into the state block returned by /sync,
+ # as we may not have sent them to the client before. We find these membership
+ # events by filtering them out of timeline_start, which has already been filtered
+ # to only include membership events for the senders in the timeline.
+ # In practice, we can do this by removing them from the p_ids list,
+ # which is the list of relevant state we know we have already sent to the client.
+ # see https://github.com/matrix-org/synapse/pull/2970
+ # /files/efcdacad7d1b7f52f879179701c7e0d9b763511f#r204732809
+
+ if lazy_load_members:
+ p_ids.difference_update(
+ e for t, e in timeline_start.iteritems()
+ if t[0] == EventTypes.Member
+ )
state_ids = ((c_ids | ts_ids) - p_ids) - tc_ids
|