diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index aed640450f..f6cf343174 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -46,6 +46,7 @@ logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
notified_presence_counter = metrics.register_counter("notified_presence")
+federation_presence_out_counter = metrics.register_counter("federation_presence_out")
presence_updates_counter = metrics.register_counter("presence_updates")
timers_fired_counter = metrics.register_counter("timers_fired")
federation_presence_counter = metrics.register_counter("federation_presence")
@@ -129,6 +130,10 @@ class PresenceHandler(BaseHandler):
for state in active_presence
}
+ metrics.register_callback(
+ "user_to_current_state_size", lambda: len(self.user_to_current_state)
+ )
+
now = self.clock.time_msec()
for state in active_presence:
self.wheel_timer.insert(
@@ -259,6 +264,8 @@ class PresenceHandler(BaseHandler):
if user_id not in to_notify
}
if to_federation_ping:
+ federation_presence_out_counter.inc_by(len(to_federation_ping))
+
_, _, hosts_to_states = yield self._get_interested_parties(
to_federation_ping.values()
)
@@ -522,6 +529,7 @@ class PresenceHandler(BaseHandler):
new_fields["last_active_ts"] = now - last_active_ago
new_fields["status_msg"] = push.get("status_msg", None)
+ new_fields["currently_active"] = push.get("currently_active", False)
prev_state = yield self.current_state_for_user(user_id)
updates.append(prev_state.copy_and_replace(**new_fields))
@@ -770,6 +778,25 @@ class PresenceHandler(BaseHandler):
defer.returnValue(observer_user.to_string() in accepted_observers)
+ @defer.inlineCallbacks
+ def get_all_presence_updates(self, last_id, current_id):
+ """
+ Gets a list of presence update rows from between the given stream ids.
+ Each row has:
+ - stream_id(str)
+ - user_id(str)
+ - state(str)
+ - last_active_ts(int)
+ - last_federation_update_ts(int)
+ - last_user_sync_ts(int)
+ - status_msg(int)
+ - currently_active(int)
+ """
+ # TODO(markjh): replicate the unpersisted changes.
+ # This could use the in-memory stores for recent changes.
+ rows = yield self.store.get_all_presence_updates(last_id, current_id)
+ defer.returnValue(rows)
+
def should_notify(old_state, new_state):
"""Decides if a presence state change should be sent to interested parties.
@@ -835,39 +862,66 @@ class PresenceEventSource(object):
# We don't try and limit the presence updates by the current token, as
# sending down the rare duplicate is not a concern.
- user_id = user.to_string()
- if from_key is not None:
- from_key = int(from_key)
- room_ids = room_ids or []
-
- presence = self.hs.get_handlers().presence_handler
+ with Measure(self.clock, "presence.get_new_events"):
+ user_id = user.to_string()
+ if from_key is not None:
+ from_key = int(from_key)
+ room_ids = room_ids or []
- if not room_ids:
- rooms = yield self.store.get_rooms_for_user(user_id)
- room_ids = set(e.room_id for e in rooms)
+ presence = self.hs.get_handlers().presence_handler
+ stream_change_cache = self.store.presence_stream_cache
- user_ids_to_check = set()
- for room_id in room_ids:
- users = yield self.store.get_users_in_room(room_id)
- user_ids_to_check.update(users)
-
- plist = yield self.store.get_presence_list_accepted(user.localpart)
- user_ids_to_check.update([row["observed_user_id"] for row in plist])
-
- # Always include yourself. Only really matters for when the user is
- # not in any rooms, but still.
- user_ids_to_check.add(user_id)
-
- max_token = self.store.get_current_presence_token()
-
- if from_key:
- user_ids_changed = self.store.presence_stream_cache.get_entities_changed(
- user_ids_to_check, from_key,
- )
- else:
- user_ids_changed = user_ids_to_check
-
- updates = yield presence.current_state_for_users(user_ids_changed)
+ if not room_ids:
+ rooms = yield self.store.get_rooms_for_user(user_id)
+ room_ids = set(e.room_id for e in rooms)
+ else:
+ room_ids = set(room_ids)
+
+ max_token = self.store.get_current_presence_token()
+
+ plist = yield self.store.get_presence_list_accepted(user.localpart)
+ friends = set(row["observed_user_id"] for row in plist)
+ friends.add(user_id) # So that we receive our own presence
+
+ user_ids_changed = set()
+ changed = None
+ if from_key and max_token - from_key < 100:
+ # For small deltas, its quicker to get all changes and then
+ # work out if we share a room or they're in our presence list
+ changed = stream_change_cache.get_all_entities_changed(from_key)
+
+ # get_all_entities_changed can return None
+ if changed is not None:
+ for other_user_id in changed:
+ if other_user_id in friends:
+ user_ids_changed.add(other_user_id)
+ continue
+ other_rooms = yield self.store.get_rooms_for_user(other_user_id)
+ if room_ids.intersection(e.room_id for e in other_rooms):
+ user_ids_changed.add(other_user_id)
+ continue
+ else:
+ # Too many possible updates. Find all users we can see and check
+ # if any of them have changed.
+ user_ids_to_check = set()
+ for room_id in room_ids:
+ users = yield self.store.get_users_in_room(room_id)
+ user_ids_to_check.update(users)
+
+ user_ids_to_check.update(friends)
+
+ # Always include yourself. Only really matters for when the user is
+ # not in any rooms, but still.
+ user_ids_to_check.add(user_id)
+
+ if from_key:
+ user_ids_changed = stream_change_cache.get_entities_changed(
+ user_ids_to_check, from_key,
+ )
+ else:
+ user_ids_changed = user_ids_to_check
+
+ updates = yield presence.current_state_for_users(user_ids_changed)
now = self.clock.time_msec()
|