diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/federation.py | 2 | ||||
-rw-r--r-- | synapse/handlers/profile.py | 9 | ||||
-rw-r--r-- | synapse/handlers/room.py | 12 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 173 |
4 files changed, 165 insertions, 31 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 145c1a21d4..49068c06d9 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1279,7 +1279,7 @@ class FederationHandler(BaseHandler): @log_function def on_make_leave_request(self, room_id, user_id): """ We've received a /make_leave/ request, so we create a partial - join event for the room and return that. We do *not* persist or + leave event for the room and return that. We do *not* persist or process it until the other server has signed it and sent it back. """ builder = self.event_builder_factory.new({ diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 859f6d2b2e..cb5c6d587e 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -18,6 +18,7 @@ import logging from twisted.internet import defer from synapse.api.errors import AuthError, CodeMessageException, SynapseError +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import UserID, get_domain_from_id from ._base import BaseHandler @@ -41,7 +42,7 @@ class ProfileHandler(BaseHandler): if hs.config.worker_app is None: self.clock.looping_call( - self._update_remote_profile_cache, self.PROFILE_UPDATE_MS, + self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS, ) @defer.inlineCallbacks @@ -254,6 +255,12 @@ class ProfileHandler(BaseHandler): room_id, str(e.message) ) + def _start_update_remote_profile_cache(self): + return run_as_background_process( + "Update remote profile", self._update_remote_profile_cache, + ) + + @defer.inlineCallbacks def _update_remote_profile_cache(self): """Called periodically to check profiles of remote users we haven't checked in a while. diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 6150b7e226..003b848c00 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -24,7 +24,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset from synapse.api.errors import AuthError, Codes, StoreError, SynapseError -from synapse.types import RoomAlias, RoomID, RoomStreamToken, UserID +from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID from synapse.util import stringutils from synapse.visibility import filter_events_for_client @@ -418,8 +418,6 @@ class RoomContextHandler(object): before_limit = math.floor(limit / 2.) after_limit = limit - before_limit - now_token = yield self.hs.get_event_sources().get_current_token() - users = yield self.store.get_users_in_room(room_id) is_peeking = user.to_string() not in users @@ -462,11 +460,15 @@ class RoomContextHandler(object): ) results["state"] = list(state[last_event_id].values()) - results["start"] = now_token.copy_and_replace( + # We use a dummy token here as we only care about the room portion of + # the token, which we replace. + token = StreamToken.START + + results["start"] = token.copy_and_replace( "room_key", results["start"] ).to_string() - results["end"] = now_token.copy_and_replace( + results["end"] = token.copy_and_replace( "room_key", results["end"] ).to_string() diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index c24e35362a..dff1f67dcb 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- -# Copyright 2015 - 2016 OpenMarket Ltd +# Copyright 2015, 2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -25,6 +26,8 @@ from synapse.api.constants import EventTypes, Membership from synapse.push.clientformat import format_push_rules_for_user from synapse.types import RoomStreamToken from synapse.util.async import concurrently_execute +from synapse.util.caches.expiringcache import ExpiringCache +from synapse.util.caches.lrucache import LruCache from synapse.util.caches.response_cache import ResponseCache from synapse.util.logcontext import LoggingContext from synapse.util.metrics import Measure, measure_func @@ -32,6 +35,14 @@ from synapse.visibility import filter_events_for_client logger = logging.getLogger(__name__) +# Store the cache that tracks which lazy-loaded members have been sent to a given +# client for no more than 30 minutes. +LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000 + +# Remember the last 100 members we sent to a client for the purposes of +# avoiding redundantly sending the same lazy-loaded members to the client +LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100 + SyncConfig = collections.namedtuple("SyncConfig", [ "user", @@ -181,6 +192,12 @@ class SyncHandler(object): self.response_cache = ResponseCache(hs, "sync") self.state = hs.get_state_handler() + # ExpiringCache((User, Device)) -> LruCache(state_key => event_id) + self.lazy_loaded_members_cache = ExpiringCache( + "lazy_loaded_members_cache", self.clock, + max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE, + ) + def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, full_state=False): """Get the sync for a client if we have new data for it now. Otherwise @@ -416,29 +433,44 @@ class SyncHandler(object): )) @defer.inlineCallbacks - def get_state_after_event(self, event): + def get_state_after_event(self, event, types=None, filtered_types=None): """ Get the room state after the given event Args: event(synapse.events.EventBase): event of interest + types(list[(str, str|None)]|None): List of (type, state_key) tuples + which are used to filter the state fetched. If `state_key` is None, + all events are returned of the given type. + May be None, which matches any key. + filtered_types(list[str]|None): Only apply filtering via `types` to this + list of event types. Other types of events are returned unfiltered. + If None, `types` filtering is applied to all events. Returns: A Deferred map from ((type, state_key)->Event) """ - state_ids = yield self.store.get_state_ids_for_event(event.event_id) + state_ids = yield self.store.get_state_ids_for_event( + event.event_id, types, filtered_types=filtered_types, + ) if event.is_state(): state_ids = state_ids.copy() state_ids[(event.type, event.state_key)] = event.event_id defer.returnValue(state_ids) @defer.inlineCallbacks - def get_state_at(self, room_id, stream_position): + def get_state_at(self, room_id, stream_position, types=None, filtered_types=None): """ Get the room state at a particular stream position Args: room_id(str): room for which to get state stream_position(StreamToken): point at which to get state + types(list[(str, str|None)]|None): List of (type, state_key) tuples + which are used to filter the state fetched. If `state_key` is None, + all events are returned of the given type. + filtered_types(list[str]|None): Only apply filtering via `types` to this + list of event types. Other types of events are returned unfiltered. + If None, `types` filtering is applied to all events. Returns: A Deferred map from ((type, state_key)->Event) @@ -453,7 +485,9 @@ class SyncHandler(object): if last_events: last_event = last_events[-1] - state = yield self.get_state_after_event(last_event) + state = yield self.get_state_after_event( + last_event, types, filtered_types=filtered_types, + ) else: # no events in this room - so presumably no state @@ -485,59 +519,129 @@ class SyncHandler(object): # TODO(mjark) Check for new redactions in the state events. with Measure(self.clock, "compute_state_delta"): + + types = None + filtered_types = None + + lazy_load_members = sync_config.filter_collection.lazy_load_members() + include_redundant_members = ( + sync_config.filter_collection.include_redundant_members() + ) + + if lazy_load_members: + # We only request state for the members needed to display the + # timeline: + + types = [ + (EventTypes.Member, state_key) + for state_key in set( + event.sender # FIXME: we also care about invite targets etc. + for event in batch.events + ) + ] + + # only apply the filtering to room members + filtered_types = [EventTypes.Member] + + timeline_state = { + (event.type, event.state_key): event.event_id + for event in batch.events if event.is_state() + } + if full_state: if batch: current_state_ids = yield self.store.get_state_ids_for_event( - batch.events[-1].event_id + batch.events[-1].event_id, types=types, + filtered_types=filtered_types, ) state_ids = yield self.store.get_state_ids_for_event( - batch.events[0].event_id + batch.events[0].event_id, types=types, + filtered_types=filtered_types, ) + else: current_state_ids = yield self.get_state_at( - room_id, stream_position=now_token + room_id, stream_position=now_token, types=types, + filtered_types=filtered_types, ) state_ids = current_state_ids - timeline_state = { - (event.type, event.state_key): event.event_id - for event in batch.events if event.is_state() - } - state_ids = _calculate_state( timeline_contains=timeline_state, timeline_start=state_ids, previous={}, current=current_state_ids, + lazy_load_members=lazy_load_members, ) elif batch.limited: state_at_previous_sync = yield self.get_state_at( - room_id, stream_position=since_token + room_id, stream_position=since_token, types=types, + filtered_types=filtered_types, ) current_state_ids = yield self.store.get_state_ids_for_event( - batch.events[-1].event_id + batch.events[-1].event_id, types=types, + filtered_types=filtered_types, ) state_at_timeline_start = yield self.store.get_state_ids_for_event( - batch.events[0].event_id + batch.events[0].event_id, types=types, + filtered_types=filtered_types, ) - timeline_state = { - (event.type, event.state_key): event.event_id - for event in batch.events if event.is_state() - } - state_ids = _calculate_state( timeline_contains=timeline_state, timeline_start=state_at_timeline_start, previous=state_at_previous_sync, current=current_state_ids, + lazy_load_members=lazy_load_members, ) else: state_ids = {} + if lazy_load_members: + if types: + state_ids = yield self.store.get_state_ids_for_event( + batch.events[0].event_id, types=types, + filtered_types=filtered_types, + ) + + if lazy_load_members and not include_redundant_members: + cache_key = (sync_config.user.to_string(), sync_config.device_id) + cache = self.lazy_loaded_members_cache.get(cache_key) + if cache is None: + logger.debug("creating LruCache for %r", cache_key) + cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE) + self.lazy_loaded_members_cache[cache_key] = cache + else: + logger.debug("found LruCache for %r", cache_key) + + # if it's a new sync sequence, then assume the client has had + # amnesia and doesn't want any recent lazy-loaded members + # de-duplicated. + if since_token is None: + logger.debug("clearing LruCache for %r", cache_key) + cache.clear() + else: + # only send members which aren't in our LruCache (either + # because they're new to this client or have been pushed out + # of the cache) + logger.debug("filtering state from %r...", state_ids) + state_ids = { + t: event_id + for t, event_id in state_ids.iteritems() + if cache.get(t[1]) != event_id + } + logger.debug("...to %r", state_ids) + + # add any member IDs we are about to send into our LruCache + for t, event_id in itertools.chain( + state_ids.items(), + timeline_state.items(), + ): + if t[0] == EventTypes.Member: + cache.set(t[1], event_id) state = {} if state_ids: @@ -1448,7 +1552,9 @@ def _action_has_highlight(actions): return False -def _calculate_state(timeline_contains, timeline_start, previous, current): +def _calculate_state( + timeline_contains, timeline_start, previous, current, lazy_load_members, +): """Works out what state to include in a sync response. Args: @@ -1457,6 +1563,9 @@ def _calculate_state(timeline_contains, timeline_start, previous, current): previous (dict): state at the end of the previous sync (or empty dict if this is an initial sync) current (dict): state at the end of the timeline + lazy_load_members (bool): whether to return members from timeline_start + or not. assumes that timeline_start has already been filtered to + include only the members the client needs to know about. Returns: dict @@ -1472,9 +1581,25 @@ def _calculate_state(timeline_contains, timeline_start, previous, current): } c_ids = set(e for e in current.values()) - tc_ids = set(e for e in timeline_contains.values()) - p_ids = set(e for e in previous.values()) ts_ids = set(e for e in timeline_start.values()) + p_ids = set(e for e in previous.values()) + tc_ids = set(e for e in timeline_contains.values()) + + # If we are lazyloading room members, we explicitly add the membership events + # for the senders in the timeline into the state block returned by /sync, + # as we may not have sent them to the client before. We find these membership + # events by filtering them out of timeline_start, which has already been filtered + # to only include membership events for the senders in the timeline. + # In practice, we can do this by removing them from the p_ids list, + # which is the list of relevant state we know we have already sent to the client. + # see https://github.com/matrix-org/synapse/pull/2970 + # /files/efcdacad7d1b7f52f879179701c7e0d9b763511f#r204732809 + + if lazy_load_members: + p_ids.difference_update( + e for t, e in timeline_start.iteritems() + if t[0] == EventTypes.Member + ) state_ids = ((c_ids | ts_ids) - p_ids) - tc_ids |