summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorMatthew Hodgson <matthew@arasphere.net>2018-08-16 10:46:50 +0200
committerRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2018-08-16 09:46:50 +0100
commit3f543dc021dd9456c8ed2da7a1e4769a68c07729 (patch)
treea95c6c0436fe5715ccc096aeeacdbc459bf451b8 /synapse
parentMerge pull request #3686 from matrix-org/rav/changelog_links_to_prs (diff)
downloadsynapse-3f543dc021dd9456c8ed2da7a1e4769a68c07729.tar.xz
initial cut at a room summary API (#3574)
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/sync.py159
-rw-r--r--synapse/rest/client/v2_alpha/sync.py1
-rw-r--r--synapse/storage/_base.py7
-rw-r--r--synapse/storage/state.py5
-rw-r--r--synapse/storage/stream.py4
5 files changed, 158 insertions, 18 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 3b21a04a5d..ac3edf0cc9 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -75,6 +75,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
     "ephemeral",
     "account_data",
     "unread_notifications",
+    "summary",
 ])):
     __slots__ = []
 
@@ -504,9 +505,141 @@ class SyncHandler(object):
         defer.returnValue(state)
 
     @defer.inlineCallbacks
+    def compute_summary(self, room_id, sync_config, batch, state, now_token):
+        """ Works out a room summary block for this room, summarising the number
+        of joined members in the room, and providing the 'hero' members if the
+        room has no name so clients can consistently name rooms.  Also adds
+        state events to 'state' if needed to describe the heroes.
+
+        Args:
+            room_id(str):
+            sync_config(synapse.handlers.sync.SyncConfig):
+            batch(synapse.handlers.sync.TimelineBatch): The timeline batch for
+                the room that will be sent to the user.
+            state(dict): dict of (type, state_key) -> Event as returned by
+                compute_state_delta
+            now_token(str): Token of the end of the current batch.
+
+        Returns:
+             A deferred dict describing the room summary
+        """
+
+        # FIXME: this promulgates https://github.com/matrix-org/synapse/issues/3305
+        last_events, _ = yield self.store.get_recent_event_ids_for_room(
+            room_id, end_token=now_token.room_key, limit=1,
+        )
+
+        if not last_events:
+            defer.returnValue(None)
+            return
+
+        last_event = last_events[-1]
+        state_ids = yield self.store.get_state_ids_for_event(
+            last_event.event_id, [
+                (EventTypes.Member, None),
+                (EventTypes.Name, ''),
+                (EventTypes.CanonicalAlias, ''),
+            ]
+        )
+
+        member_ids = {
+            state_key: event_id
+            for (t, state_key), event_id in state_ids.iteritems()
+            if t == EventTypes.Member
+        }
+        name_id = state_ids.get((EventTypes.Name, ''))
+        canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, ''))
+
+        summary = {}
+
+        # FIXME: it feels very heavy to load up every single membership event
+        # just to calculate the counts.
+        member_events = yield self.store.get_events(member_ids.values())
+
+        joined_user_ids = []
+        invited_user_ids = []
+
+        for ev in member_events.values():
+            if ev.content.get("membership") == Membership.JOIN:
+                joined_user_ids.append(ev.state_key)
+            elif ev.content.get("membership") == Membership.INVITE:
+                invited_user_ids.append(ev.state_key)
+
+        # TODO: only send these when they change.
+        summary["m.joined_member_count"] = len(joined_user_ids)
+        summary["m.invited_member_count"] = len(invited_user_ids)
+
+        if name_id or canonical_alias_id:
+            defer.returnValue(summary)
+
+        # FIXME: order by stream ordering, not alphabetic
+
+        me = sync_config.user.to_string()
+        if (joined_user_ids or invited_user_ids):
+            summary['m.heroes'] = sorted(
+                [
+                    user_id
+                    for user_id in (joined_user_ids + invited_user_ids)
+                    if user_id != me
+                ]
+            )[0:5]
+        else:
+            summary['m.heroes'] = sorted(
+                [user_id for user_id in member_ids.keys() if user_id != me]
+            )[0:5]
+
+        if not sync_config.filter_collection.lazy_load_members():
+            defer.returnValue(summary)
+
+        # ensure we send membership events for heroes if needed
+        cache_key = (sync_config.user.to_string(), sync_config.device_id)
+        cache = self.get_lazy_loaded_members_cache(cache_key)
+
+        # track which members the client should already know about via LL:
+        # Ones which are already in state...
+        existing_members = set(
+            user_id for (typ, user_id) in state.keys()
+            if typ == EventTypes.Member
+        )
+
+        # ...or ones which are in the timeline...
+        for ev in batch.events:
+            if ev.type == EventTypes.Member:
+                existing_members.add(ev.state_key)
+
+        # ...and then ensure any missing ones get included in state.
+        missing_hero_event_ids = [
+            member_ids[hero_id]
+            for hero_id in summary['m.heroes']
+            if (
+                cache.get(hero_id) != member_ids[hero_id] and
+                hero_id not in existing_members
+            )
+        ]
+
+        missing_hero_state = yield self.store.get_events(missing_hero_event_ids)
+        missing_hero_state = missing_hero_state.values()
+
+        for s in missing_hero_state:
+            cache.set(s.state_key, s.event_id)
+            state[(EventTypes.Member, s.state_key)] = s
+
+        defer.returnValue(summary)
+
+    def get_lazy_loaded_members_cache(self, cache_key):
+        cache = self.lazy_loaded_members_cache.get(cache_key)
+        if cache is None:
+            logger.debug("creating LruCache for %r", cache_key)
+            cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE)
+            self.lazy_loaded_members_cache[cache_key] = cache
+        else:
+            logger.debug("found LruCache for %r", cache_key)
+        return cache
+
+    @defer.inlineCallbacks
     def compute_state_delta(self, room_id, batch, sync_config, since_token, now_token,
                             full_state):
-        """ Works out the differnce in state between the start of the timeline
+        """ Works out the difference in state between the start of the timeline
         and the previous sync.
 
         Args:
@@ -520,7 +653,7 @@ class SyncHandler(object):
             full_state(bool): Whether to force returning the full state.
 
         Returns:
-             A deferred new event dictionary
+             A deferred dict of (type, state_key) -> Event
         """
         # TODO(mjark) Check if the state events were received by the server
         # after the previous sync, since we need to include those state
@@ -618,13 +751,7 @@ class SyncHandler(object):
 
             if lazy_load_members and not include_redundant_members:
                 cache_key = (sync_config.user.to_string(), sync_config.device_id)
-                cache = self.lazy_loaded_members_cache.get(cache_key)
-                if cache is None:
-                    logger.debug("creating LruCache for %r", cache_key)
-                    cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE)
-                    self.lazy_loaded_members_cache[cache_key] = cache
-                else:
-                    logger.debug("found LruCache for %r", cache_key)
+                cache = self.get_lazy_loaded_members_cache(cache_key)
 
                 # if it's a new sync sequence, then assume the client has had
                 # amnesia and doesn't want any recent lazy-loaded members
@@ -1425,7 +1552,6 @@ class SyncHandler(object):
             if events == [] and tags is None:
                 return
 
-        since_token = sync_result_builder.since_token
         now_token = sync_result_builder.now_token
         sync_config = sync_result_builder.sync_config
 
@@ -1468,6 +1594,18 @@ class SyncHandler(object):
             full_state=full_state
         )
 
+        summary = {}
+        if (
+            sync_config.filter_collection.lazy_load_members() and
+            (
+                any(ev.type == EventTypes.Member for ev in batch.events) or
+                since_token is None
+            )
+        ):
+            summary = yield self.compute_summary(
+                room_id, sync_config, batch, state, now_token
+            )
+
         if room_builder.rtype == "joined":
             unread_notifications = {}
             room_sync = JoinedSyncResult(
@@ -1477,6 +1615,7 @@ class SyncHandler(object):
                 ephemeral=ephemeral,
                 account_data=account_data_events,
                 unread_notifications=unread_notifications,
+                summary=summary,
             )
 
             if room_sync or always_include:
diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
index 8aa06faf23..1275baa1ba 100644
--- a/synapse/rest/client/v2_alpha/sync.py
+++ b/synapse/rest/client/v2_alpha/sync.py
@@ -370,6 +370,7 @@ class SyncRestServlet(RestServlet):
             ephemeral_events = room.ephemeral
             result["ephemeral"] = {"events": ephemeral_events}
             result["unread_notifications"] = room.unread_notifications
+            result["summary"] = room.summary
 
         return result
 
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 44f37b4c1e..08dffd774f 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -1150,17 +1150,16 @@ class SQLBaseStore(object):
         defer.returnValue(retval)
 
     def get_user_count_txn(self, txn):
-        """Get a total number of registerd users in the users list.
+        """Get a total number of registered users in the users list.
 
         Args:
             txn : Transaction object
         Returns:
-            defer.Deferred: resolves to int
+            int : number of users
         """
         sql_count = "SELECT COUNT(*) FROM users WHERE is_guest = 0;"
         txn.execute(sql_count)
-        count = txn.fetchone()[0]
-        defer.returnValue(count)
+        return txn.fetchone()[0]
 
     def _simple_search_list(self, table, term, col, retcols,
                             desc="_simple_search_list"):
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 754dfa6973..dd03c4168b 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -480,7 +480,8 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
     @defer.inlineCallbacks
     def get_state_ids_for_events(self, event_ids, types=None, filtered_types=None):
         """
-        Get the state dicts corresponding to a list of events
+        Get the state dicts corresponding to a list of events, containing the event_ids
+        of the state events (as opposed to the events themselves)
 
         Args:
             event_ids(list(str)): events whose state should be returned
@@ -493,7 +494,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
                 If None, `types` filtering is applied to all events.
 
         Returns:
-            A deferred dict from event_id -> (type, state_key) -> state_event
+            A deferred dict from event_id -> (type, state_key) -> event_id
         """
         event_to_groups = yield self._get_state_group_for_events(
             event_ids,
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index b9f2b74ac6..4c296d72c0 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -348,7 +348,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             end_token (str): The stream token representing now.
 
         Returns:
-            Deferred[tuple[list[FrozenEvent],  str]]: Returns a list of
+            Deferred[tuple[list[FrozenEvent], str]]: Returns a list of
             events and a token pointing to the start of the returned
             events.
             The events returned are in ascending order.
@@ -379,7 +379,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             end_token (str): The stream token representing now.
 
         Returns:
-            Deferred[tuple[list[_EventDictReturn],  str]]: Returns a list of
+            Deferred[tuple[list[_EventDictReturn], str]]: Returns a list of
             _EventDictReturn and a token pointing to the start of the returned
             events.
             The events returned are in ascending order.