summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/16066.bugfix1
-rw-r--r--changelog.d/16170.bugfix1
-rw-r--r--changelog.d/16170.misc1
-rw-r--r--changelog.d/16171.bugfix1
-rw-r--r--changelog.d/16171.misc1
-rw-r--r--changelog.d/16172.bugfix1
-rw-r--r--changelog.d/16172.misc1
-rw-r--r--synapse/api/presence.py43
-rw-r--r--synapse/handlers/presence.py279
-rw-r--r--tests/handlers/test_presence.py500
10 files changed, 765 insertions, 64 deletions
diff --git a/changelog.d/16066.bugfix b/changelog.d/16066.bugfix
new file mode 100644
index 0000000000..83649cf42a
--- /dev/null
+++ b/changelog.d/16066.bugfix
@@ -0,0 +1 @@
+Fix a long-standing bug where multi-device accounts could cause high load due to presence.
diff --git a/changelog.d/16170.bugfix b/changelog.d/16170.bugfix
new file mode 100644
index 0000000000..83649cf42a
--- /dev/null
+++ b/changelog.d/16170.bugfix
@@ -0,0 +1 @@
+Fix a long-standing bug where multi-device accounts could cause high load due to presence.
diff --git a/changelog.d/16170.misc b/changelog.d/16170.misc
deleted file mode 100644
index c950b54367..0000000000
--- a/changelog.d/16170.misc
+++ /dev/null
@@ -1 +0,0 @@
-Simplify presence code when using workers.
diff --git a/changelog.d/16171.bugfix b/changelog.d/16171.bugfix
new file mode 100644
index 0000000000..83649cf42a
--- /dev/null
+++ b/changelog.d/16171.bugfix
@@ -0,0 +1 @@
+Fix a long-standing bug where multi-device accounts could cause high load due to presence.
diff --git a/changelog.d/16171.misc b/changelog.d/16171.misc
deleted file mode 100644
index 4d709cb56e..0000000000
--- a/changelog.d/16171.misc
+++ /dev/null
@@ -1 +0,0 @@
-Track per-device information in the presence code.
diff --git a/changelog.d/16172.bugfix b/changelog.d/16172.bugfix
new file mode 100644
index 0000000000..83649cf42a
--- /dev/null
+++ b/changelog.d/16172.bugfix
@@ -0,0 +1 @@
+Fix a long-standing bug where multi-device accounts could cause high load due to presence.
diff --git a/changelog.d/16172.misc b/changelog.d/16172.misc
deleted file mode 100644
index 4d709cb56e..0000000000
--- a/changelog.d/16172.misc
+++ /dev/null
@@ -1 +0,0 @@
-Track per-device information in the presence code.
diff --git a/synapse/api/presence.py b/synapse/api/presence.py
index b80aa83cb3..b78f419994 100644
--- a/synapse/api/presence.py
+++ b/synapse/api/presence.py
@@ -20,18 +20,53 @@ from synapse.api.constants import PresenceState
 from synapse.types import JsonDict
 
 
+@attr.s(slots=True, auto_attribs=True)
+class UserDevicePresenceState:
+    """
+    Represents the current presence state of a user's device.
+
+    user_id: The user ID.
+    device_id: The user's device ID.
+    state: The presence state, see PresenceState.
+    last_active_ts: Time in msec that the device last interacted with server.
+    last_sync_ts: Time in msec that the device last *completed* a sync
+        (or event stream).
+    """
+
+    user_id: str
+    device_id: Optional[str]
+    state: str
+    last_active_ts: int
+    last_sync_ts: int
+
+    @classmethod
+    def default(
+        cls, user_id: str, device_id: Optional[str]
+    ) -> "UserDevicePresenceState":
+        """Returns a default presence state."""
+        return cls(
+            user_id=user_id,
+            device_id=device_id,
+            state=PresenceState.OFFLINE,
+            last_active_ts=0,
+            last_sync_ts=0,
+        )
+
+
 @attr.s(slots=True, frozen=True, auto_attribs=True)
 class UserPresenceState:
     """Represents the current presence state of the user.
 
-    user_id
-    last_active: Time in msec that the user last interacted with server.
-    last_federation_update: Time in msec since either a) we sent a presence
+    user_id: The user ID.
+    state: The presence state, see PresenceState.
+    last_active_ts: Time in msec that the user last interacted with server.
+    last_federation_update_ts: Time in msec since either a) we sent a presence
         update to other servers or b) we received a presence update, depending
         on if is a local user or not.
-    last_user_sync: Time in msec that the user last *completed* a sync
+    last_user_sync_ts: Time in msec that the user last *completed* a sync
         (or event stream).
     status_msg: User set status message.
+    currently_active: True if the user is currently syncing.
     """
 
     user_id: str
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index f31e18328b..80190838b7 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -13,13 +13,56 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-"""This module is responsible for keeping track of presence status of local
+"""
+This module is responsible for keeping track of presence status of local
 and remote users.
 
 The methods that define policy are:
     - PresenceHandler._update_states
     - PresenceHandler._handle_timeouts
     - should_notify
+
+# Tracking local presence
+
+For local users, presence is tracked on a per-device basis. When a user has multiple
+devices the user presence state is derived by coalescing the presence from each
+device:
+
+    BUSY > ONLINE > UNAVAILABLE > OFFLINE
+
+The time that each device was last active and last synced is tracked in order to
+automatically downgrade a device's presence state:
+
+    A device may move from ONLINE -> UNAVAILABLE, if it has not been active for
+    a period of time.
+
+    A device may go from any state -> OFFLINE, if it is not active and has not
+    synced for a period of time.
+
+The timeouts are handled using a wheel timer, which has coarse buckets. Timings
+do not need to be exact.
+
+Generally a device's presence state is updated whenever a user syncs (via the
+set_presence parameter), when the presence API is called, or if "pro-active"
+events occur, including:
+
+* Sending an event, receipt, read marker.
+* Updating typing status.
+
+The busy state has special status that it cannot is not downgraded by a call to
+sync with a lower priority state *and* it takes a long period of time to transition
+to offline.
+
+# Persisting (and restoring) presence
+
+For all users, presence is persisted on a per-user basis. Data is kept in-memory
+and persisted periodically. When Synapse starts each worker loads the current
+presence state and then tracks the presence stream to keep itself up-to-date.
+
+When restoring presence for local users a pseudo-device is created to match the
+user state; this device follows the normal timeout logic (see above) and will
+automatically be replaced with any information from currently available devices.
+
 """
 import abc
 import contextlib
@@ -30,6 +73,7 @@ from contextlib import contextmanager
 from types import TracebackType
 from typing import (
     TYPE_CHECKING,
+    AbstractSet,
     Any,
     Callable,
     Collection,
@@ -49,7 +93,7 @@ from prometheus_client import Counter
 import synapse.metrics
 from synapse.api.constants import EduTypes, EventTypes, Membership, PresenceState
 from synapse.api.errors import SynapseError
-from synapse.api.presence import UserPresenceState
+from synapse.api.presence import UserDevicePresenceState, UserPresenceState
 from synapse.appservice import ApplicationService
 from synapse.events.presence_router import PresenceRouter
 from synapse.logging.context import run_in_background
@@ -162,6 +206,7 @@ class BasePresenceHandler(abc.ABC):
             self.VALID_PRESENCE += (PresenceState.BUSY,)
 
         active_presence = self.store.take_presence_startup_info()
+        # The combined status across all user devices.
         self.user_to_current_state = {state.user_id: state for state in active_presence}
 
     @abc.abstractmethod
@@ -708,9 +753,27 @@ class PresenceHandler(BasePresenceHandler):
             lambda: len(self.user_to_current_state),
         )
 
+        # The per-device presence state, maps user to devices to per-device presence state.
+        self._user_to_device_to_current_state: Dict[
+            str, Dict[Optional[str], UserDevicePresenceState]
+        ] = {}
+
         now = self.clock.time_msec()
         if self._presence_enabled:
             for state in self.user_to_current_state.values():
+                # Create a psuedo-device to properly handle time outs. This will
+                # be overridden by any "real" devices within SYNC_ONLINE_TIMEOUT.
+                pseudo_device_id = None
+                self._user_to_device_to_current_state[state.user_id] = {
+                    pseudo_device_id: UserDevicePresenceState(
+                        user_id=state.user_id,
+                        device_id=pseudo_device_id,
+                        state=state.state,
+                        last_active_ts=state.last_active_ts,
+                        last_sync_ts=state.last_user_sync_ts,
+                    )
+                }
+
                 self.wheel_timer.insert(
                     now=now, obj=state.user_id, then=state.last_active_ts + IDLE_TIMER
                 )
@@ -752,7 +815,7 @@ class PresenceHandler(BasePresenceHandler):
 
         # Keeps track of the number of *ongoing* syncs on other processes.
         #
-        # While any sync is ongoing on another process the user will never
+        # While any sync is ongoing on another process the user's device will never
         # go offline.
         #
         # Each process has a unique identifier and an update frequency. If
@@ -981,22 +1044,21 @@ class PresenceHandler(BasePresenceHandler):
 
         timers_fired_counter.inc(len(states))
 
-        syncing_user_ids = {
-            user_id
-            for (user_id, _), count in self._user_device_to_num_current_syncs.items()
+        # Set of user ID & device IDs which are currently syncing.
+        syncing_user_devices = {
+            user_id_device_id
+            for user_id_device_id, count in self._user_device_to_num_current_syncs.items()
             if count
         }
-        syncing_user_ids.update(
-            user_id
-            for user_id, _ in itertools.chain(
-                *self.external_process_to_current_syncs.values()
-            )
+        syncing_user_devices.update(
+            itertools.chain(*self.external_process_to_current_syncs.values())
         )
 
         changes = handle_timeouts(
             states,
             is_mine_fn=self.is_mine_id,
-            syncing_user_ids=syncing_user_ids,
+            syncing_user_devices=syncing_user_devices,
+            user_to_devices=self._user_to_device_to_current_state,
             now=now,
         )
 
@@ -1016,11 +1078,26 @@ class PresenceHandler(BasePresenceHandler):
 
         bump_active_time_counter.inc()
 
-        prev_state = await self.current_state_for_user(user_id)
+        now = self.clock.time_msec()
+
+        # Update the device information & mark the device as online if it was
+        # unavailable.
+        devices = self._user_to_device_to_current_state.setdefault(user_id, {})
+        device_state = devices.setdefault(
+            device_id,
+            UserDevicePresenceState.default(user_id, device_id),
+        )
+        device_state.last_active_ts = now
+        if device_state.state == PresenceState.UNAVAILABLE:
+            device_state.state = PresenceState.ONLINE
 
-        new_fields: Dict[str, Any] = {"last_active_ts": self.clock.time_msec()}
-        if prev_state.state == PresenceState.UNAVAILABLE:
-            new_fields["state"] = PresenceState.ONLINE
+        # Update the user state, this will always update last_active_ts and
+        # might update the presence state.
+        prev_state = await self.current_state_for_user(user_id)
+        new_fields: Dict[str, Any] = {
+            "last_active_ts": now,
+            "state": _combine_device_states(devices.values()),
+        }
 
         await self._update_states([prev_state.copy_and_replace(**new_fields)])
 
@@ -1132,6 +1209,12 @@ class PresenceHandler(BasePresenceHandler):
             if is_syncing and (user_id, device_id) not in process_presence:
                 process_presence.add((user_id, device_id))
             elif not is_syncing and (user_id, device_id) in process_presence:
+                devices = self._user_to_device_to_current_state.setdefault(user_id, {})
+                device_state = devices.setdefault(
+                    device_id, UserDevicePresenceState.default(user_id, device_id)
+                )
+                device_state.last_sync_ts = sync_time_msec
+
                 new_state = prev_state.copy_and_replace(
                     last_user_sync_ts=sync_time_msec
                 )
@@ -1151,11 +1234,24 @@ class PresenceHandler(BasePresenceHandler):
             process_presence = self.external_process_to_current_syncs.pop(
                 process_id, set()
             )
-            prev_states = await self.current_state_for_users(
-                {user_id for user_id, device_id in process_presence}
-            )
+
             time_now_ms = self.clock.time_msec()
 
+            # Mark each device as having a last sync time.
+            updated_users = set()
+            for user_id, device_id in process_presence:
+                device_state = self._user_to_device_to_current_state.setdefault(
+                    user_id, {}
+                ).setdefault(
+                    device_id, UserDevicePresenceState.default(user_id, device_id)
+                )
+
+                device_state.last_sync_ts = time_now_ms
+                updated_users.add(user_id)
+
+            # Update each user (and insert into the appropriate timers to check if
+            # they've gone offline).
+            prev_states = await self.current_state_for_users(updated_users)
             await self._update_states(
                 [
                     prev_state.copy_and_replace(last_user_sync_ts=time_now_ms)
@@ -1277,6 +1373,20 @@ class PresenceHandler(BasePresenceHandler):
         if prev_state.state == PresenceState.BUSY and is_sync:
             presence = PresenceState.BUSY
 
+        # Update the device specific information.
+        devices = self._user_to_device_to_current_state.setdefault(user_id, {})
+        device_state = devices.setdefault(
+            device_id,
+            UserDevicePresenceState.default(user_id, device_id),
+        )
+        device_state.state = presence
+        device_state.last_active_ts = now
+        if is_sync:
+            device_state.last_sync_ts = now
+
+        # Based on the state of each user's device calculate the new presence state.
+        presence = _combine_device_states(devices.values())
+
         new_fields = {"state": presence}
 
         if presence == PresenceState.ONLINE or presence == PresenceState.BUSY:
@@ -1873,7 +1983,8 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
 def handle_timeouts(
     user_states: List[UserPresenceState],
     is_mine_fn: Callable[[str], bool],
-    syncing_user_ids: Set[str],
+    syncing_user_devices: AbstractSet[Tuple[str, Optional[str]]],
+    user_to_devices: Dict[str, Dict[Optional[str], UserDevicePresenceState]],
     now: int,
 ) -> List[UserPresenceState]:
     """Checks the presence of users that have timed out and updates as
@@ -1882,7 +1993,8 @@ def handle_timeouts(
     Args:
         user_states: List of UserPresenceState's to check.
         is_mine_fn: Function that returns if a user_id is ours
-        syncing_user_ids: Set of user_ids with active syncs.
+        syncing_user_devices: A set of (user ID, device ID) tuples with active syncs..
+        user_to_devices: A map of user ID to device ID to UserDevicePresenceState.
         now: Current time in ms.
 
     Returns:
@@ -1891,9 +2003,16 @@ def handle_timeouts(
     changes = {}  # Actual changes we need to notify people about
 
     for state in user_states:
-        is_mine = is_mine_fn(state.user_id)
-
-        new_state = handle_timeout(state, is_mine, syncing_user_ids, now)
+        user_id = state.user_id
+        is_mine = is_mine_fn(user_id)
+
+        new_state = handle_timeout(
+            state,
+            is_mine,
+            syncing_user_devices,
+            user_to_devices.get(user_id, {}),
+            now,
+        )
         if new_state:
             changes[state.user_id] = new_state
 
@@ -1901,14 +2020,19 @@ def handle_timeouts(
 
 
 def handle_timeout(
-    state: UserPresenceState, is_mine: bool, syncing_user_ids: Set[str], now: int
+    state: UserPresenceState,
+    is_mine: bool,
+    syncing_device_ids: AbstractSet[Tuple[str, Optional[str]]],
+    user_devices: Dict[Optional[str], UserDevicePresenceState],
+    now: int,
 ) -> Optional[UserPresenceState]:
     """Checks the presence of the user to see if any of the timers have elapsed
 
     Args:
-        state
+        state: UserPresenceState to check.
         is_mine: Whether the user is ours
-        syncing_user_ids: Set of user_ids with active syncs.
+        syncing_user_devices: A set of (user ID, device ID) tuples with active syncs..
+        user_devices: A map of device ID to UserDevicePresenceState.
         now: Current time in ms.
 
     Returns:
@@ -1919,34 +2043,55 @@ def handle_timeout(
         return None
 
     changed = False
-    user_id = state.user_id
 
     if is_mine:
-        if state.state == PresenceState.ONLINE:
-            if now - state.last_active_ts > IDLE_TIMER:
-                # Currently online, but last activity ages ago so auto
-                # idle
-                state = state.copy_and_replace(state=PresenceState.UNAVAILABLE)
-                changed = True
-            elif now - state.last_active_ts > LAST_ACTIVE_GRANULARITY:
-                # So that we send down a notification that we've
-                # stopped updating.
+        # Check per-device whether the device should be considered idle or offline
+        # due to timeouts.
+        device_changed = False
+        offline_devices = []
+        for device_id, device_state in user_devices.items():
+            if device_state.state == PresenceState.ONLINE:
+                if now - device_state.last_active_ts > IDLE_TIMER:
+                    # Currently online, but last activity ages ago so auto
+                    # idle
+                    device_state.state = PresenceState.UNAVAILABLE
+                    device_changed = True
+
+            # If there are have been no sync for a while (and none ongoing),
+            # set presence to offline.
+            if (state.user_id, device_id) not in syncing_device_ids:
+                # If the user has done something recently but hasn't synced,
+                # don't set them as offline.
+                sync_or_active = max(
+                    device_state.last_sync_ts, device_state.last_active_ts
+                )
+
+                if now - sync_or_active > SYNC_ONLINE_TIMEOUT:
+                    # Mark the device as going offline.
+                    offline_devices.append(device_id)
+                    device_changed = True
+
+        # Offline devices are not needed and do not add information.
+        for device_id in offline_devices:
+            user_devices.pop(device_id)
+
+        # If the presence state of the devices changed, then (maybe) update
+        # the user's overall presence state.
+        if device_changed:
+            new_presence = _combine_device_states(user_devices.values())
+            if new_presence != state.state:
+                state = state.copy_and_replace(state=new_presence)
                 changed = True
 
+        if now - state.last_active_ts > LAST_ACTIVE_GRANULARITY:
+            # So that we send down a notification that we've
+            # stopped updating.
+            changed = True
+
         if now - state.last_federation_update_ts > FEDERATION_PING_INTERVAL:
             # Need to send ping to other servers to ensure they don't
             # timeout and set us to offline
             changed = True
-
-        # If there are have been no sync for a while (and none ongoing),
-        # set presence to offline
-        if user_id not in syncing_user_ids:
-            # If the user has done something recently but hasn't synced,
-            # don't set them as offline.
-            sync_or_active = max(state.last_user_sync_ts, state.last_active_ts)
-            if now - sync_or_active > SYNC_ONLINE_TIMEOUT:
-                state = state.copy_and_replace(state=PresenceState.OFFLINE)
-                changed = True
     else:
         # We expect to be poked occasionally by the other side.
         # This is to protect against forgetful/buggy servers, so that
@@ -2036,6 +2181,46 @@ def handle_update(
     return new_state, persist_and_notify, federation_ping
 
 
+PRESENCE_BY_PRIORITY = {
+    PresenceState.BUSY: 4,
+    PresenceState.ONLINE: 3,
+    PresenceState.UNAVAILABLE: 2,
+    PresenceState.OFFLINE: 1,
+}
+
+
+def _combine_device_states(
+    device_states: Iterable[UserDevicePresenceState],
+) -> str:
+    """
+    Find the device to use presence information from.
+
+    Orders devices by priority, then last_active_ts.
+
+    Args:
+        device_states: An iterable of device presence states
+
+    Return:
+        The combined presence state.
+    """
+
+    # Based on (all) the user's devices calculate the new presence state.
+    presence = PresenceState.OFFLINE
+    last_active_ts = -1
+
+    # Find the device to use the presence state of based on the presence priority,
+    # but tie-break with how recently the device has been seen.
+    for device_state in device_states:
+        if (PRESENCE_BY_PRIORITY[device_state.state], device_state.last_active_ts) > (
+            PRESENCE_BY_PRIORITY[presence],
+            last_active_ts,
+        ):
+            presence = device_state.state
+            last_active_ts = device_state.last_active_ts
+
+    return presence
+
+
 async def get_interested_parties(
     store: DataStore, presence_router: PresenceRouter, states: List[UserPresenceState]
 ) -> Tuple[Dict[str, List[UserPresenceState]], Dict[str, List[UserPresenceState]]]:
diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py
index 88a16193a3..914415740a 100644
--- a/tests/handlers/test_presence.py
+++ b/tests/handlers/test_presence.py
@@ -21,7 +21,7 @@ from signedjson.key import generate_signing_key
 from twisted.test.proto_helpers import MemoryReactor
 
 from synapse.api.constants import EventTypes, Membership, PresenceState
-from synapse.api.presence import UserPresenceState
+from synapse.api.presence import UserDevicePresenceState, UserPresenceState
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
 from synapse.events.builder import EventBuilder
 from synapse.federation.sender import FederationSender
@@ -352,6 +352,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
 
     def test_idle_timer(self) -> None:
         user_id = "@foo:bar"
+        device_id = "dev-1"
         status_msg = "I'm here!"
         now = 5000000
 
@@ -362,8 +363,21 @@ class PresenceTimeoutTestCase(unittest.TestCase):
             last_user_sync_ts=now,
             status_msg=status_msg,
         )
+        device_state = UserDevicePresenceState(
+            user_id=user_id,
+            device_id=device_id,
+            state=state.state,
+            last_active_ts=state.last_active_ts,
+            last_sync_ts=state.last_user_sync_ts,
+        )
 
-        new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
+        new_state = handle_timeout(
+            state,
+            is_mine=True,
+            syncing_device_ids=set(),
+            user_devices={device_id: device_state},
+            now=now,
+        )
 
         self.assertIsNotNone(new_state)
         assert new_state is not None
@@ -376,6 +390,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
         presence state into unavailable.
         """
         user_id = "@foo:bar"
+        device_id = "dev-1"
         status_msg = "I'm here!"
         now = 5000000
 
@@ -386,8 +401,21 @@ class PresenceTimeoutTestCase(unittest.TestCase):
             last_user_sync_ts=now,
             status_msg=status_msg,
         )
+        device_state = UserDevicePresenceState(
+            user_id=user_id,
+            device_id=device_id,
+            state=state.state,
+            last_active_ts=state.last_active_ts,
+            last_sync_ts=state.last_user_sync_ts,
+        )
 
-        new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
+        new_state = handle_timeout(
+            state,
+            is_mine=True,
+            syncing_device_ids=set(),
+            user_devices={device_id: device_state},
+            now=now,
+        )
 
         self.assertIsNotNone(new_state)
         assert new_state is not None
@@ -396,6 +424,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
 
     def test_sync_timeout(self) -> None:
         user_id = "@foo:bar"
+        device_id = "dev-1"
         status_msg = "I'm here!"
         now = 5000000
 
@@ -406,8 +435,21 @@ class PresenceTimeoutTestCase(unittest.TestCase):
             last_user_sync_ts=now - SYNC_ONLINE_TIMEOUT - 1,
             status_msg=status_msg,
         )
+        device_state = UserDevicePresenceState(
+            user_id=user_id,
+            device_id=device_id,
+            state=state.state,
+            last_active_ts=state.last_active_ts,
+            last_sync_ts=state.last_user_sync_ts,
+        )
 
-        new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
+        new_state = handle_timeout(
+            state,
+            is_mine=True,
+            syncing_device_ids=set(),
+            user_devices={device_id: device_state},
+            now=now,
+        )
 
         self.assertIsNotNone(new_state)
         assert new_state is not None
@@ -416,6 +458,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
 
     def test_sync_online(self) -> None:
         user_id = "@foo:bar"
+        device_id = "dev-1"
         status_msg = "I'm here!"
         now = 5000000
 
@@ -426,9 +469,20 @@ class PresenceTimeoutTestCase(unittest.TestCase):
             last_user_sync_ts=now - SYNC_ONLINE_TIMEOUT - 1,
             status_msg=status_msg,
         )
+        device_state = UserDevicePresenceState(
+            user_id=user_id,
+            device_id=device_id,
+            state=state.state,
+            last_active_ts=state.last_active_ts,
+            last_sync_ts=state.last_user_sync_ts,
+        )
 
         new_state = handle_timeout(
-            state, is_mine=True, syncing_user_ids={user_id}, now=now
+            state,
+            is_mine=True,
+            syncing_device_ids={(user_id, device_id)},
+            user_devices={device_id: device_state},
+            now=now,
         )
 
         self.assertIsNotNone(new_state)
@@ -438,6 +492,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
 
     def test_federation_ping(self) -> None:
         user_id = "@foo:bar"
+        device_id = "dev-1"
         status_msg = "I'm here!"
         now = 5000000
 
@@ -449,14 +504,28 @@ class PresenceTimeoutTestCase(unittest.TestCase):
             last_federation_update_ts=now - FEDERATION_PING_INTERVAL - 1,
             status_msg=status_msg,
         )
+        device_state = UserDevicePresenceState(
+            user_id=user_id,
+            device_id=device_id,
+            state=state.state,
+            last_active_ts=state.last_active_ts,
+            last_sync_ts=state.last_user_sync_ts,
+        )
 
-        new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
+        new_state = handle_timeout(
+            state,
+            is_mine=True,
+            syncing_device_ids=set(),
+            user_devices={device_id: device_state},
+            now=now,
+        )
 
         self.assertIsNotNone(new_state)
         self.assertEqual(state, new_state)
 
     def test_no_timeout(self) -> None:
         user_id = "@foo:bar"
+        device_id = "dev-1"
         now = 5000000
 
         state = UserPresenceState.default(user_id)
@@ -466,8 +535,21 @@ class PresenceTimeoutTestCase(unittest.TestCase):
             last_user_sync_ts=now,
             last_federation_update_ts=now,
         )
+        device_state = UserDevicePresenceState(
+            user_id=user_id,
+            device_id=device_id,
+            state=state.state,
+            last_active_ts=state.last_active_ts,
+            last_sync_ts=state.last_user_sync_ts,
+        )
 
-        new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
+        new_state = handle_timeout(
+            state,
+            is_mine=True,
+            syncing_device_ids=set(),
+            user_devices={device_id: device_state},
+            now=now,
+        )
 
         self.assertIsNone(new_state)
 
@@ -485,8 +567,9 @@ class PresenceTimeoutTestCase(unittest.TestCase):
             status_msg=status_msg,
         )
 
+        # Note that this is a remote user so we do not have their device information.
         new_state = handle_timeout(
-            state, is_mine=False, syncing_user_ids=set(), now=now
+            state, is_mine=False, syncing_device_ids=set(), user_devices={}, now=now
         )
 
         self.assertIsNotNone(new_state)
@@ -496,6 +579,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
 
     def test_last_active(self) -> None:
         user_id = "@foo:bar"
+        device_id = "dev-1"
         status_msg = "I'm here!"
         now = 5000000
 
@@ -507,8 +591,21 @@ class PresenceTimeoutTestCase(unittest.TestCase):
             last_federation_update_ts=now,
             status_msg=status_msg,
         )
+        device_state = UserDevicePresenceState(
+            user_id=user_id,
+            device_id=device_id,
+            state=state.state,
+            last_active_ts=state.last_active_ts,
+            last_sync_ts=state.last_user_sync_ts,
+        )
 
-        new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
+        new_state = handle_timeout(
+            state,
+            is_mine=True,
+            syncing_device_ids=set(),
+            user_devices={device_id: device_state},
+            now=now,
+        )
 
         self.assertIsNotNone(new_state)
         self.assertEqual(state, new_state)
@@ -579,7 +676,7 @@ class PresenceHandlerInitTestCase(unittest.HomeserverTestCase):
         [
             (PresenceState.BUSY, PresenceState.BUSY),
             (PresenceState.ONLINE, PresenceState.ONLINE),
-            (PresenceState.UNAVAILABLE, PresenceState.UNAVAILABLE),
+            (PresenceState.UNAVAILABLE, PresenceState.ONLINE),
             # Offline syncs don't update the state.
             (PresenceState.OFFLINE, PresenceState.ONLINE),
         ]
@@ -800,6 +897,389 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase):
         # we should now be online
         self.assertEqual(state.state, PresenceState.ONLINE)
 
+    @parameterized.expand(
+        # A list of tuples of 4 strings:
+        #
+        # * The presence state of device 1.
+        # * The presence state of device 2.
+        # * The expected user presence state after both devices have synced.
+        # * The expected user presence state after device 1 has idled.
+        # * The expected user presence state after device 2 has idled.
+        # * True to use workers, False a monolith.
+        [
+            (*cases, workers)
+            for workers in (False, True)
+            for cases in [
+                # If both devices have the same state, online should eventually idle.
+                # Otherwise, the state doesn't change.
+                (
+                    PresenceState.ONLINE,
+                    PresenceState.ONLINE,
+                    PresenceState.ONLINE,
+                    PresenceState.ONLINE,
+                    PresenceState.UNAVAILABLE,
+                ),
+                (
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.UNAVAILABLE,
+                ),
+                (
+                    PresenceState.OFFLINE,
+                    PresenceState.OFFLINE,
+                    PresenceState.OFFLINE,
+                    PresenceState.OFFLINE,
+                    PresenceState.OFFLINE,
+                ),
+                # If the second device has a "lower" state it should fallback to it.
+                (
+                    PresenceState.ONLINE,
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.ONLINE,
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.UNAVAILABLE,
+                ),
+                (
+                    PresenceState.ONLINE,
+                    PresenceState.OFFLINE,
+                    PresenceState.ONLINE,
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.UNAVAILABLE,
+                ),
+                (
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.OFFLINE,
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.UNAVAILABLE,
+                ),
+                # If the second device has a "higher" state it should override.
+                (
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.ONLINE,
+                    PresenceState.ONLINE,
+                    PresenceState.ONLINE,
+                    PresenceState.UNAVAILABLE,
+                ),
+                (
+                    PresenceState.OFFLINE,
+                    PresenceState.ONLINE,
+                    PresenceState.ONLINE,
+                    PresenceState.ONLINE,
+                    PresenceState.UNAVAILABLE,
+                ),
+                (
+                    PresenceState.OFFLINE,
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.UNAVAILABLE,
+                ),
+            ]
+        ],
+        name_func=lambda testcase_func, param_num, params: f"{testcase_func.__name__}_{param_num}_{'workers' if params.args[5] else 'monolith'}",
+    )
+    @unittest.override_config({"experimental_features": {"msc3026_enabled": True}})
+    def test_set_presence_from_syncing_multi_device(
+        self,
+        dev_1_state: str,
+        dev_2_state: str,
+        expected_state_1: str,
+        expected_state_2: str,
+        expected_state_3: str,
+        test_with_workers: bool,
+    ) -> None:
+        """
+        Test the behaviour of multiple devices syncing at the same time.
+
+        Roughly the user's presence state should be set to the "highest" priority
+        of all the devices. When a device then goes offline its state should be
+        discarded and the next highest should win.
+
+        Note that these tests use the idle timer (and don't close the syncs), it
+        is unlikely that a *single* sync would last this long, but is close enough
+        to continually syncing with that current state.
+        """
+        user_id = f"@test:{self.hs.config.server.server_name}"
+
+        # By default, we call /sync against the main process.
+        worker_presence_handler = self.presence_handler
+        if test_with_workers:
+            # Create a worker and use it to handle /sync traffic instead.
+            # This is used to test that presence changes get replicated from workers
+            # to the main process correctly.
+            worker_to_sync_against = self.make_worker_hs(
+                "synapse.app.generic_worker", {"worker_name": "synchrotron"}
+            )
+            worker_presence_handler = worker_to_sync_against.get_presence_handler()
+
+        # 1. Sync with the first device.
+        self.get_success(
+            worker_presence_handler.user_syncing(
+                user_id,
+                "dev-1",
+                affect_presence=dev_1_state != PresenceState.OFFLINE,
+                presence_state=dev_1_state,
+            ),
+            by=0.01,
+        )
+
+        # 2. Wait half the idle timer.
+        self.reactor.advance(IDLE_TIMER / 1000 / 2)
+        self.reactor.pump([0.1])
+
+        # 3. Sync with the second device.
+        self.get_success(
+            worker_presence_handler.user_syncing(
+                user_id,
+                "dev-2",
+                affect_presence=dev_2_state != PresenceState.OFFLINE,
+                presence_state=dev_2_state,
+            ),
+            by=0.01,
+        )
+
+        # 4. Assert the expected presence state.
+        state = self.get_success(
+            self.presence_handler.get_state(UserID.from_string(user_id))
+        )
+        self.assertEqual(state.state, expected_state_1)
+        if test_with_workers:
+            state = self.get_success(
+                worker_presence_handler.get_state(UserID.from_string(user_id))
+            )
+            self.assertEqual(state.state, expected_state_1)
+
+        # When testing with workers, make another random sync (with any *different*
+        # user) to keep the process information from expiring.
+        #
+        # This is due to EXTERNAL_PROCESS_EXPIRY being equivalent to IDLE_TIMER.
+        if test_with_workers:
+            with self.get_success(
+                worker_presence_handler.user_syncing(
+                    f"@other-user:{self.hs.config.server.server_name}",
+                    "dev-3",
+                    affect_presence=True,
+                    presence_state=PresenceState.ONLINE,
+                ),
+                by=0.01,
+            ):
+                pass
+
+        # 5. Advance such that the first device should be discarded (the idle timer),
+        # then pump so _handle_timeouts function to called.
+        self.reactor.advance(IDLE_TIMER / 1000 / 2)
+        self.reactor.pump([0.01])
+
+        # 6. Assert the expected presence state.
+        state = self.get_success(
+            self.presence_handler.get_state(UserID.from_string(user_id))
+        )
+        self.assertEqual(state.state, expected_state_2)
+        if test_with_workers:
+            state = self.get_success(
+                worker_presence_handler.get_state(UserID.from_string(user_id))
+            )
+            self.assertEqual(state.state, expected_state_2)
+
+        # 7. Advance such that the second device should be discarded (half the idle timer),
+        # then pump so _handle_timeouts function to called.
+        self.reactor.advance(IDLE_TIMER / 1000 / 2)
+        self.reactor.pump([0.1])
+
+        # 8. The devices are still "syncing" (the sync context managers were never
+        # closed), so might idle.
+        state = self.get_success(
+            self.presence_handler.get_state(UserID.from_string(user_id))
+        )
+        self.assertEqual(state.state, expected_state_3)
+        if test_with_workers:
+            state = self.get_success(
+                worker_presence_handler.get_state(UserID.from_string(user_id))
+            )
+            self.assertEqual(state.state, expected_state_3)
+
+    @parameterized.expand(
+        # A list of tuples of 4 strings:
+        #
+        # * The presence state of device 1.
+        # * The presence state of device 2.
+        # * The expected user presence state after both devices have synced.
+        # * The expected user presence state after device 1 has stopped syncing.
+        # * True to use workers, False a monolith.
+        [
+            (*cases, workers)
+            for workers in (False, True)
+            for cases in [
+                # If both devices have the same state, nothing exciting should happen.
+                (
+                    PresenceState.ONLINE,
+                    PresenceState.ONLINE,
+                    PresenceState.ONLINE,
+                    PresenceState.ONLINE,
+                ),
+                (
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.UNAVAILABLE,
+                ),
+                (
+                    PresenceState.OFFLINE,
+                    PresenceState.OFFLINE,
+                    PresenceState.OFFLINE,
+                    PresenceState.OFFLINE,
+                ),
+                # If the second device has a "lower" state it should fallback to it.
+                (
+                    PresenceState.ONLINE,
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.ONLINE,
+                    PresenceState.UNAVAILABLE,
+                ),
+                (
+                    PresenceState.ONLINE,
+                    PresenceState.OFFLINE,
+                    PresenceState.ONLINE,
+                    PresenceState.OFFLINE,
+                ),
+                (
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.OFFLINE,
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.OFFLINE,
+                ),
+                # If the second device has a "higher" state it should override.
+                (
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.ONLINE,
+                    PresenceState.ONLINE,
+                    PresenceState.ONLINE,
+                ),
+                (
+                    PresenceState.OFFLINE,
+                    PresenceState.ONLINE,
+                    PresenceState.ONLINE,
+                    PresenceState.ONLINE,
+                ),
+                (
+                    PresenceState.OFFLINE,
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.UNAVAILABLE,
+                ),
+            ]
+        ],
+        name_func=lambda testcase_func, param_num, params: f"{testcase_func.__name__}_{param_num}_{'workers' if params.args[4] else 'monolith'}",
+    )
+    @unittest.override_config({"experimental_features": {"msc3026_enabled": True}})
+    def test_set_presence_from_non_syncing_multi_device(
+        self,
+        dev_1_state: str,
+        dev_2_state: str,
+        expected_state_1: str,
+        expected_state_2: str,
+        test_with_workers: bool,
+    ) -> None:
+        """
+        Test the behaviour of multiple devices syncing at the same time.
+
+        Roughly the user's presence state should be set to the "highest" priority
+        of all the devices. When a device then goes offline its state should be
+        discarded and the next highest should win.
+
+        Note that these tests use the idle timer (and don't close the syncs), it
+        is unlikely that a *single* sync would last this long, but is close enough
+        to continually syncing with that current state.
+        """
+        user_id = f"@test:{self.hs.config.server.server_name}"
+
+        # By default, we call /sync against the main process.
+        worker_presence_handler = self.presence_handler
+        if test_with_workers:
+            # Create a worker and use it to handle /sync traffic instead.
+            # This is used to test that presence changes get replicated from workers
+            # to the main process correctly.
+            worker_to_sync_against = self.make_worker_hs(
+                "synapse.app.generic_worker", {"worker_name": "synchrotron"}
+            )
+            worker_presence_handler = worker_to_sync_against.get_presence_handler()
+
+        # 1. Sync with the first device.
+        sync_1 = self.get_success(
+            worker_presence_handler.user_syncing(
+                user_id,
+                "dev-1",
+                affect_presence=dev_1_state != PresenceState.OFFLINE,
+                presence_state=dev_1_state,
+            ),
+            by=0.1,
+        )
+
+        # 2. Sync with the second device.
+        sync_2 = self.get_success(
+            worker_presence_handler.user_syncing(
+                user_id,
+                "dev-2",
+                affect_presence=dev_2_state != PresenceState.OFFLINE,
+                presence_state=dev_2_state,
+            ),
+            by=0.1,
+        )
+
+        # 3. Assert the expected presence state.
+        state = self.get_success(
+            self.presence_handler.get_state(UserID.from_string(user_id))
+        )
+        self.assertEqual(state.state, expected_state_1)
+        if test_with_workers:
+            state = self.get_success(
+                worker_presence_handler.get_state(UserID.from_string(user_id))
+            )
+            self.assertEqual(state.state, expected_state_1)
+
+        # 4. Disconnect the first device.
+        with sync_1:
+            pass
+
+        # 5. Advance such that the first device should be discarded (the sync timeout),
+        # then pump so _handle_timeouts function to called.
+        self.reactor.advance(SYNC_ONLINE_TIMEOUT / 1000)
+        self.reactor.pump([5])
+
+        # 6. Assert the expected presence state.
+        state = self.get_success(
+            self.presence_handler.get_state(UserID.from_string(user_id))
+        )
+        self.assertEqual(state.state, expected_state_2)
+        if test_with_workers:
+            state = self.get_success(
+                worker_presence_handler.get_state(UserID.from_string(user_id))
+            )
+            self.assertEqual(state.state, expected_state_2)
+
+        # 7. Disconnect the second device.
+        with sync_2:
+            pass
+
+        # 8. Advance such that the second device should be discarded (the sync timeout),
+        # then pump so _handle_timeouts function to called.
+        self.reactor.advance(SYNC_ONLINE_TIMEOUT / 1000)
+        self.reactor.pump([5])
+
+        # 9. There are no more devices, should be offline.
+        state = self.get_success(
+            self.presence_handler.get_state(UserID.from_string(user_id))
+        )
+        self.assertEqual(state.state, PresenceState.OFFLINE)
+        if test_with_workers:
+            state = self.get_success(
+                worker_presence_handler.get_state(UserID.from_string(user_id))
+            )
+            self.assertEqual(state.state, PresenceState.OFFLINE)
+
     def test_set_presence_from_syncing_keeps_status(self) -> None:
         """Test that presence set by syncing retains status message"""
         status_msg = "I'm here!"