summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorMatthew Hodgson <matthew@arasphere.net>2018-07-26 22:51:30 +0100
committerGitHub <noreply@github.com>2018-07-26 22:51:30 +0100
commita75231b507e025eaaa4f06d8932c04fa4e942d48 (patch)
tree1299b63f9624cc9b5d8c74dfb7653c0123e80618 /synapse
parentMerge pull request #3613 from matrix-org/rav/stop_using_event_edges_room_id (diff)
downloadsynapse-a75231b507e025eaaa4f06d8932c04fa4e942d48.tar.xz
Deduplicate redundant lazy-loaded members (#3331)
* attempt at deduplicating lazy-loaded members

as per the proposal; we can deduplicate redundant lazy-loaded members
which are sent in the same sync sequence. we do this heuristically
rather than requiring the client to somehow tell us which members it
has chosen to cache, by instead caching the last N members sent to
a client, and not sending them again.  For now we hardcode N to 100.
Each cache for a given (user,device) tuple is in turn cached for up to
X minutes (to avoid the caches building up).  For now we hardcode X to 30.

* add include_redundant_members filter option & make it work

* remove stale todo

* add tests for _get_some_state_from_cache

* incorporate review
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/filtering.py9
-rw-r--r--synapse/handlers/sync.py87
2 files changed, 71 insertions, 25 deletions
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index 7e767b9bf5..186831e118 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -117,6 +117,9 @@ ROOM_EVENT_FILTER_SCHEMA = {
         "lazy_load_members": {
             "type": "boolean"
         },
+        "include_redundant_members": {
+            "type": "boolean"
+        },
     }
 }
 
@@ -267,6 +270,9 @@ class FilterCollection(object):
     def lazy_load_members(self):
         return self._room_state_filter.lazy_load_members()
 
+    def include_redundant_members(self):
+        return self._room_state_filter.include_redundant_members()
+
     def filter_presence(self, events):
         return self._presence_filter.filter(events)
 
@@ -426,6 +432,9 @@ class Filter(object):
     def lazy_load_members(self):
         return self.filter_json.get("lazy_load_members", False)
 
+    def include_redundant_members(self):
+        return self.filter_json.get("include_redundant_members", False)
+
 
 def _matches_wildcard(actual_value, filter_value):
     if filter_value.endswith("*"):
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 4ced3144c8..dff1f67dcb 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -26,6 +26,8 @@ from synapse.api.constants import EventTypes, Membership
 from synapse.push.clientformat import format_push_rules_for_user
 from synapse.types import RoomStreamToken
 from synapse.util.async import concurrently_execute
+from synapse.util.caches.expiringcache import ExpiringCache
+from synapse.util.caches.lrucache import LruCache
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.util.logcontext import LoggingContext
 from synapse.util.metrics import Measure, measure_func
@@ -33,6 +35,14 @@ from synapse.visibility import filter_events_for_client
 
 logger = logging.getLogger(__name__)
 
+# Store the cache that tracks which lazy-loaded members have been sent to a given
+# client for no more than 30 minutes.
+LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
+
+# Remember the last 100 members we sent to a client for the purposes of
+# avoiding redundantly sending the same lazy-loaded members to the client
+LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
+
 
 SyncConfig = collections.namedtuple("SyncConfig", [
     "user",
@@ -182,6 +192,12 @@ class SyncHandler(object):
         self.response_cache = ResponseCache(hs, "sync")
         self.state = hs.get_state_handler()
 
+        # ExpiringCache((User, Device)) -> LruCache(state_key => event_id)
+        self.lazy_loaded_members_cache = ExpiringCache(
+            "lazy_loaded_members_cache", self.clock,
+            max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
+        )
+
     def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
                                full_state=False):
         """Get the sync for a client if we have new data for it now. Otherwise
@@ -505,9 +521,13 @@ class SyncHandler(object):
         with Measure(self.clock, "compute_state_delta"):
 
             types = None
-            lazy_load_members = sync_config.filter_collection.lazy_load_members()
             filtered_types = None
 
+            lazy_load_members = sync_config.filter_collection.lazy_load_members()
+            include_redundant_members = (
+                sync_config.filter_collection.include_redundant_members()
+            )
+
             if lazy_load_members:
                 # We only request state for the members needed to display the
                 # timeline:
@@ -523,6 +543,11 @@ class SyncHandler(object):
                 # only apply the filtering to room members
                 filtered_types = [EventTypes.Member]
 
+            timeline_state = {
+                (event.type, event.state_key): event.event_id
+                for event in batch.events if event.is_state()
+            }
+
             if full_state:
                 if batch:
                     current_state_ids = yield self.store.get_state_ids_for_event(
@@ -543,11 +568,6 @@ class SyncHandler(object):
 
                     state_ids = current_state_ids
 
-                timeline_state = {
-                    (event.type, event.state_key): event.event_id
-                    for event in batch.events if event.is_state()
-                }
-
                 state_ids = _calculate_state(
                     timeline_contains=timeline_state,
                     timeline_start=state_ids,
@@ -571,21 +591,6 @@ class SyncHandler(object):
                     filtered_types=filtered_types,
                 )
 
-                timeline_state = {
-                    (event.type, event.state_key): event.event_id
-                    for event in batch.events if event.is_state()
-                }
-
-                # TODO: optionally filter out redundant membership events at this
-                # point, to stop repeatedly sending members in every /sync as if
-                # the client isn't tracking them.
-                # When implemented, this should filter using event_ids (not mxids).
-                # In practice, limited syncs are
-                # relatively rare so it's not a total disaster to send redundant
-                # members down at this point. Redundant members are ones which
-                # repeatedly get sent down /sync because we don't know if the client
-                # is caching them or not.
-
                 state_ids = _calculate_state(
                     timeline_contains=timeline_state,
                     timeline_start=state_at_timeline_start,
@@ -596,16 +601,48 @@ class SyncHandler(object):
             else:
                 state_ids = {}
                 if lazy_load_members:
-                    # TODO: filter out redundant members based on their mxids (not their
-                    # event_ids) at this point. We know we can do it based on mxid as this
-                    # is an non-gappy incremental sync.
-
                     if types:
                         state_ids = yield self.store.get_state_ids_for_event(
                             batch.events[0].event_id, types=types,
                             filtered_types=filtered_types,
                         )
 
+            if lazy_load_members and not include_redundant_members:
+                cache_key = (sync_config.user.to_string(), sync_config.device_id)
+                cache = self.lazy_loaded_members_cache.get(cache_key)
+                if cache is None:
+                    logger.debug("creating LruCache for %r", cache_key)
+                    cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE)
+                    self.lazy_loaded_members_cache[cache_key] = cache
+                else:
+                    logger.debug("found LruCache for %r", cache_key)
+
+                # if it's a new sync sequence, then assume the client has had
+                # amnesia and doesn't want any recent lazy-loaded members
+                # de-duplicated.
+                if since_token is None:
+                    logger.debug("clearing LruCache for %r", cache_key)
+                    cache.clear()
+                else:
+                    # only send members which aren't in our LruCache (either
+                    # because they're new to this client or have been pushed out
+                    # of the cache)
+                    logger.debug("filtering state from %r...", state_ids)
+                    state_ids = {
+                        t: event_id
+                        for t, event_id in state_ids.iteritems()
+                        if cache.get(t[1]) != event_id
+                    }
+                    logger.debug("...to %r", state_ids)
+
+                # add any member IDs we are about to send into our LruCache
+                for t, event_id in itertools.chain(
+                    state_ids.items(),
+                    timeline_state.items(),
+                ):
+                    if t[0] == EventTypes.Member:
+                        cache.set(t[1], event_id)
+
         state = {}
         if state_ids:
             state = yield self.store.get_events(list(state_ids.values()))