summary refs log tree commit diff
path: root/synapse/handlers/presence.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/presence.py')
-rw-r--r--synapse/handlers/presence.py227
1 files changed, 176 insertions, 51 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 7731de85c0..9bfceda88a 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -52,6 +52,13 @@ def partitionbool(l, func):
 
 class PresenceHandler(BaseHandler):
 
+    STATE_LEVELS = {
+        PresenceState.OFFLINE: 0,
+        PresenceState.UNAVAILABLE: 1,
+        PresenceState.ONLINE: 2,
+        PresenceState.FREE_FOR_CHAT: 3,
+    }
+
     def __init__(self, hs):
         super(PresenceHandler, self).__init__(hs)
 
@@ -135,7 +142,7 @@ class PresenceHandler(BaseHandler):
             return self._user_cachemap[user]
         else:
             statuscache = UserPresenceCache()
-            statuscache.update({"state": PresenceState.OFFLINE}, user)
+            statuscache.update({"presence": PresenceState.OFFLINE}, user)
             return statuscache
 
     def registered_user(self, user):
@@ -143,10 +150,6 @@ class PresenceHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def is_presence_visible(self, observer_user, observed_user):
-        defer.returnValue(True)
-        # return
-        # FIXME (erikj): This code path absolutely kills the database.
-
         assert(observed_user.is_mine)
 
         if observer_user == observed_user:
@@ -173,19 +176,24 @@ class PresenceHandler(BaseHandler):
                 observed_user=target_user
             )
 
-            if visible:
-                state = yield self.store.get_presence_state(
-                    target_user.localpart
-                )
-            else:
+            if not visible:
                 raise SynapseError(404, "Presence information not visible")
+            state = yield self.store.get_presence_state(target_user.localpart)
+            if "mtime" in state:
+                del state["mtime"]
+            state["presence"] = state["state"]
+
+            if target_user in self._user_cachemap:
+                state["last_active"] = (
+                    self._user_cachemap[target_user].get_state()["last_active"]
+                )
         else:
             # TODO(paul): Have remote server send us permissions set
             state = self._get_or_offline_usercache(target_user).get_state()
 
-        if "mtime" in state and (state["mtime"] is not None):
-            state["mtime_age"] = int(
-                self.clock.time_msec() - state.pop("mtime")
+        if "last_active" in state:
+            state["last_active_ago"] = int(
+                self.clock.time_msec() - state.pop("last_active")
             )
         defer.returnValue(state)
 
@@ -202,20 +210,33 @@ class PresenceHandler(BaseHandler):
         if target_user != auth_user:
             raise AuthError(400, "Cannot set another user's displayname")
 
-        # TODO(paul): Sanity-check 'state'
         if "status_msg" not in state:
             state["status_msg"] = None
 
         for k in state.keys():
-            if k not in ("state", "status_msg"):
+            if k not in ("presence", "state", "status_msg"):
                 raise SynapseError(
                     400, "Unexpected presence state key '%s'" % (k,)
                 )
 
+        # Handle legacy "state" key for now
+        if "state" in state:
+            state["presence"] = state.pop("state")
+
+        if state["presence"] not in self.STATE_LEVELS:
+            raise SynapseError(400, "'%s' is not a valid presence state" %
+                state["presence"]
+            )
+
         logger.debug("Updating presence state of %s to %s",
-                     target_user.localpart, state["state"])
+                     target_user.localpart, state["presence"])
 
         state_to_store = dict(state)
+        state_to_store["state"] = state_to_store.pop("presence")
+
+        statuscache=self._get_or_offline_usercache(target_user)
+        was_level = self.STATE_LEVELS[statuscache.get_state()["presence"]]
+        now_level = self.STATE_LEVELS[state["presence"]]
 
         yield defer.DeferredList([
             self.store.set_presence_state(
@@ -226,9 +247,10 @@ class PresenceHandler(BaseHandler):
             ),
         ])
 
-        state["mtime"] = self.clock.time_msec()
+        if now_level > was_level:
+            state["last_active"] = self.clock.time_msec()
 
-        now_online = state["state"] != PresenceState.OFFLINE
+        now_online = state["presence"] != PresenceState.OFFLINE
         was_polling = target_user in self._user_cachemap
 
         if now_online and not was_polling:
@@ -240,6 +262,12 @@ class PresenceHandler(BaseHandler):
         #   we don't have to do this all the time
         self.changed_presencelike_data(target_user, state)
 
+    def bump_presence_active_time(self, user, now=None):
+        if now is None:
+            now = self.clock.time_msec()
+
+        self.changed_presencelike_data(user, {"last_active": now})
+
     def changed_presencelike_data(self, user, state):
         statuscache = self._get_or_make_usercache(user)
 
@@ -251,28 +279,27 @@ class PresenceHandler(BaseHandler):
     @log_function
     def started_user_eventstream(self, user):
         # TODO(paul): Use "last online" state
-        self.set_state(user, user, {"state": PresenceState.ONLINE})
+        self.set_state(user, user, {"presence": PresenceState.ONLINE})
 
     @log_function
     def stopped_user_eventstream(self, user):
         # TODO(paul): Save current state as "last online" state
-        self.set_state(user, user, {"state": PresenceState.OFFLINE})
+        self.set_state(user, user, {"presence": PresenceState.OFFLINE})
 
     @defer.inlineCallbacks
     def user_joined_room(self, user, room_id):
-
         if user.is_mine:
-            self.push_update_to_local_and_remote(
-                observed_user=user,
-                room_ids=[room_id],
-                statuscache=self._get_or_offline_usercache(user),
-            )
+            statuscache = self._get_or_make_usercache(user)
 
-        else:
-            self.push_update_to_clients(
+            # No actual update but we need to bump the serial anyway for the
+            # event source
+            self._user_cachemap_latest_serial += 1
+            statuscache.update({}, serial=self._user_cachemap_latest_serial)
+
+            self.push_update_to_local_and_remote(
                 observed_user=user,
                 room_ids=[room_id],
-                statuscache=self._get_or_offline_usercache(user),
+                statuscache=statuscache,
             )
 
         # We also want to tell them about current presence of people.
@@ -386,9 +413,9 @@ class PresenceHandler(BaseHandler):
             observed_user = self.hs.parse_userid(p.pop("observed_user_id"))
             p["observed_user"] = observed_user
             p.update(self._get_or_offline_usercache(observed_user).get_state())
-            if "mtime" in p:
-                p["mtime_age"] = int(
-                    self.clock.time_msec() - p.pop("mtime")
+            if "last_active" in p:
+                p["last_active_ago"] = int(
+                    self.clock.time_msec() - p.pop("last_active")
                 )
 
         defer.returnValue(presence)
@@ -457,10 +484,6 @@ class PresenceHandler(BaseHandler):
     def _start_polling_local(self, user, target_user):
         target_localpart = target_user.localpart
 
-        if not self.is_presence_visible(observer_user=user,
-            observed_user=target_user):
-            return
-
         if target_localpart not in self._local_pushmap:
             self._local_pushmap[target_localpart] = set()
 
@@ -577,21 +600,30 @@ class PresenceHandler(BaseHandler):
     def _push_presence_remote(self, user, destination, state=None):
         if state is None:
             state = yield self.store.get_presence_state(user.localpart)
+            del state["mtime"]
+            state["presence"] = state["state"]
+
+            if user in self._user_cachemap:
+                state["last_active"] = (
+                    self._user_cachemap[user].get_state()["last_active"]
+                )
 
             yield self.distributor.fire(
                 "collect_presencelike_data", user, state
             )
 
-        if "mtime" in state:
+        if "last_active" in state:
             state = dict(state)
-            state["mtime_age"] = int(
-                self.clock.time_msec() - state.pop("mtime")
+            state["last_active_ago"] = int(
+                self.clock.time_msec() - state.pop("last_active")
             )
 
         user_state = {
             "user_id": user.to_string(),
         }
         user_state.update(**state)
+        if "state" in user_state and "presence" not in user_state:
+            user_state["presence"] = user_state["state"]
 
         yield self.federation.send_edu(
             destination=destination,
@@ -618,14 +650,29 @@ class PresenceHandler(BaseHandler):
             room_ids = yield rm_handler.get_rooms_for_user(user)
 
             if not observers and not room_ids:
-                break
+                continue
 
             state = dict(push)
             del state["user_id"]
 
-            if "mtime_age" in state:
-                state["mtime"] = int(
-                    self.clock.time_msec() - state.pop("mtime_age")
+            if "presence" in state:
+                # all is OK
+                pass
+            elif "state" in state:
+                # Legacy handling
+                state["presence"] = state["state"]
+            else:
+                logger.warning("Received a presence 'push' EDU from %s without"
+                    + " either a 'presence' or 'state' key", origin
+                )
+                continue
+
+            if "state" in state:
+                del state["state"]
+
+            if "last_active_ago" in state:
+                state["last_active"] = int(
+                    self.clock.time_msec() - state.pop("last_active_ago")
                 )
 
             statuscache = self._get_or_make_usercache(user)
@@ -640,7 +687,7 @@ class PresenceHandler(BaseHandler):
                 statuscache=statuscache,
             )
 
-            if state["state"] == PresenceState.OFFLINE:
+            if state["presence"] == PresenceState.OFFLINE:
                 del self._user_cachemap[user]
 
         for poll in content.get("poll", []):
@@ -673,10 +720,9 @@ class PresenceHandler(BaseHandler):
         yield defer.DeferredList(deferreds)
 
     @defer.inlineCallbacks
-    def push_update_to_local_and_remote(self, observed_user,
+    def push_update_to_local_and_remote(self, observed_user, statuscache,
                                         users_to_push=[], room_ids=[],
-                                        remote_domains=[],
-                                        statuscache=None):
+                                        remote_domains=[]):
 
         localusers, remoteusers = partitionbool(
             users_to_push,
@@ -722,6 +768,78 @@ class PresenceHandler(BaseHandler):
         )
 
 
+class PresenceEventSource(object):
+    def __init__(self, hs):
+        self.hs = hs
+        self.clock = hs.get_clock()
+
+    def get_new_events_for_user(self, user, from_key, limit):
+        from_key = int(from_key)
+
+        presence = self.hs.get_handlers().presence_handler
+        cachemap = presence._user_cachemap
+
+        # TODO(paul): limit, and filter by visibility
+        updates = [(k, cachemap[k]) for k in cachemap
+                   if from_key < cachemap[k].serial]
+
+        if updates:
+            clock = self.clock
+
+            latest_serial = max([x[1].serial for x in updates])
+            data = [x[1].make_event(user=x[0], clock=clock) for x in updates]
+
+            return ((data, latest_serial))
+        else:
+            return (([], presence._user_cachemap_latest_serial))
+
+    def get_current_key(self):
+        presence = self.hs.get_handlers().presence_handler
+        return presence._user_cachemap_latest_serial
+
+    def get_pagination_rows(self, user, pagination_config, key):
+        # TODO (erikj): Does this make sense? Ordering?
+
+        from_token = pagination_config.from_token
+        to_token = pagination_config.to_token
+
+        from_key = int(from_token.presence_key)
+
+        if to_token:
+            to_key = int(to_token.presence_key)
+        else:
+            to_key = -1
+
+        presence = self.hs.get_handlers().presence_handler
+        cachemap = presence._user_cachemap
+
+        # TODO(paul): limit, and filter by visibility
+        updates = [(k, cachemap[k]) for k in cachemap
+                   if to_key < cachemap[k].serial < from_key]
+
+        if updates:
+            clock = self.clock
+
+            earliest_serial = max([x[1].serial for x in updates])
+            data = [x[1].make_event(user=x[0], clock=clock) for x in updates]
+
+            if to_token:
+                next_token = to_token
+            else:
+                next_token = from_token
+
+            next_token = next_token.copy_and_replace(
+                "presence_key", earliest_serial
+            )
+            return ((data, next_token))
+        else:
+            if not to_token:
+                to_token = from_token.copy_and_replace(
+                    "presence_key", 0
+                )
+            return (([], to_token))
+
+
 class UserPresenceCache(object):
     """Store an observed user's state and status message.
 
@@ -733,6 +851,7 @@ class UserPresenceCache(object):
 
     def update(self, state, serial):
         assert("mtime_age" not in state)
+        assert("state" not in state)
 
         self.state.update(state)
         # Delete keys that are now 'None'
@@ -749,15 +868,21 @@ class UserPresenceCache(object):
 
     def get_state(self):
         # clone it so caller can't break our cache
-        return dict(self.state)
+        state = dict(self.state)
+
+        # Legacy handling
+        if "presence" in state:
+            state["state"] = state["presence"]
+
+        return state
 
     def make_event(self, user, clock):
         content = self.get_state()
         content["user_id"] = user.to_string()
 
-        if "mtime" in content:
-            content["mtime_age"] = int(
-                clock.time_msec() - content.pop("mtime")
+        if "last_active" in content:
+            content["last_active_ago"] = int(
+                clock.time_msec() - content.pop("last_active")
             )
 
         return {"type": "m.presence", "content": content}