summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-07-30 09:56:18 +0100
committerErik Johnston <erik@matrix.org>2018-07-30 09:56:18 +0100
commit143f1a2532e4a583c4f4f215334800cb63a4117f (patch)
tree2926318407e42844fcf70211f6596ccab919b184 /synapse/handlers
parentNewsfile (diff)
parentmake /context lazyload & filter aware (#3567) (diff)
downloadsynapse-143f1a2532e4a583c4f4f215334800cb63a4117f.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/split_fed_store
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/federation.py2
-rw-r--r--synapse/handlers/profile.py2
-rw-r--r--synapse/handlers/room.py24
-rw-r--r--synapse/handlers/room_member.py4
-rw-r--r--synapse/handlers/search.py2
-rw-r--r--synapse/handlers/sync.py173
6 files changed, 177 insertions, 30 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 06700d5038..617452be6c 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1233,7 +1233,7 @@ class FederationHandler(BaseHandler):
     @log_function
     def on_make_leave_request(self, room_id, user_id):
         """ We've received a /make_leave/ request, so we create a partial
-        join event for the room and return that. We do *not* persist or
+        leave event for the room and return that. We do *not* persist or
         process it until the other server has signed it and sent it back.
         """
         builder = self.event_builder_factory.new({
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 43692b83a8..cb5c6d587e 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -256,7 +256,7 @@ class ProfileHandler(BaseHandler):
                 )
 
     def _start_update_remote_profile_cache(self):
-        run_as_background_process(
+        return run_as_background_process(
             "Update remote profile", self._update_remote_profile_cache,
         )
 
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 003b848c00..7b7804d9b2 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -15,6 +15,7 @@
 # limitations under the License.
 
 """Contains functions for performing events on rooms."""
+import itertools
 import logging
 import math
 import string
@@ -401,7 +402,7 @@ class RoomContextHandler(object):
         self.store = hs.get_datastore()
 
     @defer.inlineCallbacks
-    def get_event_context(self, user, room_id, event_id, limit):
+    def get_event_context(self, user, room_id, event_id, limit, event_filter):
         """Retrieves events, pagination tokens and state around a given event
         in a room.
 
@@ -411,6 +412,8 @@ class RoomContextHandler(object):
             event_id (str)
             limit (int): The maximum number of events to return in total
                 (excluding state).
+            event_filter (Filter|None): the filter to apply to the events returned
+                (excluding the target event_id)
 
         Returns:
             dict, or None if the event isn't found
@@ -443,7 +446,7 @@ class RoomContextHandler(object):
             )
 
         results = yield self.store.get_events_around(
-            room_id, event_id, before_limit, after_limit
+            room_id, event_id, before_limit, after_limit, event_filter
         )
 
         results["events_before"] = yield filter_evts(results["events_before"])
@@ -455,8 +458,23 @@ class RoomContextHandler(object):
         else:
             last_event_id = event_id
 
+        types = None
+        filtered_types = None
+        if event_filter and event_filter.lazy_load_members():
+            members = set(ev.sender for ev in itertools.chain(
+                results["events_before"],
+                (results["event"],),
+                results["events_after"],
+            ))
+            filtered_types = [EventTypes.Member]
+            types = [(EventTypes.Member, member) for member in members]
+
+        # XXX: why do we return the state as of the last event rather than the
+        # first? Shouldn't we be consistent with /sync?
+        # https://github.com/matrix-org/matrix-doc/issues/687
+
         state = yield self.store.get_state_for_events(
-            [last_event_id], None
+            [last_event_id], types, filtered_types=filtered_types,
         )
         results["state"] = list(state[last_event_id].values())
 
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index a832d91809..0d4a3f4677 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -708,6 +708,10 @@ class RoomMemberHandler(object):
             inviter_display_name = member_event.content.get("displayname", "")
             inviter_avatar_url = member_event.content.get("avatar_url", "")
 
+        # if user has no display name, default to their MXID
+        if not inviter_display_name:
+            inviter_display_name = user.to_string()
+
         canonical_room_alias = ""
         canonical_alias_event = room_state.get((EventTypes.CanonicalAlias, ""))
         if canonical_alias_event:
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 69ae9731d5..c464adbd0b 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -287,7 +287,7 @@ class SearchHandler(BaseHandler):
             contexts = {}
             for event in allowed_events:
                 res = yield self.store.get_events_around(
-                    event.room_id, event.event_id, before_limit, after_limit
+                    event.room_id, event.event_id, before_limit, after_limit,
                 )
 
                 logger.info(
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index c24e35362a..dff1f67dcb 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
-# Copyright 2015 - 2016 OpenMarket Ltd
+# Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -25,6 +26,8 @@ from synapse.api.constants import EventTypes, Membership
 from synapse.push.clientformat import format_push_rules_for_user
 from synapse.types import RoomStreamToken
 from synapse.util.async import concurrently_execute
+from synapse.util.caches.expiringcache import ExpiringCache
+from synapse.util.caches.lrucache import LruCache
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.util.logcontext import LoggingContext
 from synapse.util.metrics import Measure, measure_func
@@ -32,6 +35,14 @@ from synapse.visibility import filter_events_for_client
 
 logger = logging.getLogger(__name__)
 
+# Store the cache that tracks which lazy-loaded members have been sent to a given
+# client for no more than 30 minutes.
+LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
+
+# Remember the last 100 members we sent to a client for the purposes of
+# avoiding redundantly sending the same lazy-loaded members to the client
+LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
+
 
 SyncConfig = collections.namedtuple("SyncConfig", [
     "user",
@@ -181,6 +192,12 @@ class SyncHandler(object):
         self.response_cache = ResponseCache(hs, "sync")
         self.state = hs.get_state_handler()
 
+        # ExpiringCache((User, Device)) -> LruCache(state_key => event_id)
+        self.lazy_loaded_members_cache = ExpiringCache(
+            "lazy_loaded_members_cache", self.clock,
+            max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
+        )
+
     def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
                                full_state=False):
         """Get the sync for a client if we have new data for it now. Otherwise
@@ -416,29 +433,44 @@ class SyncHandler(object):
         ))
 
     @defer.inlineCallbacks
-    def get_state_after_event(self, event):
+    def get_state_after_event(self, event, types=None, filtered_types=None):
         """
         Get the room state after the given event
 
         Args:
             event(synapse.events.EventBase): event of interest
+            types(list[(str, str|None)]|None): List of (type, state_key) tuples
+                which are used to filter the state fetched. If `state_key` is None,
+                all events are returned of the given type.
+                May be None, which matches any key.
+            filtered_types(list[str]|None): Only apply filtering via `types` to this
+                list of event types.  Other types of events are returned unfiltered.
+                If None, `types` filtering is applied to all events.
 
         Returns:
             A Deferred map from ((type, state_key)->Event)
         """
-        state_ids = yield self.store.get_state_ids_for_event(event.event_id)
+        state_ids = yield self.store.get_state_ids_for_event(
+            event.event_id, types, filtered_types=filtered_types,
+        )
         if event.is_state():
             state_ids = state_ids.copy()
             state_ids[(event.type, event.state_key)] = event.event_id
         defer.returnValue(state_ids)
 
     @defer.inlineCallbacks
-    def get_state_at(self, room_id, stream_position):
+    def get_state_at(self, room_id, stream_position, types=None, filtered_types=None):
         """ Get the room state at a particular stream position
 
         Args:
             room_id(str): room for which to get state
             stream_position(StreamToken): point at which to get state
+            types(list[(str, str|None)]|None): List of (type, state_key) tuples
+                which are used to filter the state fetched. If `state_key` is None,
+                all events are returned of the given type.
+            filtered_types(list[str]|None): Only apply filtering via `types` to this
+                list of event types.  Other types of events are returned unfiltered.
+                If None, `types` filtering is applied to all events.
 
         Returns:
             A Deferred map from ((type, state_key)->Event)
@@ -453,7 +485,9 @@ class SyncHandler(object):
 
         if last_events:
             last_event = last_events[-1]
-            state = yield self.get_state_after_event(last_event)
+            state = yield self.get_state_after_event(
+                last_event, types, filtered_types=filtered_types,
+            )
 
         else:
             # no events in this room - so presumably no state
@@ -485,59 +519,129 @@ class SyncHandler(object):
         # TODO(mjark) Check for new redactions in the state events.
 
         with Measure(self.clock, "compute_state_delta"):
+
+            types = None
+            filtered_types = None
+
+            lazy_load_members = sync_config.filter_collection.lazy_load_members()
+            include_redundant_members = (
+                sync_config.filter_collection.include_redundant_members()
+            )
+
+            if lazy_load_members:
+                # We only request state for the members needed to display the
+                # timeline:
+
+                types = [
+                    (EventTypes.Member, state_key)
+                    for state_key in set(
+                        event.sender  # FIXME: we also care about invite targets etc.
+                        for event in batch.events
+                    )
+                ]
+
+                # only apply the filtering to room members
+                filtered_types = [EventTypes.Member]
+
+            timeline_state = {
+                (event.type, event.state_key): event.event_id
+                for event in batch.events if event.is_state()
+            }
+
             if full_state:
                 if batch:
                     current_state_ids = yield self.store.get_state_ids_for_event(
-                        batch.events[-1].event_id
+                        batch.events[-1].event_id, types=types,
+                        filtered_types=filtered_types,
                     )
 
                     state_ids = yield self.store.get_state_ids_for_event(
-                        batch.events[0].event_id
+                        batch.events[0].event_id, types=types,
+                        filtered_types=filtered_types,
                     )
+
                 else:
                     current_state_ids = yield self.get_state_at(
-                        room_id, stream_position=now_token
+                        room_id, stream_position=now_token, types=types,
+                        filtered_types=filtered_types,
                     )
 
                     state_ids = current_state_ids
 
-                timeline_state = {
-                    (event.type, event.state_key): event.event_id
-                    for event in batch.events if event.is_state()
-                }
-
                 state_ids = _calculate_state(
                     timeline_contains=timeline_state,
                     timeline_start=state_ids,
                     previous={},
                     current=current_state_ids,
+                    lazy_load_members=lazy_load_members,
                 )
             elif batch.limited:
                 state_at_previous_sync = yield self.get_state_at(
-                    room_id, stream_position=since_token
+                    room_id, stream_position=since_token, types=types,
+                    filtered_types=filtered_types,
                 )
 
                 current_state_ids = yield self.store.get_state_ids_for_event(
-                    batch.events[-1].event_id
+                    batch.events[-1].event_id, types=types,
+                    filtered_types=filtered_types,
                 )
 
                 state_at_timeline_start = yield self.store.get_state_ids_for_event(
-                    batch.events[0].event_id
+                    batch.events[0].event_id, types=types,
+                    filtered_types=filtered_types,
                 )
 
-                timeline_state = {
-                    (event.type, event.state_key): event.event_id
-                    for event in batch.events if event.is_state()
-                }
-
                 state_ids = _calculate_state(
                     timeline_contains=timeline_state,
                     timeline_start=state_at_timeline_start,
                     previous=state_at_previous_sync,
                     current=current_state_ids,
+                    lazy_load_members=lazy_load_members,
                 )
             else:
                 state_ids = {}
+                if lazy_load_members:
+                    if types:
+                        state_ids = yield self.store.get_state_ids_for_event(
+                            batch.events[0].event_id, types=types,
+                            filtered_types=filtered_types,
+                        )
+
+            if lazy_load_members and not include_redundant_members:
+                cache_key = (sync_config.user.to_string(), sync_config.device_id)
+                cache = self.lazy_loaded_members_cache.get(cache_key)
+                if cache is None:
+                    logger.debug("creating LruCache for %r", cache_key)
+                    cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE)
+                    self.lazy_loaded_members_cache[cache_key] = cache
+                else:
+                    logger.debug("found LruCache for %r", cache_key)
+
+                # if it's a new sync sequence, then assume the client has had
+                # amnesia and doesn't want any recent lazy-loaded members
+                # de-duplicated.
+                if since_token is None:
+                    logger.debug("clearing LruCache for %r", cache_key)
+                    cache.clear()
+                else:
+                    # only send members which aren't in our LruCache (either
+                    # because they're new to this client or have been pushed out
+                    # of the cache)
+                    logger.debug("filtering state from %r...", state_ids)
+                    state_ids = {
+                        t: event_id
+                        for t, event_id in state_ids.iteritems()
+                        if cache.get(t[1]) != event_id
+                    }
+                    logger.debug("...to %r", state_ids)
+
+                # add any member IDs we are about to send into our LruCache
+                for t, event_id in itertools.chain(
+                    state_ids.items(),
+                    timeline_state.items(),
+                ):
+                    if t[0] == EventTypes.Member:
+                        cache.set(t[1], event_id)
 
         state = {}
         if state_ids:
@@ -1448,7 +1552,9 @@ def _action_has_highlight(actions):
     return False
 
 
-def _calculate_state(timeline_contains, timeline_start, previous, current):
+def _calculate_state(
+    timeline_contains, timeline_start, previous, current, lazy_load_members,
+):
     """Works out what state to include in a sync response.
 
     Args:
@@ -1457,6 +1563,9 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
         previous (dict): state at the end of the previous sync (or empty dict
             if this is an initial sync)
         current (dict): state at the end of the timeline
+        lazy_load_members (bool): whether to return members from timeline_start
+            or not.  assumes that timeline_start has already been filtered to
+            include only the members the client needs to know about.
 
     Returns:
         dict
@@ -1472,9 +1581,25 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
     }
 
     c_ids = set(e for e in current.values())
-    tc_ids = set(e for e in timeline_contains.values())
-    p_ids = set(e for e in previous.values())
     ts_ids = set(e for e in timeline_start.values())
+    p_ids = set(e for e in previous.values())
+    tc_ids = set(e for e in timeline_contains.values())
+
+    # If we are lazyloading room members, we explicitly add the membership events
+    # for the senders in the timeline into the state block returned by /sync,
+    # as we may not have sent them to the client before.  We find these membership
+    # events by filtering them out of timeline_start, which has already been filtered
+    # to only include membership events for the senders in the timeline.
+    # In practice, we can do this by removing them from the p_ids list,
+    # which is the list of relevant state we know we have already sent to the client.
+    # see https://github.com/matrix-org/synapse/pull/2970
+    #            /files/efcdacad7d1b7f52f879179701c7e0d9b763511f#r204732809
+
+    if lazy_load_members:
+        p_ids.difference_update(
+            e for t, e in timeline_start.iteritems()
+            if t[0] == EventTypes.Member
+        )
 
     state_ids = ((c_ids | ts_ids) - p_ids) - tc_ids