diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/presence.py | 131 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 16 |
2 files changed, 84 insertions, 63 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 4197311a97..aed640450f 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -31,6 +31,7 @@ from synapse.storage.presence import UserPresenceState from synapse.util.logcontext import preserve_fn from synapse.util.logutils import log_function +from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer from synapse.types import UserID import synapse.metrics @@ -209,57 +210,60 @@ class PresenceHandler(BaseHandler): """ now = self.clock.time_msec() - # NOTE: We purposefully don't yield between now and when we've - # calculated what we want to do with the new states, to avoid races. + with Measure(self.clock, "presence_update_states"): - to_notify = {} # Changes we want to notify everyone about - to_federation_ping = {} # These need sending keep-alives - for new_state in new_states: - user_id = new_state.user_id + # NOTE: We purposefully don't yield between now and when we've + # calculated what we want to do with the new states, to avoid races. - # Its fine to not hit the database here, as the only thing not in - # the current state cache are OFFLINE states, where the only field - # of interest is last_active which is safe enough to assume is 0 - # here. - prev_state = self.user_to_current_state.get( - user_id, UserPresenceState.default(user_id) - ) + to_notify = {} # Changes we want to notify everyone about + to_federation_ping = {} # These need sending keep-alives - new_state, should_notify, should_ping = handle_update( - prev_state, new_state, - is_mine=self.hs.is_mine_id(user_id), - wheel_timer=self.wheel_timer, - now=now - ) + for new_state in new_states: + user_id = new_state.user_id - self.user_to_current_state[user_id] = new_state + # Its fine to not hit the database here, as the only thing not in + # the current state cache are OFFLINE states, where the only field + # of interest is last_active which is safe enough to assume is 0 + # here. + prev_state = self.user_to_current_state.get( + user_id, UserPresenceState.default(user_id) + ) - if should_notify: - to_notify[user_id] = new_state - elif should_ping: - to_federation_ping[user_id] = new_state + new_state, should_notify, should_ping = handle_update( + prev_state, new_state, + is_mine=self.hs.is_mine_id(user_id), + wheel_timer=self.wheel_timer, + now=now + ) - # TODO: We should probably ensure there are no races hereafter + self.user_to_current_state[user_id] = new_state - presence_updates_counter.inc_by(len(new_states)) + if should_notify: + to_notify[user_id] = new_state + elif should_ping: + to_federation_ping[user_id] = new_state - if to_notify: - notified_presence_counter.inc_by(len(to_notify)) - yield self._persist_and_notify(to_notify.values()) + # TODO: We should probably ensure there are no races hereafter - self.unpersisted_users_changes |= set(s.user_id for s in new_states) - self.unpersisted_users_changes -= set(to_notify.keys()) + presence_updates_counter.inc_by(len(new_states)) - to_federation_ping = { - user_id: state for user_id, state in to_federation_ping.items() - if user_id not in to_notify - } - if to_federation_ping: - _, _, hosts_to_states = yield self._get_interested_parties( - to_federation_ping.values() - ) + if to_notify: + notified_presence_counter.inc_by(len(to_notify)) + yield self._persist_and_notify(to_notify.values()) + + self.unpersisted_users_changes |= set(s.user_id for s in new_states) + self.unpersisted_users_changes -= set(to_notify.keys()) + + to_federation_ping = { + user_id: state for user_id, state in to_federation_ping.items() + if user_id not in to_notify + } + if to_federation_ping: + _, _, hosts_to_states = yield self._get_interested_parties( + to_federation_ping.values() + ) - self._push_to_remotes(hosts_to_states) + self._push_to_remotes(hosts_to_states) def _handle_timeouts(self): """Checks the presence of users that have timed out and updates as @@ -267,26 +271,27 @@ class PresenceHandler(BaseHandler): """ now = self.clock.time_msec() - # Fetch the list of users that *may* have timed out. Things may have - # changed since the timeout was set, so we won't necessarily have to - # take any action. - users_to_check = self.wheel_timer.fetch(now) + with Measure(self.clock, "presence_handle_timeouts"): + # Fetch the list of users that *may* have timed out. Things may have + # changed since the timeout was set, so we won't necessarily have to + # take any action. + users_to_check = self.wheel_timer.fetch(now) - states = [ - self.user_to_current_state.get( - user_id, UserPresenceState.default(user_id) - ) - for user_id in set(users_to_check) - ] + states = [ + self.user_to_current_state.get( + user_id, UserPresenceState.default(user_id) + ) + for user_id in set(users_to_check) + ] - timers_fired_counter.inc_by(len(states)) + timers_fired_counter.inc_by(len(states)) - changes = handle_timeouts( - states, - is_mine_fn=self.hs.is_mine_id, - user_to_num_current_syncs=self.user_to_num_current_syncs, - now=now, - ) + changes = handle_timeouts( + states, + is_mine_fn=self.hs.is_mine_id, + user_to_num_current_syncs=self.user_to_num_current_syncs, + now=now, + ) preserve_fn(self._update_states)(changes) @@ -423,13 +428,21 @@ class PresenceHandler(BaseHandler): hosts_to_states = {} for room_id, states in room_ids_to_states.items(): + local_states = filter(lambda s: self.hs.is_mine_id(s.user_id), states) + if not local_states: + continue + hosts = yield self.store.get_joined_hosts_for_room(room_id) for host in hosts: - hosts_to_states.setdefault(host, []).extend(states) + hosts_to_states.setdefault(host, []).extend(local_states) for user_id, states in users_to_states.items(): + local_states = filter(lambda s: self.hs.is_mine_id(s.user_id), states) + if not local_states: + continue + host = UserID.from_string(user_id).domain - hosts_to_states.setdefault(host, []).extend(states) + hosts_to_states.setdefault(host, []).extend(local_states) # TODO: de-dup hosts_to_states, as a single host might have multiple # of same presence diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 75e4ffb3ab..c87ff75c05 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -843,16 +843,20 @@ class SyncHandler(BaseHandler): # TODO(mjark) Check for new redactions in the state events. with Measure(self.clock, "compute_state_delta"): - current_state = yield self.get_state_at( - room_id, stream_position=now_token - ) - if full_state: if batch: + current_state = yield self.store.get_state_for_event( + batch.events[-1].event_id + ) + state = yield self.store.get_state_for_event( batch.events[0].event_id ) else: + current_state = yield self.get_state_at( + room_id, stream_position=now_token + ) + state = current_state timeline_state = { @@ -871,6 +875,10 @@ class SyncHandler(BaseHandler): room_id, stream_position=since_token ) + current_state = yield self.store.get_state_for_event( + batch.events[-1].event_id + ) + state_at_timeline_start = yield self.store.get_state_for_event( batch.events[0].event_id ) |