diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index da610e430f..585f3e4da2 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -29,7 +29,9 @@ from synapse.api.errors import SynapseError
from synapse.api.constants import PresenceState
from synapse.storage.presence import UserPresenceState
-from synapse.util.logcontext import preserve_fn
+from synapse.util.caches.descriptors import cachedInlineCallbacks
+from synapse.util.async import Linearizer
+from synapse.util.logcontext import run_in_background
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
@@ -91,29 +93,30 @@ class PresenceHandler(object):
self.store = hs.get_datastore()
self.wheel_timer = WheelTimer()
self.notifier = hs.get_notifier()
- self.replication = hs.get_replication_layer()
self.federation = hs.get_federation_sender()
self.state = hs.get_state_handler()
- self.replication.register_edu_handler(
+ federation_registry = hs.get_federation_registry()
+
+ federation_registry.register_edu_handler(
"m.presence", self.incoming_presence
)
- self.replication.register_edu_handler(
+ federation_registry.register_edu_handler(
"m.presence_invite",
lambda origin, content: self.invite_presence(
observed_user=UserID.from_string(content["observed_user"]),
observer_user=UserID.from_string(content["observer_user"]),
)
)
- self.replication.register_edu_handler(
+ federation_registry.register_edu_handler(
"m.presence_accept",
lambda origin, content: self.accept_presence(
observed_user=UserID.from_string(content["observed_user"]),
observer_user=UserID.from_string(content["observer_user"]),
)
)
- self.replication.register_edu_handler(
+ federation_registry.register_edu_handler(
"m.presence_deny",
lambda origin, content: self.deny_presence(
observed_user=UserID.from_string(content["observed_user"]),
@@ -186,6 +189,7 @@ class PresenceHandler(object):
# process_id to millisecond timestamp last updated.
self.external_process_to_current_syncs = {}
self.external_process_last_updated_ms = {}
+ self.external_sync_linearizer = Linearizer(name="external_sync_linearizer")
# Start a LoopingCall in 30s that fires every 5s.
# The initial delay is to allow disconnected clients a chance to
@@ -251,6 +255,14 @@ class PresenceHandler(object):
logger.info("Finished _persist_unpersisted_changes")
@defer.inlineCallbacks
+ def _update_states_and_catch_exception(self, new_states):
+ try:
+ res = yield self._update_states(new_states)
+ defer.returnValue(res)
+ except Exception:
+ logger.exception("Error updating presence")
+
+ @defer.inlineCallbacks
def _update_states(self, new_states):
"""Updates presence of users. Sets the appropriate timeouts. Pokes
the notifier and federation if and only if the changed presence state
@@ -315,11 +327,7 @@ class PresenceHandler(object):
if to_federation_ping:
federation_presence_out_counter.inc_by(len(to_federation_ping))
- _, _, hosts_to_states = yield self._get_interested_parties(
- to_federation_ping.values()
- )
-
- self._push_to_remotes(hosts_to_states)
+ self._push_to_remotes(to_federation_ping.values())
def _handle_timeouts(self):
"""Checks the presence of users that have timed out and updates as
@@ -364,8 +372,8 @@ class PresenceHandler(object):
now=now,
)
- preserve_fn(self._update_states)(changes)
- except:
+ run_in_background(self._update_states_and_catch_exception, changes)
+ except Exception:
logger.exception("Exception in _handle_timeouts loop")
@defer.inlineCallbacks
@@ -422,20 +430,23 @@ class PresenceHandler(object):
@defer.inlineCallbacks
def _end():
- if affect_presence:
+ try:
self.user_to_num_current_syncs[user_id] -= 1
prev_state = yield self.current_state_for_user(user_id)
yield self._update_states([prev_state.copy_and_replace(
last_user_sync_ts=self.clock.time_msec(),
)])
+ except Exception:
+ logger.exception("Error updating presence after sync")
@contextmanager
def _user_syncing():
try:
yield
finally:
- preserve_fn(_end)()
+ if affect_presence:
+ run_in_background(_end)
defer.returnValue(_user_syncing())
@@ -508,6 +519,73 @@ class PresenceHandler(object):
self.external_process_to_current_syncs[process_id] = syncing_user_ids
@defer.inlineCallbacks
+ def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec):
+ """Update the syncing users for an external process as a delta.
+
+ Args:
+ process_id (str): An identifier for the process the users are
+ syncing against. This allows synapse to process updates
+ as user start and stop syncing against a given process.
+ user_id (str): The user who has started or stopped syncing
+ is_syncing (bool): Whether or not the user is now syncing
+ sync_time_msec(int): Time in ms when the user was last syncing
+ """
+ with (yield self.external_sync_linearizer.queue(process_id)):
+ prev_state = yield self.current_state_for_user(user_id)
+
+ process_presence = self.external_process_to_current_syncs.setdefault(
+ process_id, set()
+ )
+
+ updates = []
+ if is_syncing and user_id not in process_presence:
+ if prev_state.state == PresenceState.OFFLINE:
+ updates.append(prev_state.copy_and_replace(
+ state=PresenceState.ONLINE,
+ last_active_ts=sync_time_msec,
+ last_user_sync_ts=sync_time_msec,
+ ))
+ else:
+ updates.append(prev_state.copy_and_replace(
+ last_user_sync_ts=sync_time_msec,
+ ))
+ process_presence.add(user_id)
+ elif user_id in process_presence:
+ updates.append(prev_state.copy_and_replace(
+ last_user_sync_ts=sync_time_msec,
+ ))
+
+ if not is_syncing:
+ process_presence.discard(user_id)
+
+ if updates:
+ yield self._update_states(updates)
+
+ self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
+
+ @defer.inlineCallbacks
+ def update_external_syncs_clear(self, process_id):
+ """Marks all users that had been marked as syncing by a given process
+ as offline.
+
+ Used when the process has stopped/disappeared.
+ """
+ with (yield self.external_sync_linearizer.queue(process_id)):
+ process_presence = self.external_process_to_current_syncs.pop(
+ process_id, set()
+ )
+ prev_states = yield self.current_state_for_users(process_presence)
+ time_now_ms = self.clock.time_msec()
+
+ yield self._update_states([
+ prev_state.copy_and_replace(
+ last_user_sync_ts=time_now_ms,
+ )
+ for prev_state in prev_states.itervalues()
+ ])
+ self.external_process_last_updated_ms.pop(process_id, None)
+
+ @defer.inlineCallbacks
def current_state_for_user(self, user_id):
"""Get the current presence state for a user.
"""
@@ -526,14 +604,14 @@ class PresenceHandler(object):
for user_id in user_ids
}
- missing = [user_id for user_id, state in states.items() if not state]
+ missing = [user_id for user_id, state in states.iteritems() if not state]
if missing:
# There are things not in our in memory cache. Lets pull them out of
# the database.
res = yield self.store.get_presence_for_users(missing)
states.update(res)
- missing = [user_id for user_id, state in states.items() if not state]
+ missing = [user_id for user_id, state in states.iteritems() if not state]
if missing:
new = {
user_id: UserPresenceState.default(user_id)
@@ -545,89 +623,39 @@ class PresenceHandler(object):
defer.returnValue(states)
@defer.inlineCallbacks
- def _get_interested_parties(self, states, calculate_remote_hosts=True):
- """Given a list of states return which entities (rooms, users, servers)
- are interested in the given states.
-
- Returns:
- 3-tuple: `(room_ids_to_states, users_to_states, hosts_to_states)`,
- with each item being a dict of `entity_name` -> `[UserPresenceState]`
- """
- room_ids_to_states = {}
- users_to_states = {}
- for state in states:
- events = yield self.store.get_rooms_for_user(state.user_id)
- for e in events:
- room_ids_to_states.setdefault(e.room_id, []).append(state)
-
- plist = yield self.store.get_presence_list_observers_accepted(state.user_id)
- for u in plist:
- users_to_states.setdefault(u, []).append(state)
-
- # Always notify self
- users_to_states.setdefault(state.user_id, []).append(state)
-
- hosts_to_states = {}
- if calculate_remote_hosts:
- for room_id, states in room_ids_to_states.items():
- local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
- if not local_states:
- continue
-
- users = yield self.store.get_users_in_room(room_id)
- hosts = set(get_domain_from_id(u) for u in users)
-
- for host in hosts:
- hosts_to_states.setdefault(host, []).extend(local_states)
-
- for user_id, states in users_to_states.items():
- local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
- if not local_states:
- continue
-
- host = get_domain_from_id(user_id)
- hosts_to_states.setdefault(host, []).extend(local_states)
-
- # TODO: de-dup hosts_to_states, as a single host might have multiple
- # of same presence
-
- defer.returnValue((room_ids_to_states, users_to_states, hosts_to_states))
-
- @defer.inlineCallbacks
def _persist_and_notify(self, states):
"""Persist states in the database, poke the notifier and send to
interested remote servers
"""
stream_id, max_token = yield self.store.update_presence(states)
- parties = yield self._get_interested_parties(states)
- room_ids_to_states, users_to_states, hosts_to_states = parties
+ parties = yield get_interested_parties(self.store, states)
+ room_ids_to_states, users_to_states = parties
self.notifier.on_new_event(
"presence_key", stream_id, rooms=room_ids_to_states.keys(),
- users=[UserID.from_string(u) for u in users_to_states.keys()]
+ users=[UserID.from_string(u) for u in users_to_states]
)
- self._push_to_remotes(hosts_to_states)
+ self._push_to_remotes(states)
@defer.inlineCallbacks
def notify_for_states(self, state, stream_id):
- parties = yield self._get_interested_parties([state])
- room_ids_to_states, users_to_states, hosts_to_states = parties
+ parties = yield get_interested_parties(self.store, [state])
+ room_ids_to_states, users_to_states = parties
self.notifier.on_new_event(
"presence_key", stream_id, rooms=room_ids_to_states.keys(),
- users=[UserID.from_string(u) for u in users_to_states.keys()]
+ users=[UserID.from_string(u) for u in users_to_states]
)
- def _push_to_remotes(self, hosts_to_states):
+ def _push_to_remotes(self, states):
"""Sends state updates to remote servers.
Args:
- hosts_to_states (dict): Mapping `server_name` -> `[UserPresenceState]`
+ states (list(UserPresenceState))
"""
- for host, states in hosts_to_states.items():
- self.federation.send_presence(host, states)
+ self.federation.send_presence(states)
@defer.inlineCallbacks
def incoming_presence(self, origin, content):
@@ -719,9 +747,7 @@ class PresenceHandler(object):
for state in updates
])
else:
- defer.returnValue([
- format_user_presence_state(state, now) for state in updates
- ])
+ defer.returnValue(updates)
@defer.inlineCallbacks
def set_state(self, target_user, state, ignore_status_msg=False):
@@ -766,18 +792,17 @@ class PresenceHandler(object):
# don't need to send to local clients here, as that is done as part
# of the event stream/sync.
# TODO: Only send to servers not already in the room.
- user_ids = yield self.store.get_users_in_room(room_id)
if self.is_mine(user):
state = yield self.current_state_for_user(user.to_string())
- hosts = set(get_domain_from_id(u) for u in user_ids)
- self._push_to_remotes({host: (state,) for host in hosts})
+ self._push_to_remotes([state])
else:
+ user_ids = yield self.store.get_users_in_room(room_id)
user_ids = filter(self.is_mine_id, user_ids)
states = yield self.current_state_for_users(user_ids)
- self._push_to_remotes({user.domain: states.values()})
+ self._push_to_remotes(states.values())
@defer.inlineCallbacks
def get_presence_list(self, observer_user, accepted=None):
@@ -795,6 +820,9 @@ class PresenceHandler(object):
as_event=False,
)
+ now = self.clock.time_msec()
+ results[:] = [format_user_presence_state(r, now) for r in results]
+
is_accepted = {
row["observed_user_id"]: row["accepted"] for row in presence_list
}
@@ -847,6 +875,7 @@ class PresenceHandler(object):
)
state_dict = yield self.get_state(observed_user, as_event=False)
+ state_dict = format_user_presence_state(state_dict, self.clock.time_msec())
self.federation.send_edu(
destination=observer_user.domain,
@@ -910,11 +939,12 @@ class PresenceHandler(object):
def is_visible(self, observed_user, observer_user):
"""Returns whether a user can see another user's presence.
"""
- observer_rooms = yield self.store.get_rooms_for_user(observer_user.to_string())
- observed_rooms = yield self.store.get_rooms_for_user(observed_user.to_string())
-
- observer_room_ids = set(r.room_id for r in observer_rooms)
- observed_room_ids = set(r.room_id for r in observed_rooms)
+ observer_room_ids = yield self.store.get_rooms_for_user(
+ observer_user.to_string()
+ )
+ observed_room_ids = yield self.store.get_rooms_for_user(
+ observed_user.to_string()
+ )
if observer_room_ids & observed_room_ids:
defer.returnValue(True)
@@ -979,14 +1009,18 @@ def should_notify(old_state, new_state):
return False
-def format_user_presence_state(state, now):
+def format_user_presence_state(state, now, include_user_id=True):
"""Convert UserPresenceState to a format that can be sent down to clients
and to other servers.
+
+ The "user_id" is optional so that this function can be used to format presence
+ updates for client /sync responses and for federation /send requests.
"""
content = {
"presence": state.state,
- "user_id": state.user_id,
}
+ if include_user_id:
+ content["user_id"] = state.user_id
if state.last_active_ts:
content["last_active_ago"] = now - state.last_active_ts
if state.status_msg and state.state != PresenceState.OFFLINE:
@@ -1025,7 +1059,6 @@ class PresenceEventSource(object):
# sending down the rare duplicate is not a concern.
with Measure(self.clock, "presence.get_new_events"):
- user_id = user.to_string()
if from_key is not None:
from_key = int(from_key)
@@ -1034,18 +1067,7 @@ class PresenceEventSource(object):
max_token = self.store.get_current_presence_token()
- plist = yield self.store.get_presence_list_accepted(user.localpart)
- users_interested_in = set(row["observed_user_id"] for row in plist)
- users_interested_in.add(user_id) # So that we receive our own presence
-
- users_who_share_room = yield self.store.get_users_who_share_room_with_user(
- user_id
- )
- users_interested_in.update(users_who_share_room)
-
- if explicit_room_id:
- user_ids = yield self.store.get_users_in_room(explicit_room_id)
- users_interested_in.update(user_ids)
+ users_interested_in = yield self._get_interested_in(user, explicit_room_id)
user_ids_changed = set()
changed = None
@@ -1073,16 +1095,13 @@ class PresenceEventSource(object):
updates = yield presence.current_state_for_users(user_ids_changed)
- now = self.clock.time_msec()
-
- defer.returnValue(([
- {
- "type": "m.presence",
- "content": format_user_presence_state(s, now),
- }
- for s in updates.values()
- if include_offline or s.state != PresenceState.OFFLINE
- ], max_token))
+ if include_offline:
+ defer.returnValue((updates.values(), max_token))
+ else:
+ defer.returnValue(([
+ s for s in updates.itervalues()
+ if s.state != PresenceState.OFFLINE
+ ], max_token))
def get_current_key(self):
return self.store.get_current_presence_token()
@@ -1090,6 +1109,31 @@ class PresenceEventSource(object):
def get_pagination_rows(self, user, pagination_config, key):
return self.get_new_events(user, from_key=None, include_offline=False)
+ @cachedInlineCallbacks(num_args=2, cache_context=True)
+ def _get_interested_in(self, user, explicit_room_id, cache_context):
+ """Returns the set of users that the given user should see presence
+ updates for
+ """
+ user_id = user.to_string()
+ plist = yield self.store.get_presence_list_accepted(
+ user.localpart, on_invalidate=cache_context.invalidate,
+ )
+ users_interested_in = set(row["observed_user_id"] for row in plist)
+ users_interested_in.add(user_id) # So that we receive our own presence
+
+ users_who_share_room = yield self.store.get_users_who_share_room_with_user(
+ user_id, on_invalidate=cache_context.invalidate,
+ )
+ users_interested_in.update(users_who_share_room)
+
+ if explicit_room_id:
+ user_ids = yield self.store.get_users_in_room(
+ explicit_room_id, on_invalidate=cache_context.invalidate,
+ )
+ users_interested_in.update(user_ids)
+
+ defer.returnValue(users_interested_in)
+
def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now):
"""Checks the presence of users that have timed out and updates as
@@ -1157,14 +1201,17 @@ def handle_timeout(state, is_mine, syncing_user_ids, now):
# If there are have been no sync for a while (and none ongoing),
# set presence to offline
if user_id not in syncing_user_ids:
- if now - state.last_user_sync_ts > SYNC_ONLINE_TIMEOUT:
+ # If the user has done something recently but hasn't synced,
+ # don't set them as offline.
+ sync_or_active = max(state.last_user_sync_ts, state.last_active_ts)
+ if now - sync_or_active > SYNC_ONLINE_TIMEOUT:
state = state.copy_and_replace(
state=PresenceState.OFFLINE,
status_msg=None,
)
changed = True
else:
- # We expect to be poked occaisonally by the other side.
+ # We expect to be poked occasionally by the other side.
# This is to protect against forgetful/buggy servers, so that
# no one gets stuck online forever.
if now - state.last_federation_update_ts > FEDERATION_TIMEOUT:
@@ -1255,3 +1302,66 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
persist_and_notify = True
return new_state, persist_and_notify, federation_ping
+
+
+@defer.inlineCallbacks
+def get_interested_parties(store, states):
+ """Given a list of states return which entities (rooms, users)
+ are interested in the given states.
+
+ Args:
+ states (list(UserPresenceState))
+
+ Returns:
+ 2-tuple: `(room_ids_to_states, users_to_states)`,
+ with each item being a dict of `entity_name` -> `[UserPresenceState]`
+ """
+ room_ids_to_states = {}
+ users_to_states = {}
+ for state in states:
+ room_ids = yield store.get_rooms_for_user(state.user_id)
+ for room_id in room_ids:
+ room_ids_to_states.setdefault(room_id, []).append(state)
+
+ plist = yield store.get_presence_list_observers_accepted(state.user_id)
+ for u in plist:
+ users_to_states.setdefault(u, []).append(state)
+
+ # Always notify self
+ users_to_states.setdefault(state.user_id, []).append(state)
+
+ defer.returnValue((room_ids_to_states, users_to_states))
+
+
+@defer.inlineCallbacks
+def get_interested_remotes(store, states, state_handler):
+ """Given a list of presence states figure out which remote servers
+ should be sent which.
+
+ All the presence states should be for local users only.
+
+ Args:
+ store (DataStore)
+ states (list(UserPresenceState))
+
+ Returns:
+ Deferred list of ([destinations], [UserPresenceState]), where for
+ each row the list of UserPresenceState should be sent to each
+ destination
+ """
+ hosts_and_states = []
+
+ # First we look up the rooms each user is in (as well as any explicit
+ # subscriptions), then for each distinct room we look up the remote
+ # hosts in those rooms.
+ room_ids_to_states, users_to_states = yield get_interested_parties(store, states)
+
+ for room_id, states in room_ids_to_states.iteritems():
+ hosts = yield state_handler.get_current_hosts_in_room(room_id)
+ hosts_and_states.append((hosts, states))
+
+ for user_id, states in users_to_states.iteritems():
+ host = get_domain_from_id(user_id)
+ hosts_and_states.append(([host], states))
+
+ defer.returnValue(hosts_and_states)
|