diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 37e87fc054..bd1285b15c 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -31,9 +31,11 @@ from prometheus_client import Counter
from twisted.internet import defer
-from synapse.api.constants import PresenceState
+import synapse.metrics
+from synapse.api.constants import EventTypes, Membership, PresenceState
from synapse.api.errors import SynapseError
from synapse.metrics import LaterGauge
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.presence import UserPresenceState
from synapse.types import UserID, get_domain_from_id
from synapse.util.async_helpers import Linearizer
@@ -98,6 +100,7 @@ class PresenceHandler(object):
self.hs = hs
self.is_mine = hs.is_mine
self.is_mine_id = hs.is_mine_id
+ self.server_name = hs.hostname
self.clock = hs.get_clock()
self.store = hs.get_datastore()
self.wheel_timer = WheelTimer()
@@ -110,30 +113,6 @@ class PresenceHandler(object):
federation_registry.register_edu_handler(
"m.presence", self.incoming_presence
)
- federation_registry.register_edu_handler(
- "m.presence_invite",
- lambda origin, content: self.invite_presence(
- observed_user=UserID.from_string(content["observed_user"]),
- observer_user=UserID.from_string(content["observer_user"]),
- )
- )
- federation_registry.register_edu_handler(
- "m.presence_accept",
- lambda origin, content: self.accept_presence(
- observed_user=UserID.from_string(content["observed_user"]),
- observer_user=UserID.from_string(content["observer_user"]),
- )
- )
- federation_registry.register_edu_handler(
- "m.presence_deny",
- lambda origin, content: self.deny_presence(
- observed_user=UserID.from_string(content["observed_user"]),
- observer_user=UserID.from_string(content["observer_user"]),
- )
- )
-
- distributor = hs.get_distributor()
- distributor.observe("user_joined_room", self.user_joined_room)
active_presence = self.store.take_presence_startup_info()
@@ -220,6 +199,15 @@ class PresenceHandler(object):
LaterGauge("synapse_handlers_presence_wheel_timer_size", "", [],
lambda: len(self.wheel_timer))
+ # Used to handle sending of presence to newly joined users/servers
+ if hs.config.use_presence:
+ self.notifier.add_replication_callback(self.notify_new_event)
+
+ # Presence is best effort and quickly heals itself, so lets just always
+ # stream from the current state when we restart.
+ self._event_pos = self.store.get_current_events_token()
+ self._event_processing = False
+
@defer.inlineCallbacks
def _on_shutdown(self):
"""Gets called when shutting down. This lets us persist any updates that
@@ -751,199 +739,173 @@ class PresenceHandler(object):
yield self._update_states([prev_state.copy_and_replace(**new_fields)])
@defer.inlineCallbacks
- def user_joined_room(self, user, room_id):
- """Called (via the distributor) when a user joins a room. This funciton
- sends presence updates to servers, either:
- 1. the joining user is a local user and we send their presence to
- all servers in the room.
- 2. the joining user is a remote user and so we send presence for all
- local users in the room.
+ def is_visible(self, observed_user, observer_user):
+ """Returns whether a user can see another user's presence.
"""
- # We only need to send presence to servers that don't have it yet. We
- # don't need to send to local clients here, as that is done as part
- # of the event stream/sync.
- # TODO: Only send to servers not already in the room.
- if self.is_mine(user):
- state = yield self.current_state_for_user(user.to_string())
-
- self._push_to_remotes([state])
- else:
- user_ids = yield self.store.get_users_in_room(room_id)
- user_ids = list(filter(self.is_mine_id, user_ids))
+ observer_room_ids = yield self.store.get_rooms_for_user(
+ observer_user.to_string()
+ )
+ observed_room_ids = yield self.store.get_rooms_for_user(
+ observed_user.to_string()
+ )
- states = yield self.current_state_for_users(user_ids)
+ if observer_room_ids & observed_room_ids:
+ defer.returnValue(True)
- self._push_to_remotes(list(states.values()))
+ defer.returnValue(False)
@defer.inlineCallbacks
- def get_presence_list(self, observer_user, accepted=None):
- """Returns the presence for all users in their presence list.
+ def get_all_presence_updates(self, last_id, current_id):
"""
- if not self.is_mine(observer_user):
- raise SynapseError(400, "User is not hosted on this Home Server")
-
- presence_list = yield self.store.get_presence_list(
- observer_user.localpart, accepted=accepted
- )
+ Gets a list of presence update rows from between the given stream ids.
+ Each row has:
+ - stream_id(str)
+ - user_id(str)
+ - state(str)
+ - last_active_ts(int)
+ - last_federation_update_ts(int)
+ - last_user_sync_ts(int)
+ - status_msg(int)
+ - currently_active(int)
+ """
+ # TODO(markjh): replicate the unpersisted changes.
+ # This could use the in-memory stores for recent changes.
+ rows = yield self.store.get_all_presence_updates(last_id, current_id)
+ defer.returnValue(rows)
- results = yield self.get_states(
- target_user_ids=[row["observed_user_id"] for row in presence_list],
- as_event=False,
- )
+ def notify_new_event(self):
+ """Called when new events have happened. Handles users and servers
+ joining rooms and require being sent presence.
+ """
- now = self.clock.time_msec()
- results[:] = [format_user_presence_state(r, now) for r in results]
+ if self._event_processing:
+ return
- is_accepted = {
- row["observed_user_id"]: row["accepted"] for row in presence_list
- }
+ @defer.inlineCallbacks
+ def _process_presence():
+ assert not self._event_processing
- for result in results:
- result.update({
- "accepted": is_accepted,
- })
+ self._event_processing = True
+ try:
+ yield self._unsafe_process()
+ finally:
+ self._event_processing = False
- defer.returnValue(results)
+ run_as_background_process("presence.notify_new_event", _process_presence)
@defer.inlineCallbacks
- def send_presence_invite(self, observer_user, observed_user):
- """Sends a presence invite.
- """
- yield self.store.add_presence_list_pending(
- observer_user.localpart, observed_user.to_string()
- )
+ def _unsafe_process(self):
+ # Loop round handling deltas until we're up to date
+ while True:
+ with Measure(self.clock, "presence_delta"):
+ deltas = yield self.store.get_current_state_deltas(self._event_pos)
+ if not deltas:
+ return
- if self.is_mine(observed_user):
- yield self.invite_presence(observed_user, observer_user)
- else:
- yield self.federation.build_and_send_edu(
- destination=observed_user.domain,
- edu_type="m.presence_invite",
- content={
- "observed_user": observed_user.to_string(),
- "observer_user": observer_user.to_string(),
- }
- )
+ yield self._handle_state_delta(deltas)
+
+ self._event_pos = deltas[-1]["stream_id"]
+
+ # Expose current event processing position to prometheus
+ synapse.metrics.event_processing_positions.labels("presence").set(
+ self._event_pos
+ )
@defer.inlineCallbacks
- def invite_presence(self, observed_user, observer_user):
- """Handles new presence invites.
+ def _handle_state_delta(self, deltas):
+ """Process current state deltas to find new joins that need to be
+ handled.
"""
- if not self.is_mine(observed_user):
- raise SynapseError(400, "User is not hosted on this Home Server")
+ for delta in deltas:
+ typ = delta["type"]
+ state_key = delta["state_key"]
+ room_id = delta["room_id"]
+ event_id = delta["event_id"]
+ prev_event_id = delta["prev_event_id"]
- # TODO: Don't auto accept
- if self.is_mine(observer_user):
- yield self.accept_presence(observed_user, observer_user)
- else:
- self.federation.build_and_send_edu(
- destination=observer_user.domain,
- edu_type="m.presence_accept",
- content={
- "observed_user": observed_user.to_string(),
- "observer_user": observer_user.to_string(),
- }
- )
+ logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
- state_dict = yield self.get_state(observed_user, as_event=False)
- state_dict = format_user_presence_state(state_dict, self.clock.time_msec())
+ if typ != EventTypes.Member:
+ continue
- self.federation.build_and_send_edu(
- destination=observer_user.domain,
- edu_type="m.presence",
- content={
- "push": [state_dict]
- }
- )
+ event = yield self.store.get_event(event_id)
+ if event.content.get("membership") != Membership.JOIN:
+ # We only care about joins
+ continue
- @defer.inlineCallbacks
- def accept_presence(self, observed_user, observer_user):
- """Handles a m.presence_accept EDU. Mark a presence invite from a
- local or remote user as accepted in a local user's presence list.
- Starts polling for presence updates from the local or remote user.
- Args:
- observed_user(UserID): The user to update in the presence list.
- observer_user(UserID): The owner of the presence list to update.
- """
- yield self.store.set_presence_list_accepted(
- observer_user.localpart, observed_user.to_string()
- )
+ if prev_event_id:
+ prev_event = yield self.store.get_event(prev_event_id)
+ if prev_event.content.get("membership") == Membership.JOIN:
+ # Ignore changes to join events.
+ continue
+
+ yield self._on_user_joined_room(room_id, state_key)
@defer.inlineCallbacks
- def deny_presence(self, observed_user, observer_user):
- """Handle a m.presence_deny EDU. Removes a local or remote user from a
- local user's presence list.
+ def _on_user_joined_room(self, room_id, user_id):
+ """Called when we detect a user joining the room via the current state
+ delta stream.
+
Args:
- observed_user(UserID): The local or remote user to remove from the
- list.
- observer_user(UserID): The local owner of the presence list.
+ room_id (str)
+ user_id (str)
+
Returns:
- A Deferred.
+ Deferred
"""
- yield self.store.del_presence_list(
- observer_user.localpart, observed_user.to_string()
- )
- # TODO(paul): Inform the user somehow?
+ if self.is_mine_id(user_id):
+ # If this is a local user then we need to send their presence
+ # out to hosts in the room (who don't already have it)
- @defer.inlineCallbacks
- def drop(self, observed_user, observer_user):
- """Remove a local or remote user from a local user's presence list and
- unsubscribe the local user from updates that user.
- Args:
- observed_user(UserId): The local or remote user to remove from the
- list.
- observer_user(UserId): The local owner of the presence list.
- Returns:
- A Deferred.
- """
- if not self.is_mine(observer_user):
- raise SynapseError(400, "User is not hosted on this Home Server")
+ # TODO: We should be able to filter the hosts down to those that
+ # haven't previously seen the user
- yield self.store.del_presence_list(
- observer_user.localpart, observed_user.to_string()
- )
+ state = yield self.current_state_for_user(user_id)
+ hosts = yield self.state.get_current_hosts_in_room(room_id)
- # TODO: Inform the remote that we've dropped the presence list.
+ # Filter out ourselves.
+ hosts = set(host for host in hosts if host != self.server_name)
- @defer.inlineCallbacks
- def is_visible(self, observed_user, observer_user):
- """Returns whether a user can see another user's presence.
- """
- observer_room_ids = yield self.store.get_rooms_for_user(
- observer_user.to_string()
- )
- observed_room_ids = yield self.store.get_rooms_for_user(
- observed_user.to_string()
- )
+ self.federation.send_presence_to_destinations(
+ states=[state],
+ destinations=hosts,
+ )
+ else:
+ # A remote user has joined the room, so we need to:
+ # 1. Check if this is a new server in the room
+ # 2. If so send any presence they don't already have for
+ # local users in the room.
- if observer_room_ids & observed_room_ids:
- defer.returnValue(True)
+ # TODO: We should be able to filter the users down to those that
+ # the server hasn't previously seen
- accepted_observers = yield self.store.get_presence_list_observers_accepted(
- observed_user.to_string()
- )
+ # TODO: Check that this is actually a new server joining the
+ # room.
- defer.returnValue(observer_user.to_string() in accepted_observers)
+ user_ids = yield self.state.get_current_users_in_room(room_id)
+ user_ids = list(filter(self.is_mine_id, user_ids))
- @defer.inlineCallbacks
- def get_all_presence_updates(self, last_id, current_id):
- """
- Gets a list of presence update rows from between the given stream ids.
- Each row has:
- - stream_id(str)
- - user_id(str)
- - state(str)
- - last_active_ts(int)
- - last_federation_update_ts(int)
- - last_user_sync_ts(int)
- - status_msg(int)
- - currently_active(int)
- """
- # TODO(markjh): replicate the unpersisted changes.
- # This could use the in-memory stores for recent changes.
- rows = yield self.store.get_all_presence_updates(last_id, current_id)
- defer.returnValue(rows)
+ states = yield self.current_state_for_users(user_ids)
+
+ # Filter out old presence, i.e. offline presence states where
+ # the user hasn't been active for a week. We can change this
+ # depending on what we want the UX to be, but at the least we
+ # should filter out offline presence where the state is just the
+ # default state.
+ now = self.clock.time_msec()
+ states = [
+ state for state in states.values()
+ if state.state != PresenceState.OFFLINE
+ or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000
+ or state.status_msg is not None
+ ]
+
+ if states:
+ self.federation.send_presence_to_destinations(
+ states=states,
+ destinations=[get_domain_from_id(user_id)],
+ )
def should_notify(old_state, new_state):
@@ -1086,10 +1048,7 @@ class PresenceEventSource(object):
updates for
"""
user_id = user.to_string()
- plist = yield self.store.get_presence_list_accepted(
- user.localpart, on_invalidate=cache_context.invalidate,
- )
- users_interested_in = set(row["observed_user_id"] for row in plist)
+ users_interested_in = set()
users_interested_in.add(user_id) # So that we receive our own presence
users_who_share_room = yield self.store.get_users_who_share_room_with_user(
@@ -1294,10 +1253,6 @@ def get_interested_parties(store, states):
for room_id in room_ids:
room_ids_to_states.setdefault(room_id, []).append(state)
- plist = yield store.get_presence_list_observers_accepted(state.user_id)
- for u in plist:
- users_to_states.setdefault(u, []).append(state)
-
# Always notify self
users_to_states.setdefault(state.user_id, []).append(state)
|