diff options
Diffstat (limited to 'synapse/handlers/presence.py')
-rw-r--r-- | synapse/handlers/presence.py | 227 |
1 files changed, 176 insertions, 51 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 7731de85c0..9bfceda88a 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -52,6 +52,13 @@ def partitionbool(l, func): class PresenceHandler(BaseHandler): + STATE_LEVELS = { + PresenceState.OFFLINE: 0, + PresenceState.UNAVAILABLE: 1, + PresenceState.ONLINE: 2, + PresenceState.FREE_FOR_CHAT: 3, + } + def __init__(self, hs): super(PresenceHandler, self).__init__(hs) @@ -135,7 +142,7 @@ class PresenceHandler(BaseHandler): return self._user_cachemap[user] else: statuscache = UserPresenceCache() - statuscache.update({"state": PresenceState.OFFLINE}, user) + statuscache.update({"presence": PresenceState.OFFLINE}, user) return statuscache def registered_user(self, user): @@ -143,10 +150,6 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def is_presence_visible(self, observer_user, observed_user): - defer.returnValue(True) - # return - # FIXME (erikj): This code path absolutely kills the database. - assert(observed_user.is_mine) if observer_user == observed_user: @@ -173,19 +176,24 @@ class PresenceHandler(BaseHandler): observed_user=target_user ) - if visible: - state = yield self.store.get_presence_state( - target_user.localpart - ) - else: + if not visible: raise SynapseError(404, "Presence information not visible") + state = yield self.store.get_presence_state(target_user.localpart) + if "mtime" in state: + del state["mtime"] + state["presence"] = state["state"] + + if target_user in self._user_cachemap: + state["last_active"] = ( + self._user_cachemap[target_user].get_state()["last_active"] + ) else: # TODO(paul): Have remote server send us permissions set state = self._get_or_offline_usercache(target_user).get_state() - if "mtime" in state and (state["mtime"] is not None): - state["mtime_age"] = int( - self.clock.time_msec() - state.pop("mtime") + if "last_active" in state: + state["last_active_ago"] = int( + self.clock.time_msec() - state.pop("last_active") ) defer.returnValue(state) @@ -202,20 +210,33 @@ class PresenceHandler(BaseHandler): if target_user != auth_user: raise AuthError(400, "Cannot set another user's displayname") - # TODO(paul): Sanity-check 'state' if "status_msg" not in state: state["status_msg"] = None for k in state.keys(): - if k not in ("state", "status_msg"): + if k not in ("presence", "state", "status_msg"): raise SynapseError( 400, "Unexpected presence state key '%s'" % (k,) ) + # Handle legacy "state" key for now + if "state" in state: + state["presence"] = state.pop("state") + + if state["presence"] not in self.STATE_LEVELS: + raise SynapseError(400, "'%s' is not a valid presence state" % + state["presence"] + ) + logger.debug("Updating presence state of %s to %s", - target_user.localpart, state["state"]) + target_user.localpart, state["presence"]) state_to_store = dict(state) + state_to_store["state"] = state_to_store.pop("presence") + + statuscache=self._get_or_offline_usercache(target_user) + was_level = self.STATE_LEVELS[statuscache.get_state()["presence"]] + now_level = self.STATE_LEVELS[state["presence"]] yield defer.DeferredList([ self.store.set_presence_state( @@ -226,9 +247,10 @@ class PresenceHandler(BaseHandler): ), ]) - state["mtime"] = self.clock.time_msec() + if now_level > was_level: + state["last_active"] = self.clock.time_msec() - now_online = state["state"] != PresenceState.OFFLINE + now_online = state["presence"] != PresenceState.OFFLINE was_polling = target_user in self._user_cachemap if now_online and not was_polling: @@ -240,6 +262,12 @@ class PresenceHandler(BaseHandler): # we don't have to do this all the time self.changed_presencelike_data(target_user, state) + def bump_presence_active_time(self, user, now=None): + if now is None: + now = self.clock.time_msec() + + self.changed_presencelike_data(user, {"last_active": now}) + def changed_presencelike_data(self, user, state): statuscache = self._get_or_make_usercache(user) @@ -251,28 +279,27 @@ class PresenceHandler(BaseHandler): @log_function def started_user_eventstream(self, user): # TODO(paul): Use "last online" state - self.set_state(user, user, {"state": PresenceState.ONLINE}) + self.set_state(user, user, {"presence": PresenceState.ONLINE}) @log_function def stopped_user_eventstream(self, user): # TODO(paul): Save current state as "last online" state - self.set_state(user, user, {"state": PresenceState.OFFLINE}) + self.set_state(user, user, {"presence": PresenceState.OFFLINE}) @defer.inlineCallbacks def user_joined_room(self, user, room_id): - if user.is_mine: - self.push_update_to_local_and_remote( - observed_user=user, - room_ids=[room_id], - statuscache=self._get_or_offline_usercache(user), - ) + statuscache = self._get_or_make_usercache(user) - else: - self.push_update_to_clients( + # No actual update but we need to bump the serial anyway for the + # event source + self._user_cachemap_latest_serial += 1 + statuscache.update({}, serial=self._user_cachemap_latest_serial) + + self.push_update_to_local_and_remote( observed_user=user, room_ids=[room_id], - statuscache=self._get_or_offline_usercache(user), + statuscache=statuscache, ) # We also want to tell them about current presence of people. @@ -386,9 +413,9 @@ class PresenceHandler(BaseHandler): observed_user = self.hs.parse_userid(p.pop("observed_user_id")) p["observed_user"] = observed_user p.update(self._get_or_offline_usercache(observed_user).get_state()) - if "mtime" in p: - p["mtime_age"] = int( - self.clock.time_msec() - p.pop("mtime") + if "last_active" in p: + p["last_active_ago"] = int( + self.clock.time_msec() - p.pop("last_active") ) defer.returnValue(presence) @@ -457,10 +484,6 @@ class PresenceHandler(BaseHandler): def _start_polling_local(self, user, target_user): target_localpart = target_user.localpart - if not self.is_presence_visible(observer_user=user, - observed_user=target_user): - return - if target_localpart not in self._local_pushmap: self._local_pushmap[target_localpart] = set() @@ -577,21 +600,30 @@ class PresenceHandler(BaseHandler): def _push_presence_remote(self, user, destination, state=None): if state is None: state = yield self.store.get_presence_state(user.localpart) + del state["mtime"] + state["presence"] = state["state"] + + if user in self._user_cachemap: + state["last_active"] = ( + self._user_cachemap[user].get_state()["last_active"] + ) yield self.distributor.fire( "collect_presencelike_data", user, state ) - if "mtime" in state: + if "last_active" in state: state = dict(state) - state["mtime_age"] = int( - self.clock.time_msec() - state.pop("mtime") + state["last_active_ago"] = int( + self.clock.time_msec() - state.pop("last_active") ) user_state = { "user_id": user.to_string(), } user_state.update(**state) + if "state" in user_state and "presence" not in user_state: + user_state["presence"] = user_state["state"] yield self.federation.send_edu( destination=destination, @@ -618,14 +650,29 @@ class PresenceHandler(BaseHandler): room_ids = yield rm_handler.get_rooms_for_user(user) if not observers and not room_ids: - break + continue state = dict(push) del state["user_id"] - if "mtime_age" in state: - state["mtime"] = int( - self.clock.time_msec() - state.pop("mtime_age") + if "presence" in state: + # all is OK + pass + elif "state" in state: + # Legacy handling + state["presence"] = state["state"] + else: + logger.warning("Received a presence 'push' EDU from %s without" + + " either a 'presence' or 'state' key", origin + ) + continue + + if "state" in state: + del state["state"] + + if "last_active_ago" in state: + state["last_active"] = int( + self.clock.time_msec() - state.pop("last_active_ago") ) statuscache = self._get_or_make_usercache(user) @@ -640,7 +687,7 @@ class PresenceHandler(BaseHandler): statuscache=statuscache, ) - if state["state"] == PresenceState.OFFLINE: + if state["presence"] == PresenceState.OFFLINE: del self._user_cachemap[user] for poll in content.get("poll", []): @@ -673,10 +720,9 @@ class PresenceHandler(BaseHandler): yield defer.DeferredList(deferreds) @defer.inlineCallbacks - def push_update_to_local_and_remote(self, observed_user, + def push_update_to_local_and_remote(self, observed_user, statuscache, users_to_push=[], room_ids=[], - remote_domains=[], - statuscache=None): + remote_domains=[]): localusers, remoteusers = partitionbool( users_to_push, @@ -722,6 +768,78 @@ class PresenceHandler(BaseHandler): ) +class PresenceEventSource(object): + def __init__(self, hs): + self.hs = hs + self.clock = hs.get_clock() + + def get_new_events_for_user(self, user, from_key, limit): + from_key = int(from_key) + + presence = self.hs.get_handlers().presence_handler + cachemap = presence._user_cachemap + + # TODO(paul): limit, and filter by visibility + updates = [(k, cachemap[k]) for k in cachemap + if from_key < cachemap[k].serial] + + if updates: + clock = self.clock + + latest_serial = max([x[1].serial for x in updates]) + data = [x[1].make_event(user=x[0], clock=clock) for x in updates] + + return ((data, latest_serial)) + else: + return (([], presence._user_cachemap_latest_serial)) + + def get_current_key(self): + presence = self.hs.get_handlers().presence_handler + return presence._user_cachemap_latest_serial + + def get_pagination_rows(self, user, pagination_config, key): + # TODO (erikj): Does this make sense? Ordering? + + from_token = pagination_config.from_token + to_token = pagination_config.to_token + + from_key = int(from_token.presence_key) + + if to_token: + to_key = int(to_token.presence_key) + else: + to_key = -1 + + presence = self.hs.get_handlers().presence_handler + cachemap = presence._user_cachemap + + # TODO(paul): limit, and filter by visibility + updates = [(k, cachemap[k]) for k in cachemap + if to_key < cachemap[k].serial < from_key] + + if updates: + clock = self.clock + + earliest_serial = max([x[1].serial for x in updates]) + data = [x[1].make_event(user=x[0], clock=clock) for x in updates] + + if to_token: + next_token = to_token + else: + next_token = from_token + + next_token = next_token.copy_and_replace( + "presence_key", earliest_serial + ) + return ((data, next_token)) + else: + if not to_token: + to_token = from_token.copy_and_replace( + "presence_key", 0 + ) + return (([], to_token)) + + class UserPresenceCache(object): """Store an observed user's state and status message. @@ -733,6 +851,7 @@ class UserPresenceCache(object): def update(self, state, serial): assert("mtime_age" not in state) + assert("state" not in state) self.state.update(state) # Delete keys that are now 'None' @@ -749,15 +868,21 @@ class UserPresenceCache(object): def get_state(self): # clone it so caller can't break our cache - return dict(self.state) + state = dict(self.state) + + # Legacy handling + if "presence" in state: + state["state"] = state["presence"] + + return state def make_event(self, user, clock): content = self.get_state() content["user_id"] = user.to_string() - if "mtime" in content: - content["mtime_age"] = int( - clock.time_msec() - content.pop("mtime") + if "last_active" in content: + content["last_active_ago"] = int( + clock.time_msec() - content.pop("last_active") ) return {"type": "m.presence", "content": content} |