summary refs log tree commit diff
path: root/synapse/handlers/presence.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/presence.py')
-rw-r--r--synapse/handlers/presence.py116
1 files changed, 85 insertions, 31 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index aed640450f..f6cf343174 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -46,6 +46,7 @@ logger = logging.getLogger(__name__)
 metrics = synapse.metrics.get_metrics_for(__name__)
 
 notified_presence_counter = metrics.register_counter("notified_presence")
+federation_presence_out_counter = metrics.register_counter("federation_presence_out")
 presence_updates_counter = metrics.register_counter("presence_updates")
 timers_fired_counter = metrics.register_counter("timers_fired")
 federation_presence_counter = metrics.register_counter("federation_presence")
@@ -129,6 +130,10 @@ class PresenceHandler(BaseHandler):
             for state in active_presence
         }
 
+        metrics.register_callback(
+            "user_to_current_state_size", lambda: len(self.user_to_current_state)
+        )
+
         now = self.clock.time_msec()
         for state in active_presence:
             self.wheel_timer.insert(
@@ -259,6 +264,8 @@ class PresenceHandler(BaseHandler):
                 if user_id not in to_notify
             }
             if to_federation_ping:
+                federation_presence_out_counter.inc_by(len(to_federation_ping))
+
                 _, _, hosts_to_states = yield self._get_interested_parties(
                     to_federation_ping.values()
                 )
@@ -522,6 +529,7 @@ class PresenceHandler(BaseHandler):
                 new_fields["last_active_ts"] = now - last_active_ago
 
             new_fields["status_msg"] = push.get("status_msg", None)
+            new_fields["currently_active"] = push.get("currently_active", False)
 
             prev_state = yield self.current_state_for_user(user_id)
             updates.append(prev_state.copy_and_replace(**new_fields))
@@ -770,6 +778,25 @@ class PresenceHandler(BaseHandler):
 
         defer.returnValue(observer_user.to_string() in accepted_observers)
 
+    @defer.inlineCallbacks
+    def get_all_presence_updates(self, last_id, current_id):
+        """
+        Gets a list of presence update rows from between the given stream ids.
+        Each row has:
+        - stream_id(str)
+        - user_id(str)
+        - state(str)
+        - last_active_ts(int)
+        - last_federation_update_ts(int)
+        - last_user_sync_ts(int)
+        - status_msg(int)
+        - currently_active(int)
+        """
+        # TODO(markjh): replicate the unpersisted changes.
+        # This could use the in-memory stores for recent changes.
+        rows = yield self.store.get_all_presence_updates(last_id, current_id)
+        defer.returnValue(rows)
+
 
 def should_notify(old_state, new_state):
     """Decides if a presence state change should be sent to interested parties.
@@ -835,39 +862,66 @@ class PresenceEventSource(object):
         # We don't try and limit the presence updates by the current token, as
         # sending down the rare duplicate is not a concern.
 
-        user_id = user.to_string()
-        if from_key is not None:
-            from_key = int(from_key)
-        room_ids = room_ids or []
-
-        presence = self.hs.get_handlers().presence_handler
+        with Measure(self.clock, "presence.get_new_events"):
+            user_id = user.to_string()
+            if from_key is not None:
+                from_key = int(from_key)
+            room_ids = room_ids or []
 
-        if not room_ids:
-            rooms = yield self.store.get_rooms_for_user(user_id)
-            room_ids = set(e.room_id for e in rooms)
+            presence = self.hs.get_handlers().presence_handler
+            stream_change_cache = self.store.presence_stream_cache
 
-        user_ids_to_check = set()
-        for room_id in room_ids:
-            users = yield self.store.get_users_in_room(room_id)
-            user_ids_to_check.update(users)
-
-        plist = yield self.store.get_presence_list_accepted(user.localpart)
-        user_ids_to_check.update([row["observed_user_id"] for row in plist])
-
-        # Always include yourself. Only really matters for when the user is
-        # not in any rooms, but still.
-        user_ids_to_check.add(user_id)
-
-        max_token = self.store.get_current_presence_token()
-
-        if from_key:
-            user_ids_changed = self.store.presence_stream_cache.get_entities_changed(
-                user_ids_to_check, from_key,
-            )
-        else:
-            user_ids_changed = user_ids_to_check
-
-        updates = yield presence.current_state_for_users(user_ids_changed)
+            if not room_ids:
+                rooms = yield self.store.get_rooms_for_user(user_id)
+                room_ids = set(e.room_id for e in rooms)
+            else:
+                room_ids = set(room_ids)
+
+            max_token = self.store.get_current_presence_token()
+
+            plist = yield self.store.get_presence_list_accepted(user.localpart)
+            friends = set(row["observed_user_id"] for row in plist)
+            friends.add(user_id)  # So that we receive our own presence
+
+            user_ids_changed = set()
+            changed = None
+            if from_key and max_token - from_key < 100:
+                # For small deltas, its quicker to get all changes and then
+                # work out if we share a room or they're in our presence list
+                changed = stream_change_cache.get_all_entities_changed(from_key)
+
+            # get_all_entities_changed can return None
+            if changed is not None:
+                for other_user_id in changed:
+                    if other_user_id in friends:
+                        user_ids_changed.add(other_user_id)
+                        continue
+                    other_rooms = yield self.store.get_rooms_for_user(other_user_id)
+                    if room_ids.intersection(e.room_id for e in other_rooms):
+                        user_ids_changed.add(other_user_id)
+                        continue
+            else:
+                # Too many possible updates. Find all users we can see and check
+                # if any of them have changed.
+                user_ids_to_check = set()
+                for room_id in room_ids:
+                    users = yield self.store.get_users_in_room(room_id)
+                    user_ids_to_check.update(users)
+
+                user_ids_to_check.update(friends)
+
+                # Always include yourself. Only really matters for when the user is
+                # not in any rooms, but still.
+                user_ids_to_check.add(user_id)
+
+                if from_key:
+                    user_ids_changed = stream_change_cache.get_entities_changed(
+                        user_ids_to_check, from_key,
+                    )
+                else:
+                    user_ids_changed = user_ids_to_check
+
+            updates = yield presence.current_state_for_users(user_ids_changed)
 
         now = self.clock.time_msec()