summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/api/filtering.py11
-rw-r--r--synapse/handlers/sync.py108
-rw-r--r--synapse/storage/state.py104
3 files changed, 184 insertions, 39 deletions
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index 25346baa87..7e767b9bf5 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -113,7 +113,10 @@ ROOM_EVENT_FILTER_SCHEMA = {
         },
         "contains_url": {
             "type": "boolean"
-        }
+        },
+        "lazy_load_members": {
+            "type": "boolean"
+        },
     }
 }
 
@@ -261,6 +264,9 @@ class FilterCollection(object):
     def ephemeral_limit(self):
         return self._room_ephemeral_filter.limit()
 
+    def lazy_load_members(self):
+        return self._room_state_filter.lazy_load_members()
+
     def filter_presence(self, events):
         return self._presence_filter.filter(events)
 
@@ -417,6 +423,9 @@ class Filter(object):
     def limit(self):
         return self.filter_json.get("limit", 10)
 
+    def lazy_load_members(self):
+        return self.filter_json.get("lazy_load_members", False)
+
 
 def _matches_wildcard(actual_value, filter_value):
     if filter_value.endswith("*"):
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index c24e35362a..0c21ac2c77 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
-# Copyright 2015 - 2016 OpenMarket Ltd
+# Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -416,29 +417,38 @@ class SyncHandler(object):
         ))
 
     @defer.inlineCallbacks
-    def get_state_after_event(self, event):
+    def get_state_after_event(self, event, types=None):
         """
         Get the room state after the given event
 
         Args:
             event(synapse.events.EventBase): event of interest
-
+            types(list[(str|None, str|None)]|None): List of (type, state_key) tuples
+                which are used to filter the state fetched. If `state_key` is None,
+                all events are returned of the given type.  Presence of type of `None`
+                indicates that types not in the list should not be filtered out.
+                May be None, which matches any key.
         Returns:
             A Deferred map from ((type, state_key)->Event)
         """
-        state_ids = yield self.store.get_state_ids_for_event(event.event_id)
+        state_ids = yield self.store.get_state_ids_for_event(event.event_id, types)
         if event.is_state():
             state_ids = state_ids.copy()
             state_ids[(event.type, event.state_key)] = event.event_id
         defer.returnValue(state_ids)
 
     @defer.inlineCallbacks
-    def get_state_at(self, room_id, stream_position):
+    def get_state_at(self, room_id, stream_position, types=None):
         """ Get the room state at a particular stream position
 
         Args:
             room_id(str): room for which to get state
             stream_position(StreamToken): point at which to get state
+            types(list[(str|None, str|None)]|None): List of (type, state_key) tuples
+                which are used to filter the state fetched. If `state_key` is None,
+                all events are returned of the given type.  Presence of type of `None`
+                indicates that types not in the list should not be filtered out.
+                May be None, which matches any key.
 
         Returns:
             A Deferred map from ((type, state_key)->Event)
@@ -453,7 +463,7 @@ class SyncHandler(object):
 
         if last_events:
             last_event = last_events[-1]
-            state = yield self.get_state_after_event(last_event)
+            state = yield self.get_state_after_event(last_event, types)
 
         else:
             # no events in this room - so presumably no state
@@ -485,22 +495,60 @@ class SyncHandler(object):
         # TODO(mjark) Check for new redactions in the state events.
 
         with Measure(self.clock, "compute_state_delta"):
+
+            types = None
+            member_state_ids = {}
+            lazy_load_members = sync_config.filter_collection.lazy_load_members()
+
+            if lazy_load_members:
+                # We only request state for the members needed to display the
+                # timeline:
+
+                types = [
+                    (EventTypes.Member, state_key)
+                    for state_key in set(
+                        event.sender  # FIXME: we also care about invite targets etc.
+                        for event in batch.events
+                    )
+                ]
+
+                # We can't remove redundant member types at this stage as it has
+                # to be done based on event_id, and we don't have the member
+                # event ids until we've pulled them out of the DB.
+
+                if not types:
+                    # an optimisation to stop needlessly trying to calculate
+                    # member_state_ids
+                    #
+                    # XXX: i can't remember what this trying to do. why would
+                    # types ever be []? --matthew
+                    lazy_load_members = False
+
+                types.append((None, None))  # don't just filter to room members
+
             if full_state:
                 if batch:
                     current_state_ids = yield self.store.get_state_ids_for_event(
-                        batch.events[-1].event_id
+                        batch.events[-1].event_id, types=types
                     )
 
                     state_ids = yield self.store.get_state_ids_for_event(
-                        batch.events[0].event_id
+                        batch.events[0].event_id, types=types
                     )
+
                 else:
                     current_state_ids = yield self.get_state_at(
-                        room_id, stream_position=now_token
+                        room_id, stream_position=now_token, types=types
                     )
 
                     state_ids = current_state_ids
 
+                if lazy_load_members:
+                    member_state_ids = {
+                        t: state_ids[t]
+                        for t in state_ids if t[0] == EventTypes.Member
+                    }
+
                 timeline_state = {
                     (event.type, event.state_key): event.event_id
                     for event in batch.events if event.is_state()
@@ -509,22 +557,33 @@ class SyncHandler(object):
                 state_ids = _calculate_state(
                     timeline_contains=timeline_state,
                     timeline_start=state_ids,
+                    timeline_start_members=member_state_ids,
                     previous={},
                     current=current_state_ids,
                 )
             elif batch.limited:
                 state_at_previous_sync = yield self.get_state_at(
-                    room_id, stream_position=since_token
+                    room_id, stream_position=since_token, types=types
                 )
 
                 current_state_ids = yield self.store.get_state_ids_for_event(
-                    batch.events[-1].event_id
+                    batch.events[-1].event_id, types=types
                 )
 
                 state_at_timeline_start = yield self.store.get_state_ids_for_event(
-                    batch.events[0].event_id
+                    batch.events[0].event_id, types=types
                 )
 
+                if lazy_load_members:
+                    # TODO: filter out redundant members based on their event_ids
+                    # (not mxids) at this point. In practice, limited syncs are
+                    # relatively rare so it's not a total disaster to send redundant
+                    # members down at this point.
+                    member_state_ids = {
+                        t: state_at_timeline_start[t]
+                        for t in state_at_timeline_start if t[0] == EventTypes.Member
+                    }
+
                 timeline_state = {
                     (event.type, event.state_key): event.event_id
                     for event in batch.events if event.is_state()
@@ -533,11 +592,23 @@ class SyncHandler(object):
                 state_ids = _calculate_state(
                     timeline_contains=timeline_state,
                     timeline_start=state_at_timeline_start,
+                    timeline_start_members=member_state_ids,
                     previous=state_at_previous_sync,
                     current=current_state_ids,
                 )
             else:
                 state_ids = {}
+                if lazy_load_members:
+                    # TODO: filter out redundant members based on their mxids (not their
+                    # event_ids) at this point. We know we can do it based on mxid as this
+                    # is an non-gappy incremental sync.
+
+                    # strip off the (None, None) and filter to just room members
+                    types = types[:-1]
+                    if types:
+                        state_ids = yield self.store.get_state_ids_for_event(
+                            batch.events[0].event_id, types=types
+                        )
 
         state = {}
         if state_ids:
@@ -1448,12 +1519,16 @@ def _action_has_highlight(actions):
     return False
 
 
-def _calculate_state(timeline_contains, timeline_start, previous, current):
+def _calculate_state(timeline_contains, timeline_start, timeline_start_members,
+                     previous, current):
     """Works out what state to include in a sync response.
 
     Args:
         timeline_contains (dict): state in the timeline
         timeline_start (dict): state at the start of the timeline
+        timeline_start_members (dict): state at the start of the timeline
+            for room members who participate in this chunk of timeline.
+            Should always be a subset of timeline_start.
         previous (dict): state at the end of the previous sync (or empty dict
             if this is an initial sync)
         current (dict): state at the end of the timeline
@@ -1472,11 +1547,12 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
     }
 
     c_ids = set(e for e in current.values())
-    tc_ids = set(e for e in timeline_contains.values())
-    p_ids = set(e for e in previous.values())
     ts_ids = set(e for e in timeline_start.values())
+    tsm_ids = set(e for e in timeline_start_members.values())
+    p_ids = set(e for e in previous.values())
+    tc_ids = set(e for e in timeline_contains.values())
 
-    state_ids = ((c_ids | ts_ids) - p_ids) - tc_ids
+    state_ids = (((c_ids | ts_ids) - p_ids) - tc_ids) | tsm_ids
 
     return {
         event_id_to_key[e]: e for e in state_ids
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 89a05c4618..c5ff44fef7 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -186,7 +186,19 @@ class StateGroupWorkerStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def _get_state_groups_from_groups(self, groups, types):
-        """Returns dictionary state_group -> (dict of (type, state_key) -> event id)
+        """Returns the state groups for a given set of groups, filtering on
+        types of state events.
+
+        Args:
+            groups(list[int]): list of state group IDs to query
+            types(list[str|None, str|None])|None: List of 2-tuples of the form
+                (`type`, `state_key`), where a `state_key` of `None` matches all
+                state_keys for the `type`. Presence of type of `None` indicates
+                that types not in the list should not be filtered out. If None,
+                all types are returned.
+
+        Returns:
+            dictionary state_group -> (dict of (type, state_key) -> event id)
         """
         results = {}
 
@@ -202,8 +214,18 @@ class StateGroupWorkerStore(SQLBaseStore):
 
     def _get_state_groups_from_groups_txn(self, txn, groups, types=None):
         results = {group: {} for group in groups}
+
+        include_other_types = False
+
         if types is not None:
-            types = list(set(types))  # deduplicate types list
+            type_set = set(types)
+            if (None, None) in type_set:
+                # special case (None, None) to mean that other types should be
+                # returned - i.e. we were just filtering down the state keys
+                # for particular types.
+                include_other_types = True
+                type_set.remove((None, None))
+            types = list(type_set)  # deduplicate types list
 
         if isinstance(self.database_engine, PostgresEngine):
             # Temporarily disable sequential scans in this transaction. This is
@@ -250,6 +272,17 @@ class StateGroupWorkerStore(SQLBaseStore):
                     )
                     for etype, state_key in types
                 ]
+
+                if include_other_types:
+                    # XXX: check whether this slows postgres down like a list of
+                    # ORs does too?
+                    unique_types = set([t for (t, _) in types])
+                    clause_to_args.append(
+                        (
+                            "AND type <> ? " * len(unique_types),
+                            list(unique_types)
+                        )
+                    )
             else:
                 # If types is None we fetch all the state, and so just use an
                 # empty where clause with no extra args.
@@ -278,6 +311,14 @@ class StateGroupWorkerStore(SQLBaseStore):
                     else:
                         where_clauses.append("(type = ? AND state_key = ?)")
                         where_args.extend([typ[0], typ[1]])
+
+                if include_other_types:
+                    unique_types = set([t for (t, _) in types])
+                    where_clauses.append(
+                        "(" + " AND ".join(["type <> ?"] * len(unique_types)) + ")"
+                    )
+                    where_args.extend(list(unique_types))
+
                 where_clause = "AND (%s)" % (" OR ".join(where_clauses))
             else:
                 where_clause = ""
@@ -338,10 +379,12 @@ class StateGroupWorkerStore(SQLBaseStore):
         that are in the `types` list.
 
         Args:
-            event_ids (list)
-            types (list): List of (type, state_key) tuples which are used to
-                filter the state fetched. `state_key` may be None, which matches
-                any `state_key`
+            event_ids (list[string])
+            types (list[(str|None, str|None)]|None): List of (type, state_key) tuples
+                which are used to filter the state fetched. If `state_key` is None,
+                all events are returned of the given type.  Presence of type of `None`
+                indicates that types not in the list should not be filtered out.
+                May be None, which matches any key.
 
         Returns:
             deferred: A list of dicts corresponding to the event_ids given.
@@ -377,9 +420,11 @@ class StateGroupWorkerStore(SQLBaseStore):
 
         Args:
             event_ids(list(str)): events whose state should be returned
-            types(list[(str, str)]|None): List of (type, state_key) tuples
-                which are used to filter the state fetched. May be None, which
-                matches any key
+            types(list[(str|None, str|None)]|None): List of (type, state_key) tuples
+                which are used to filter the state fetched. If `state_key` is None,
+                all events are returned of the given type.  Presence of type of `None`
+                indicates that types not in the list should not be filtered out.
+                May be None, which matches any key.
 
         Returns:
             A deferred dict from event_id -> (type, state_key) -> state_event
@@ -405,9 +450,11 @@ class StateGroupWorkerStore(SQLBaseStore):
 
         Args:
             event_id(str): event whose state should be returned
-            types(list[(str, str)]|None): List of (type, state_key) tuples
-                which are used to filter the state fetched. May be None, which
-                matches any key
+            types(list[(str|None, str|None)]|None): List of (type, state_key) tuples
+                which are used to filter the state fetched. If `state_key` is None,
+                all events are returned of the given type.  Presence of type of `None`
+                indicates that types not in the list should not be filtered out.
+                May be None, which matches any key.
 
         Returns:
             A deferred dict from (type, state_key) -> state_event
@@ -422,9 +469,11 @@ class StateGroupWorkerStore(SQLBaseStore):
 
         Args:
             event_id(str): event whose state should be returned
-            types(list[(str, str)]|None): List of (type, state_key) tuples
-                which are used to filter the state fetched. May be None, which
-                matches any key
+            types(list[(str|None, str|None)]|None): List of (type, state_key) tuples
+                which are used to filter the state fetched. If `state_key` is None,
+                all events are returned of the given type.  Presence of type of `None`
+                indicates that types not in the list should not be filtered out.
+                May be None, which matches any key.
 
         Returns:
             A deferred dict from (type, state_key) -> state_event
@@ -470,20 +519,30 @@ class StateGroupWorkerStore(SQLBaseStore):
         missing state.
 
         Args:
-            group: The state group to lookup
-            types (list): List of 2-tuples of the form (`type`, `state_key`),
-                where a `state_key` of `None` matches all state_keys for the
-                `type`.
+            group(int): The state group to lookup
+            types(list[str|None, str|None]): List of 2-tuples of the form
+                (`type`, `state_key`), where a `state_key` of `None` matches all
+                state_keys for the `type`. Presence of type of `None` indicates
+                that types not in the list should not be filtered out.
         """
         is_all, known_absent, state_dict_ids = self._state_group_cache.get(group)
 
         type_to_key = {}
         missing_types = set()
 
+        include_other_types = False
+
         for typ, state_key in types:
             key = (typ, state_key)
+
+            if typ is None:
+                include_other_types = True
+                next
+
             if state_key is None:
                 type_to_key[typ] = None
+                # XXX: why do we mark the type as missing from our cache just
+                # because we weren't filtering on a specific value of state_key?
                 missing_types.add(key)
             else:
                 if type_to_key.get(typ, object()) is not None:
@@ -497,7 +556,7 @@ class StateGroupWorkerStore(SQLBaseStore):
         def include(typ, state_key):
             valid_state_keys = type_to_key.get(typ, sentinel)
             if valid_state_keys is sentinel:
-                return False
+                return include_other_types
             if valid_state_keys is None:
                 return True
             if state_key in valid_state_keys:
@@ -533,13 +592,14 @@ class StateGroupWorkerStore(SQLBaseStore):
         Args:
             groups (iterable[int]): list of state groups for which we want
                 to get the state.
-            types (None|iterable[(str, None|str)]):
+            types (None|iterable[(None|str, None|str)]):
                 indicates the state type/keys required. If None, the whole
                 state is fetched and returned.
 
                 Otherwise, each entry should be a `(type, state_key)` tuple to
                 include in the response. A `state_key` of None is a wildcard
-                meaning that we require all state with that type.
+                meaning that we require all state with that type. A `type` of None
+                indicates that types not in the list should not be filtered out.
 
         Returns:
             Deferred[dict[int, dict[(type, state_key), EventBase]]]