summary refs log tree commit diff
path: root/synapse/handlers/presence.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/presence.py')
-rw-r--r--synapse/handlers/presence.py342
1 files changed, 151 insertions, 191 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 37e87fc054..59d53f1050 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -31,9 +31,11 @@ from prometheus_client import Counter
 
 from twisted.internet import defer
 
-from synapse.api.constants import PresenceState
+import synapse.metrics
+from synapse.api.constants import EventTypes, Membership, PresenceState
 from synapse.api.errors import SynapseError
 from synapse.metrics import LaterGauge
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.presence import UserPresenceState
 from synapse.types import UserID, get_domain_from_id
 from synapse.util.async_helpers import Linearizer
@@ -98,6 +100,7 @@ class PresenceHandler(object):
         self.hs = hs
         self.is_mine = hs.is_mine
         self.is_mine_id = hs.is_mine_id
+        self.server_name = hs.hostname
         self.clock = hs.get_clock()
         self.store = hs.get_datastore()
         self.wheel_timer = WheelTimer()
@@ -110,30 +113,6 @@ class PresenceHandler(object):
         federation_registry.register_edu_handler(
             "m.presence", self.incoming_presence
         )
-        federation_registry.register_edu_handler(
-            "m.presence_invite",
-            lambda origin, content: self.invite_presence(
-                observed_user=UserID.from_string(content["observed_user"]),
-                observer_user=UserID.from_string(content["observer_user"]),
-            )
-        )
-        federation_registry.register_edu_handler(
-            "m.presence_accept",
-            lambda origin, content: self.accept_presence(
-                observed_user=UserID.from_string(content["observed_user"]),
-                observer_user=UserID.from_string(content["observer_user"]),
-            )
-        )
-        federation_registry.register_edu_handler(
-            "m.presence_deny",
-            lambda origin, content: self.deny_presence(
-                observed_user=UserID.from_string(content["observed_user"]),
-                observer_user=UserID.from_string(content["observer_user"]),
-            )
-        )
-
-        distributor = hs.get_distributor()
-        distributor.observe("user_joined_room", self.user_joined_room)
 
         active_presence = self.store.take_presence_startup_info()
 
@@ -220,6 +199,15 @@ class PresenceHandler(object):
         LaterGauge("synapse_handlers_presence_wheel_timer_size", "", [],
                    lambda: len(self.wheel_timer))
 
+        # Used to handle sending of presence to newly joined users/servers
+        if hs.config.use_presence:
+            self.notifier.add_replication_callback(self.notify_new_event)
+
+        # Presence is best effort and quickly heals itself, so lets just always
+        # stream from the current state when we restart.
+        self._event_pos = self.store.get_current_events_token()
+        self._event_processing = False
+
     @defer.inlineCallbacks
     def _on_shutdown(self):
         """Gets called when shutting down. This lets us persist any updates that
@@ -751,199 +739,178 @@ class PresenceHandler(object):
         yield self._update_states([prev_state.copy_and_replace(**new_fields)])
 
     @defer.inlineCallbacks
-    def user_joined_room(self, user, room_id):
-        """Called (via the distributor) when a user joins a room. This funciton
-        sends presence updates to servers, either:
-            1. the joining user is a local user and we send their presence to
-               all servers in the room.
-            2. the joining user is a remote user and so we send presence for all
-               local users in the room.
+    def is_visible(self, observed_user, observer_user):
+        """Returns whether a user can see another user's presence.
         """
-        # We only need to send presence to servers that don't have it yet. We
-        # don't need to send to local clients here, as that is done as part
-        # of the event stream/sync.
-        # TODO: Only send to servers not already in the room.
-        if self.is_mine(user):
-            state = yield self.current_state_for_user(user.to_string())
-
-            self._push_to_remotes([state])
-        else:
-            user_ids = yield self.store.get_users_in_room(room_id)
-            user_ids = list(filter(self.is_mine_id, user_ids))
+        observer_room_ids = yield self.store.get_rooms_for_user(
+            observer_user.to_string()
+        )
+        observed_room_ids = yield self.store.get_rooms_for_user(
+            observed_user.to_string()
+        )
 
-            states = yield self.current_state_for_users(user_ids)
+        if observer_room_ids & observed_room_ids:
+            defer.returnValue(True)
 
-            self._push_to_remotes(list(states.values()))
+        defer.returnValue(False)
 
     @defer.inlineCallbacks
-    def get_presence_list(self, observer_user, accepted=None):
-        """Returns the presence for all users in their presence list.
+    def get_all_presence_updates(self, last_id, current_id):
         """
-        if not self.is_mine(observer_user):
-            raise SynapseError(400, "User is not hosted on this Home Server")
-
-        presence_list = yield self.store.get_presence_list(
-            observer_user.localpart, accepted=accepted
-        )
+        Gets a list of presence update rows from between the given stream ids.
+        Each row has:
+        - stream_id(str)
+        - user_id(str)
+        - state(str)
+        - last_active_ts(int)
+        - last_federation_update_ts(int)
+        - last_user_sync_ts(int)
+        - status_msg(int)
+        - currently_active(int)
+        """
+        # TODO(markjh): replicate the unpersisted changes.
+        # This could use the in-memory stores for recent changes.
+        rows = yield self.store.get_all_presence_updates(last_id, current_id)
+        defer.returnValue(rows)
 
-        results = yield self.get_states(
-            target_user_ids=[row["observed_user_id"] for row in presence_list],
-            as_event=False,
-        )
+    def notify_new_event(self):
+        """Called when new events have happened. Handles users and servers
+        joining rooms and require being sent presence.
+        """
 
-        now = self.clock.time_msec()
-        results[:] = [format_user_presence_state(r, now) for r in results]
+        if self._event_processing:
+            return
 
-        is_accepted = {
-            row["observed_user_id"]: row["accepted"] for row in presence_list
-        }
+        @defer.inlineCallbacks
+        def _process_presence():
+            assert not self._event_processing
 
-        for result in results:
-            result.update({
-                "accepted": is_accepted,
-            })
+            self._event_processing = True
+            try:
+                yield self._unsafe_process()
+            finally:
+                self._event_processing = False
 
-        defer.returnValue(results)
+        run_as_background_process("presence.notify_new_event", _process_presence)
 
     @defer.inlineCallbacks
-    def send_presence_invite(self, observer_user, observed_user):
-        """Sends a presence invite.
-        """
-        yield self.store.add_presence_list_pending(
-            observer_user.localpart, observed_user.to_string()
-        )
+    def _unsafe_process(self):
+        # Loop round handling deltas until we're up to date
+        while True:
+            with Measure(self.clock, "presence_delta"):
+                deltas = yield self.store.get_current_state_deltas(self._event_pos)
+                if not deltas:
+                    return
 
-        if self.is_mine(observed_user):
-            yield self.invite_presence(observed_user, observer_user)
-        else:
-            yield self.federation.build_and_send_edu(
-                destination=observed_user.domain,
-                edu_type="m.presence_invite",
-                content={
-                    "observed_user": observed_user.to_string(),
-                    "observer_user": observer_user.to_string(),
-                }
-            )
+                yield self._handle_state_delta(deltas)
+
+                self._event_pos = deltas[-1]["stream_id"]
+
+                # Expose current event processing position to prometheus
+                synapse.metrics.event_processing_positions.labels("presence").set(
+                    self._event_pos
+                )
 
     @defer.inlineCallbacks
-    def invite_presence(self, observed_user, observer_user):
-        """Handles new presence invites.
+    def _handle_state_delta(self, deltas):
+        """Process current state deltas to find new joins that need to be
+        handled.
         """
-        if not self.is_mine(observed_user):
-            raise SynapseError(400, "User is not hosted on this Home Server")
+        for delta in deltas:
+            typ = delta["type"]
+            state_key = delta["state_key"]
+            room_id = delta["room_id"]
+            event_id = delta["event_id"]
+            prev_event_id = delta["prev_event_id"]
 
-        # TODO: Don't auto accept
-        if self.is_mine(observer_user):
-            yield self.accept_presence(observed_user, observer_user)
-        else:
-            self.federation.build_and_send_edu(
-                destination=observer_user.domain,
-                edu_type="m.presence_accept",
-                content={
-                    "observed_user": observed_user.to_string(),
-                    "observer_user": observer_user.to_string(),
-                }
-            )
+            logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
 
-            state_dict = yield self.get_state(observed_user, as_event=False)
-            state_dict = format_user_presence_state(state_dict, self.clock.time_msec())
+            if typ != EventTypes.Member:
+                continue
 
-            self.federation.build_and_send_edu(
-                destination=observer_user.domain,
-                edu_type="m.presence",
-                content={
-                    "push": [state_dict]
-                }
-            )
+            if event_id is None:
+                # state has been deleted, so this is not a join. We only care about
+                # joins.
+                continue
 
-    @defer.inlineCallbacks
-    def accept_presence(self, observed_user, observer_user):
-        """Handles a m.presence_accept EDU. Mark a presence invite from a
-        local or remote user as accepted in a local user's presence list.
-        Starts polling for presence updates from the local or remote user.
-        Args:
-            observed_user(UserID): The user to update in the presence list.
-            observer_user(UserID): The owner of the presence list to update.
-        """
-        yield self.store.set_presence_list_accepted(
-            observer_user.localpart, observed_user.to_string()
-        )
+            event = yield self.store.get_event(event_id)
+            if event.content.get("membership") != Membership.JOIN:
+                # We only care about joins
+                continue
 
-    @defer.inlineCallbacks
-    def deny_presence(self, observed_user, observer_user):
-        """Handle a m.presence_deny EDU. Removes a local or remote user from a
-        local user's presence list.
-        Args:
-            observed_user(UserID): The local or remote user to remove from the
-                list.
-            observer_user(UserID): The local owner of the presence list.
-        Returns:
-            A Deferred.
-        """
-        yield self.store.del_presence_list(
-            observer_user.localpart, observed_user.to_string()
-        )
+            if prev_event_id:
+                prev_event = yield self.store.get_event(prev_event_id)
+                if prev_event.content.get("membership") == Membership.JOIN:
+                    # Ignore changes to join events.
+                    continue
 
-        # TODO(paul): Inform the user somehow?
+            yield self._on_user_joined_room(room_id, state_key)
 
     @defer.inlineCallbacks
-    def drop(self, observed_user, observer_user):
-        """Remove a local or remote user from a local user's presence list and
-        unsubscribe the local user from updates that user.
+    def _on_user_joined_room(self, room_id, user_id):
+        """Called when we detect a user joining the room via the current state
+        delta stream.
+
         Args:
-            observed_user(UserId): The local or remote user to remove from the
-                list.
-            observer_user(UserId): The local owner of the presence list.
+            room_id (str)
+            user_id (str)
+
         Returns:
-            A Deferred.
+            Deferred
         """
-        if not self.is_mine(observer_user):
-            raise SynapseError(400, "User is not hosted on this Home Server")
 
-        yield self.store.del_presence_list(
-            observer_user.localpart, observed_user.to_string()
-        )
+        if self.is_mine_id(user_id):
+            # If this is a local user then we need to send their presence
+            # out to hosts in the room (who don't already have it)
 
-        # TODO: Inform the remote that we've dropped the presence list.
+            # TODO: We should be able to filter the hosts down to those that
+            # haven't previously seen the user
 
-    @defer.inlineCallbacks
-    def is_visible(self, observed_user, observer_user):
-        """Returns whether a user can see another user's presence.
-        """
-        observer_room_ids = yield self.store.get_rooms_for_user(
-            observer_user.to_string()
-        )
-        observed_room_ids = yield self.store.get_rooms_for_user(
-            observed_user.to_string()
-        )
+            state = yield self.current_state_for_user(user_id)
+            hosts = yield self.state.get_current_hosts_in_room(room_id)
 
-        if observer_room_ids & observed_room_ids:
-            defer.returnValue(True)
+            # Filter out ourselves.
+            hosts = set(host for host in hosts if host != self.server_name)
 
-        accepted_observers = yield self.store.get_presence_list_observers_accepted(
-            observed_user.to_string()
-        )
+            self.federation.send_presence_to_destinations(
+                states=[state],
+                destinations=hosts,
+            )
+        else:
+            # A remote user has joined the room, so we need to:
+            #   1. Check if this is a new server in the room
+            #   2. If so send any presence they don't already have for
+            #      local users in the room.
 
-        defer.returnValue(observer_user.to_string() in accepted_observers)
+            # TODO: We should be able to filter the users down to those that
+            # the server hasn't previously seen
 
-    @defer.inlineCallbacks
-    def get_all_presence_updates(self, last_id, current_id):
-        """
-        Gets a list of presence update rows from between the given stream ids.
-        Each row has:
-        - stream_id(str)
-        - user_id(str)
-        - state(str)
-        - last_active_ts(int)
-        - last_federation_update_ts(int)
-        - last_user_sync_ts(int)
-        - status_msg(int)
-        - currently_active(int)
-        """
-        # TODO(markjh): replicate the unpersisted changes.
-        # This could use the in-memory stores for recent changes.
-        rows = yield self.store.get_all_presence_updates(last_id, current_id)
-        defer.returnValue(rows)
+            # TODO: Check that this is actually a new server joining the
+            # room.
+
+            user_ids = yield self.state.get_current_users_in_room(room_id)
+            user_ids = list(filter(self.is_mine_id, user_ids))
+
+            states = yield self.current_state_for_users(user_ids)
+
+            # Filter out old presence, i.e. offline presence states where
+            # the user hasn't been active for a week. We can change this
+            # depending on what we want the UX to be, but at the least we
+            # should filter out offline presence where the state is just the
+            # default state.
+            now = self.clock.time_msec()
+            states = [
+                state for state in states.values()
+                if state.state != PresenceState.OFFLINE
+                or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000
+                or state.status_msg is not None
+            ]
+
+            if states:
+                self.federation.send_presence_to_destinations(
+                    states=states,
+                    destinations=[get_domain_from_id(user_id)],
+                )
 
 
 def should_notify(old_state, new_state):
@@ -1086,10 +1053,7 @@ class PresenceEventSource(object):
         updates for
         """
         user_id = user.to_string()
-        plist = yield self.store.get_presence_list_accepted(
-            user.localpart, on_invalidate=cache_context.invalidate,
-        )
-        users_interested_in = set(row["observed_user_id"] for row in plist)
+        users_interested_in = set()
         users_interested_in.add(user_id)  # So that we receive our own presence
 
         users_who_share_room = yield self.store.get_users_who_share_room_with_user(
@@ -1294,10 +1258,6 @@ def get_interested_parties(store, states):
         for room_id in room_ids:
             room_ids_to_states.setdefault(room_id, []).append(state)
 
-        plist = yield store.get_presence_list_observers_accepted(state.user_id)
-        for u in plist:
-            users_to_states.setdefault(u, []).append(state)
-
         # Always notify self
         users_to_states.setdefault(state.user_id, []).append(state)