diff options
Diffstat (limited to 'synapse/handlers/sync.py')
-rw-r--r-- | synapse/handlers/sync.py | 136 |
1 files changed, 78 insertions, 58 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index c1257521c1..48b7431433 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -20,11 +20,14 @@ import logging from six import iteritems, itervalues +from prometheus_client import Counter + from twisted.internet import defer from synapse.api.constants import EventTypes, Membership from synapse.push.clientformat import format_push_rules_for_user from synapse.storage.roommember import MemberSummary +from synapse.storage.state import StateFilter from synapse.types import RoomStreamToken from synapse.util.async_helpers import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache @@ -36,6 +39,19 @@ from synapse.visibility import filter_events_for_client logger = logging.getLogger(__name__) + +# Counts the number of times we returned a non-empty sync. `type` is one of +# "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is +# "true" or "false" depending on if the request asked for lazy loaded members or +# not. +non_empty_sync_counter = Counter( + "synapse_handlers_sync_nonempty_total", + "Count of non empty sync responses. type is initial_sync/full_state_sync" + "/incremental_sync. lazy_loaded indicates if lazy loaded members were " + "enabled for that request.", + ["type", "lazy_loaded"], +) + # Store the cache that tracks which lazy-loaded members have been sent to a given # client for no more than 30 minutes. LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000 @@ -227,14 +243,16 @@ class SyncHandler(object): @defer.inlineCallbacks def _wait_for_sync_for_user(self, sync_config, since_token, timeout, full_state): + if since_token is None: + sync_type = "initial_sync" + elif full_state: + sync_type = "full_state_sync" + else: + sync_type = "incremental_sync" + context = LoggingContext.current_context() if context: - if since_token is None: - context.tag = "initial_sync" - elif full_state: - context.tag = "full_state_sync" - else: - context.tag = "incremental_sync" + context.tag = sync_type if timeout == 0 or since_token is None or full_state: # we are going to return immediately, so don't bother calling @@ -242,7 +260,6 @@ class SyncHandler(object): result = yield self.current_sync_for_user( sync_config, since_token, full_state=full_state, ) - defer.returnValue(result) else: def current_sync_callback(before_token, after_token): return self.current_sync_for_user(sync_config, since_token) @@ -251,7 +268,15 @@ class SyncHandler(object): sync_config.user.to_string(), timeout, current_sync_callback, from_token=since_token, ) - defer.returnValue(result) + + if result: + if sync_config.filter_collection.lazy_load_members(): + lazy_loaded = "true" + else: + lazy_loaded = "false" + non_empty_sync_counter.labels(sync_type, lazy_loaded).inc() + + defer.returnValue(result) def current_sync_for_user(self, sync_config, since_token=None, full_state=False): @@ -445,25 +470,20 @@ class SyncHandler(object): )) @defer.inlineCallbacks - def get_state_after_event(self, event, types=None, filtered_types=None): + def get_state_after_event(self, event, state_filter=StateFilter.all()): """ Get the room state after the given event Args: event(synapse.events.EventBase): event of interest - types(list[(str, str|None)]|None): List of (type, state_key) tuples - which are used to filter the state fetched. If `state_key` is None, - all events are returned of the given type. - May be None, which matches any key. - filtered_types(list[str]|None): Only apply filtering via `types` to this - list of event types. Other types of events are returned unfiltered. - If None, `types` filtering is applied to all events. + state_filter (StateFilter): The state filter used to fetch state + from the database. Returns: A Deferred map from ((type, state_key)->Event) """ state_ids = yield self.store.get_state_ids_for_event( - event.event_id, types, filtered_types=filtered_types, + event.event_id, state_filter=state_filter, ) if event.is_state(): state_ids = state_ids.copy() @@ -471,18 +491,14 @@ class SyncHandler(object): defer.returnValue(state_ids) @defer.inlineCallbacks - def get_state_at(self, room_id, stream_position, types=None, filtered_types=None): + def get_state_at(self, room_id, stream_position, state_filter=StateFilter.all()): """ Get the room state at a particular stream position Args: room_id(str): room for which to get state stream_position(StreamToken): point at which to get state - types(list[(str, str|None)]|None): List of (type, state_key) tuples - which are used to filter the state fetched. If `state_key` is None, - all events are returned of the given type. - filtered_types(list[str]|None): Only apply filtering via `types` to this - list of event types. Other types of events are returned unfiltered. - If None, `types` filtering is applied to all events. + state_filter (StateFilter): The state filter used to fetch state + from the database. Returns: A Deferred map from ((type, state_key)->Event) @@ -498,7 +514,7 @@ class SyncHandler(object): if last_events: last_event = last_events[-1] state = yield self.get_state_after_event( - last_event, types, filtered_types=filtered_types, + last_event, state_filter=state_filter, ) else: @@ -539,11 +555,12 @@ class SyncHandler(object): last_event = last_events[-1] state_ids = yield self.store.get_state_ids_for_event( - last_event.event_id, [ + last_event.event_id, + state_filter=StateFilter.from_types([ (EventTypes.Name, ''), (EventTypes.CanonicalAlias, ''), (EventTypes.RoomAvatar, ''), - ] + ]), ) # this is heavily cached, thus: fast. @@ -568,20 +585,20 @@ class SyncHandler(object): # haven't been "deleted" by blatting {} over the top. room_avatar_id = state_ids.get((EventTypes.RoomAvatar, '')) if room_avatar_id: - room_avatar = yield self.store.get_event(room_avatar_id, allow_none=False) + room_avatar = yield self.store.get_event(room_avatar_id, allow_none=True) if room_avatar and room_avatar.content: # we have a room avatar; check to see if we can skip heroes # because the room has an explicit name or canonical alias: name_id = state_ids.get((EventTypes.Name, '')) if name_id: - name = yield self.store.get_event(name_id, allow_none=False) + name = yield self.store.get_event(name_id, allow_none=True) if name and name.content: defer.returnValue(summary) canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, '')) if canonical_alias_id: canonical_alias = yield self.store.get_event( - canonical_alias_id, allow_none=False, + canonical_alias_id, allow_none=Tue, ) if canonical_alias and canonical_alias.content: defer.returnValue(summary) @@ -701,8 +718,7 @@ class SyncHandler(object): with Measure(self.clock, "compute_state_delta"): - types = None - filtered_types = None + members_to_fetch = None lazy_load_members = sync_config.filter_collection.lazy_load_members() include_redundant_members = ( @@ -713,16 +729,21 @@ class SyncHandler(object): # We only request state for the members needed to display the # timeline: - types = [ - (EventTypes.Member, state_key) - for state_key in set( - event.sender # FIXME: we also care about invite targets etc. - for event in batch.events - ) - ] + members_to_fetch = set( + event.sender # FIXME: we also care about invite targets etc. + for event in batch.events + ) + + if full_state: + # always make sure we LL ourselves so we know we're in the room + # (if we are) to fix https://github.com/vector-im/riot-web/issues/7209 + # We only need apply this on full state syncs given we disabled + # LL for incr syncs in #3840. + members_to_fetch.add(sync_config.user.to_string()) - # only apply the filtering to room members - filtered_types = [EventTypes.Member] + state_filter = StateFilter.from_lazy_load_member_list(members_to_fetch) + else: + state_filter = StateFilter.all() timeline_state = { (event.type, event.state_key): event.event_id @@ -732,19 +753,17 @@ class SyncHandler(object): if full_state: if batch: current_state_ids = yield self.store.get_state_ids_for_event( - batch.events[-1].event_id, types=types, - filtered_types=filtered_types, + batch.events[-1].event_id, state_filter=state_filter, ) state_ids = yield self.store.get_state_ids_for_event( - batch.events[0].event_id, types=types, - filtered_types=filtered_types, + batch.events[0].event_id, state_filter=state_filter, ) else: current_state_ids = yield self.get_state_at( - room_id, stream_position=now_token, types=types, - filtered_types=filtered_types, + room_id, stream_position=now_token, + state_filter=state_filter, ) state_ids = current_state_ids @@ -758,8 +777,7 @@ class SyncHandler(object): ) elif batch.limited: state_at_timeline_start = yield self.store.get_state_ids_for_event( - batch.events[0].event_id, types=types, - filtered_types=filtered_types, + batch.events[0].event_id, state_filter=state_filter, ) # for now, we disable LL for gappy syncs - see @@ -774,17 +792,15 @@ class SyncHandler(object): # members to just be ones which were timeline senders, which then ensures # all of the rest get included in the state block (if we need to know # about them). - types = None - filtered_types = None + state_filter = StateFilter.all() state_at_previous_sync = yield self.get_state_at( - room_id, stream_position=since_token, types=types, - filtered_types=filtered_types, + room_id, stream_position=since_token, + state_filter=state_filter, ) current_state_ids = yield self.store.get_state_ids_for_event( - batch.events[-1].event_id, types=types, - filtered_types=filtered_types, + batch.events[-1].event_id, state_filter=state_filter, ) state_ids = _calculate_state( @@ -798,7 +814,7 @@ class SyncHandler(object): else: state_ids = {} if lazy_load_members: - if types: + if members_to_fetch and batch.events: # We're returning an incremental sync, with no # "gap" since the previous sync, so normally there would be # no state to return. @@ -808,8 +824,12 @@ class SyncHandler(object): # timeline here, and then dedupe any redundant ones below. state_ids = yield self.store.get_state_ids_for_event( - batch.events[0].event_id, types=types, - filtered_types=None, # we only want members! + batch.events[0].event_id, + # we only want members! + state_filter=StateFilter.from_types( + (EventTypes.Member, member) + for member in members_to_fetch + ), ) if lazy_load_members and not include_redundant_members: |