diff options
Diffstat (limited to 'synapse/handlers/presence.py')
-rw-r--r-- | synapse/handlers/presence.py | 411 |
1 files changed, 210 insertions, 201 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 59d53f1050..5204073a38 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -50,16 +50,20 @@ logger = logging.getLogger(__name__) notified_presence_counter = Counter("synapse_handler_presence_notified_presence", "") federation_presence_out_counter = Counter( - "synapse_handler_presence_federation_presence_out", "") + "synapse_handler_presence_federation_presence_out", "" +) presence_updates_counter = Counter("synapse_handler_presence_presence_updates", "") timers_fired_counter = Counter("synapse_handler_presence_timers_fired", "") -federation_presence_counter = Counter("synapse_handler_presence_federation_presence", "") +federation_presence_counter = Counter( + "synapse_handler_presence_federation_presence", "" +) bump_active_time_counter = Counter("synapse_handler_presence_bump_active_time", "") get_updates_counter = Counter("synapse_handler_presence_get_updates", "", ["type"]) notify_reason_counter = Counter( - "synapse_handler_presence_notify_reason", "", ["reason"]) + "synapse_handler_presence_notify_reason", "", ["reason"] +) state_transition_counter = Counter( "synapse_handler_presence_state_transition", "", ["from", "to"] ) @@ -90,7 +94,6 @@ assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER class PresenceHandler(object): - def __init__(self, hs): """ @@ -110,31 +113,26 @@ class PresenceHandler(object): federation_registry = hs.get_federation_registry() - federation_registry.register_edu_handler( - "m.presence", self.incoming_presence - ) + federation_registry.register_edu_handler("m.presence", self.incoming_presence) active_presence = self.store.take_presence_startup_info() # A dictionary of the current state of users. This is prefilled with # non-offline presence from the DB. We should fetch from the DB if # we can't find a users presence in here. - self.user_to_current_state = { - state.user_id: state - for state in active_presence - } + self.user_to_current_state = {state.user_id: state for state in active_presence} LaterGauge( - "synapse_handlers_presence_user_to_current_state_size", "", [], - lambda: len(self.user_to_current_state) + "synapse_handlers_presence_user_to_current_state_size", + "", + [], + lambda: len(self.user_to_current_state), ) now = self.clock.time_msec() for state in active_presence: self.wheel_timer.insert( - now=now, - obj=state.user_id, - then=state.last_active_ts + IDLE_TIMER, + now=now, obj=state.user_id, then=state.last_active_ts + IDLE_TIMER ) self.wheel_timer.insert( now=now, @@ -158,7 +156,13 @@ class PresenceHandler(object): # have not yet been persisted self.unpersisted_users_changes = set() - hs.get_reactor().addSystemEventTrigger("before", "shutdown", self._on_shutdown) + hs.get_reactor().addSystemEventTrigger( + "before", + "shutdown", + run_as_background_process, + "presence.on_shutdown", + self._on_shutdown, + ) self.serial_to_user = {} self._next_serial = 1 @@ -182,22 +186,26 @@ class PresenceHandler(object): # Start a LoopingCall in 30s that fires every 5s. # The initial delay is to allow disconnected clients a chance to # reconnect before we treat them as offline. - self.clock.call_later( - 30, - self.clock.looping_call, - self._handle_timeouts, - 5000, - ) + def run_timeout_handler(): + return run_as_background_process( + "handle_presence_timeouts", self._handle_timeouts + ) - self.clock.call_later( - 60, - self.clock.looping_call, - self._persist_unpersisted_changes, - 60 * 1000, - ) + self.clock.call_later(30, self.clock.looping_call, run_timeout_handler, 5000) - LaterGauge("synapse_handlers_presence_wheel_timer_size", "", [], - lambda: len(self.wheel_timer)) + def run_persister(): + return run_as_background_process( + "persist_presence_changes", self._persist_unpersisted_changes + ) + + self.clock.call_later(60, self.clock.looping_call, run_persister, 60 * 1000) + + LaterGauge( + "synapse_handlers_presence_wheel_timer_size", + "", + [], + lambda: len(self.wheel_timer), + ) # Used to handle sending of presence to newly joined users/servers if hs.config.use_presence: @@ -225,14 +233,17 @@ class PresenceHandler(object): logger.info( "Performing _on_shutdown. Persisting %d unpersisted changes", - len(self.user_to_current_state) + len(self.user_to_current_state), ) if self.unpersisted_users_changes: - yield self.store.update_presence([ - self.user_to_current_state[user_id] - for user_id in self.unpersisted_users_changes - ]) + + yield self.store.update_presence( + [ + self.user_to_current_state[user_id] + for user_id in self.unpersisted_users_changes + ] + ) logger.info("Finished _on_shutdown") @defer.inlineCallbacks @@ -240,29 +251,14 @@ class PresenceHandler(object): """We periodically persist the unpersisted changes, as otherwise they may stack up and slow down shutdown times. """ - logger.info( - "Performing _persist_unpersisted_changes. Persisting %d unpersisted changes", - len(self.unpersisted_users_changes) - ) - unpersisted = self.unpersisted_users_changes self.unpersisted_users_changes = set() if unpersisted: - yield self.store.update_presence([ - self.user_to_current_state[user_id] - for user_id in unpersisted - ]) - - logger.info("Finished _persist_unpersisted_changes") - - @defer.inlineCallbacks - def _update_states_and_catch_exception(self, new_states): - try: - res = yield self._update_states(new_states) - defer.returnValue(res) - except Exception: - logger.exception("Error updating presence") + logger.info("Persisting %d upersisted presence updates", len(unpersisted)) + yield self.store.update_presence( + [self.user_to_current_state[user_id] for user_id in unpersisted] + ) @defer.inlineCallbacks def _update_states(self, new_states): @@ -298,10 +294,11 @@ class PresenceHandler(object): ) new_state, should_notify, should_ping = handle_update( - prev_state, new_state, + prev_state, + new_state, is_mine=self.is_mine_id(user_id), wheel_timer=self.wheel_timer, - now=now + now=now, ) self.user_to_current_state[user_id] = new_state @@ -323,7 +320,8 @@ class PresenceHandler(object): self.unpersisted_users_changes -= set(to_notify.keys()) to_federation_ping = { - user_id: state for user_id, state in to_federation_ping.items() + user_id: state + for user_id, state in to_federation_ping.items() if user_id not in to_notify } if to_federation_ping: @@ -338,45 +336,39 @@ class PresenceHandler(object): logger.info("Handling presence timeouts") now = self.clock.time_msec() - try: - with Measure(self.clock, "presence_handle_timeouts"): - # Fetch the list of users that *may* have timed out. Things may have - # changed since the timeout was set, so we won't necessarily have to - # take any action. - users_to_check = set(self.wheel_timer.fetch(now)) - - # Check whether the lists of syncing processes from an external - # process have expired. - expired_process_ids = [ - process_id for process_id, last_update - in self.external_process_last_updated_ms.items() - if now - last_update > EXTERNAL_PROCESS_EXPIRY - ] - for process_id in expired_process_ids: - users_to_check.update( - self.external_process_last_updated_ms.pop(process_id, ()) - ) - self.external_process_last_update.pop(process_id) + # Fetch the list of users that *may* have timed out. Things may have + # changed since the timeout was set, so we won't necessarily have to + # take any action. + users_to_check = set(self.wheel_timer.fetch(now)) + + # Check whether the lists of syncing processes from an external + # process have expired. + expired_process_ids = [ + process_id + for process_id, last_update in self.external_process_last_updated_ms.items() + if now - last_update > EXTERNAL_PROCESS_EXPIRY + ] + for process_id in expired_process_ids: + users_to_check.update( + self.external_process_last_updated_ms.pop(process_id, ()) + ) + self.external_process_last_update.pop(process_id) - states = [ - self.user_to_current_state.get( - user_id, UserPresenceState.default(user_id) - ) - for user_id in users_to_check - ] + states = [ + self.user_to_current_state.get(user_id, UserPresenceState.default(user_id)) + for user_id in users_to_check + ] - timers_fired_counter.inc(len(states)) + timers_fired_counter.inc(len(states)) - changes = handle_timeouts( - states, - is_mine_fn=self.is_mine_id, - syncing_user_ids=self.get_currently_syncing_users(), - now=now, - ) + changes = handle_timeouts( + states, + is_mine_fn=self.is_mine_id, + syncing_user_ids=self.get_currently_syncing_users(), + now=now, + ) - run_in_background(self._update_states_and_catch_exception, changes) - except Exception: - logger.exception("Exception in _handle_timeouts loop") + return self._update_states(changes) @defer.inlineCallbacks def bump_presence_active_time(self, user): @@ -393,9 +385,7 @@ class PresenceHandler(object): prev_state = yield self.current_state_for_user(user_id) - new_fields = { - "last_active_ts": self.clock.time_msec(), - } + new_fields = {"last_active_ts": self.clock.time_msec()} if prev_state.state == PresenceState.UNAVAILABLE: new_fields["state"] = PresenceState.ONLINE @@ -429,15 +419,23 @@ class PresenceHandler(object): if prev_state.state == PresenceState.OFFLINE: # If they're currently offline then bring them online, otherwise # just update the last sync times. - yield self._update_states([prev_state.copy_and_replace( - state=PresenceState.ONLINE, - last_active_ts=self.clock.time_msec(), - last_user_sync_ts=self.clock.time_msec(), - )]) + yield self._update_states( + [ + prev_state.copy_and_replace( + state=PresenceState.ONLINE, + last_active_ts=self.clock.time_msec(), + last_user_sync_ts=self.clock.time_msec(), + ) + ] + ) else: - yield self._update_states([prev_state.copy_and_replace( - last_user_sync_ts=self.clock.time_msec(), - )]) + yield self._update_states( + [ + prev_state.copy_and_replace( + last_user_sync_ts=self.clock.time_msec() + ) + ] + ) @defer.inlineCallbacks def _end(): @@ -445,9 +443,13 @@ class PresenceHandler(object): self.user_to_num_current_syncs[user_id] -= 1 prev_state = yield self.current_state_for_user(user_id) - yield self._update_states([prev_state.copy_and_replace( - last_user_sync_ts=self.clock.time_msec(), - )]) + yield self._update_states( + [ + prev_state.copy_and_replace( + last_user_sync_ts=self.clock.time_msec() + ) + ] + ) except Exception: logger.exception("Error updating presence after sync") @@ -468,7 +470,8 @@ class PresenceHandler(object): """ if self.hs.config.use_presence: syncing_user_ids = { - user_id for user_id, count in self.user_to_num_current_syncs.items() + user_id + for user_id, count in self.user_to_num_current_syncs.items() if count } for user_ids in self.external_process_to_current_syncs.values(): @@ -478,7 +481,9 @@ class PresenceHandler(object): return set() @defer.inlineCallbacks - def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec): + def update_external_syncs_row( + self, process_id, user_id, is_syncing, sync_time_msec + ): """Update the syncing users for an external process as a delta. Args: @@ -499,20 +504,22 @@ class PresenceHandler(object): updates = [] if is_syncing and user_id not in process_presence: if prev_state.state == PresenceState.OFFLINE: - updates.append(prev_state.copy_and_replace( - state=PresenceState.ONLINE, - last_active_ts=sync_time_msec, - last_user_sync_ts=sync_time_msec, - )) + updates.append( + prev_state.copy_and_replace( + state=PresenceState.ONLINE, + last_active_ts=sync_time_msec, + last_user_sync_ts=sync_time_msec, + ) + ) else: - updates.append(prev_state.copy_and_replace( - last_user_sync_ts=sync_time_msec, - )) + updates.append( + prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec) + ) process_presence.add(user_id) elif user_id in process_presence: - updates.append(prev_state.copy_and_replace( - last_user_sync_ts=sync_time_msec, - )) + updates.append( + prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec) + ) if not is_syncing: process_presence.discard(user_id) @@ -536,12 +543,12 @@ class PresenceHandler(object): prev_states = yield self.current_state_for_users(process_presence) time_now_ms = self.clock.time_msec() - yield self._update_states([ - prev_state.copy_and_replace( - last_user_sync_ts=time_now_ms, - ) - for prev_state in itervalues(prev_states) - ]) + yield self._update_states( + [ + prev_state.copy_and_replace(last_user_sync_ts=time_now_ms) + for prev_state in itervalues(prev_states) + ] + ) self.external_process_last_updated_ms.pop(process_id, None) @defer.inlineCallbacks @@ -573,8 +580,7 @@ class PresenceHandler(object): missing = [user_id for user_id, state in iteritems(states) if not state] if missing: new = { - user_id: UserPresenceState.default(user_id) - for user_id in missing + user_id: UserPresenceState.default(user_id) for user_id in missing } states.update(new) self.user_to_current_state.update(new) @@ -592,8 +598,10 @@ class PresenceHandler(object): room_ids_to_states, users_to_states = parties self.notifier.on_new_event( - "presence_key", stream_id, rooms=room_ids_to_states.keys(), - users=[UserID.from_string(u) for u in users_to_states] + "presence_key", + stream_id, + rooms=room_ids_to_states.keys(), + users=[UserID.from_string(u) for u in users_to_states], ) self._push_to_remotes(states) @@ -604,8 +612,10 @@ class PresenceHandler(object): room_ids_to_states, users_to_states = parties self.notifier.on_new_event( - "presence_key", stream_id, rooms=room_ids_to_states.keys(), - users=[UserID.from_string(u) for u in users_to_states] + "presence_key", + stream_id, + rooms=room_ids_to_states.keys(), + users=[UserID.from_string(u) for u in users_to_states], ) def _push_to_remotes(self, states): @@ -630,15 +640,15 @@ class PresenceHandler(object): user_id = push.get("user_id", None) if not user_id: logger.info( - "Got presence update from %r with no 'user_id': %r", - origin, push, + "Got presence update from %r with no 'user_id': %r", origin, push ) continue if get_domain_from_id(user_id) != origin: logger.info( "Got presence update from %r with bad 'user_id': %r", - origin, user_id, + origin, + user_id, ) continue @@ -646,14 +656,12 @@ class PresenceHandler(object): if not presence_state: logger.info( "Got presence update from %r with no 'presence_state': %r", - origin, push, + origin, + push, ) continue - new_fields = { - "state": presence_state, - "last_federation_update_ts": now, - } + new_fields = {"state": presence_state, "last_federation_update_ts": now} last_active_ago = push.get("last_active_ago", None) if last_active_ago is not None: @@ -671,10 +679,7 @@ class PresenceHandler(object): @defer.inlineCallbacks def get_state(self, target_user, as_event=False): - results = yield self.get_states( - [target_user.to_string()], - as_event=as_event, - ) + results = yield self.get_states([target_user.to_string()], as_event=as_event) defer.returnValue(results[0]) @@ -698,13 +703,15 @@ class PresenceHandler(object): now = self.clock.time_msec() if as_event: - defer.returnValue([ - { - "type": "m.presence", - "content": format_user_presence_state(state, now), - } - for state in updates - ]) + defer.returnValue( + [ + { + "type": "m.presence", + "content": format_user_presence_state(state, now), + } + for state in updates + ] + ) else: defer.returnValue(updates) @@ -716,7 +723,9 @@ class PresenceHandler(object): presence = state["presence"] valid_presence = ( - PresenceState.ONLINE, PresenceState.UNAVAILABLE, PresenceState.OFFLINE + PresenceState.ONLINE, + PresenceState.UNAVAILABLE, + PresenceState.OFFLINE, ) if presence not in valid_presence: raise SynapseError(400, "Invalid presence state") @@ -725,9 +734,7 @@ class PresenceHandler(object): prev_state = yield self.current_state_for_user(user_id) - new_fields = { - "state": presence - } + new_fields = {"state": presence} if not ignore_status_msg: msg = status_msg if presence != PresenceState.OFFLINE else None @@ -833,14 +840,17 @@ class PresenceHandler(object): # joins. continue - event = yield self.store.get_event(event_id) - if event.content.get("membership") != Membership.JOIN: + event = yield self.store.get_event(event_id, allow_none=True) + if not event or event.content.get("membership") != Membership.JOIN: # We only care about joins continue if prev_event_id: - prev_event = yield self.store.get_event(prev_event_id) - if prev_event.content.get("membership") == Membership.JOIN: + prev_event = yield self.store.get_event(prev_event_id, allow_none=True) + if ( + prev_event + and prev_event.content.get("membership") == Membership.JOIN + ): # Ignore changes to join events. continue @@ -873,8 +883,7 @@ class PresenceHandler(object): hosts = set(host for host in hosts if host != self.server_name) self.federation.send_presence_to_destinations( - states=[state], - destinations=hosts, + states=[state], destinations=hosts ) else: # A remote user has joined the room, so we need to: @@ -900,7 +909,8 @@ class PresenceHandler(object): # default state. now = self.clock.time_msec() states = [ - state for state in states.values() + state + for state in states.values() if state.state != PresenceState.OFFLINE or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000 or state.status_msg is not None @@ -908,8 +918,7 @@ class PresenceHandler(object): if states: self.federation.send_presence_to_destinations( - states=states, - destinations=[get_domain_from_id(user_id)], + states=states, destinations=[get_domain_from_id(user_id)] ) @@ -933,7 +942,10 @@ def should_notify(old_state, new_state): notify_reason_counter.labels("current_active_change").inc() return True - if new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY: + if ( + new_state.last_active_ts - old_state.last_active_ts + > LAST_ACTIVE_GRANULARITY + ): # Only notify about last active bumps if we're not currently acive if not new_state.currently_active: notify_reason_counter.labels("last_active_change_online").inc() @@ -954,9 +966,7 @@ def format_user_presence_state(state, now, include_user_id=True): The "user_id" is optional so that this function can be used to format presence updates for client /sync responses and for federation /send requests. """ - content = { - "presence": state.state, - } + content = {"presence": state.state} if include_user_id: content["user_id"] = state.user_id if state.last_active_ts: @@ -982,8 +992,15 @@ class PresenceEventSource(object): @defer.inlineCallbacks @log_function - def get_new_events(self, user, from_key, room_ids=None, include_offline=True, - explicit_room_id=None, **kwargs): + def get_new_events( + self, + user, + from_key, + room_ids=None, + include_offline=True, + explicit_room_id=None, + **kwargs + ): # The process for getting presence events are: # 1. Get the rooms the user is in. # 2. Get the list of user in the rooms. @@ -1026,7 +1043,7 @@ class PresenceEventSource(object): if from_key: user_ids_changed = stream_change_cache.get_entities_changed( - users_interested_in, from_key, + users_interested_in, from_key ) else: user_ids_changed = users_interested_in @@ -1036,10 +1053,16 @@ class PresenceEventSource(object): if include_offline: defer.returnValue((list(updates.values()), max_token)) else: - defer.returnValue(([ - s for s in itervalues(updates) - if s.state != PresenceState.OFFLINE - ], max_token)) + defer.returnValue( + ( + [ + s + for s in itervalues(updates) + if s.state != PresenceState.OFFLINE + ], + max_token, + ) + ) def get_current_key(self): return self.store.get_current_presence_token() @@ -1057,13 +1080,13 @@ class PresenceEventSource(object): users_interested_in.add(user_id) # So that we receive our own presence users_who_share_room = yield self.store.get_users_who_share_room_with_user( - user_id, on_invalidate=cache_context.invalidate, + user_id, on_invalidate=cache_context.invalidate ) users_interested_in.update(users_who_share_room) if explicit_room_id: user_ids = yield self.store.get_users_in_room( - explicit_room_id, on_invalidate=cache_context.invalidate, + explicit_room_id, on_invalidate=cache_context.invalidate ) users_interested_in.update(user_ids) @@ -1119,9 +1142,7 @@ def handle_timeout(state, is_mine, syncing_user_ids, now): if now - state.last_active_ts > IDLE_TIMER: # Currently online, but last activity ages ago so auto # idle - state = state.copy_and_replace( - state=PresenceState.UNAVAILABLE, - ) + state = state.copy_and_replace(state=PresenceState.UNAVAILABLE) changed = True elif now - state.last_active_ts > LAST_ACTIVE_GRANULARITY: # So that we send down a notification that we've @@ -1141,8 +1162,7 @@ def handle_timeout(state, is_mine, syncing_user_ids, now): sync_or_active = max(state.last_user_sync_ts, state.last_active_ts) if now - sync_or_active > SYNC_ONLINE_TIMEOUT: state = state.copy_and_replace( - state=PresenceState.OFFLINE, - status_msg=None, + state=PresenceState.OFFLINE, status_msg=None ) changed = True else: @@ -1151,10 +1171,7 @@ def handle_timeout(state, is_mine, syncing_user_ids, now): # no one gets stuck online forever. if now - state.last_federation_update_ts > FEDERATION_TIMEOUT: # The other side seems to have disappeared. - state = state.copy_and_replace( - state=PresenceState.OFFLINE, - status_msg=None, - ) + state = state.copy_and_replace(state=PresenceState.OFFLINE, status_msg=None) changed = True return state if changed else None @@ -1189,21 +1206,17 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now): if new_state.state == PresenceState.ONLINE: # Idle timer wheel_timer.insert( - now=now, - obj=user_id, - then=new_state.last_active_ts + IDLE_TIMER + now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER ) active = now - new_state.last_active_ts < LAST_ACTIVE_GRANULARITY - new_state = new_state.copy_and_replace( - currently_active=active, - ) + new_state = new_state.copy_and_replace(currently_active=active) if active: wheel_timer.insert( now=now, obj=user_id, - then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY + then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY, ) if new_state.state != PresenceState.OFFLINE: @@ -1211,29 +1224,25 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now): wheel_timer.insert( now=now, obj=user_id, - then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT + then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT, ) last_federate = new_state.last_federation_update_ts if now - last_federate > FEDERATION_PING_INTERVAL: # Been a while since we've poked remote servers - new_state = new_state.copy_and_replace( - last_federation_update_ts=now, - ) + new_state = new_state.copy_and_replace(last_federation_update_ts=now) federation_ping = True else: wheel_timer.insert( now=now, obj=user_id, - then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT + then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT, ) # Check whether the change was something worth notifying about if should_notify(prev_state, new_state): - new_state = new_state.copy_and_replace( - last_federation_update_ts=now, - ) + new_state = new_state.copy_and_replace(last_federation_update_ts=now) persist_and_notify = True return new_state, persist_and_notify, federation_ping |