summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorMatthew Hodgson <matthew@arasphere.net>2018-09-12 00:50:39 +0100
committerGitHub <noreply@github.com>2018-09-12 00:50:39 +0100
commitb041115415191f8353177f7e0c1beec81be0c921 (patch)
treec473c520965ca49bb984eaa5ceaf9688a23471f5 /synapse
parentMerge pull request #3834 from mvgorcum/develop (diff)
downloadsynapse-b041115415191f8353177f7e0c1beec81be0c921.tar.xz
Speed up lazy loading (#3827)
* speed up room summaries by pulling their data from room_memberships rather than room state
* disable LL for incr syncs, and log incr sync stats  (#3840)
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/sync.py118
-rw-r--r--synapse/storage/events.py4
-rw-r--r--synapse/storage/roommember.py65
3 files changed, 157 insertions, 30 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 7eed2fcc9b..23983a51ab 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -24,6 +24,7 @@ from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
 from synapse.push.clientformat import format_push_rules_for_user
+from synapse.storage.roommember import MemberSummary
 from synapse.types import RoomStreamToken
 from synapse.util.async_helpers import concurrently_execute
 from synapse.util.caches.expiringcache import ExpiringCache
@@ -525,6 +526,8 @@ class SyncHandler(object):
              A deferred dict describing the room summary
         """
 
+        # FIXME: we could/should get this from room_stats when matthew/stats lands
+
         # FIXME: this promulgates https://github.com/matrix-org/synapse/issues/3305
         last_events, _ = yield self.store.get_recent_event_ids_for_room(
             room_id, end_token=now_token.room_key, limit=1,
@@ -537,44 +540,54 @@ class SyncHandler(object):
         last_event = last_events[-1]
         state_ids = yield self.store.get_state_ids_for_event(
             last_event.event_id, [
-                (EventTypes.Member, None),
                 (EventTypes.Name, ''),
                 (EventTypes.CanonicalAlias, ''),
             ]
         )
 
-        member_ids = {
-            state_key: event_id
-            for (t, state_key), event_id in iteritems(state_ids)
-            if t == EventTypes.Member
-        }
+        # this is heavily cached, thus: fast.
+        details = yield self.store.get_room_summary(room_id)
+
         name_id = state_ids.get((EventTypes.Name, ''))
         canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, ''))
 
         summary = {}
-
-        # FIXME: it feels very heavy to load up every single membership event
-        # just to calculate the counts.
-        member_events = yield self.store.get_events(member_ids.values())
-
-        joined_user_ids = []
-        invited_user_ids = []
-
-        for ev in member_events.values():
-            if ev.content.get("membership") == Membership.JOIN:
-                joined_user_ids.append(ev.state_key)
-            elif ev.content.get("membership") == Membership.INVITE:
-                invited_user_ids.append(ev.state_key)
+        empty_ms = MemberSummary([], 0)
 
         # TODO: only send these when they change.
-        summary["m.joined_member_count"] = len(joined_user_ids)
-        summary["m.invited_member_count"] = len(invited_user_ids)
+        summary["m.joined_member_count"] = (
+            details.get(Membership.JOIN, empty_ms).count
+        )
+        summary["m.invited_member_count"] = (
+            details.get(Membership.INVITE, empty_ms).count
+        )
 
         if name_id or canonical_alias_id:
             defer.returnValue(summary)
 
-        # FIXME: order by stream ordering, not alphabetic
+        joined_user_ids = [
+            r[0] for r in details.get(Membership.JOIN, empty_ms).members
+        ]
+        invited_user_ids = [
+            r[0] for r in details.get(Membership.INVITE, empty_ms).members
+        ]
+        gone_user_ids = (
+            [r[0] for r in details.get(Membership.LEAVE, empty_ms).members] +
+            [r[0] for r in details.get(Membership.BAN, empty_ms).members]
+        )
+
+        # FIXME: only build up a member_ids list for our heroes
+        member_ids = {}
+        for membership in (
+            Membership.JOIN,
+            Membership.INVITE,
+            Membership.LEAVE,
+            Membership.BAN
+        ):
+            for user_id, event_id in details.get(membership, empty_ms).members:
+                member_ids[user_id] = event_id
 
+        # FIXME: order by stream ordering rather than as returned by SQL
         me = sync_config.user.to_string()
         if (joined_user_ids or invited_user_ids):
             summary['m.heroes'] = sorted(
@@ -586,7 +599,11 @@ class SyncHandler(object):
             )[0:5]
         else:
             summary['m.heroes'] = sorted(
-                [user_id for user_id in member_ids.keys() if user_id != me]
+                [
+                    user_id
+                    for user_id in gone_user_ids
+                    if user_id != me
+                ]
             )[0:5]
 
         if not sync_config.filter_collection.lazy_load_members():
@@ -719,6 +736,26 @@ class SyncHandler(object):
                     lazy_load_members=lazy_load_members,
                 )
             elif batch.limited:
+                state_at_timeline_start = yield self.store.get_state_ids_for_event(
+                    batch.events[0].event_id, types=types,
+                    filtered_types=filtered_types,
+                )
+
+                # for now, we disable LL for gappy syncs - see
+                # https://github.com/vector-im/riot-web/issues/7211#issuecomment-419976346
+                # N.B. this slows down incr syncs as we are now processing way
+                # more state in the server than if we were LLing.
+                #
+                # We still have to filter timeline_start to LL entries (above) in order
+                # for _calculate_state's LL logic to work, as we have to include LL
+                # members for timeline senders in case they weren't loaded in the initial
+                # sync.  We do this by (counterintuitively) by filtering timeline_start
+                # members to just be ones which were timeline senders, which then ensures
+                # all of the rest get included in the state block (if we need to know
+                # about them).
+                types = None
+                filtered_types = None
+
                 state_at_previous_sync = yield self.get_state_at(
                     room_id, stream_position=since_token, types=types,
                     filtered_types=filtered_types,
@@ -729,24 +766,21 @@ class SyncHandler(object):
                     filtered_types=filtered_types,
                 )
 
-                state_at_timeline_start = yield self.store.get_state_ids_for_event(
-                    batch.events[0].event_id, types=types,
-                    filtered_types=filtered_types,
-                )
-
                 state_ids = _calculate_state(
                     timeline_contains=timeline_state,
                     timeline_start=state_at_timeline_start,
                     previous=state_at_previous_sync,
                     current=current_state_ids,
+                    # we have to include LL members in case LL initial sync missed them
                     lazy_load_members=lazy_load_members,
                 )
             else:
                 state_ids = {}
                 if lazy_load_members:
                     if types:
-                        # We're returning an incremental sync, with no "gap" since
-                        # the previous sync, so normally there would be no state to return
+                        # We're returning an incremental sync, with no
+                        # "gap" since the previous sync, so normally there would be
+                        # no state to return.
                         # But we're lazy-loading, so the client might need some more
                         # member events to understand the events in this timeline.
                         # So we fish out all the member events corresponding to the
@@ -1616,10 +1650,24 @@ class SyncHandler(object):
         )
 
         summary = {}
+
+        # we include a summary in room responses when we're lazy loading
+        # members (as the client otherwise doesn't have enough info to form
+        # the name itself).
         if (
             sync_config.filter_collection.lazy_load_members() and
             (
+                # we recalulate the summary:
+                #   if there are membership changes in the timeline, or
+                #   if membership has changed during a gappy sync, or
+                #   if this is an initial sync.
                 any(ev.type == EventTypes.Member for ev in batch.events) or
+                (
+                    # XXX: this may include false positives in the form of LL
+                    # members which have snuck into state
+                    batch.limited and
+                    any(t == EventTypes.Member for (t, k) in state)
+                ) or
                 since_token is None
             )
         ):
@@ -1649,6 +1697,16 @@ class SyncHandler(object):
                     unread_notifications["highlight_count"] = notifs["highlight_count"]
 
                 sync_result_builder.joined.append(room_sync)
+
+            if batch.limited:
+                user_id = sync_result_builder.sync_config.user.to_string()
+                logger.info(
+                    "Incremental syncing room %s for user %s with %d state events" % (
+                        room_id,
+                        user_id,
+                        len(state),
+                    )
+                )
         elif room_builder.rtype == "archived":
             room_sync = ArchivedSyncResult(
                 room_id=room_id,
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 8bf87f38f7..30ff87a4c4 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -930,6 +930,10 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
                 )
 
                 self._invalidate_cache_and_stream(
+                    txn, self.get_room_summary, (room_id,)
+                )
+
+                self._invalidate_cache_and_stream(
                     txn, self.get_current_state_ids, (room_id,)
                 )
 
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 9b4e6d6aa8..0707f9a86a 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -51,6 +51,12 @@ ProfileInfo = namedtuple(
     "ProfileInfo", ("avatar_url", "display_name")
 )
 
+# "members" points to a truncated list of (user_id, event_id) tuples for users of
+# a given membership type, suitable for use in calculating heroes for a room.
+# "count" points to the total numberr of users of a given membership type.
+MemberSummary = namedtuple(
+    "MemberSummary", ("members", "count")
+)
 
 _MEMBERSHIP_PROFILE_UPDATE_NAME = "room_membership_profile_update"
 
@@ -82,6 +88,65 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             return [to_ascii(r[0]) for r in txn]
         return self.runInteraction("get_users_in_room", f)
 
+    @cached(max_entries=100000)
+    def get_room_summary(self, room_id):
+        """ Get the details of a room roughly suitable for use by the room
+        summary extension to /sync. Useful when lazy loading room members.
+        Args:
+            room_id (str): The room ID to query
+        Returns:
+            Deferred[dict[str, MemberSummary]:
+                dict of membership states, pointing to a MemberSummary named tuple.
+        """
+
+        def _get_room_summary_txn(txn):
+            # first get counts.
+            # We do this all in one transaction to keep the cache small.
+            # FIXME: get rid of this when we have room_stats
+            sql = """
+                SELECT count(*), m.membership FROM room_memberships as m
+                 INNER JOIN current_state_events as c
+                 ON m.event_id = c.event_id
+                 AND m.room_id = c.room_id
+                 AND m.user_id = c.state_key
+                 WHERE c.type = 'm.room.member' AND c.room_id = ?
+                 GROUP BY m.membership
+            """
+
+            txn.execute(sql, (room_id,))
+            res = {}
+            for count, membership in txn:
+                summary = res.setdefault(to_ascii(membership), MemberSummary([], count))
+
+            # we order by membership and then fairly arbitrarily by event_id so
+            # heroes are consistent
+            sql = """
+                SELECT m.user_id, m.membership, m.event_id
+                FROM room_memberships as m
+                 INNER JOIN current_state_events as c
+                 ON m.event_id = c.event_id
+                 AND m.room_id = c.room_id
+                 AND m.user_id = c.state_key
+                 WHERE c.type = 'm.room.member' AND c.room_id = ?
+                 ORDER BY
+                    CASE m.membership WHEN ? THEN 1 WHEN ? THEN 2 ELSE 3 END ASC,
+                    m.event_id ASC
+                 LIMIT ?
+            """
+
+            # 6 is 5 (number of heroes) plus 1, in case one of them is the calling user.
+            txn.execute(sql, (room_id, Membership.JOIN, Membership.INVITE, 6))
+            for user_id, membership, event_id in txn:
+                summary = res[to_ascii(membership)]
+                # we will always have a summary for this membership type at this
+                # point given the summary currently contains the counts.
+                members = summary.members
+                members.append((to_ascii(user_id), to_ascii(event_id)))
+
+            return res
+
+        return self.runInteraction("get_room_summary", _get_room_summary_txn)
+
     @cached()
     def get_invited_rooms_for_user(self, user_id):
         """ Get all the rooms the user is invited to