diff options
Diffstat (limited to 'synapse/handlers/presence.py')
-rw-r--r-- | synapse/handlers/presence.py | 342 |
1 files changed, 151 insertions, 191 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 37e87fc054..59d53f1050 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -31,9 +31,11 @@ from prometheus_client import Counter from twisted.internet import defer -from synapse.api.constants import PresenceState +import synapse.metrics +from synapse.api.constants import EventTypes, Membership, PresenceState from synapse.api.errors import SynapseError from synapse.metrics import LaterGauge +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.presence import UserPresenceState from synapse.types import UserID, get_domain_from_id from synapse.util.async_helpers import Linearizer @@ -98,6 +100,7 @@ class PresenceHandler(object): self.hs = hs self.is_mine = hs.is_mine self.is_mine_id = hs.is_mine_id + self.server_name = hs.hostname self.clock = hs.get_clock() self.store = hs.get_datastore() self.wheel_timer = WheelTimer() @@ -110,30 +113,6 @@ class PresenceHandler(object): federation_registry.register_edu_handler( "m.presence", self.incoming_presence ) - federation_registry.register_edu_handler( - "m.presence_invite", - lambda origin, content: self.invite_presence( - observed_user=UserID.from_string(content["observed_user"]), - observer_user=UserID.from_string(content["observer_user"]), - ) - ) - federation_registry.register_edu_handler( - "m.presence_accept", - lambda origin, content: self.accept_presence( - observed_user=UserID.from_string(content["observed_user"]), - observer_user=UserID.from_string(content["observer_user"]), - ) - ) - federation_registry.register_edu_handler( - "m.presence_deny", - lambda origin, content: self.deny_presence( - observed_user=UserID.from_string(content["observed_user"]), - observer_user=UserID.from_string(content["observer_user"]), - ) - ) - - distributor = hs.get_distributor() - distributor.observe("user_joined_room", self.user_joined_room) active_presence = self.store.take_presence_startup_info() @@ -220,6 +199,15 @@ class PresenceHandler(object): LaterGauge("synapse_handlers_presence_wheel_timer_size", "", [], lambda: len(self.wheel_timer)) + # Used to handle sending of presence to newly joined users/servers + if hs.config.use_presence: + self.notifier.add_replication_callback(self.notify_new_event) + + # Presence is best effort and quickly heals itself, so lets just always + # stream from the current state when we restart. + self._event_pos = self.store.get_current_events_token() + self._event_processing = False + @defer.inlineCallbacks def _on_shutdown(self): """Gets called when shutting down. This lets us persist any updates that @@ -751,199 +739,178 @@ class PresenceHandler(object): yield self._update_states([prev_state.copy_and_replace(**new_fields)]) @defer.inlineCallbacks - def user_joined_room(self, user, room_id): - """Called (via the distributor) when a user joins a room. This funciton - sends presence updates to servers, either: - 1. the joining user is a local user and we send their presence to - all servers in the room. - 2. the joining user is a remote user and so we send presence for all - local users in the room. + def is_visible(self, observed_user, observer_user): + """Returns whether a user can see another user's presence. """ - # We only need to send presence to servers that don't have it yet. We - # don't need to send to local clients here, as that is done as part - # of the event stream/sync. - # TODO: Only send to servers not already in the room. - if self.is_mine(user): - state = yield self.current_state_for_user(user.to_string()) - - self._push_to_remotes([state]) - else: - user_ids = yield self.store.get_users_in_room(room_id) - user_ids = list(filter(self.is_mine_id, user_ids)) + observer_room_ids = yield self.store.get_rooms_for_user( + observer_user.to_string() + ) + observed_room_ids = yield self.store.get_rooms_for_user( + observed_user.to_string() + ) - states = yield self.current_state_for_users(user_ids) + if observer_room_ids & observed_room_ids: + defer.returnValue(True) - self._push_to_remotes(list(states.values())) + defer.returnValue(False) @defer.inlineCallbacks - def get_presence_list(self, observer_user, accepted=None): - """Returns the presence for all users in their presence list. + def get_all_presence_updates(self, last_id, current_id): """ - if not self.is_mine(observer_user): - raise SynapseError(400, "User is not hosted on this Home Server") - - presence_list = yield self.store.get_presence_list( - observer_user.localpart, accepted=accepted - ) + Gets a list of presence update rows from between the given stream ids. + Each row has: + - stream_id(str) + - user_id(str) + - state(str) + - last_active_ts(int) + - last_federation_update_ts(int) + - last_user_sync_ts(int) + - status_msg(int) + - currently_active(int) + """ + # TODO(markjh): replicate the unpersisted changes. + # This could use the in-memory stores for recent changes. + rows = yield self.store.get_all_presence_updates(last_id, current_id) + defer.returnValue(rows) - results = yield self.get_states( - target_user_ids=[row["observed_user_id"] for row in presence_list], - as_event=False, - ) + def notify_new_event(self): + """Called when new events have happened. Handles users and servers + joining rooms and require being sent presence. + """ - now = self.clock.time_msec() - results[:] = [format_user_presence_state(r, now) for r in results] + if self._event_processing: + return - is_accepted = { - row["observed_user_id"]: row["accepted"] for row in presence_list - } + @defer.inlineCallbacks + def _process_presence(): + assert not self._event_processing - for result in results: - result.update({ - "accepted": is_accepted, - }) + self._event_processing = True + try: + yield self._unsafe_process() + finally: + self._event_processing = False - defer.returnValue(results) + run_as_background_process("presence.notify_new_event", _process_presence) @defer.inlineCallbacks - def send_presence_invite(self, observer_user, observed_user): - """Sends a presence invite. - """ - yield self.store.add_presence_list_pending( - observer_user.localpart, observed_user.to_string() - ) + def _unsafe_process(self): + # Loop round handling deltas until we're up to date + while True: + with Measure(self.clock, "presence_delta"): + deltas = yield self.store.get_current_state_deltas(self._event_pos) + if not deltas: + return - if self.is_mine(observed_user): - yield self.invite_presence(observed_user, observer_user) - else: - yield self.federation.build_and_send_edu( - destination=observed_user.domain, - edu_type="m.presence_invite", - content={ - "observed_user": observed_user.to_string(), - "observer_user": observer_user.to_string(), - } - ) + yield self._handle_state_delta(deltas) + + self._event_pos = deltas[-1]["stream_id"] + + # Expose current event processing position to prometheus + synapse.metrics.event_processing_positions.labels("presence").set( + self._event_pos + ) @defer.inlineCallbacks - def invite_presence(self, observed_user, observer_user): - """Handles new presence invites. + def _handle_state_delta(self, deltas): + """Process current state deltas to find new joins that need to be + handled. """ - if not self.is_mine(observed_user): - raise SynapseError(400, "User is not hosted on this Home Server") + for delta in deltas: + typ = delta["type"] + state_key = delta["state_key"] + room_id = delta["room_id"] + event_id = delta["event_id"] + prev_event_id = delta["prev_event_id"] - # TODO: Don't auto accept - if self.is_mine(observer_user): - yield self.accept_presence(observed_user, observer_user) - else: - self.federation.build_and_send_edu( - destination=observer_user.domain, - edu_type="m.presence_accept", - content={ - "observed_user": observed_user.to_string(), - "observer_user": observer_user.to_string(), - } - ) + logger.debug("Handling: %r %r, %s", typ, state_key, event_id) - state_dict = yield self.get_state(observed_user, as_event=False) - state_dict = format_user_presence_state(state_dict, self.clock.time_msec()) + if typ != EventTypes.Member: + continue - self.federation.build_and_send_edu( - destination=observer_user.domain, - edu_type="m.presence", - content={ - "push": [state_dict] - } - ) + if event_id is None: + # state has been deleted, so this is not a join. We only care about + # joins. + continue - @defer.inlineCallbacks - def accept_presence(self, observed_user, observer_user): - """Handles a m.presence_accept EDU. Mark a presence invite from a - local or remote user as accepted in a local user's presence list. - Starts polling for presence updates from the local or remote user. - Args: - observed_user(UserID): The user to update in the presence list. - observer_user(UserID): The owner of the presence list to update. - """ - yield self.store.set_presence_list_accepted( - observer_user.localpart, observed_user.to_string() - ) + event = yield self.store.get_event(event_id) + if event.content.get("membership") != Membership.JOIN: + # We only care about joins + continue - @defer.inlineCallbacks - def deny_presence(self, observed_user, observer_user): - """Handle a m.presence_deny EDU. Removes a local or remote user from a - local user's presence list. - Args: - observed_user(UserID): The local or remote user to remove from the - list. - observer_user(UserID): The local owner of the presence list. - Returns: - A Deferred. - """ - yield self.store.del_presence_list( - observer_user.localpart, observed_user.to_string() - ) + if prev_event_id: + prev_event = yield self.store.get_event(prev_event_id) + if prev_event.content.get("membership") == Membership.JOIN: + # Ignore changes to join events. + continue - # TODO(paul): Inform the user somehow? + yield self._on_user_joined_room(room_id, state_key) @defer.inlineCallbacks - def drop(self, observed_user, observer_user): - """Remove a local or remote user from a local user's presence list and - unsubscribe the local user from updates that user. + def _on_user_joined_room(self, room_id, user_id): + """Called when we detect a user joining the room via the current state + delta stream. + Args: - observed_user(UserId): The local or remote user to remove from the - list. - observer_user(UserId): The local owner of the presence list. + room_id (str) + user_id (str) + Returns: - A Deferred. + Deferred """ - if not self.is_mine(observer_user): - raise SynapseError(400, "User is not hosted on this Home Server") - yield self.store.del_presence_list( - observer_user.localpart, observed_user.to_string() - ) + if self.is_mine_id(user_id): + # If this is a local user then we need to send their presence + # out to hosts in the room (who don't already have it) - # TODO: Inform the remote that we've dropped the presence list. + # TODO: We should be able to filter the hosts down to those that + # haven't previously seen the user - @defer.inlineCallbacks - def is_visible(self, observed_user, observer_user): - """Returns whether a user can see another user's presence. - """ - observer_room_ids = yield self.store.get_rooms_for_user( - observer_user.to_string() - ) - observed_room_ids = yield self.store.get_rooms_for_user( - observed_user.to_string() - ) + state = yield self.current_state_for_user(user_id) + hosts = yield self.state.get_current_hosts_in_room(room_id) - if observer_room_ids & observed_room_ids: - defer.returnValue(True) + # Filter out ourselves. + hosts = set(host for host in hosts if host != self.server_name) - accepted_observers = yield self.store.get_presence_list_observers_accepted( - observed_user.to_string() - ) + self.federation.send_presence_to_destinations( + states=[state], + destinations=hosts, + ) + else: + # A remote user has joined the room, so we need to: + # 1. Check if this is a new server in the room + # 2. If so send any presence they don't already have for + # local users in the room. - defer.returnValue(observer_user.to_string() in accepted_observers) + # TODO: We should be able to filter the users down to those that + # the server hasn't previously seen - @defer.inlineCallbacks - def get_all_presence_updates(self, last_id, current_id): - """ - Gets a list of presence update rows from between the given stream ids. - Each row has: - - stream_id(str) - - user_id(str) - - state(str) - - last_active_ts(int) - - last_federation_update_ts(int) - - last_user_sync_ts(int) - - status_msg(int) - - currently_active(int) - """ - # TODO(markjh): replicate the unpersisted changes. - # This could use the in-memory stores for recent changes. - rows = yield self.store.get_all_presence_updates(last_id, current_id) - defer.returnValue(rows) + # TODO: Check that this is actually a new server joining the + # room. + + user_ids = yield self.state.get_current_users_in_room(room_id) + user_ids = list(filter(self.is_mine_id, user_ids)) + + states = yield self.current_state_for_users(user_ids) + + # Filter out old presence, i.e. offline presence states where + # the user hasn't been active for a week. We can change this + # depending on what we want the UX to be, but at the least we + # should filter out offline presence where the state is just the + # default state. + now = self.clock.time_msec() + states = [ + state for state in states.values() + if state.state != PresenceState.OFFLINE + or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000 + or state.status_msg is not None + ] + + if states: + self.federation.send_presence_to_destinations( + states=states, + destinations=[get_domain_from_id(user_id)], + ) def should_notify(old_state, new_state): @@ -1086,10 +1053,7 @@ class PresenceEventSource(object): updates for """ user_id = user.to_string() - plist = yield self.store.get_presence_list_accepted( - user.localpart, on_invalidate=cache_context.invalidate, - ) - users_interested_in = set(row["observed_user_id"] for row in plist) + users_interested_in = set() users_interested_in.add(user_id) # So that we receive our own presence users_who_share_room = yield self.store.get_users_who_share_room_with_user( @@ -1294,10 +1258,6 @@ def get_interested_parties(store, states): for room_id in room_ids: room_ids_to_states.setdefault(room_id, []).append(state) - plist = yield store.get_presence_list_observers_accepted(state.user_id) - for u in plist: - users_to_states.setdefault(u, []).append(state) - # Always notify self users_to_states.setdefault(state.user_id, []).append(state) |