summary refs log tree commit diff
path: root/synapse/handlers/presence.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/presence.py')
-rw-r--r--synapse/handlers/presence.py411
1 files changed, 210 insertions, 201 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 59d53f1050..5204073a38 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -50,16 +50,20 @@ logger = logging.getLogger(__name__)
 
 notified_presence_counter = Counter("synapse_handler_presence_notified_presence", "")
 federation_presence_out_counter = Counter(
-    "synapse_handler_presence_federation_presence_out", "")
+    "synapse_handler_presence_federation_presence_out", ""
+)
 presence_updates_counter = Counter("synapse_handler_presence_presence_updates", "")
 timers_fired_counter = Counter("synapse_handler_presence_timers_fired", "")
-federation_presence_counter = Counter("synapse_handler_presence_federation_presence", "")
+federation_presence_counter = Counter(
+    "synapse_handler_presence_federation_presence", ""
+)
 bump_active_time_counter = Counter("synapse_handler_presence_bump_active_time", "")
 
 get_updates_counter = Counter("synapse_handler_presence_get_updates", "", ["type"])
 
 notify_reason_counter = Counter(
-    "synapse_handler_presence_notify_reason", "", ["reason"])
+    "synapse_handler_presence_notify_reason", "", ["reason"]
+)
 state_transition_counter = Counter(
     "synapse_handler_presence_state_transition", "", ["from", "to"]
 )
@@ -90,7 +94,6 @@ assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
 
 
 class PresenceHandler(object):
-
     def __init__(self, hs):
         """
 
@@ -110,31 +113,26 @@ class PresenceHandler(object):
 
         federation_registry = hs.get_federation_registry()
 
-        federation_registry.register_edu_handler(
-            "m.presence", self.incoming_presence
-        )
+        federation_registry.register_edu_handler("m.presence", self.incoming_presence)
 
         active_presence = self.store.take_presence_startup_info()
 
         # A dictionary of the current state of users. This is prefilled with
         # non-offline presence from the DB. We should fetch from the DB if
         # we can't find a users presence in here.
-        self.user_to_current_state = {
-            state.user_id: state
-            for state in active_presence
-        }
+        self.user_to_current_state = {state.user_id: state for state in active_presence}
 
         LaterGauge(
-            "synapse_handlers_presence_user_to_current_state_size", "", [],
-            lambda: len(self.user_to_current_state)
+            "synapse_handlers_presence_user_to_current_state_size",
+            "",
+            [],
+            lambda: len(self.user_to_current_state),
         )
 
         now = self.clock.time_msec()
         for state in active_presence:
             self.wheel_timer.insert(
-                now=now,
-                obj=state.user_id,
-                then=state.last_active_ts + IDLE_TIMER,
+                now=now, obj=state.user_id, then=state.last_active_ts + IDLE_TIMER
             )
             self.wheel_timer.insert(
                 now=now,
@@ -158,7 +156,13 @@ class PresenceHandler(object):
         # have not yet been persisted
         self.unpersisted_users_changes = set()
 
-        hs.get_reactor().addSystemEventTrigger("before", "shutdown", self._on_shutdown)
+        hs.get_reactor().addSystemEventTrigger(
+            "before",
+            "shutdown",
+            run_as_background_process,
+            "presence.on_shutdown",
+            self._on_shutdown,
+        )
 
         self.serial_to_user = {}
         self._next_serial = 1
@@ -182,22 +186,26 @@ class PresenceHandler(object):
         # Start a LoopingCall in 30s that fires every 5s.
         # The initial delay is to allow disconnected clients a chance to
         # reconnect before we treat them as offline.
-        self.clock.call_later(
-            30,
-            self.clock.looping_call,
-            self._handle_timeouts,
-            5000,
-        )
+        def run_timeout_handler():
+            return run_as_background_process(
+                "handle_presence_timeouts", self._handle_timeouts
+            )
 
-        self.clock.call_later(
-            60,
-            self.clock.looping_call,
-            self._persist_unpersisted_changes,
-            60 * 1000,
-        )
+        self.clock.call_later(30, self.clock.looping_call, run_timeout_handler, 5000)
 
-        LaterGauge("synapse_handlers_presence_wheel_timer_size", "", [],
-                   lambda: len(self.wheel_timer))
+        def run_persister():
+            return run_as_background_process(
+                "persist_presence_changes", self._persist_unpersisted_changes
+            )
+
+        self.clock.call_later(60, self.clock.looping_call, run_persister, 60 * 1000)
+
+        LaterGauge(
+            "synapse_handlers_presence_wheel_timer_size",
+            "",
+            [],
+            lambda: len(self.wheel_timer),
+        )
 
         # Used to handle sending of presence to newly joined users/servers
         if hs.config.use_presence:
@@ -225,14 +233,17 @@ class PresenceHandler(object):
 
         logger.info(
             "Performing _on_shutdown. Persisting %d unpersisted changes",
-            len(self.user_to_current_state)
+            len(self.user_to_current_state),
         )
 
         if self.unpersisted_users_changes:
-            yield self.store.update_presence([
-                self.user_to_current_state[user_id]
-                for user_id in self.unpersisted_users_changes
-            ])
+
+            yield self.store.update_presence(
+                [
+                    self.user_to_current_state[user_id]
+                    for user_id in self.unpersisted_users_changes
+                ]
+            )
         logger.info("Finished _on_shutdown")
 
     @defer.inlineCallbacks
@@ -240,29 +251,14 @@ class PresenceHandler(object):
         """We periodically persist the unpersisted changes, as otherwise they
         may stack up and slow down shutdown times.
         """
-        logger.info(
-            "Performing _persist_unpersisted_changes. Persisting %d unpersisted changes",
-            len(self.unpersisted_users_changes)
-        )
-
         unpersisted = self.unpersisted_users_changes
         self.unpersisted_users_changes = set()
 
         if unpersisted:
-            yield self.store.update_presence([
-                self.user_to_current_state[user_id]
-                for user_id in unpersisted
-            ])
-
-        logger.info("Finished _persist_unpersisted_changes")
-
-    @defer.inlineCallbacks
-    def _update_states_and_catch_exception(self, new_states):
-        try:
-            res = yield self._update_states(new_states)
-            defer.returnValue(res)
-        except Exception:
-            logger.exception("Error updating presence")
+            logger.info("Persisting %d upersisted presence updates", len(unpersisted))
+            yield self.store.update_presence(
+                [self.user_to_current_state[user_id] for user_id in unpersisted]
+            )
 
     @defer.inlineCallbacks
     def _update_states(self, new_states):
@@ -298,10 +294,11 @@ class PresenceHandler(object):
                 )
 
                 new_state, should_notify, should_ping = handle_update(
-                    prev_state, new_state,
+                    prev_state,
+                    new_state,
                     is_mine=self.is_mine_id(user_id),
                     wheel_timer=self.wheel_timer,
-                    now=now
+                    now=now,
                 )
 
                 self.user_to_current_state[user_id] = new_state
@@ -323,7 +320,8 @@ class PresenceHandler(object):
             self.unpersisted_users_changes -= set(to_notify.keys())
 
             to_federation_ping = {
-                user_id: state for user_id, state in to_federation_ping.items()
+                user_id: state
+                for user_id, state in to_federation_ping.items()
                 if user_id not in to_notify
             }
             if to_federation_ping:
@@ -338,45 +336,39 @@ class PresenceHandler(object):
         logger.info("Handling presence timeouts")
         now = self.clock.time_msec()
 
-        try:
-            with Measure(self.clock, "presence_handle_timeouts"):
-                # Fetch the list of users that *may* have timed out. Things may have
-                # changed since the timeout was set, so we won't necessarily have to
-                # take any action.
-                users_to_check = set(self.wheel_timer.fetch(now))
-
-                # Check whether the lists of syncing processes from an external
-                # process have expired.
-                expired_process_ids = [
-                    process_id for process_id, last_update
-                    in self.external_process_last_updated_ms.items()
-                    if now - last_update > EXTERNAL_PROCESS_EXPIRY
-                ]
-                for process_id in expired_process_ids:
-                    users_to_check.update(
-                        self.external_process_last_updated_ms.pop(process_id, ())
-                    )
-                    self.external_process_last_update.pop(process_id)
+        # Fetch the list of users that *may* have timed out. Things may have
+        # changed since the timeout was set, so we won't necessarily have to
+        # take any action.
+        users_to_check = set(self.wheel_timer.fetch(now))
+
+        # Check whether the lists of syncing processes from an external
+        # process have expired.
+        expired_process_ids = [
+            process_id
+            for process_id, last_update in self.external_process_last_updated_ms.items()
+            if now - last_update > EXTERNAL_PROCESS_EXPIRY
+        ]
+        for process_id in expired_process_ids:
+            users_to_check.update(
+                self.external_process_last_updated_ms.pop(process_id, ())
+            )
+            self.external_process_last_update.pop(process_id)
 
-                states = [
-                    self.user_to_current_state.get(
-                        user_id, UserPresenceState.default(user_id)
-                    )
-                    for user_id in users_to_check
-                ]
+        states = [
+            self.user_to_current_state.get(user_id, UserPresenceState.default(user_id))
+            for user_id in users_to_check
+        ]
 
-                timers_fired_counter.inc(len(states))
+        timers_fired_counter.inc(len(states))
 
-                changes = handle_timeouts(
-                    states,
-                    is_mine_fn=self.is_mine_id,
-                    syncing_user_ids=self.get_currently_syncing_users(),
-                    now=now,
-                )
+        changes = handle_timeouts(
+            states,
+            is_mine_fn=self.is_mine_id,
+            syncing_user_ids=self.get_currently_syncing_users(),
+            now=now,
+        )
 
-            run_in_background(self._update_states_and_catch_exception, changes)
-        except Exception:
-            logger.exception("Exception in _handle_timeouts loop")
+        return self._update_states(changes)
 
     @defer.inlineCallbacks
     def bump_presence_active_time(self, user):
@@ -393,9 +385,7 @@ class PresenceHandler(object):
 
         prev_state = yield self.current_state_for_user(user_id)
 
-        new_fields = {
-            "last_active_ts": self.clock.time_msec(),
-        }
+        new_fields = {"last_active_ts": self.clock.time_msec()}
         if prev_state.state == PresenceState.UNAVAILABLE:
             new_fields["state"] = PresenceState.ONLINE
 
@@ -429,15 +419,23 @@ class PresenceHandler(object):
             if prev_state.state == PresenceState.OFFLINE:
                 # If they're currently offline then bring them online, otherwise
                 # just update the last sync times.
-                yield self._update_states([prev_state.copy_and_replace(
-                    state=PresenceState.ONLINE,
-                    last_active_ts=self.clock.time_msec(),
-                    last_user_sync_ts=self.clock.time_msec(),
-                )])
+                yield self._update_states(
+                    [
+                        prev_state.copy_and_replace(
+                            state=PresenceState.ONLINE,
+                            last_active_ts=self.clock.time_msec(),
+                            last_user_sync_ts=self.clock.time_msec(),
+                        )
+                    ]
+                )
             else:
-                yield self._update_states([prev_state.copy_and_replace(
-                    last_user_sync_ts=self.clock.time_msec(),
-                )])
+                yield self._update_states(
+                    [
+                        prev_state.copy_and_replace(
+                            last_user_sync_ts=self.clock.time_msec()
+                        )
+                    ]
+                )
 
         @defer.inlineCallbacks
         def _end():
@@ -445,9 +443,13 @@ class PresenceHandler(object):
                 self.user_to_num_current_syncs[user_id] -= 1
 
                 prev_state = yield self.current_state_for_user(user_id)
-                yield self._update_states([prev_state.copy_and_replace(
-                    last_user_sync_ts=self.clock.time_msec(),
-                )])
+                yield self._update_states(
+                    [
+                        prev_state.copy_and_replace(
+                            last_user_sync_ts=self.clock.time_msec()
+                        )
+                    ]
+                )
             except Exception:
                 logger.exception("Error updating presence after sync")
 
@@ -468,7 +470,8 @@ class PresenceHandler(object):
         """
         if self.hs.config.use_presence:
             syncing_user_ids = {
-                user_id for user_id, count in self.user_to_num_current_syncs.items()
+                user_id
+                for user_id, count in self.user_to_num_current_syncs.items()
                 if count
             }
             for user_ids in self.external_process_to_current_syncs.values():
@@ -478,7 +481,9 @@ class PresenceHandler(object):
             return set()
 
     @defer.inlineCallbacks
-    def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec):
+    def update_external_syncs_row(
+        self, process_id, user_id, is_syncing, sync_time_msec
+    ):
         """Update the syncing users for an external process as a delta.
 
         Args:
@@ -499,20 +504,22 @@ class PresenceHandler(object):
             updates = []
             if is_syncing and user_id not in process_presence:
                 if prev_state.state == PresenceState.OFFLINE:
-                    updates.append(prev_state.copy_and_replace(
-                        state=PresenceState.ONLINE,
-                        last_active_ts=sync_time_msec,
-                        last_user_sync_ts=sync_time_msec,
-                    ))
+                    updates.append(
+                        prev_state.copy_and_replace(
+                            state=PresenceState.ONLINE,
+                            last_active_ts=sync_time_msec,
+                            last_user_sync_ts=sync_time_msec,
+                        )
+                    )
                 else:
-                    updates.append(prev_state.copy_and_replace(
-                        last_user_sync_ts=sync_time_msec,
-                    ))
+                    updates.append(
+                        prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec)
+                    )
                 process_presence.add(user_id)
             elif user_id in process_presence:
-                updates.append(prev_state.copy_and_replace(
-                    last_user_sync_ts=sync_time_msec,
-                ))
+                updates.append(
+                    prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec)
+                )
 
             if not is_syncing:
                 process_presence.discard(user_id)
@@ -536,12 +543,12 @@ class PresenceHandler(object):
             prev_states = yield self.current_state_for_users(process_presence)
             time_now_ms = self.clock.time_msec()
 
-            yield self._update_states([
-                prev_state.copy_and_replace(
-                    last_user_sync_ts=time_now_ms,
-                )
-                for prev_state in itervalues(prev_states)
-            ])
+            yield self._update_states(
+                [
+                    prev_state.copy_and_replace(last_user_sync_ts=time_now_ms)
+                    for prev_state in itervalues(prev_states)
+                ]
+            )
             self.external_process_last_updated_ms.pop(process_id, None)
 
     @defer.inlineCallbacks
@@ -573,8 +580,7 @@ class PresenceHandler(object):
             missing = [user_id for user_id, state in iteritems(states) if not state]
             if missing:
                 new = {
-                    user_id: UserPresenceState.default(user_id)
-                    for user_id in missing
+                    user_id: UserPresenceState.default(user_id) for user_id in missing
                 }
                 states.update(new)
                 self.user_to_current_state.update(new)
@@ -592,8 +598,10 @@ class PresenceHandler(object):
         room_ids_to_states, users_to_states = parties
 
         self.notifier.on_new_event(
-            "presence_key", stream_id, rooms=room_ids_to_states.keys(),
-            users=[UserID.from_string(u) for u in users_to_states]
+            "presence_key",
+            stream_id,
+            rooms=room_ids_to_states.keys(),
+            users=[UserID.from_string(u) for u in users_to_states],
         )
 
         self._push_to_remotes(states)
@@ -604,8 +612,10 @@ class PresenceHandler(object):
         room_ids_to_states, users_to_states = parties
 
         self.notifier.on_new_event(
-            "presence_key", stream_id, rooms=room_ids_to_states.keys(),
-            users=[UserID.from_string(u) for u in users_to_states]
+            "presence_key",
+            stream_id,
+            rooms=room_ids_to_states.keys(),
+            users=[UserID.from_string(u) for u in users_to_states],
         )
 
     def _push_to_remotes(self, states):
@@ -630,15 +640,15 @@ class PresenceHandler(object):
             user_id = push.get("user_id", None)
             if not user_id:
                 logger.info(
-                    "Got presence update from %r with no 'user_id': %r",
-                    origin, push,
+                    "Got presence update from %r with no 'user_id': %r", origin, push
                 )
                 continue
 
             if get_domain_from_id(user_id) != origin:
                 logger.info(
                     "Got presence update from %r with bad 'user_id': %r",
-                    origin, user_id,
+                    origin,
+                    user_id,
                 )
                 continue
 
@@ -646,14 +656,12 @@ class PresenceHandler(object):
             if not presence_state:
                 logger.info(
                     "Got presence update from %r with no 'presence_state': %r",
-                    origin, push,
+                    origin,
+                    push,
                 )
                 continue
 
-            new_fields = {
-                "state": presence_state,
-                "last_federation_update_ts": now,
-            }
+            new_fields = {"state": presence_state, "last_federation_update_ts": now}
 
             last_active_ago = push.get("last_active_ago", None)
             if last_active_ago is not None:
@@ -671,10 +679,7 @@ class PresenceHandler(object):
 
     @defer.inlineCallbacks
     def get_state(self, target_user, as_event=False):
-        results = yield self.get_states(
-            [target_user.to_string()],
-            as_event=as_event,
-        )
+        results = yield self.get_states([target_user.to_string()], as_event=as_event)
 
         defer.returnValue(results[0])
 
@@ -698,13 +703,15 @@ class PresenceHandler(object):
 
         now = self.clock.time_msec()
         if as_event:
-            defer.returnValue([
-                {
-                    "type": "m.presence",
-                    "content": format_user_presence_state(state, now),
-                }
-                for state in updates
-            ])
+            defer.returnValue(
+                [
+                    {
+                        "type": "m.presence",
+                        "content": format_user_presence_state(state, now),
+                    }
+                    for state in updates
+                ]
+            )
         else:
             defer.returnValue(updates)
 
@@ -716,7 +723,9 @@ class PresenceHandler(object):
         presence = state["presence"]
 
         valid_presence = (
-            PresenceState.ONLINE, PresenceState.UNAVAILABLE, PresenceState.OFFLINE
+            PresenceState.ONLINE,
+            PresenceState.UNAVAILABLE,
+            PresenceState.OFFLINE,
         )
         if presence not in valid_presence:
             raise SynapseError(400, "Invalid presence state")
@@ -725,9 +734,7 @@ class PresenceHandler(object):
 
         prev_state = yield self.current_state_for_user(user_id)
 
-        new_fields = {
-            "state": presence
-        }
+        new_fields = {"state": presence}
 
         if not ignore_status_msg:
             msg = status_msg if presence != PresenceState.OFFLINE else None
@@ -833,14 +840,17 @@ class PresenceHandler(object):
                 # joins.
                 continue
 
-            event = yield self.store.get_event(event_id)
-            if event.content.get("membership") != Membership.JOIN:
+            event = yield self.store.get_event(event_id, allow_none=True)
+            if not event or event.content.get("membership") != Membership.JOIN:
                 # We only care about joins
                 continue
 
             if prev_event_id:
-                prev_event = yield self.store.get_event(prev_event_id)
-                if prev_event.content.get("membership") == Membership.JOIN:
+                prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
+                if (
+                    prev_event
+                    and prev_event.content.get("membership") == Membership.JOIN
+                ):
                     # Ignore changes to join events.
                     continue
 
@@ -873,8 +883,7 @@ class PresenceHandler(object):
             hosts = set(host for host in hosts if host != self.server_name)
 
             self.federation.send_presence_to_destinations(
-                states=[state],
-                destinations=hosts,
+                states=[state], destinations=hosts
             )
         else:
             # A remote user has joined the room, so we need to:
@@ -900,7 +909,8 @@ class PresenceHandler(object):
             # default state.
             now = self.clock.time_msec()
             states = [
-                state for state in states.values()
+                state
+                for state in states.values()
                 if state.state != PresenceState.OFFLINE
                 or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000
                 or state.status_msg is not None
@@ -908,8 +918,7 @@ class PresenceHandler(object):
 
             if states:
                 self.federation.send_presence_to_destinations(
-                    states=states,
-                    destinations=[get_domain_from_id(user_id)],
+                    states=states, destinations=[get_domain_from_id(user_id)]
                 )
 
 
@@ -933,7 +942,10 @@ def should_notify(old_state, new_state):
             notify_reason_counter.labels("current_active_change").inc()
             return True
 
-        if new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
+        if (
+            new_state.last_active_ts - old_state.last_active_ts
+            > LAST_ACTIVE_GRANULARITY
+        ):
             # Only notify about last active bumps if we're not currently acive
             if not new_state.currently_active:
                 notify_reason_counter.labels("last_active_change_online").inc()
@@ -954,9 +966,7 @@ def format_user_presence_state(state, now, include_user_id=True):
     The "user_id" is optional so that this function can be used to format presence
     updates for client /sync responses and for federation /send requests.
     """
-    content = {
-        "presence": state.state,
-    }
+    content = {"presence": state.state}
     if include_user_id:
         content["user_id"] = state.user_id
     if state.last_active_ts:
@@ -982,8 +992,15 @@ class PresenceEventSource(object):
 
     @defer.inlineCallbacks
     @log_function
-    def get_new_events(self, user, from_key, room_ids=None, include_offline=True,
-                       explicit_room_id=None, **kwargs):
+    def get_new_events(
+        self,
+        user,
+        from_key,
+        room_ids=None,
+        include_offline=True,
+        explicit_room_id=None,
+        **kwargs
+    ):
         # The process for getting presence events are:
         #  1. Get the rooms the user is in.
         #  2. Get the list of user in the rooms.
@@ -1026,7 +1043,7 @@ class PresenceEventSource(object):
 
                 if from_key:
                     user_ids_changed = stream_change_cache.get_entities_changed(
-                        users_interested_in, from_key,
+                        users_interested_in, from_key
                     )
                 else:
                     user_ids_changed = users_interested_in
@@ -1036,10 +1053,16 @@ class PresenceEventSource(object):
         if include_offline:
             defer.returnValue((list(updates.values()), max_token))
         else:
-            defer.returnValue(([
-                s for s in itervalues(updates)
-                if s.state != PresenceState.OFFLINE
-            ], max_token))
+            defer.returnValue(
+                (
+                    [
+                        s
+                        for s in itervalues(updates)
+                        if s.state != PresenceState.OFFLINE
+                    ],
+                    max_token,
+                )
+            )
 
     def get_current_key(self):
         return self.store.get_current_presence_token()
@@ -1057,13 +1080,13 @@ class PresenceEventSource(object):
         users_interested_in.add(user_id)  # So that we receive our own presence
 
         users_who_share_room = yield self.store.get_users_who_share_room_with_user(
-            user_id, on_invalidate=cache_context.invalidate,
+            user_id, on_invalidate=cache_context.invalidate
         )
         users_interested_in.update(users_who_share_room)
 
         if explicit_room_id:
             user_ids = yield self.store.get_users_in_room(
-                explicit_room_id, on_invalidate=cache_context.invalidate,
+                explicit_room_id, on_invalidate=cache_context.invalidate
             )
             users_interested_in.update(user_ids)
 
@@ -1119,9 +1142,7 @@ def handle_timeout(state, is_mine, syncing_user_ids, now):
             if now - state.last_active_ts > IDLE_TIMER:
                 # Currently online, but last activity ages ago so auto
                 # idle
-                state = state.copy_and_replace(
-                    state=PresenceState.UNAVAILABLE,
-                )
+                state = state.copy_and_replace(state=PresenceState.UNAVAILABLE)
                 changed = True
             elif now - state.last_active_ts > LAST_ACTIVE_GRANULARITY:
                 # So that we send down a notification that we've
@@ -1141,8 +1162,7 @@ def handle_timeout(state, is_mine, syncing_user_ids, now):
             sync_or_active = max(state.last_user_sync_ts, state.last_active_ts)
             if now - sync_or_active > SYNC_ONLINE_TIMEOUT:
                 state = state.copy_and_replace(
-                    state=PresenceState.OFFLINE,
-                    status_msg=None,
+                    state=PresenceState.OFFLINE, status_msg=None
                 )
                 changed = True
     else:
@@ -1151,10 +1171,7 @@ def handle_timeout(state, is_mine, syncing_user_ids, now):
         # no one gets stuck online forever.
         if now - state.last_federation_update_ts > FEDERATION_TIMEOUT:
             # The other side seems to have disappeared.
-            state = state.copy_and_replace(
-                state=PresenceState.OFFLINE,
-                status_msg=None,
-            )
+            state = state.copy_and_replace(state=PresenceState.OFFLINE, status_msg=None)
             changed = True
 
     return state if changed else None
@@ -1189,21 +1206,17 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
         if new_state.state == PresenceState.ONLINE:
             # Idle timer
             wheel_timer.insert(
-                now=now,
-                obj=user_id,
-                then=new_state.last_active_ts + IDLE_TIMER
+                now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER
             )
 
             active = now - new_state.last_active_ts < LAST_ACTIVE_GRANULARITY
-            new_state = new_state.copy_and_replace(
-                currently_active=active,
-            )
+            new_state = new_state.copy_and_replace(currently_active=active)
 
             if active:
                 wheel_timer.insert(
                     now=now,
                     obj=user_id,
-                    then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY
+                    then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY,
                 )
 
         if new_state.state != PresenceState.OFFLINE:
@@ -1211,29 +1224,25 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
             wheel_timer.insert(
                 now=now,
                 obj=user_id,
-                then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT
+                then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
             )
 
             last_federate = new_state.last_federation_update_ts
             if now - last_federate > FEDERATION_PING_INTERVAL:
                 # Been a while since we've poked remote servers
-                new_state = new_state.copy_and_replace(
-                    last_federation_update_ts=now,
-                )
+                new_state = new_state.copy_and_replace(last_federation_update_ts=now)
                 federation_ping = True
 
     else:
         wheel_timer.insert(
             now=now,
             obj=user_id,
-            then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT
+            then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT,
         )
 
     # Check whether the change was something worth notifying about
     if should_notify(prev_state, new_state):
-        new_state = new_state.copy_and_replace(
-            last_federation_update_ts=now,
-        )
+        new_state = new_state.copy_and_replace(last_federation_update_ts=now)
         persist_and_notify = True
 
     return new_state, persist_and_notify, federation_ping