summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/handlers/presence.py280
-rw-r--r--tests/handlers/test_presence.py373
2 files changed, 560 insertions, 93 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 8831d83c56..0a061fe9b2 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -217,54 +217,19 @@ class PresenceHandler(BaseHandler):
                 user_id, UserPresenceState.default(user_id)
             )
 
-            # If the users are ours then we want to set up a bunch of timers
-            # to time things out.
-            if self.hs.is_mine_id(user_id):
-                if new_state.state == PresenceState.ONLINE:
-                    # Idle timer
-                    self.wheel_timer.insert(
-                        now=now,
-                        obj=user_id,
-                        then=new_state.last_active_ts + IDLE_TIMER
-                    )
-
-                if new_state.state != PresenceState.OFFLINE:
-                    # User has stopped syncing
-                    self.wheel_timer.insert(
-                        now=now,
-                        obj=user_id,
-                        then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT
-                    )
-
-                    last_federate = new_state.last_federation_update_ts
-                    if now - last_federate > FEDERATION_PING_INTERVAL:
-                        # Been a while since we've poked remote servers
-                        new_state = new_state.copy_and_replace(
-                            last_federation_update_ts=now,
-                        )
-                        to_federation_ping[user_id] = new_state
-
-            else:
-                self.wheel_timer.insert(
-                    now=now,
-                    obj=user_id,
-                    then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT
-                )
+            new_state, should_notify, should_ping = handle_update(
+                prev_state, new_state,
+                is_mine=self.hs.is_mine_id(user_id),
+                wheel_timer=self.wheel_timer,
+                now=now
+            )
 
-            if new_state.state == PresenceState.ONLINE:
-                active = now - new_state.last_active_ts < LAST_ACTIVE_GRANULARITY
-                new_state = new_state.copy_and_replace(
-                    currently_active=active,
-                )
+            self.user_to_current_state[user_id] = new_state
 
-            # Check whether the change was something worth notifying about
-            if should_notify(prev_state, new_state):
-                new_state.copy_and_replace(
-                    last_federation_update_ts=now,
-                )
+            if should_notify:
                 to_notify[user_id] = new_state
-
-            self.user_to_current_state[user_id] = new_state
+            elif should_ping:
+                to_federation_ping[user_id] = new_state
 
         # TODO: We should probably ensure there are no races hereafter
 
@@ -296,55 +261,22 @@ class PresenceHandler(BaseHandler):
         # take any action.
         users_to_check = self.wheel_timer.fetch(now)
 
-        changes = {}  # Actual changes we need to notify people about
-
-        for user_id in set(users_to_check):
-            state = self.user_to_current_state.get(user_id, None)
-            if not state:
-                continue
-
-            if state.state == PresenceState.OFFLINE:
-                # No timeouts are associated with offline states.
-                continue
+        states = [
+            self.user_to_current_state.get(
+                user_id, UserPresenceState.default(user_id)
+            )
+            for user_id in set(users_to_check)
+        ]
+
+        changes = handle_timeouts(
+            states,
+            is_mine_fn=self.hs.is_mine_id,
+            user_to_current_state=self.user_to_current_state,
+            user_to_num_current_syncs=self.user_to_num_current_syncs,
+            now=now,
+        )
 
-            if self.hs.is_mine_id(user_id):
-                if state.state == PresenceState.ONLINE:
-                    if now - state.last_active_ts > IDLE_TIMER:
-                        # Currently online, but last activity ages ago so auto
-                        # idle
-                        changes[user_id] = state.copy_and_replace(
-                            state=PresenceState.UNAVAILABLE,
-                        )
-                    elif now - state.last_active_ts > LAST_ACTIVE_GRANULARITY:
-                        # So that we send down a notification that we've
-                        # stopped updating.
-                        changes[user_id] = state
-
-                if now - state.last_federation_update_ts > FEDERATION_PING_INTERVAL:
-                    # Need to send ping to other servers to ensure they don't
-                    # timeout and set us to offline
-                    changes[user_id] = state
-
-                # If there are have been no sync for a while (and none ongoing),
-                # set presence to offline
-                if not self.user_to_num_current_syncs.get(user_id, 0):
-                    if now - state.last_user_sync_ts > SYNC_ONLINE_TIMEOUT:
-                        changes[user_id] = state.copy_and_replace(
-                            state=PresenceState.OFFLINE,
-                            status_msg=None,
-                        )
-            else:
-                # We expect to be poked occaisonally by the other side.
-                # This is to protect against forgetful/buggy servers, so that
-                # no one gets stuck online forever.
-                if now - state.last_federation_update_ts > FEDERATION_TIMEOUT:
-                    # The other side seems to have disappeared.
-                    changes[user_id] = state.copy_and_replace(
-                        state=PresenceState.OFFLINE,
-                        status_msg=None,
-                    )
-
-        preserve_fn(self._update_states)(changes.values())
+        preserve_fn(self._update_states)(changes)
 
     @defer.inlineCallbacks
     def bump_presence_active_time(self, user):
@@ -925,3 +857,165 @@ class PresenceEventSource(object):
 
     def get_pagination_rows(self, user, pagination_config, key):
         return self.get_new_events(user, from_key=None, include_offline=False)
+
+
+def handle_timeouts(user_states, is_mine_fn, user_to_num_current_syncs, now):
+    """Checks the presence of users that have timed out and updates as
+    appropriate.
+
+    Args:
+        user_states(list): List of UserPresenceState's to check.
+        is_mine_fn (fn): Function that returns if a user_id is ours
+        user_to_num_current_syncs (dict): Mapping of user_id to number of currently
+            active syncs.
+        now (int): Current time in ms.
+
+    Returns:
+        List of UserPresenceState updates
+    """
+    changes = {}  # Actual changes we need to notify people about
+
+    for state in user_states:
+        is_mine = is_mine_fn(state.user_id)
+
+        new_state = handle_timeout(state, is_mine, user_to_num_current_syncs, now)
+        if new_state:
+            changes[state.user_id] = new_state
+
+    return changes.values()
+
+
+def handle_timeout(state, is_mine, user_to_num_current_syncs, now):
+    """Checks the presence of the user to see if any of the timers have elapsed
+
+    Args:
+        state (UserPresenceState)
+        is_mine (bool): Whether the user is ours
+        user_to_num_current_syncs (dict): Mapping of user_id to number of currently
+            active syncs.
+        now (int): Current time in ms.
+
+    Returns:
+        A UserPresenceState update or None if no update.
+    """
+    if state.state == PresenceState.OFFLINE:
+        # No timeouts are associated with offline states.
+        return None
+
+    changed = False
+    user_id = state.user_id
+
+    if is_mine:
+        if state.state == PresenceState.ONLINE:
+            if now - state.last_active_ts > IDLE_TIMER:
+                # Currently online, but last activity ages ago so auto
+                # idle
+                state = state.copy_and_replace(
+                    state=PresenceState.UNAVAILABLE,
+                )
+                changed = True
+            elif now - state.last_active_ts > LAST_ACTIVE_GRANULARITY:
+                # So that we send down a notification that we've
+                # stopped updating.
+                changed = True
+
+        if now - state.last_federation_update_ts > FEDERATION_PING_INTERVAL:
+            # Need to send ping to other servers to ensure they don't
+            # timeout and set us to offline
+            changed = True
+
+        # If there are have been no sync for a while (and none ongoing),
+        # set presence to offline
+        if not user_to_num_current_syncs.get(user_id, 0):
+            if now - state.last_user_sync_ts > SYNC_ONLINE_TIMEOUT:
+                state = state.copy_and_replace(
+                    state=PresenceState.OFFLINE,
+                    status_msg=None,
+                )
+                changed = True
+    else:
+        # We expect to be poked occaisonally by the other side.
+        # This is to protect against forgetful/buggy servers, so that
+        # no one gets stuck online forever.
+        if now - state.last_federation_update_ts > FEDERATION_TIMEOUT:
+            # The other side seems to have disappeared.
+            state = state.copy_and_replace(
+                state=PresenceState.OFFLINE,
+                status_msg=None,
+            )
+            changed = True
+
+    return state if changed else None
+
+
+def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
+    """Given a presence update:
+        1. Add any appropriate timers.
+        2. Check if we should notify anyone.
+
+    Args:
+        prev_state (UserPresenceState)
+        new_state (UserPresenceState)
+        is_mine (bool): Whether the user is ours
+        wheel_timer (WheelTimer)
+        now (int): Time now in ms
+
+    Returns:
+        3-tuple: `(new_state, persist_and_notify, federation_ping)` where:
+            - new_state: is the state to actually persist
+            - persist_and_notify (bool): whether to persist and notify people
+            - federation_ping (bool): whether we should send a ping over federation
+    """
+    user_id = new_state.user_id
+
+    persist_and_notify = False
+    federation_ping = False
+
+    # If the users are ours then we want to set up a bunch of timers
+    # to time things out.
+    if is_mine:
+        if new_state.state == PresenceState.ONLINE:
+            # Idle timer
+            wheel_timer.insert(
+                now=now,
+                obj=user_id,
+                then=new_state.last_active_ts + IDLE_TIMER
+            )
+
+        if new_state.state != PresenceState.OFFLINE:
+            # User has stopped syncing
+            wheel_timer.insert(
+                now=now,
+                obj=user_id,
+                then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT
+            )
+
+            last_federate = new_state.last_federation_update_ts
+            if now - last_federate > FEDERATION_PING_INTERVAL:
+                # Been a while since we've poked remote servers
+                new_state = new_state.copy_and_replace(
+                    last_federation_update_ts=now,
+                )
+                federation_ping = True
+
+    else:
+        wheel_timer.insert(
+            now=now,
+            obj=user_id,
+            then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT
+        )
+
+    if new_state.state == PresenceState.ONLINE:
+        active = now - new_state.last_active_ts < LAST_ACTIVE_GRANULARITY
+        new_state = new_state.copy_and_replace(
+            currently_active=active,
+        )
+
+    # Check whether the change was something worth notifying about
+    if should_notify(prev_state, new_state):
+        new_state = new_state.copy_and_replace(
+            last_federation_update_ts=now,
+        )
+        persist_and_notify = True
+
+    return new_state, persist_and_notify, federation_ping
diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py
new file mode 100644
index 0000000000..197298db15
--- /dev/null
+++ b/tests/handlers/test_presence.py
@@ -0,0 +1,373 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from tests import unittest
+
+from mock import Mock, call
+
+from synapse.api.constants import PresenceState
+from synapse.handlers.presence import (
+    handle_update, handle_timeout,
+    IDLE_TIMER, SYNC_ONLINE_TIMEOUT, LAST_ACTIVE_GRANULARITY, FEDERATION_TIMEOUT,
+    FEDERATION_PING_INTERVAL,
+)
+from synapse.storage.presence import UserPresenceState
+
+
+class PresenceUpdateTestCase(unittest.TestCase):
+    def test_offline_to_online(self):
+        wheel_timer = Mock()
+        user_id = "@foo:bar"
+        now = 5000000
+
+        prev_state = UserPresenceState.default(user_id)
+        new_state = prev_state.copy_and_replace(
+            state=PresenceState.ONLINE,
+            last_active_ts=now,
+        )
+
+        state, persist_and_notify, federation_ping = handle_update(
+            prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
+        )
+
+        self.assertTrue(persist_and_notify)
+        self.assertTrue(state.currently_active)
+        self.assertEquals(new_state.state, state.state)
+        self.assertEquals(new_state.status_msg, state.status_msg)
+        self.assertEquals(state.last_federation_update_ts, now)
+
+        self.assertEquals(wheel_timer.insert.call_count, 2)
+        wheel_timer.insert.assert_has_calls([
+            call(
+                now=now,
+                obj=user_id,
+                then=new_state.last_active_ts + IDLE_TIMER
+            ),
+            call(
+                now=now,
+                obj=user_id,
+                then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT
+            )
+        ], any_order=True)
+
+    def test_online_to_online(self):
+        wheel_timer = Mock()
+        user_id = "@foo:bar"
+        now = 5000000
+
+        prev_state = UserPresenceState.default(user_id)
+        prev_state = prev_state.copy_and_replace(
+            state=PresenceState.ONLINE,
+            last_active_ts=now,
+            currently_active=True,
+        )
+
+        new_state = prev_state.copy_and_replace(
+            state=PresenceState.ONLINE,
+            last_active_ts=now,
+        )
+
+        state, persist_and_notify, federation_ping = handle_update(
+            prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
+        )
+
+        self.assertFalse(persist_and_notify)
+        self.assertTrue(federation_ping)
+        self.assertTrue(state.currently_active)
+        self.assertEquals(new_state.state, state.state)
+        self.assertEquals(new_state.status_msg, state.status_msg)
+        self.assertEquals(state.last_federation_update_ts, now)
+
+        self.assertEquals(wheel_timer.insert.call_count, 2)
+        wheel_timer.insert.assert_has_calls([
+            call(
+                now=now,
+                obj=user_id,
+                then=new_state.last_active_ts + IDLE_TIMER
+            ),
+            call(
+                now=now,
+                obj=user_id,
+                then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT
+            )
+        ], any_order=True)
+
+    def test_online_to_online_last_active(self):
+        wheel_timer = Mock()
+        user_id = "@foo:bar"
+        now = 5000000
+
+        prev_state = UserPresenceState.default(user_id)
+        prev_state = prev_state.copy_and_replace(
+            state=PresenceState.ONLINE,
+            last_active_ts=now - LAST_ACTIVE_GRANULARITY - 1,
+            currently_active=True,
+        )
+
+        new_state = prev_state.copy_and_replace(
+            state=PresenceState.ONLINE,
+        )
+
+        state, persist_and_notify, federation_ping = handle_update(
+            prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
+        )
+
+        self.assertTrue(persist_and_notify)
+        self.assertFalse(state.currently_active)
+        self.assertEquals(new_state.state, state.state)
+        self.assertEquals(new_state.status_msg, state.status_msg)
+        self.assertEquals(state.last_federation_update_ts, now)
+
+        self.assertEquals(wheel_timer.insert.call_count, 2)
+        wheel_timer.insert.assert_has_calls([
+            call(
+                now=now,
+                obj=user_id,
+                then=new_state.last_active_ts + IDLE_TIMER
+            ),
+            call(
+                now=now,
+                obj=user_id,
+                then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT
+            )
+        ], any_order=True)
+
+    def test_remote_ping_timer(self):
+        wheel_timer = Mock()
+        user_id = "@foo:bar"
+        now = 5000000
+
+        prev_state = UserPresenceState.default(user_id)
+        prev_state = prev_state.copy_and_replace(
+            state=PresenceState.ONLINE,
+        )
+
+        new_state = prev_state.copy_and_replace(
+            state=PresenceState.ONLINE,
+        )
+
+        state, persist_and_notify, federation_ping = handle_update(
+            prev_state, new_state, is_mine=False, wheel_timer=wheel_timer, now=now
+        )
+
+        self.assertFalse(persist_and_notify)
+        self.assertFalse(federation_ping)
+        self.assertFalse(state.currently_active)
+        self.assertEquals(new_state.state, state.state)
+        self.assertEquals(new_state.status_msg, state.status_msg)
+
+        self.assertEquals(wheel_timer.insert.call_count, 1)
+        wheel_timer.insert.assert_has_calls([
+            call(
+                now=now,
+                obj=user_id,
+                then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT
+            ),
+        ], any_order=True)
+
+    def test_online_to_offline(self):
+        wheel_timer = Mock()
+        user_id = "@foo:bar"
+        now = 5000000
+
+        prev_state = UserPresenceState.default(user_id)
+        prev_state = prev_state.copy_and_replace(
+            state=PresenceState.ONLINE,
+            last_active_ts=now,
+            currently_active=True,
+        )
+
+        new_state = prev_state.copy_and_replace(
+            state=PresenceState.OFFLINE,
+        )
+
+        state, persist_and_notify, federation_ping = handle_update(
+            prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
+        )
+
+        self.assertTrue(persist_and_notify)
+        self.assertEquals(new_state.state, state.state)
+        self.assertEquals(state.last_federation_update_ts, now)
+
+        self.assertEquals(wheel_timer.insert.call_count, 0)
+
+    def test_online_to_idle(self):
+        wheel_timer = Mock()
+        user_id = "@foo:bar"
+        now = 5000000
+
+        prev_state = UserPresenceState.default(user_id)
+        prev_state = prev_state.copy_and_replace(
+            state=PresenceState.ONLINE,
+            last_active_ts=now,
+            currently_active=True,
+        )
+
+        new_state = prev_state.copy_and_replace(
+            state=PresenceState.UNAVAILABLE,
+        )
+
+        state, persist_and_notify, federation_ping = handle_update(
+            prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
+        )
+
+        self.assertTrue(persist_and_notify)
+        self.assertEquals(new_state.state, state.state)
+        self.assertEquals(state.last_federation_update_ts, now)
+        self.assertEquals(new_state.state, state.state)
+        self.assertEquals(new_state.status_msg, state.status_msg)
+
+        self.assertEquals(wheel_timer.insert.call_count, 1)
+        wheel_timer.insert.assert_has_calls([
+            call(
+                now=now,
+                obj=user_id,
+                then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT
+            )
+        ], any_order=True)
+
+
+class PresenceTimeoutTestCase(unittest.TestCase):
+    def test_idle_timer(self):
+        user_id = "@foo:bar"
+        now = 5000000
+
+        state = UserPresenceState.default(user_id)
+        state = state.copy_and_replace(
+            state=PresenceState.ONLINE,
+            last_active_ts=now - IDLE_TIMER - 1,
+            last_user_sync_ts=now,
+        )
+
+        new_state = handle_timeout(
+            state, is_mine=True, user_to_num_current_syncs={}, now=now
+        )
+
+        self.assertIsNotNone(new_state)
+        self.assertEquals(new_state.state, PresenceState.UNAVAILABLE)
+
+    def test_sync_timeout(self):
+        user_id = "@foo:bar"
+        now = 5000000
+
+        state = UserPresenceState.default(user_id)
+        state = state.copy_and_replace(
+            state=PresenceState.ONLINE,
+            last_active_ts=now,
+            last_user_sync_ts=now - SYNC_ONLINE_TIMEOUT - 1,
+        )
+
+        new_state = handle_timeout(
+            state, is_mine=True, user_to_num_current_syncs={}, now=now
+        )
+
+        self.assertIsNotNone(new_state)
+        self.assertEquals(new_state.state, PresenceState.OFFLINE)
+
+    def test_sync_online(self):
+        user_id = "@foo:bar"
+        now = 5000000
+
+        state = UserPresenceState.default(user_id)
+        state = state.copy_and_replace(
+            state=PresenceState.ONLINE,
+            last_active_ts=now - SYNC_ONLINE_TIMEOUT - 1,
+            last_user_sync_ts=now - SYNC_ONLINE_TIMEOUT - 1,
+        )
+
+        new_state = handle_timeout(
+            state, is_mine=True, user_to_num_current_syncs={
+                user_id: 1,
+            }, now=now
+        )
+
+        self.assertIsNotNone(new_state)
+        self.assertEquals(new_state.state, PresenceState.ONLINE)
+
+    def test_federation_ping(self):
+        user_id = "@foo:bar"
+        now = 5000000
+
+        state = UserPresenceState.default(user_id)
+        state = state.copy_and_replace(
+            state=PresenceState.ONLINE,
+            last_active_ts=now,
+            last_user_sync_ts=now,
+            last_federation_update_ts=now - FEDERATION_PING_INTERVAL - 1,
+        )
+
+        new_state = handle_timeout(
+            state, is_mine=True, user_to_num_current_syncs={}, now=now
+        )
+
+        self.assertIsNotNone(new_state)
+        self.assertEquals(new_state, new_state)
+
+    def test_no_timeout(self):
+        user_id = "@foo:bar"
+        now = 5000000
+
+        state = UserPresenceState.default(user_id)
+        state = state.copy_and_replace(
+            state=PresenceState.ONLINE,
+            last_active_ts=now,
+            last_user_sync_ts=now,
+            last_federation_update_ts=now,
+        )
+
+        new_state = handle_timeout(
+            state, is_mine=True, user_to_num_current_syncs={}, now=now
+        )
+
+        self.assertIsNone(new_state)
+
+    def test_federation_timeout(self):
+        user_id = "@foo:bar"
+        now = 5000000
+
+        state = UserPresenceState.default(user_id)
+        state = state.copy_and_replace(
+            state=PresenceState.ONLINE,
+            last_active_ts=now,
+            last_user_sync_ts=now,
+            last_federation_update_ts=now - FEDERATION_TIMEOUT - 1,
+        )
+
+        new_state = handle_timeout(
+            state, is_mine=False, user_to_num_current_syncs={}, now=now
+        )
+
+        self.assertIsNotNone(new_state)
+        self.assertEquals(new_state.state, PresenceState.OFFLINE)
+
+    def test_last_active(self):
+        user_id = "@foo:bar"
+        now = 5000000
+
+        state = UserPresenceState.default(user_id)
+        state = state.copy_and_replace(
+            state=PresenceState.ONLINE,
+            last_active_ts=now - LAST_ACTIVE_GRANULARITY - 1,
+            last_user_sync_ts=now,
+            last_federation_update_ts=now,
+        )
+
+        new_state = handle_timeout(
+            state, is_mine=True, user_to_num_current_syncs={}, now=now
+        )
+
+        self.assertIsNotNone(new_state)
+        self.assertEquals(state, new_state)