summary refs log tree commit diff
path: root/synapse/handlers/presence.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/handlers/presence.py311
1 files changed, 158 insertions, 153 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 557fb5f83d..5204073a38 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -50,16 +50,20 @@ logger = logging.getLogger(__name__)
 
 notified_presence_counter = Counter("synapse_handler_presence_notified_presence", "")
 federation_presence_out_counter = Counter(
-    "synapse_handler_presence_federation_presence_out", "")
+    "synapse_handler_presence_federation_presence_out", ""
+)
 presence_updates_counter = Counter("synapse_handler_presence_presence_updates", "")
 timers_fired_counter = Counter("synapse_handler_presence_timers_fired", "")
-federation_presence_counter = Counter("synapse_handler_presence_federation_presence", "")
+federation_presence_counter = Counter(
+    "synapse_handler_presence_federation_presence", ""
+)
 bump_active_time_counter = Counter("synapse_handler_presence_bump_active_time", "")
 
 get_updates_counter = Counter("synapse_handler_presence_get_updates", "", ["type"])
 
 notify_reason_counter = Counter(
-    "synapse_handler_presence_notify_reason", "", ["reason"])
+    "synapse_handler_presence_notify_reason", "", ["reason"]
+)
 state_transition_counter = Counter(
     "synapse_handler_presence_state_transition", "", ["from", "to"]
 )
@@ -90,7 +94,6 @@ assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
 
 
 class PresenceHandler(object):
-
     def __init__(self, hs):
         """
 
@@ -110,31 +113,26 @@ class PresenceHandler(object):
 
         federation_registry = hs.get_federation_registry()
 
-        federation_registry.register_edu_handler(
-            "m.presence", self.incoming_presence
-        )
+        federation_registry.register_edu_handler("m.presence", self.incoming_presence)
 
         active_presence = self.store.take_presence_startup_info()
 
         # A dictionary of the current state of users. This is prefilled with
         # non-offline presence from the DB. We should fetch from the DB if
         # we can't find a users presence in here.
-        self.user_to_current_state = {
-            state.user_id: state
-            for state in active_presence
-        }
+        self.user_to_current_state = {state.user_id: state for state in active_presence}
 
         LaterGauge(
-            "synapse_handlers_presence_user_to_current_state_size", "", [],
-            lambda: len(self.user_to_current_state)
+            "synapse_handlers_presence_user_to_current_state_size",
+            "",
+            [],
+            lambda: len(self.user_to_current_state),
         )
 
         now = self.clock.time_msec()
         for state in active_presence:
             self.wheel_timer.insert(
-                now=now,
-                obj=state.user_id,
-                then=state.last_active_ts + IDLE_TIMER,
+                now=now, obj=state.user_id, then=state.last_active_ts + IDLE_TIMER
             )
             self.wheel_timer.insert(
                 now=now,
@@ -193,27 +191,21 @@ class PresenceHandler(object):
                 "handle_presence_timeouts", self._handle_timeouts
             )
 
-        self.clock.call_later(
-            30,
-            self.clock.looping_call,
-            run_timeout_handler,
-            5000,
-        )
+        self.clock.call_later(30, self.clock.looping_call, run_timeout_handler, 5000)
 
         def run_persister():
             return run_as_background_process(
                 "persist_presence_changes", self._persist_unpersisted_changes
             )
 
-        self.clock.call_later(
-            60,
-            self.clock.looping_call,
-            run_persister,
-            60 * 1000,
-        )
+        self.clock.call_later(60, self.clock.looping_call, run_persister, 60 * 1000)
 
-        LaterGauge("synapse_handlers_presence_wheel_timer_size", "", [],
-                   lambda: len(self.wheel_timer))
+        LaterGauge(
+            "synapse_handlers_presence_wheel_timer_size",
+            "",
+            [],
+            lambda: len(self.wheel_timer),
+        )
 
         # Used to handle sending of presence to newly joined users/servers
         if hs.config.use_presence:
@@ -241,15 +233,17 @@ class PresenceHandler(object):
 
         logger.info(
             "Performing _on_shutdown. Persisting %d unpersisted changes",
-            len(self.user_to_current_state)
+            len(self.user_to_current_state),
         )
 
         if self.unpersisted_users_changes:
 
-            yield self.store.update_presence([
-                self.user_to_current_state[user_id]
-                for user_id in self.unpersisted_users_changes
-            ])
+            yield self.store.update_presence(
+                [
+                    self.user_to_current_state[user_id]
+                    for user_id in self.unpersisted_users_changes
+                ]
+            )
         logger.info("Finished _on_shutdown")
 
     @defer.inlineCallbacks
@@ -261,13 +255,10 @@ class PresenceHandler(object):
         self.unpersisted_users_changes = set()
 
         if unpersisted:
-            logger.info(
-                "Persisting %d upersisted presence updates", len(unpersisted)
+            logger.info("Persisting %d upersisted presence updates", len(unpersisted))
+            yield self.store.update_presence(
+                [self.user_to_current_state[user_id] for user_id in unpersisted]
             )
-            yield self.store.update_presence([
-                self.user_to_current_state[user_id]
-                for user_id in unpersisted
-            ])
 
     @defer.inlineCallbacks
     def _update_states(self, new_states):
@@ -303,10 +294,11 @@ class PresenceHandler(object):
                 )
 
                 new_state, should_notify, should_ping = handle_update(
-                    prev_state, new_state,
+                    prev_state,
+                    new_state,
                     is_mine=self.is_mine_id(user_id),
                     wheel_timer=self.wheel_timer,
-                    now=now
+                    now=now,
                 )
 
                 self.user_to_current_state[user_id] = new_state
@@ -328,7 +320,8 @@ class PresenceHandler(object):
             self.unpersisted_users_changes -= set(to_notify.keys())
 
             to_federation_ping = {
-                user_id: state for user_id, state in to_federation_ping.items()
+                user_id: state
+                for user_id, state in to_federation_ping.items()
                 if user_id not in to_notify
             }
             if to_federation_ping:
@@ -351,8 +344,8 @@ class PresenceHandler(object):
         # Check whether the lists of syncing processes from an external
         # process have expired.
         expired_process_ids = [
-            process_id for process_id, last_update
-            in self.external_process_last_updated_ms.items()
+            process_id
+            for process_id, last_update in self.external_process_last_updated_ms.items()
             if now - last_update > EXTERNAL_PROCESS_EXPIRY
         ]
         for process_id in expired_process_ids:
@@ -362,9 +355,7 @@ class PresenceHandler(object):
             self.external_process_last_update.pop(process_id)
 
         states = [
-            self.user_to_current_state.get(
-                user_id, UserPresenceState.default(user_id)
-            )
+            self.user_to_current_state.get(user_id, UserPresenceState.default(user_id))
             for user_id in users_to_check
         ]
 
@@ -394,9 +385,7 @@ class PresenceHandler(object):
 
         prev_state = yield self.current_state_for_user(user_id)
 
-        new_fields = {
-            "last_active_ts": self.clock.time_msec(),
-        }
+        new_fields = {"last_active_ts": self.clock.time_msec()}
         if prev_state.state == PresenceState.UNAVAILABLE:
             new_fields["state"] = PresenceState.ONLINE
 
@@ -430,15 +419,23 @@ class PresenceHandler(object):
             if prev_state.state == PresenceState.OFFLINE:
                 # If they're currently offline then bring them online, otherwise
                 # just update the last sync times.
-                yield self._update_states([prev_state.copy_and_replace(
-                    state=PresenceState.ONLINE,
-                    last_active_ts=self.clock.time_msec(),
-                    last_user_sync_ts=self.clock.time_msec(),
-                )])
+                yield self._update_states(
+                    [
+                        prev_state.copy_and_replace(
+                            state=PresenceState.ONLINE,
+                            last_active_ts=self.clock.time_msec(),
+                            last_user_sync_ts=self.clock.time_msec(),
+                        )
+                    ]
+                )
             else:
-                yield self._update_states([prev_state.copy_and_replace(
-                    last_user_sync_ts=self.clock.time_msec(),
-                )])
+                yield self._update_states(
+                    [
+                        prev_state.copy_and_replace(
+                            last_user_sync_ts=self.clock.time_msec()
+                        )
+                    ]
+                )
 
         @defer.inlineCallbacks
         def _end():
@@ -446,9 +443,13 @@ class PresenceHandler(object):
                 self.user_to_num_current_syncs[user_id] -= 1
 
                 prev_state = yield self.current_state_for_user(user_id)
-                yield self._update_states([prev_state.copy_and_replace(
-                    last_user_sync_ts=self.clock.time_msec(),
-                )])
+                yield self._update_states(
+                    [
+                        prev_state.copy_and_replace(
+                            last_user_sync_ts=self.clock.time_msec()
+                        )
+                    ]
+                )
             except Exception:
                 logger.exception("Error updating presence after sync")
 
@@ -469,7 +470,8 @@ class PresenceHandler(object):
         """
         if self.hs.config.use_presence:
             syncing_user_ids = {
-                user_id for user_id, count in self.user_to_num_current_syncs.items()
+                user_id
+                for user_id, count in self.user_to_num_current_syncs.items()
                 if count
             }
             for user_ids in self.external_process_to_current_syncs.values():
@@ -479,7 +481,9 @@ class PresenceHandler(object):
             return set()
 
     @defer.inlineCallbacks
-    def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec):
+    def update_external_syncs_row(
+        self, process_id, user_id, is_syncing, sync_time_msec
+    ):
         """Update the syncing users for an external process as a delta.
 
         Args:
@@ -500,20 +504,22 @@ class PresenceHandler(object):
             updates = []
             if is_syncing and user_id not in process_presence:
                 if prev_state.state == PresenceState.OFFLINE:
-                    updates.append(prev_state.copy_and_replace(
-                        state=PresenceState.ONLINE,
-                        last_active_ts=sync_time_msec,
-                        last_user_sync_ts=sync_time_msec,
-                    ))
+                    updates.append(
+                        prev_state.copy_and_replace(
+                            state=PresenceState.ONLINE,
+                            last_active_ts=sync_time_msec,
+                            last_user_sync_ts=sync_time_msec,
+                        )
+                    )
                 else:
-                    updates.append(prev_state.copy_and_replace(
-                        last_user_sync_ts=sync_time_msec,
-                    ))
+                    updates.append(
+                        prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec)
+                    )
                 process_presence.add(user_id)
             elif user_id in process_presence:
-                updates.append(prev_state.copy_and_replace(
-                    last_user_sync_ts=sync_time_msec,
-                ))
+                updates.append(
+                    prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec)
+                )
 
             if not is_syncing:
                 process_presence.discard(user_id)
@@ -537,12 +543,12 @@ class PresenceHandler(object):
             prev_states = yield self.current_state_for_users(process_presence)
             time_now_ms = self.clock.time_msec()
 
-            yield self._update_states([
-                prev_state.copy_and_replace(
-                    last_user_sync_ts=time_now_ms,
-                )
-                for prev_state in itervalues(prev_states)
-            ])
+            yield self._update_states(
+                [
+                    prev_state.copy_and_replace(last_user_sync_ts=time_now_ms)
+                    for prev_state in itervalues(prev_states)
+                ]
+            )
             self.external_process_last_updated_ms.pop(process_id, None)
 
     @defer.inlineCallbacks
@@ -574,8 +580,7 @@ class PresenceHandler(object):
             missing = [user_id for user_id, state in iteritems(states) if not state]
             if missing:
                 new = {
-                    user_id: UserPresenceState.default(user_id)
-                    for user_id in missing
+                    user_id: UserPresenceState.default(user_id) for user_id in missing
                 }
                 states.update(new)
                 self.user_to_current_state.update(new)
@@ -593,8 +598,10 @@ class PresenceHandler(object):
         room_ids_to_states, users_to_states = parties
 
         self.notifier.on_new_event(
-            "presence_key", stream_id, rooms=room_ids_to_states.keys(),
-            users=[UserID.from_string(u) for u in users_to_states]
+            "presence_key",
+            stream_id,
+            rooms=room_ids_to_states.keys(),
+            users=[UserID.from_string(u) for u in users_to_states],
         )
 
         self._push_to_remotes(states)
@@ -605,8 +612,10 @@ class PresenceHandler(object):
         room_ids_to_states, users_to_states = parties
 
         self.notifier.on_new_event(
-            "presence_key", stream_id, rooms=room_ids_to_states.keys(),
-            users=[UserID.from_string(u) for u in users_to_states]
+            "presence_key",
+            stream_id,
+            rooms=room_ids_to_states.keys(),
+            users=[UserID.from_string(u) for u in users_to_states],
         )
 
     def _push_to_remotes(self, states):
@@ -631,15 +640,15 @@ class PresenceHandler(object):
             user_id = push.get("user_id", None)
             if not user_id:
                 logger.info(
-                    "Got presence update from %r with no 'user_id': %r",
-                    origin, push,
+                    "Got presence update from %r with no 'user_id': %r", origin, push
                 )
                 continue
 
             if get_domain_from_id(user_id) != origin:
                 logger.info(
                     "Got presence update from %r with bad 'user_id': %r",
-                    origin, user_id,
+                    origin,
+                    user_id,
                 )
                 continue
 
@@ -647,14 +656,12 @@ class PresenceHandler(object):
             if not presence_state:
                 logger.info(
                     "Got presence update from %r with no 'presence_state': %r",
-                    origin, push,
+                    origin,
+                    push,
                 )
                 continue
 
-            new_fields = {
-                "state": presence_state,
-                "last_federation_update_ts": now,
-            }
+            new_fields = {"state": presence_state, "last_federation_update_ts": now}
 
             last_active_ago = push.get("last_active_ago", None)
             if last_active_ago is not None:
@@ -672,10 +679,7 @@ class PresenceHandler(object):
 
     @defer.inlineCallbacks
     def get_state(self, target_user, as_event=False):
-        results = yield self.get_states(
-            [target_user.to_string()],
-            as_event=as_event,
-        )
+        results = yield self.get_states([target_user.to_string()], as_event=as_event)
 
         defer.returnValue(results[0])
 
@@ -699,13 +703,15 @@ class PresenceHandler(object):
 
         now = self.clock.time_msec()
         if as_event:
-            defer.returnValue([
-                {
-                    "type": "m.presence",
-                    "content": format_user_presence_state(state, now),
-                }
-                for state in updates
-            ])
+            defer.returnValue(
+                [
+                    {
+                        "type": "m.presence",
+                        "content": format_user_presence_state(state, now),
+                    }
+                    for state in updates
+                ]
+            )
         else:
             defer.returnValue(updates)
 
@@ -717,7 +723,9 @@ class PresenceHandler(object):
         presence = state["presence"]
 
         valid_presence = (
-            PresenceState.ONLINE, PresenceState.UNAVAILABLE, PresenceState.OFFLINE
+            PresenceState.ONLINE,
+            PresenceState.UNAVAILABLE,
+            PresenceState.OFFLINE,
         )
         if presence not in valid_presence:
             raise SynapseError(400, "Invalid presence state")
@@ -726,9 +734,7 @@ class PresenceHandler(object):
 
         prev_state = yield self.current_state_for_user(user_id)
 
-        new_fields = {
-            "state": presence
-        }
+        new_fields = {"state": presence}
 
         if not ignore_status_msg:
             msg = status_msg if presence != PresenceState.OFFLINE else None
@@ -877,8 +883,7 @@ class PresenceHandler(object):
             hosts = set(host for host in hosts if host != self.server_name)
 
             self.federation.send_presence_to_destinations(
-                states=[state],
-                destinations=hosts,
+                states=[state], destinations=hosts
             )
         else:
             # A remote user has joined the room, so we need to:
@@ -904,7 +909,8 @@ class PresenceHandler(object):
             # default state.
             now = self.clock.time_msec()
             states = [
-                state for state in states.values()
+                state
+                for state in states.values()
                 if state.state != PresenceState.OFFLINE
                 or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000
                 or state.status_msg is not None
@@ -912,8 +918,7 @@ class PresenceHandler(object):
 
             if states:
                 self.federation.send_presence_to_destinations(
-                    states=states,
-                    destinations=[get_domain_from_id(user_id)],
+                    states=states, destinations=[get_domain_from_id(user_id)]
                 )
 
 
@@ -937,7 +942,10 @@ def should_notify(old_state, new_state):
             notify_reason_counter.labels("current_active_change").inc()
             return True
 
-        if new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
+        if (
+            new_state.last_active_ts - old_state.last_active_ts
+            > LAST_ACTIVE_GRANULARITY
+        ):
             # Only notify about last active bumps if we're not currently acive
             if not new_state.currently_active:
                 notify_reason_counter.labels("last_active_change_online").inc()
@@ -958,9 +966,7 @@ def format_user_presence_state(state, now, include_user_id=True):
     The "user_id" is optional so that this function can be used to format presence
     updates for client /sync responses and for federation /send requests.
     """
-    content = {
-        "presence": state.state,
-    }
+    content = {"presence": state.state}
     if include_user_id:
         content["user_id"] = state.user_id
     if state.last_active_ts:
@@ -986,8 +992,15 @@ class PresenceEventSource(object):
 
     @defer.inlineCallbacks
     @log_function
-    def get_new_events(self, user, from_key, room_ids=None, include_offline=True,
-                       explicit_room_id=None, **kwargs):
+    def get_new_events(
+        self,
+        user,
+        from_key,
+        room_ids=None,
+        include_offline=True,
+        explicit_room_id=None,
+        **kwargs
+    ):
         # The process for getting presence events are:
         #  1. Get the rooms the user is in.
         #  2. Get the list of user in the rooms.
@@ -1030,7 +1043,7 @@ class PresenceEventSource(object):
 
                 if from_key:
                     user_ids_changed = stream_change_cache.get_entities_changed(
-                        users_interested_in, from_key,
+                        users_interested_in, from_key
                     )
                 else:
                     user_ids_changed = users_interested_in
@@ -1040,10 +1053,16 @@ class PresenceEventSource(object):
         if include_offline:
             defer.returnValue((list(updates.values()), max_token))
         else:
-            defer.returnValue(([
-                s for s in itervalues(updates)
-                if s.state != PresenceState.OFFLINE
-            ], max_token))
+            defer.returnValue(
+                (
+                    [
+                        s
+                        for s in itervalues(updates)
+                        if s.state != PresenceState.OFFLINE
+                    ],
+                    max_token,
+                )
+            )
 
     def get_current_key(self):
         return self.store.get_current_presence_token()
@@ -1061,13 +1080,13 @@ class PresenceEventSource(object):
         users_interested_in.add(user_id)  # So that we receive our own presence
 
         users_who_share_room = yield self.store.get_users_who_share_room_with_user(
-            user_id, on_invalidate=cache_context.invalidate,
+            user_id, on_invalidate=cache_context.invalidate
         )
         users_interested_in.update(users_who_share_room)
 
         if explicit_room_id:
             user_ids = yield self.store.get_users_in_room(
-                explicit_room_id, on_invalidate=cache_context.invalidate,
+                explicit_room_id, on_invalidate=cache_context.invalidate
             )
             users_interested_in.update(user_ids)
 
@@ -1123,9 +1142,7 @@ def handle_timeout(state, is_mine, syncing_user_ids, now):
             if now - state.last_active_ts > IDLE_TIMER:
                 # Currently online, but last activity ages ago so auto
                 # idle
-                state = state.copy_and_replace(
-                    state=PresenceState.UNAVAILABLE,
-                )
+                state = state.copy_and_replace(state=PresenceState.UNAVAILABLE)
                 changed = True
             elif now - state.last_active_ts > LAST_ACTIVE_GRANULARITY:
                 # So that we send down a notification that we've
@@ -1145,8 +1162,7 @@ def handle_timeout(state, is_mine, syncing_user_ids, now):
             sync_or_active = max(state.last_user_sync_ts, state.last_active_ts)
             if now - sync_or_active > SYNC_ONLINE_TIMEOUT:
                 state = state.copy_and_replace(
-                    state=PresenceState.OFFLINE,
-                    status_msg=None,
+                    state=PresenceState.OFFLINE, status_msg=None
                 )
                 changed = True
     else:
@@ -1155,10 +1171,7 @@ def handle_timeout(state, is_mine, syncing_user_ids, now):
         # no one gets stuck online forever.
         if now - state.last_federation_update_ts > FEDERATION_TIMEOUT:
             # The other side seems to have disappeared.
-            state = state.copy_and_replace(
-                state=PresenceState.OFFLINE,
-                status_msg=None,
-            )
+            state = state.copy_and_replace(state=PresenceState.OFFLINE, status_msg=None)
             changed = True
 
     return state if changed else None
@@ -1193,21 +1206,17 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
         if new_state.state == PresenceState.ONLINE:
             # Idle timer
             wheel_timer.insert(
-                now=now,
-                obj=user_id,
-                then=new_state.last_active_ts + IDLE_TIMER
+                now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER
             )
 
             active = now - new_state.last_active_ts < LAST_ACTIVE_GRANULARITY
-            new_state = new_state.copy_and_replace(
-                currently_active=active,
-            )
+            new_state = new_state.copy_and_replace(currently_active=active)
 
             if active:
                 wheel_timer.insert(
                     now=now,
                     obj=user_id,
-                    then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY
+                    then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY,
                 )
 
         if new_state.state != PresenceState.OFFLINE:
@@ -1215,29 +1224,25 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
             wheel_timer.insert(
                 now=now,
                 obj=user_id,
-                then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT
+                then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
             )
 
             last_federate = new_state.last_federation_update_ts
             if now - last_federate > FEDERATION_PING_INTERVAL:
                 # Been a while since we've poked remote servers
-                new_state = new_state.copy_and_replace(
-                    last_federation_update_ts=now,
-                )
+                new_state = new_state.copy_and_replace(last_federation_update_ts=now)
                 federation_ping = True
 
     else:
         wheel_timer.insert(
             now=now,
             obj=user_id,
-            then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT
+            then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT,
         )
 
     # Check whether the change was something worth notifying about
     if should_notify(prev_state, new_state):
-        new_state = new_state.copy_and_replace(
-            last_federation_update_ts=now,
-        )
+        new_state = new_state.copy_and_replace(last_federation_update_ts=now)
         persist_and_notify = True
 
     return new_state, persist_and_notify, federation_ping