diff options
Diffstat (limited to 'synapse/handlers/presence.py')
-rw-r--r-- | synapse/handlers/presence.py | 116 |
1 files changed, 85 insertions, 31 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index aed640450f..f6cf343174 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -46,6 +46,7 @@ logger = logging.getLogger(__name__) metrics = synapse.metrics.get_metrics_for(__name__) notified_presence_counter = metrics.register_counter("notified_presence") +federation_presence_out_counter = metrics.register_counter("federation_presence_out") presence_updates_counter = metrics.register_counter("presence_updates") timers_fired_counter = metrics.register_counter("timers_fired") federation_presence_counter = metrics.register_counter("federation_presence") @@ -129,6 +130,10 @@ class PresenceHandler(BaseHandler): for state in active_presence } + metrics.register_callback( + "user_to_current_state_size", lambda: len(self.user_to_current_state) + ) + now = self.clock.time_msec() for state in active_presence: self.wheel_timer.insert( @@ -259,6 +264,8 @@ class PresenceHandler(BaseHandler): if user_id not in to_notify } if to_federation_ping: + federation_presence_out_counter.inc_by(len(to_federation_ping)) + _, _, hosts_to_states = yield self._get_interested_parties( to_federation_ping.values() ) @@ -522,6 +529,7 @@ class PresenceHandler(BaseHandler): new_fields["last_active_ts"] = now - last_active_ago new_fields["status_msg"] = push.get("status_msg", None) + new_fields["currently_active"] = push.get("currently_active", False) prev_state = yield self.current_state_for_user(user_id) updates.append(prev_state.copy_and_replace(**new_fields)) @@ -770,6 +778,25 @@ class PresenceHandler(BaseHandler): defer.returnValue(observer_user.to_string() in accepted_observers) + @defer.inlineCallbacks + def get_all_presence_updates(self, last_id, current_id): + """ + Gets a list of presence update rows from between the given stream ids. + Each row has: + - stream_id(str) + - user_id(str) + - state(str) + - last_active_ts(int) + - last_federation_update_ts(int) + - last_user_sync_ts(int) + - status_msg(int) + - currently_active(int) + """ + # TODO(markjh): replicate the unpersisted changes. + # This could use the in-memory stores for recent changes. + rows = yield self.store.get_all_presence_updates(last_id, current_id) + defer.returnValue(rows) + def should_notify(old_state, new_state): """Decides if a presence state change should be sent to interested parties. @@ -835,39 +862,66 @@ class PresenceEventSource(object): # We don't try and limit the presence updates by the current token, as # sending down the rare duplicate is not a concern. - user_id = user.to_string() - if from_key is not None: - from_key = int(from_key) - room_ids = room_ids or [] - - presence = self.hs.get_handlers().presence_handler + with Measure(self.clock, "presence.get_new_events"): + user_id = user.to_string() + if from_key is not None: + from_key = int(from_key) + room_ids = room_ids or [] - if not room_ids: - rooms = yield self.store.get_rooms_for_user(user_id) - room_ids = set(e.room_id for e in rooms) + presence = self.hs.get_handlers().presence_handler + stream_change_cache = self.store.presence_stream_cache - user_ids_to_check = set() - for room_id in room_ids: - users = yield self.store.get_users_in_room(room_id) - user_ids_to_check.update(users) - - plist = yield self.store.get_presence_list_accepted(user.localpart) - user_ids_to_check.update([row["observed_user_id"] for row in plist]) - - # Always include yourself. Only really matters for when the user is - # not in any rooms, but still. - user_ids_to_check.add(user_id) - - max_token = self.store.get_current_presence_token() - - if from_key: - user_ids_changed = self.store.presence_stream_cache.get_entities_changed( - user_ids_to_check, from_key, - ) - else: - user_ids_changed = user_ids_to_check - - updates = yield presence.current_state_for_users(user_ids_changed) + if not room_ids: + rooms = yield self.store.get_rooms_for_user(user_id) + room_ids = set(e.room_id for e in rooms) + else: + room_ids = set(room_ids) + + max_token = self.store.get_current_presence_token() + + plist = yield self.store.get_presence_list_accepted(user.localpart) + friends = set(row["observed_user_id"] for row in plist) + friends.add(user_id) # So that we receive our own presence + + user_ids_changed = set() + changed = None + if from_key and max_token - from_key < 100: + # For small deltas, its quicker to get all changes and then + # work out if we share a room or they're in our presence list + changed = stream_change_cache.get_all_entities_changed(from_key) + + # get_all_entities_changed can return None + if changed is not None: + for other_user_id in changed: + if other_user_id in friends: + user_ids_changed.add(other_user_id) + continue + other_rooms = yield self.store.get_rooms_for_user(other_user_id) + if room_ids.intersection(e.room_id for e in other_rooms): + user_ids_changed.add(other_user_id) + continue + else: + # Too many possible updates. Find all users we can see and check + # if any of them have changed. + user_ids_to_check = set() + for room_id in room_ids: + users = yield self.store.get_users_in_room(room_id) + user_ids_to_check.update(users) + + user_ids_to_check.update(friends) + + # Always include yourself. Only really matters for when the user is + # not in any rooms, but still. + user_ids_to_check.add(user_id) + + if from_key: + user_ids_changed = stream_change_cache.get_entities_changed( + user_ids_to_check, from_key, + ) + else: + user_ids_changed = user_ids_to_check + + updates = yield presence.current_state_for_users(user_ids_changed) now = self.clock.time_msec() |