summary refs log tree commit diff
path: root/synapse/handlers/sync.py
diff options
context:
space:
mode:
authorMatthew Hodgson <matthew@matrix.org>2018-08-25 02:25:38 +0200
committerMatthew Hodgson <matthew@matrix.org>2018-08-25 02:25:38 +0200
commita98eae5835064639dd85055f489ff9d1d43d9769 (patch)
tree79e3183675a5c34198c75989b824766c75936553 /synapse/handlers/sync.py
parentchangelog (diff)
parentMerge pull request #3758 from matrix-org/erikj/admin_contact (diff)
downloadsynapse-a98eae5835064639dd85055f489ff9d1d43d9769.tar.xz
Merge branch 'develop' into matthew/fix_overzealous_ll_state
Diffstat (limited to 'synapse/handlers/sync.py')
-rw-r--r--synapse/handlers/sync.py250
1 files changed, 218 insertions, 32 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 52724fcacc..ee1809019e 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -25,7 +25,9 @@ from twisted.internet import defer
 from synapse.api.constants import EventTypes, Membership
 from synapse.push.clientformat import format_push_rules_for_user
 from synapse.types import RoomStreamToken
-from synapse.util.async import concurrently_execute
+from synapse.util.async_helpers import concurrently_execute
+from synapse.util.caches.expiringcache import ExpiringCache
+from synapse.util.caches.lrucache import LruCache
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.util.logcontext import LoggingContext
 from synapse.util.metrics import Measure, measure_func
@@ -33,6 +35,14 @@ from synapse.visibility import filter_events_for_client
 
 logger = logging.getLogger(__name__)
 
+# Store the cache that tracks which lazy-loaded members have been sent to a given
+# client for no more than 30 minutes.
+LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
+
+# Remember the last 100 members we sent to a client for the purposes of
+# avoiding redundantly sending the same lazy-loaded members to the client
+LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
+
 
 SyncConfig = collections.namedtuple("SyncConfig", [
     "user",
@@ -65,6 +75,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
     "ephemeral",
     "account_data",
     "unread_notifications",
+    "summary",
 ])):
     __slots__ = []
 
@@ -174,6 +185,7 @@ class SyncResult(collections.namedtuple("SyncResult", [
 class SyncHandler(object):
 
     def __init__(self, hs):
+        self.hs_config = hs.config
         self.store = hs.get_datastore()
         self.notifier = hs.get_notifier()
         self.presence_handler = hs.get_presence_handler()
@@ -181,20 +193,35 @@ class SyncHandler(object):
         self.clock = hs.get_clock()
         self.response_cache = ResponseCache(hs, "sync")
         self.state = hs.get_state_handler()
+        self.auth = hs.get_auth()
+
+        # ExpiringCache((User, Device)) -> LruCache(state_key => event_id)
+        self.lazy_loaded_members_cache = ExpiringCache(
+            "lazy_loaded_members_cache", self.clock,
+            max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
+        )
 
+    @defer.inlineCallbacks
     def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
                                full_state=False):
         """Get the sync for a client if we have new data for it now. Otherwise
         wait for new data to arrive on the server. If the timeout expires, then
         return an empty sync result.
         Returns:
-            A Deferred SyncResult.
+            Deferred[SyncResult]
         """
-        return self.response_cache.wrap(
+        # If the user is not part of the mau group, then check that limits have
+        # not been exceeded (if not part of the group by this point, almost certain
+        # auth_blocking will occur)
+        user_id = sync_config.user.to_string()
+        yield self.auth.check_auth_blocking(user_id)
+
+        res = yield self.response_cache.wrap(
             sync_config.request_key,
             self._wait_for_sync_for_user,
             sync_config, since_token, timeout, full_state,
         )
+        defer.returnValue(res)
 
     @defer.inlineCallbacks
     def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
@@ -479,9 +506,141 @@ class SyncHandler(object):
         defer.returnValue(state)
 
     @defer.inlineCallbacks
+    def compute_summary(self, room_id, sync_config, batch, state, now_token):
+        """ Works out a room summary block for this room, summarising the number
+        of joined members in the room, and providing the 'hero' members if the
+        room has no name so clients can consistently name rooms.  Also adds
+        state events to 'state' if needed to describe the heroes.
+
+        Args:
+            room_id(str):
+            sync_config(synapse.handlers.sync.SyncConfig):
+            batch(synapse.handlers.sync.TimelineBatch): The timeline batch for
+                the room that will be sent to the user.
+            state(dict): dict of (type, state_key) -> Event as returned by
+                compute_state_delta
+            now_token(str): Token of the end of the current batch.
+
+        Returns:
+             A deferred dict describing the room summary
+        """
+
+        # FIXME: this promulgates https://github.com/matrix-org/synapse/issues/3305
+        last_events, _ = yield self.store.get_recent_event_ids_for_room(
+            room_id, end_token=now_token.room_key, limit=1,
+        )
+
+        if not last_events:
+            defer.returnValue(None)
+            return
+
+        last_event = last_events[-1]
+        state_ids = yield self.store.get_state_ids_for_event(
+            last_event.event_id, [
+                (EventTypes.Member, None),
+                (EventTypes.Name, ''),
+                (EventTypes.CanonicalAlias, ''),
+            ]
+        )
+
+        member_ids = {
+            state_key: event_id
+            for (t, state_key), event_id in state_ids.iteritems()
+            if t == EventTypes.Member
+        }
+        name_id = state_ids.get((EventTypes.Name, ''))
+        canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, ''))
+
+        summary = {}
+
+        # FIXME: it feels very heavy to load up every single membership event
+        # just to calculate the counts.
+        member_events = yield self.store.get_events(member_ids.values())
+
+        joined_user_ids = []
+        invited_user_ids = []
+
+        for ev in member_events.values():
+            if ev.content.get("membership") == Membership.JOIN:
+                joined_user_ids.append(ev.state_key)
+            elif ev.content.get("membership") == Membership.INVITE:
+                invited_user_ids.append(ev.state_key)
+
+        # TODO: only send these when they change.
+        summary["m.joined_member_count"] = len(joined_user_ids)
+        summary["m.invited_member_count"] = len(invited_user_ids)
+
+        if name_id or canonical_alias_id:
+            defer.returnValue(summary)
+
+        # FIXME: order by stream ordering, not alphabetic
+
+        me = sync_config.user.to_string()
+        if (joined_user_ids or invited_user_ids):
+            summary['m.heroes'] = sorted(
+                [
+                    user_id
+                    for user_id in (joined_user_ids + invited_user_ids)
+                    if user_id != me
+                ]
+            )[0:5]
+        else:
+            summary['m.heroes'] = sorted(
+                [user_id for user_id in member_ids.keys() if user_id != me]
+            )[0:5]
+
+        if not sync_config.filter_collection.lazy_load_members():
+            defer.returnValue(summary)
+
+        # ensure we send membership events for heroes if needed
+        cache_key = (sync_config.user.to_string(), sync_config.device_id)
+        cache = self.get_lazy_loaded_members_cache(cache_key)
+
+        # track which members the client should already know about via LL:
+        # Ones which are already in state...
+        existing_members = set(
+            user_id for (typ, user_id) in state.keys()
+            if typ == EventTypes.Member
+        )
+
+        # ...or ones which are in the timeline...
+        for ev in batch.events:
+            if ev.type == EventTypes.Member:
+                existing_members.add(ev.state_key)
+
+        # ...and then ensure any missing ones get included in state.
+        missing_hero_event_ids = [
+            member_ids[hero_id]
+            for hero_id in summary['m.heroes']
+            if (
+                cache.get(hero_id) != member_ids[hero_id] and
+                hero_id not in existing_members
+            )
+        ]
+
+        missing_hero_state = yield self.store.get_events(missing_hero_event_ids)
+        missing_hero_state = missing_hero_state.values()
+
+        for s in missing_hero_state:
+            cache.set(s.state_key, s.event_id)
+            state[(EventTypes.Member, s.state_key)] = s
+
+        defer.returnValue(summary)
+
+    def get_lazy_loaded_members_cache(self, cache_key):
+        cache = self.lazy_loaded_members_cache.get(cache_key)
+        if cache is None:
+            logger.debug("creating LruCache for %r", cache_key)
+            cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE)
+            self.lazy_loaded_members_cache[cache_key] = cache
+        else:
+            logger.debug("found LruCache for %r", cache_key)
+        return cache
+
+    @defer.inlineCallbacks
     def compute_state_delta(self, room_id, batch, sync_config, since_token, now_token,
                             full_state):
-        """ Works out the differnce in state between the start of the timeline
+        """ Works out the difference in state between the start of the timeline
         and the previous sync.
 
         Args:
@@ -495,7 +654,7 @@ class SyncHandler(object):
             full_state(bool): Whether to force returning the full state.
 
         Returns:
-             A deferred new event dictionary
+             A deferred dict of (type, state_key) -> Event
         """
         # TODO(mjark) Check if the state events were received by the server
         # after the previous sync, since we need to include those state
@@ -505,9 +664,13 @@ class SyncHandler(object):
         with Measure(self.clock, "compute_state_delta"):
 
             types = None
-            lazy_load_members = sync_config.filter_collection.lazy_load_members()
             filtered_types = None
 
+            lazy_load_members = sync_config.filter_collection.lazy_load_members()
+            include_redundant_members = (
+                sync_config.filter_collection.include_redundant_members()
+            )
+
             if lazy_load_members:
                 # We only request state for the members needed to display the
                 # timeline:
@@ -523,6 +686,11 @@ class SyncHandler(object):
                 # only apply the filtering to room members
                 filtered_types = [EventTypes.Member]
 
+            timeline_state = {
+                (event.type, event.state_key): event.event_id
+                for event in batch.events if event.is_state()
+            }
+
             if full_state:
                 if batch:
                     current_state_ids = yield self.store.get_state_ids_for_event(
@@ -543,11 +711,6 @@ class SyncHandler(object):
 
                     state_ids = current_state_ids
 
-                timeline_state = {
-                    (event.type, event.state_key): event.event_id
-                    for event in batch.events if event.is_state()
-                }
-
                 state_ids = _calculate_state(
                     timeline_contains=timeline_state,
                     timeline_start=state_ids,
@@ -571,21 +734,6 @@ class SyncHandler(object):
                     filtered_types=filtered_types,
                 )
 
-                timeline_state = {
-                    (event.type, event.state_key): event.event_id
-                    for event in batch.events if event.is_state()
-                }
-
-                # TODO: optionally filter out redundant membership events at this
-                # point, to stop repeatedly sending members in every /sync as if
-                # the client isn't tracking them.
-                # When implemented, this should filter using event_ids (not mxids).
-                # In practice, limited syncs are
-                # relatively rare so it's not a total disaster to send redundant
-                # members down at this point. Redundant members are ones which
-                # repeatedly get sent down /sync because we don't know if the client
-                # is caching them or not.
-
                 state_ids = _calculate_state(
                     timeline_contains=timeline_state,
                     timeline_start=state_at_timeline_start,
@@ -596,16 +744,42 @@ class SyncHandler(object):
             else:
                 state_ids = {}
                 if lazy_load_members:
-                    # TODO: filter out redundant members based on their mxids (not their
-                    # event_ids) at this point. We know we can do it based on mxid as this
-                    # is an non-gappy incremental sync.
-
                     if types:
                         state_ids = yield self.store.get_state_ids_for_event(
                             batch.events[0].event_id, types=types,
                             filtered_types=None,  # we only want members!
                         )
 
+            if lazy_load_members and not include_redundant_members:
+                cache_key = (sync_config.user.to_string(), sync_config.device_id)
+                cache = self.get_lazy_loaded_members_cache(cache_key)
+
+                # if it's a new sync sequence, then assume the client has had
+                # amnesia and doesn't want any recent lazy-loaded members
+                # de-duplicated.
+                if since_token is None:
+                    logger.debug("clearing LruCache for %r", cache_key)
+                    cache.clear()
+                else:
+                    # only send members which aren't in our LruCache (either
+                    # because they're new to this client or have been pushed out
+                    # of the cache)
+                    logger.debug("filtering state from %r...", state_ids)
+                    state_ids = {
+                        t: event_id
+                        for t, event_id in state_ids.iteritems()
+                        if cache.get(t[1]) != event_id
+                    }
+                    logger.debug("...to %r", state_ids)
+
+                # add any member IDs we are about to send into our LruCache
+                for t, event_id in itertools.chain(
+                    state_ids.items(),
+                    timeline_state.items(),
+                ):
+                    if t[0] == EventTypes.Member:
+                        cache.set(t[1], event_id)
+
         state = {}
         if state_ids:
             state = yield self.store.get_events(list(state_ids.values()))
@@ -687,7 +861,7 @@ class SyncHandler(object):
             since_token is None and
             sync_config.filter_collection.blocks_all_presence()
         )
-        if not block_all_presence_data:
+        if self.hs_config.use_presence and not block_all_presence_data:
             yield self._generate_sync_entry_for_presence(
                 sync_result_builder, newly_joined_rooms, newly_joined_users
             )
@@ -1379,7 +1553,6 @@ class SyncHandler(object):
             if events == [] and tags is None:
                 return
 
-        since_token = sync_result_builder.since_token
         now_token = sync_result_builder.now_token
         sync_config = sync_result_builder.sync_config
 
@@ -1422,6 +1595,18 @@ class SyncHandler(object):
             full_state=full_state
         )
 
+        summary = {}
+        if (
+            sync_config.filter_collection.lazy_load_members() and
+            (
+                any(ev.type == EventTypes.Member for ev in batch.events) or
+                since_token is None
+            )
+        ):
+            summary = yield self.compute_summary(
+                room_id, sync_config, batch, state, now_token
+            )
+
         if room_builder.rtype == "joined":
             unread_notifications = {}
             room_sync = JoinedSyncResult(
@@ -1431,6 +1616,7 @@ class SyncHandler(object):
                 ephemeral=ephemeral,
                 account_data=account_data_events,
                 unread_notifications=unread_notifications,
+                summary=summary,
             )
 
             if room_sync or always_include: