diff options
Diffstat (limited to '')
-rw-r--r-- | synapse/handlers/presence.py | 311 |
1 files changed, 158 insertions, 153 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 557fb5f83d..5204073a38 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -50,16 +50,20 @@ logger = logging.getLogger(__name__) notified_presence_counter = Counter("synapse_handler_presence_notified_presence", "") federation_presence_out_counter = Counter( - "synapse_handler_presence_federation_presence_out", "") + "synapse_handler_presence_federation_presence_out", "" +) presence_updates_counter = Counter("synapse_handler_presence_presence_updates", "") timers_fired_counter = Counter("synapse_handler_presence_timers_fired", "") -federation_presence_counter = Counter("synapse_handler_presence_federation_presence", "") +federation_presence_counter = Counter( + "synapse_handler_presence_federation_presence", "" +) bump_active_time_counter = Counter("synapse_handler_presence_bump_active_time", "") get_updates_counter = Counter("synapse_handler_presence_get_updates", "", ["type"]) notify_reason_counter = Counter( - "synapse_handler_presence_notify_reason", "", ["reason"]) + "synapse_handler_presence_notify_reason", "", ["reason"] +) state_transition_counter = Counter( "synapse_handler_presence_state_transition", "", ["from", "to"] ) @@ -90,7 +94,6 @@ assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER class PresenceHandler(object): - def __init__(self, hs): """ @@ -110,31 +113,26 @@ class PresenceHandler(object): federation_registry = hs.get_federation_registry() - federation_registry.register_edu_handler( - "m.presence", self.incoming_presence - ) + federation_registry.register_edu_handler("m.presence", self.incoming_presence) active_presence = self.store.take_presence_startup_info() # A dictionary of the current state of users. This is prefilled with # non-offline presence from the DB. We should fetch from the DB if # we can't find a users presence in here. - self.user_to_current_state = { - state.user_id: state - for state in active_presence - } + self.user_to_current_state = {state.user_id: state for state in active_presence} LaterGauge( - "synapse_handlers_presence_user_to_current_state_size", "", [], - lambda: len(self.user_to_current_state) + "synapse_handlers_presence_user_to_current_state_size", + "", + [], + lambda: len(self.user_to_current_state), ) now = self.clock.time_msec() for state in active_presence: self.wheel_timer.insert( - now=now, - obj=state.user_id, - then=state.last_active_ts + IDLE_TIMER, + now=now, obj=state.user_id, then=state.last_active_ts + IDLE_TIMER ) self.wheel_timer.insert( now=now, @@ -193,27 +191,21 @@ class PresenceHandler(object): "handle_presence_timeouts", self._handle_timeouts ) - self.clock.call_later( - 30, - self.clock.looping_call, - run_timeout_handler, - 5000, - ) + self.clock.call_later(30, self.clock.looping_call, run_timeout_handler, 5000) def run_persister(): return run_as_background_process( "persist_presence_changes", self._persist_unpersisted_changes ) - self.clock.call_later( - 60, - self.clock.looping_call, - run_persister, - 60 * 1000, - ) + self.clock.call_later(60, self.clock.looping_call, run_persister, 60 * 1000) - LaterGauge("synapse_handlers_presence_wheel_timer_size", "", [], - lambda: len(self.wheel_timer)) + LaterGauge( + "synapse_handlers_presence_wheel_timer_size", + "", + [], + lambda: len(self.wheel_timer), + ) # Used to handle sending of presence to newly joined users/servers if hs.config.use_presence: @@ -241,15 +233,17 @@ class PresenceHandler(object): logger.info( "Performing _on_shutdown. Persisting %d unpersisted changes", - len(self.user_to_current_state) + len(self.user_to_current_state), ) if self.unpersisted_users_changes: - yield self.store.update_presence([ - self.user_to_current_state[user_id] - for user_id in self.unpersisted_users_changes - ]) + yield self.store.update_presence( + [ + self.user_to_current_state[user_id] + for user_id in self.unpersisted_users_changes + ] + ) logger.info("Finished _on_shutdown") @defer.inlineCallbacks @@ -261,13 +255,10 @@ class PresenceHandler(object): self.unpersisted_users_changes = set() if unpersisted: - logger.info( - "Persisting %d upersisted presence updates", len(unpersisted) + logger.info("Persisting %d upersisted presence updates", len(unpersisted)) + yield self.store.update_presence( + [self.user_to_current_state[user_id] for user_id in unpersisted] ) - yield self.store.update_presence([ - self.user_to_current_state[user_id] - for user_id in unpersisted - ]) @defer.inlineCallbacks def _update_states(self, new_states): @@ -303,10 +294,11 @@ class PresenceHandler(object): ) new_state, should_notify, should_ping = handle_update( - prev_state, new_state, + prev_state, + new_state, is_mine=self.is_mine_id(user_id), wheel_timer=self.wheel_timer, - now=now + now=now, ) self.user_to_current_state[user_id] = new_state @@ -328,7 +320,8 @@ class PresenceHandler(object): self.unpersisted_users_changes -= set(to_notify.keys()) to_federation_ping = { - user_id: state for user_id, state in to_federation_ping.items() + user_id: state + for user_id, state in to_federation_ping.items() if user_id not in to_notify } if to_federation_ping: @@ -351,8 +344,8 @@ class PresenceHandler(object): # Check whether the lists of syncing processes from an external # process have expired. expired_process_ids = [ - process_id for process_id, last_update - in self.external_process_last_updated_ms.items() + process_id + for process_id, last_update in self.external_process_last_updated_ms.items() if now - last_update > EXTERNAL_PROCESS_EXPIRY ] for process_id in expired_process_ids: @@ -362,9 +355,7 @@ class PresenceHandler(object): self.external_process_last_update.pop(process_id) states = [ - self.user_to_current_state.get( - user_id, UserPresenceState.default(user_id) - ) + self.user_to_current_state.get(user_id, UserPresenceState.default(user_id)) for user_id in users_to_check ] @@ -394,9 +385,7 @@ class PresenceHandler(object): prev_state = yield self.current_state_for_user(user_id) - new_fields = { - "last_active_ts": self.clock.time_msec(), - } + new_fields = {"last_active_ts": self.clock.time_msec()} if prev_state.state == PresenceState.UNAVAILABLE: new_fields["state"] = PresenceState.ONLINE @@ -430,15 +419,23 @@ class PresenceHandler(object): if prev_state.state == PresenceState.OFFLINE: # If they're currently offline then bring them online, otherwise # just update the last sync times. - yield self._update_states([prev_state.copy_and_replace( - state=PresenceState.ONLINE, - last_active_ts=self.clock.time_msec(), - last_user_sync_ts=self.clock.time_msec(), - )]) + yield self._update_states( + [ + prev_state.copy_and_replace( + state=PresenceState.ONLINE, + last_active_ts=self.clock.time_msec(), + last_user_sync_ts=self.clock.time_msec(), + ) + ] + ) else: - yield self._update_states([prev_state.copy_and_replace( - last_user_sync_ts=self.clock.time_msec(), - )]) + yield self._update_states( + [ + prev_state.copy_and_replace( + last_user_sync_ts=self.clock.time_msec() + ) + ] + ) @defer.inlineCallbacks def _end(): @@ -446,9 +443,13 @@ class PresenceHandler(object): self.user_to_num_current_syncs[user_id] -= 1 prev_state = yield self.current_state_for_user(user_id) - yield self._update_states([prev_state.copy_and_replace( - last_user_sync_ts=self.clock.time_msec(), - )]) + yield self._update_states( + [ + prev_state.copy_and_replace( + last_user_sync_ts=self.clock.time_msec() + ) + ] + ) except Exception: logger.exception("Error updating presence after sync") @@ -469,7 +470,8 @@ class PresenceHandler(object): """ if self.hs.config.use_presence: syncing_user_ids = { - user_id for user_id, count in self.user_to_num_current_syncs.items() + user_id + for user_id, count in self.user_to_num_current_syncs.items() if count } for user_ids in self.external_process_to_current_syncs.values(): @@ -479,7 +481,9 @@ class PresenceHandler(object): return set() @defer.inlineCallbacks - def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec): + def update_external_syncs_row( + self, process_id, user_id, is_syncing, sync_time_msec + ): """Update the syncing users for an external process as a delta. Args: @@ -500,20 +504,22 @@ class PresenceHandler(object): updates = [] if is_syncing and user_id not in process_presence: if prev_state.state == PresenceState.OFFLINE: - updates.append(prev_state.copy_and_replace( - state=PresenceState.ONLINE, - last_active_ts=sync_time_msec, - last_user_sync_ts=sync_time_msec, - )) + updates.append( + prev_state.copy_and_replace( + state=PresenceState.ONLINE, + last_active_ts=sync_time_msec, + last_user_sync_ts=sync_time_msec, + ) + ) else: - updates.append(prev_state.copy_and_replace( - last_user_sync_ts=sync_time_msec, - )) + updates.append( + prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec) + ) process_presence.add(user_id) elif user_id in process_presence: - updates.append(prev_state.copy_and_replace( - last_user_sync_ts=sync_time_msec, - )) + updates.append( + prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec) + ) if not is_syncing: process_presence.discard(user_id) @@ -537,12 +543,12 @@ class PresenceHandler(object): prev_states = yield self.current_state_for_users(process_presence) time_now_ms = self.clock.time_msec() - yield self._update_states([ - prev_state.copy_and_replace( - last_user_sync_ts=time_now_ms, - ) - for prev_state in itervalues(prev_states) - ]) + yield self._update_states( + [ + prev_state.copy_and_replace(last_user_sync_ts=time_now_ms) + for prev_state in itervalues(prev_states) + ] + ) self.external_process_last_updated_ms.pop(process_id, None) @defer.inlineCallbacks @@ -574,8 +580,7 @@ class PresenceHandler(object): missing = [user_id for user_id, state in iteritems(states) if not state] if missing: new = { - user_id: UserPresenceState.default(user_id) - for user_id in missing + user_id: UserPresenceState.default(user_id) for user_id in missing } states.update(new) self.user_to_current_state.update(new) @@ -593,8 +598,10 @@ class PresenceHandler(object): room_ids_to_states, users_to_states = parties self.notifier.on_new_event( - "presence_key", stream_id, rooms=room_ids_to_states.keys(), - users=[UserID.from_string(u) for u in users_to_states] + "presence_key", + stream_id, + rooms=room_ids_to_states.keys(), + users=[UserID.from_string(u) for u in users_to_states], ) self._push_to_remotes(states) @@ -605,8 +612,10 @@ class PresenceHandler(object): room_ids_to_states, users_to_states = parties self.notifier.on_new_event( - "presence_key", stream_id, rooms=room_ids_to_states.keys(), - users=[UserID.from_string(u) for u in users_to_states] + "presence_key", + stream_id, + rooms=room_ids_to_states.keys(), + users=[UserID.from_string(u) for u in users_to_states], ) def _push_to_remotes(self, states): @@ -631,15 +640,15 @@ class PresenceHandler(object): user_id = push.get("user_id", None) if not user_id: logger.info( - "Got presence update from %r with no 'user_id': %r", - origin, push, + "Got presence update from %r with no 'user_id': %r", origin, push ) continue if get_domain_from_id(user_id) != origin: logger.info( "Got presence update from %r with bad 'user_id': %r", - origin, user_id, + origin, + user_id, ) continue @@ -647,14 +656,12 @@ class PresenceHandler(object): if not presence_state: logger.info( "Got presence update from %r with no 'presence_state': %r", - origin, push, + origin, + push, ) continue - new_fields = { - "state": presence_state, - "last_federation_update_ts": now, - } + new_fields = {"state": presence_state, "last_federation_update_ts": now} last_active_ago = push.get("last_active_ago", None) if last_active_ago is not None: @@ -672,10 +679,7 @@ class PresenceHandler(object): @defer.inlineCallbacks def get_state(self, target_user, as_event=False): - results = yield self.get_states( - [target_user.to_string()], - as_event=as_event, - ) + results = yield self.get_states([target_user.to_string()], as_event=as_event) defer.returnValue(results[0]) @@ -699,13 +703,15 @@ class PresenceHandler(object): now = self.clock.time_msec() if as_event: - defer.returnValue([ - { - "type": "m.presence", - "content": format_user_presence_state(state, now), - } - for state in updates - ]) + defer.returnValue( + [ + { + "type": "m.presence", + "content": format_user_presence_state(state, now), + } + for state in updates + ] + ) else: defer.returnValue(updates) @@ -717,7 +723,9 @@ class PresenceHandler(object): presence = state["presence"] valid_presence = ( - PresenceState.ONLINE, PresenceState.UNAVAILABLE, PresenceState.OFFLINE + PresenceState.ONLINE, + PresenceState.UNAVAILABLE, + PresenceState.OFFLINE, ) if presence not in valid_presence: raise SynapseError(400, "Invalid presence state") @@ -726,9 +734,7 @@ class PresenceHandler(object): prev_state = yield self.current_state_for_user(user_id) - new_fields = { - "state": presence - } + new_fields = {"state": presence} if not ignore_status_msg: msg = status_msg if presence != PresenceState.OFFLINE else None @@ -877,8 +883,7 @@ class PresenceHandler(object): hosts = set(host for host in hosts if host != self.server_name) self.federation.send_presence_to_destinations( - states=[state], - destinations=hosts, + states=[state], destinations=hosts ) else: # A remote user has joined the room, so we need to: @@ -904,7 +909,8 @@ class PresenceHandler(object): # default state. now = self.clock.time_msec() states = [ - state for state in states.values() + state + for state in states.values() if state.state != PresenceState.OFFLINE or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000 or state.status_msg is not None @@ -912,8 +918,7 @@ class PresenceHandler(object): if states: self.federation.send_presence_to_destinations( - states=states, - destinations=[get_domain_from_id(user_id)], + states=states, destinations=[get_domain_from_id(user_id)] ) @@ -937,7 +942,10 @@ def should_notify(old_state, new_state): notify_reason_counter.labels("current_active_change").inc() return True - if new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY: + if ( + new_state.last_active_ts - old_state.last_active_ts + > LAST_ACTIVE_GRANULARITY + ): # Only notify about last active bumps if we're not currently acive if not new_state.currently_active: notify_reason_counter.labels("last_active_change_online").inc() @@ -958,9 +966,7 @@ def format_user_presence_state(state, now, include_user_id=True): The "user_id" is optional so that this function can be used to format presence updates for client /sync responses and for federation /send requests. """ - content = { - "presence": state.state, - } + content = {"presence": state.state} if include_user_id: content["user_id"] = state.user_id if state.last_active_ts: @@ -986,8 +992,15 @@ class PresenceEventSource(object): @defer.inlineCallbacks @log_function - def get_new_events(self, user, from_key, room_ids=None, include_offline=True, - explicit_room_id=None, **kwargs): + def get_new_events( + self, + user, + from_key, + room_ids=None, + include_offline=True, + explicit_room_id=None, + **kwargs + ): # The process for getting presence events are: # 1. Get the rooms the user is in. # 2. Get the list of user in the rooms. @@ -1030,7 +1043,7 @@ class PresenceEventSource(object): if from_key: user_ids_changed = stream_change_cache.get_entities_changed( - users_interested_in, from_key, + users_interested_in, from_key ) else: user_ids_changed = users_interested_in @@ -1040,10 +1053,16 @@ class PresenceEventSource(object): if include_offline: defer.returnValue((list(updates.values()), max_token)) else: - defer.returnValue(([ - s for s in itervalues(updates) - if s.state != PresenceState.OFFLINE - ], max_token)) + defer.returnValue( + ( + [ + s + for s in itervalues(updates) + if s.state != PresenceState.OFFLINE + ], + max_token, + ) + ) def get_current_key(self): return self.store.get_current_presence_token() @@ -1061,13 +1080,13 @@ class PresenceEventSource(object): users_interested_in.add(user_id) # So that we receive our own presence users_who_share_room = yield self.store.get_users_who_share_room_with_user( - user_id, on_invalidate=cache_context.invalidate, + user_id, on_invalidate=cache_context.invalidate ) users_interested_in.update(users_who_share_room) if explicit_room_id: user_ids = yield self.store.get_users_in_room( - explicit_room_id, on_invalidate=cache_context.invalidate, + explicit_room_id, on_invalidate=cache_context.invalidate ) users_interested_in.update(user_ids) @@ -1123,9 +1142,7 @@ def handle_timeout(state, is_mine, syncing_user_ids, now): if now - state.last_active_ts > IDLE_TIMER: # Currently online, but last activity ages ago so auto # idle - state = state.copy_and_replace( - state=PresenceState.UNAVAILABLE, - ) + state = state.copy_and_replace(state=PresenceState.UNAVAILABLE) changed = True elif now - state.last_active_ts > LAST_ACTIVE_GRANULARITY: # So that we send down a notification that we've @@ -1145,8 +1162,7 @@ def handle_timeout(state, is_mine, syncing_user_ids, now): sync_or_active = max(state.last_user_sync_ts, state.last_active_ts) if now - sync_or_active > SYNC_ONLINE_TIMEOUT: state = state.copy_and_replace( - state=PresenceState.OFFLINE, - status_msg=None, + state=PresenceState.OFFLINE, status_msg=None ) changed = True else: @@ -1155,10 +1171,7 @@ def handle_timeout(state, is_mine, syncing_user_ids, now): # no one gets stuck online forever. if now - state.last_federation_update_ts > FEDERATION_TIMEOUT: # The other side seems to have disappeared. - state = state.copy_and_replace( - state=PresenceState.OFFLINE, - status_msg=None, - ) + state = state.copy_and_replace(state=PresenceState.OFFLINE, status_msg=None) changed = True return state if changed else None @@ -1193,21 +1206,17 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now): if new_state.state == PresenceState.ONLINE: # Idle timer wheel_timer.insert( - now=now, - obj=user_id, - then=new_state.last_active_ts + IDLE_TIMER + now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER ) active = now - new_state.last_active_ts < LAST_ACTIVE_GRANULARITY - new_state = new_state.copy_and_replace( - currently_active=active, - ) + new_state = new_state.copy_and_replace(currently_active=active) if active: wheel_timer.insert( now=now, obj=user_id, - then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY + then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY, ) if new_state.state != PresenceState.OFFLINE: @@ -1215,29 +1224,25 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now): wheel_timer.insert( now=now, obj=user_id, - then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT + then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT, ) last_federate = new_state.last_federation_update_ts if now - last_federate > FEDERATION_PING_INTERVAL: # Been a while since we've poked remote servers - new_state = new_state.copy_and_replace( - last_federation_update_ts=now, - ) + new_state = new_state.copy_and_replace(last_federation_update_ts=now) federation_ping = True else: wheel_timer.insert( now=now, obj=user_id, - then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT + then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT, ) # Check whether the change was something worth notifying about if should_notify(prev_state, new_state): - new_state = new_state.copy_and_replace( - last_federation_update_ts=now, - ) + new_state = new_state.copy_and_replace(last_federation_update_ts=now) persist_and_notify = True return new_state, persist_and_notify, federation_ping |