diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index d0c8f1328b..1b89dc6274 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -33,11 +33,9 @@ from synapse.util.logcontext import preserve_fn
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
-from synapse.types import UserID
+from synapse.types import UserID, get_domain_from_id
import synapse.metrics
-from ._base import BaseHandler
-
import logging
@@ -52,6 +50,13 @@ timers_fired_counter = metrics.register_counter("timers_fired")
federation_presence_counter = metrics.register_counter("federation_presence")
bump_active_time_counter = metrics.register_counter("bump_active_time")
+get_updates_counter = metrics.register_counter("get_updates", labels=["type"])
+
+notify_reason_counter = metrics.register_counter("notify_reason", labels=["reason"])
+state_transition_counter = metrics.register_counter(
+ "state_transition", labels=["from", "to"]
+)
+
# If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
# "currently_active"
@@ -70,38 +75,45 @@ FEDERATION_TIMEOUT = 30 * 60 * 1000
# How often to resend presence to remote servers
FEDERATION_PING_INTERVAL = 25 * 60 * 1000
+# How long we will wait before assuming that the syncs from an external process
+# are dead.
+EXTERNAL_PROCESS_EXPIRY = 5 * 60 * 1000
+
assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
-class PresenceHandler(BaseHandler):
+class PresenceHandler(object):
def __init__(self, hs):
- super(PresenceHandler, self).__init__(hs)
- self.hs = hs
+ self.is_mine = hs.is_mine
+ self.is_mine_id = hs.is_mine_id
self.clock = hs.get_clock()
self.store = hs.get_datastore()
self.wheel_timer = WheelTimer()
self.notifier = hs.get_notifier()
- self.federation = hs.get_replication_layer()
+ self.replication = hs.get_replication_layer()
+ self.federation = hs.get_federation_sender()
+
+ self.state = hs.get_state_handler()
- self.federation.register_edu_handler(
+ self.replication.register_edu_handler(
"m.presence", self.incoming_presence
)
- self.federation.register_edu_handler(
+ self.replication.register_edu_handler(
"m.presence_invite",
lambda origin, content: self.invite_presence(
observed_user=UserID.from_string(content["observed_user"]),
observer_user=UserID.from_string(content["observer_user"]),
)
)
- self.federation.register_edu_handler(
+ self.replication.register_edu_handler(
"m.presence_accept",
lambda origin, content: self.accept_presence(
observed_user=UserID.from_string(content["observed_user"]),
observer_user=UserID.from_string(content["observer_user"]),
)
)
- self.federation.register_edu_handler(
+ self.replication.register_edu_handler(
"m.presence_deny",
lambda origin, content: self.deny_presence(
observed_user=UserID.from_string(content["observed_user"]),
@@ -138,7 +150,7 @@ class PresenceHandler(BaseHandler):
obj=state.user_id,
then=state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
)
- if self.hs.is_mine_id(state.user_id):
+ if self.is_mine_id(state.user_id):
self.wheel_timer.insert(
now=now,
obj=state.user_id,
@@ -160,20 +172,38 @@ class PresenceHandler(BaseHandler):
self.serial_to_user = {}
self._next_serial = 1
- # Keeps track of the number of *ongoing* syncs. While this is non zero
- # a user will never go offline.
+ # Keeps track of the number of *ongoing* syncs on this process. While
+ # this is non zero a user will never go offline.
self.user_to_num_current_syncs = {}
+ # Keeps track of the number of *ongoing* syncs on other processes.
+ # While any sync is ongoing on another process the user will never
+ # go offline.
+ # Each process has a unique identifier and an update frequency. If
+ # no update is received from that process within the update period then
+ # we assume that all the sync requests on that process have stopped.
+ # Stored as a dict from process_id to set of user_id, and a dict of
+ # process_id to millisecond timestamp last updated.
+ self.external_process_to_current_syncs = {}
+ self.external_process_last_updated_ms = {}
+
# Start a LoopingCall in 30s that fires every 5s.
# The initial delay is to allow disconnected clients a chance to
# reconnect before we treat them as offline.
self.clock.call_later(
- 0 * 1000,
+ 30,
self.clock.looping_call,
self._handle_timeouts,
5000,
)
+ self.clock.call_later(
+ 60,
+ self.clock.looping_call,
+ self._persist_unpersisted_changes,
+ 60 * 1000,
+ )
+
metrics.register_callback("wheel_timer_size", lambda: len(self.wheel_timer))
@defer.inlineCallbacks
@@ -188,7 +218,7 @@ class PresenceHandler(BaseHandler):
is some spurious presence changes that will self-correct.
"""
logger.info(
- "Performing _on_shutdown. Persiting %d unpersisted changes",
+ "Performing _on_shutdown. Persisting %d unpersisted changes",
len(self.user_to_current_state)
)
@@ -200,6 +230,27 @@ class PresenceHandler(BaseHandler):
logger.info("Finished _on_shutdown")
@defer.inlineCallbacks
+ def _persist_unpersisted_changes(self):
+ """We periodically persist the unpersisted changes, as otherwise they
+ may stack up and slow down shutdown times.
+ """
+ logger.info(
+ "Performing _persist_unpersisted_changes. Persisting %d unpersisted changes",
+ len(self.unpersisted_users_changes)
+ )
+
+ unpersisted = self.unpersisted_users_changes
+ self.unpersisted_users_changes = set()
+
+ if unpersisted:
+ yield self.store.update_presence([
+ self.user_to_current_state[user_id]
+ for user_id in unpersisted
+ ])
+
+ logger.info("Finished _persist_unpersisted_changes")
+
+ @defer.inlineCallbacks
def _update_states(self, new_states):
"""Updates presence of users. Sets the appropriate timeouts. Pokes
the notifier and federation if and only if the changed presence state
@@ -215,6 +266,12 @@ class PresenceHandler(BaseHandler):
to_notify = {} # Changes we want to notify everyone about
to_federation_ping = {} # These need sending keep-alives
+ # Only bother handling the last presence change for each user
+ new_states_dict = {}
+ for new_state in new_states:
+ new_states_dict[new_state.user_id] = new_state
+ new_state = new_states_dict.values()
+
for new_state in new_states:
user_id = new_state.user_id
@@ -228,7 +285,7 @@ class PresenceHandler(BaseHandler):
new_state, should_notify, should_ping = handle_update(
prev_state, new_state,
- is_mine=self.hs.is_mine_id(user_id),
+ is_mine=self.is_mine_id(user_id),
wheel_timer=self.wheel_timer,
now=now
)
@@ -268,31 +325,48 @@ class PresenceHandler(BaseHandler):
"""Checks the presence of users that have timed out and updates as
appropriate.
"""
+ logger.info("Handling presence timeouts")
now = self.clock.time_msec()
- with Measure(self.clock, "presence_handle_timeouts"):
- # Fetch the list of users that *may* have timed out. Things may have
- # changed since the timeout was set, so we won't necessarily have to
- # take any action.
- users_to_check = self.wheel_timer.fetch(now)
+ try:
+ with Measure(self.clock, "presence_handle_timeouts"):
+ # Fetch the list of users that *may* have timed out. Things may have
+ # changed since the timeout was set, so we won't necessarily have to
+ # take any action.
+ users_to_check = set(self.wheel_timer.fetch(now))
+
+ # Check whether the lists of syncing processes from an external
+ # process have expired.
+ expired_process_ids = [
+ process_id for process_id, last_update
+ in self.external_process_last_updated_ms.items()
+ if now - last_update > EXTERNAL_PROCESS_EXPIRY
+ ]
+ for process_id in expired_process_ids:
+ users_to_check.update(
+ self.external_process_last_updated_ms.pop(process_id, ())
+ )
+ self.external_process_last_update.pop(process_id)
- states = [
- self.user_to_current_state.get(
- user_id, UserPresenceState.default(user_id)
- )
- for user_id in set(users_to_check)
- ]
+ states = [
+ self.user_to_current_state.get(
+ user_id, UserPresenceState.default(user_id)
+ )
+ for user_id in users_to_check
+ ]
- timers_fired_counter.inc_by(len(states))
+ timers_fired_counter.inc_by(len(states))
- changes = handle_timeouts(
- states,
- is_mine_fn=self.hs.is_mine_id,
- user_to_num_current_syncs=self.user_to_num_current_syncs,
- now=now,
- )
+ changes = handle_timeouts(
+ states,
+ is_mine_fn=self.is_mine_id,
+ syncing_user_ids=self.get_currently_syncing_users(),
+ now=now,
+ )
- preserve_fn(self._update_states)(changes)
+ preserve_fn(self._update_states)(changes)
+ except:
+ logger.exception("Exception in _handle_timeouts loop")
@defer.inlineCallbacks
def bump_presence_active_time(self, user):
@@ -365,6 +439,74 @@ class PresenceHandler(BaseHandler):
defer.returnValue(_user_syncing())
+ def get_currently_syncing_users(self):
+ """Get the set of user ids that are currently syncing on this HS.
+ Returns:
+ set(str): A set of user_id strings.
+ """
+ syncing_user_ids = {
+ user_id for user_id, count in self.user_to_num_current_syncs.items()
+ if count
+ }
+ for user_ids in self.external_process_to_current_syncs.values():
+ syncing_user_ids.update(user_ids)
+ return syncing_user_ids
+
+ @defer.inlineCallbacks
+ def update_external_syncs(self, process_id, syncing_user_ids):
+ """Update the syncing users for an external process
+
+ Args:
+ process_id(str): An identifier for the process the users are
+ syncing against. This allows synapse to process updates
+ as user start and stop syncing against a given process.
+ syncing_user_ids(set(str)): The set of user_ids that are
+ currently syncing on that server.
+ """
+
+ # Grab the previous list of user_ids that were syncing on that process
+ prev_syncing_user_ids = (
+ self.external_process_to_current_syncs.get(process_id, set())
+ )
+ # Grab the current presence state for both the users that are syncing
+ # now and the users that were syncing before this update.
+ prev_states = yield self.current_state_for_users(
+ syncing_user_ids | prev_syncing_user_ids
+ )
+ updates = []
+ time_now_ms = self.clock.time_msec()
+
+ # For each new user that is syncing check if we need to mark them as
+ # being online.
+ for new_user_id in syncing_user_ids - prev_syncing_user_ids:
+ prev_state = prev_states[new_user_id]
+ if prev_state.state == PresenceState.OFFLINE:
+ updates.append(prev_state.copy_and_replace(
+ state=PresenceState.ONLINE,
+ last_active_ts=time_now_ms,
+ last_user_sync_ts=time_now_ms,
+ ))
+ else:
+ updates.append(prev_state.copy_and_replace(
+ last_user_sync_ts=time_now_ms,
+ ))
+
+ # For each user that is still syncing or stopped syncing update the
+ # last sync time so that we will correctly apply the grace period when
+ # they stop syncing.
+ for old_user_id in prev_syncing_user_ids:
+ prev_state = prev_states[old_user_id]
+ updates.append(prev_state.copy_and_replace(
+ last_user_sync_ts=time_now_ms,
+ ))
+
+ yield self._update_states(updates)
+
+ # Update the last updated time for the process. We expire the entries
+ # if we don't receive an update in the given timeframe.
+ self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
+ self.external_process_to_current_syncs[process_id] = syncing_user_ids
+
@defer.inlineCallbacks
def current_state_for_user(self, user_id):
"""Get the current presence state for a user.
@@ -403,7 +545,7 @@ class PresenceHandler(BaseHandler):
defer.returnValue(states)
@defer.inlineCallbacks
- def _get_interested_parties(self, states):
+ def _get_interested_parties(self, states, calculate_remote_hosts=True):
"""Given a list of states return which entities (rooms, users, servers)
are interested in the given states.
@@ -426,21 +568,24 @@ class PresenceHandler(BaseHandler):
users_to_states.setdefault(state.user_id, []).append(state)
hosts_to_states = {}
- for room_id, states in room_ids_to_states.items():
- local_states = filter(lambda s: self.hs.is_mine_id(s.user_id), states)
- if not local_states:
- continue
+ if calculate_remote_hosts:
+ for room_id, states in room_ids_to_states.items():
+ local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
+ if not local_states:
+ continue
+
+ users = yield self.state.get_current_user_in_room(room_id)
+ hosts = set(get_domain_from_id(u) for u in users)
- hosts = yield self.store.get_joined_hosts_for_room(room_id)
- for host in hosts:
- hosts_to_states.setdefault(host, []).extend(local_states)
+ for host in hosts:
+ hosts_to_states.setdefault(host, []).extend(local_states)
for user_id, states in users_to_states.items():
- local_states = filter(lambda s: self.hs.is_mine_id(s.user_id), states)
+ local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
if not local_states:
continue
- host = UserID.from_string(user_id).domain
+ host = get_domain_from_id(user_id)
hosts_to_states.setdefault(host, []).extend(local_states)
# TODO: de-dup hosts_to_states, as a single host might have multiple
@@ -465,24 +610,24 @@ class PresenceHandler(BaseHandler):
self._push_to_remotes(hosts_to_states)
+ @defer.inlineCallbacks
+ def notify_for_states(self, state, stream_id):
+ parties = yield self._get_interested_parties([state])
+ room_ids_to_states, users_to_states, hosts_to_states = parties
+
+ self.notifier.on_new_event(
+ "presence_key", stream_id, rooms=room_ids_to_states.keys(),
+ users=[UserID.from_string(u) for u in users_to_states.keys()]
+ )
+
def _push_to_remotes(self, hosts_to_states):
"""Sends state updates to remote servers.
Args:
hosts_to_states (dict): Mapping `server_name` -> `[UserPresenceState]`
"""
- now = self.clock.time_msec()
for host, states in hosts_to_states.items():
- self.federation.send_edu(
- destination=host,
- edu_type="m.presence",
- content={
- "push": [
- _format_user_presence_state(state, now)
- for state in states
- ]
- }
- )
+ self.federation.send_presence(host, states)
@defer.inlineCallbacks
def incoming_presence(self, origin, content):
@@ -503,6 +648,13 @@ class PresenceHandler(BaseHandler):
)
continue
+ if get_domain_from_id(user_id) != origin:
+ logger.info(
+ "Got presence update from %r with bad 'user_id': %r",
+ origin, user_id,
+ )
+ continue
+
presence_state = push.get("presence", None)
if not presence_state:
logger.info(
@@ -562,17 +714,17 @@ class PresenceHandler(BaseHandler):
defer.returnValue([
{
"type": "m.presence",
- "content": _format_user_presence_state(state, now),
+ "content": format_user_presence_state(state, now),
}
for state in updates
])
else:
defer.returnValue([
- _format_user_presence_state(state, now) for state in updates
+ format_user_presence_state(state, now) for state in updates
])
@defer.inlineCallbacks
- def set_state(self, target_user, state):
+ def set_state(self, target_user, state, ignore_status_msg=False):
"""Set the presence state of the user.
"""
status_msg = state.get("status_msg", None)
@@ -589,10 +741,13 @@ class PresenceHandler(BaseHandler):
prev_state = yield self.current_state_for_user(user_id)
new_fields = {
- "state": presence,
- "status_msg": status_msg if presence != PresenceState.OFFLINE else None
+ "state": presence
}
+ if not ignore_status_msg:
+ msg = status_msg if presence != PresenceState.OFFLINE else None
+ new_fields["status_msg"] = msg
+
if presence == PresenceState.ONLINE:
new_fields["last_active_ts"] = self.clock.time_msec()
@@ -611,14 +766,14 @@ class PresenceHandler(BaseHandler):
# don't need to send to local clients here, as that is done as part
# of the event stream/sync.
# TODO: Only send to servers not already in the room.
- if self.hs.is_mine(user):
+ user_ids = yield self.state.get_current_user_in_room(room_id)
+ if self.is_mine(user):
state = yield self.current_state_for_user(user.to_string())
- hosts = yield self.store.get_joined_hosts_for_room(room_id)
+ hosts = set(get_domain_from_id(u) for u in user_ids)
self._push_to_remotes({host: (state,) for host in hosts})
else:
- user_ids = yield self.store.get_users_in_room(room_id)
- user_ids = filter(self.hs.is_mine_id, user_ids)
+ user_ids = filter(self.is_mine_id, user_ids)
states = yield self.current_state_for_users(user_ids)
@@ -628,7 +783,7 @@ class PresenceHandler(BaseHandler):
def get_presence_list(self, observer_user, accepted=None):
"""Returns the presence for all users in their presence list.
"""
- if not self.hs.is_mine(observer_user):
+ if not self.is_mine(observer_user):
raise SynapseError(400, "User is not hosted on this Home Server")
presence_list = yield self.store.get_presence_list(
@@ -659,7 +814,7 @@ class PresenceHandler(BaseHandler):
observer_user.localpart, observed_user.to_string()
)
- if self.hs.is_mine(observed_user):
+ if self.is_mine(observed_user):
yield self.invite_presence(observed_user, observer_user)
else:
yield self.federation.send_edu(
@@ -675,11 +830,11 @@ class PresenceHandler(BaseHandler):
def invite_presence(self, observed_user, observer_user):
"""Handles new presence invites.
"""
- if not self.hs.is_mine(observed_user):
+ if not self.is_mine(observed_user):
raise SynapseError(400, "User is not hosted on this Home Server")
# TODO: Don't auto accept
- if self.hs.is_mine(observer_user):
+ if self.is_mine(observer_user):
yield self.accept_presence(observed_user, observer_user)
else:
self.federation.send_edu(
@@ -742,7 +897,7 @@ class PresenceHandler(BaseHandler):
Returns:
A Deferred.
"""
- if not self.hs.is_mine(observer_user):
+ if not self.is_mine(observer_user):
raise SynapseError(400, "User is not hosted on this Home Server")
yield self.store.del_presence_list(
@@ -793,28 +948,38 @@ class PresenceHandler(BaseHandler):
def should_notify(old_state, new_state):
"""Decides if a presence state change should be sent to interested parties.
"""
+ if old_state == new_state:
+ return False
+
if old_state.status_msg != new_state.status_msg:
+ notify_reason_counter.inc("status_msg_change")
return True
- if old_state.state == PresenceState.ONLINE:
- if new_state.state != PresenceState.ONLINE:
- # Always notify for online -> anything
- return True
+ if old_state.state != new_state.state:
+ notify_reason_counter.inc("state_change")
+ state_transition_counter.inc(old_state.state, new_state.state)
+ return True
+ if old_state.state == PresenceState.ONLINE:
if new_state.currently_active != old_state.currently_active:
+ notify_reason_counter.inc("current_active_change")
return True
- if new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
- # Always notify for a transition where last active gets bumped.
- return True
+ if new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
+ # Only notify about last active bumps if we're not currently acive
+ if not new_state.currently_active:
+ notify_reason_counter.inc("last_active_change_online")
+ return True
- if old_state.state != new_state.state:
+ elif new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
+ # Always notify for a transition where last active gets bumped.
+ notify_reason_counter.inc("last_active_change_not_online")
return True
return False
-def _format_user_presence_state(state, now):
+def format_user_presence_state(state, now):
"""Convert UserPresenceState to a format that can be sent down to clients
and to other servers.
"""
@@ -834,9 +999,14 @@ def _format_user_presence_state(state, now):
class PresenceEventSource(object):
def __init__(self, hs):
- self.hs = hs
+ # We can't call get_presence_handler here because there's a cycle:
+ #
+ # Presence -> Notifier -> PresenceEventSource -> Presence
+ #
+ self.get_presence_handler = hs.get_presence_handler
self.clock = hs.get_clock()
self.store = hs.get_datastore()
+ self.state = hs.get_state_handler()
@defer.inlineCallbacks
@log_function
@@ -860,7 +1030,7 @@ class PresenceEventSource(object):
from_key = int(from_key)
room_ids = room_ids or []
- presence = self.hs.get_handlers().presence_handler
+ presence = self.get_presence_handler()
stream_change_cache = self.store.presence_stream_cache
if not room_ids:
@@ -877,13 +1047,13 @@ class PresenceEventSource(object):
user_ids_changed = set()
changed = None
- if from_key and max_token - from_key < 100:
- # For small deltas, its quicker to get all changes and then
- # work out if we share a room or they're in our presence list
+ if from_key:
changed = stream_change_cache.get_all_entities_changed(from_key)
- # get_all_entities_changed can return None
- if changed is not None:
+ if changed is not None and len(changed) < 500:
+ # For small deltas, its quicker to get all changes and then
+ # work out if we share a room or they're in our presence list
+ get_updates_counter.inc("stream")
for other_user_id in changed:
if other_user_id in friends:
user_ids_changed.add(other_user_id)
@@ -895,9 +1065,11 @@ class PresenceEventSource(object):
else:
# Too many possible updates. Find all users we can see and check
# if any of them have changed.
+ get_updates_counter.inc("full")
+
user_ids_to_check = set()
for room_id in room_ids:
- users = yield self.store.get_users_in_room(room_id)
+ users = yield self.state.get_current_user_in_room(room_id)
user_ids_to_check.update(users)
user_ids_to_check.update(friends)
@@ -920,7 +1092,7 @@ class PresenceEventSource(object):
defer.returnValue(([
{
"type": "m.presence",
- "content": _format_user_presence_state(s, now),
+ "content": format_user_presence_state(s, now),
}
for s in updates.values()
if include_offline or s.state != PresenceState.OFFLINE
@@ -933,15 +1105,14 @@ class PresenceEventSource(object):
return self.get_new_events(user, from_key=None, include_offline=False)
-def handle_timeouts(user_states, is_mine_fn, user_to_num_current_syncs, now):
+def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now):
"""Checks the presence of users that have timed out and updates as
appropriate.
Args:
user_states(list): List of UserPresenceState's to check.
is_mine_fn (fn): Function that returns if a user_id is ours
- user_to_num_current_syncs (dict): Mapping of user_id to number of currently
- active syncs.
+ syncing_user_ids (set): Set of user_ids with active syncs.
now (int): Current time in ms.
Returns:
@@ -952,21 +1123,20 @@ def handle_timeouts(user_states, is_mine_fn, user_to_num_current_syncs, now):
for state in user_states:
is_mine = is_mine_fn(state.user_id)
- new_state = handle_timeout(state, is_mine, user_to_num_current_syncs, now)
+ new_state = handle_timeout(state, is_mine, syncing_user_ids, now)
if new_state:
changes[state.user_id] = new_state
return changes.values()
-def handle_timeout(state, is_mine, user_to_num_current_syncs, now):
+def handle_timeout(state, is_mine, syncing_user_ids, now):
"""Checks the presence of the user to see if any of the timers have elapsed
Args:
state (UserPresenceState)
is_mine (bool): Whether the user is ours
- user_to_num_current_syncs (dict): Mapping of user_id to number of currently
- active syncs.
+ syncing_user_ids (set): Set of user_ids with active syncs.
now (int): Current time in ms.
Returns:
@@ -1000,7 +1170,7 @@ def handle_timeout(state, is_mine, user_to_num_current_syncs, now):
# If there are have been no sync for a while (and none ongoing),
# set presence to offline
- if not user_to_num_current_syncs.get(user_id, 0):
+ if user_id not in syncing_user_ids:
if now - state.last_user_sync_ts > SYNC_ONLINE_TIMEOUT:
state = state.copy_and_replace(
state=PresenceState.OFFLINE,
|