summary refs log tree commit diff
path: root/synapse/handlers/presence.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/presence.py')
-rw-r--r--synapse/handlers/presence.py176
1 files changed, 147 insertions, 29 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 37e87fc054..e85c49742d 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -31,9 +31,11 @@ from prometheus_client import Counter
 
 from twisted.internet import defer
 
-from synapse.api.constants import PresenceState
+import synapse.metrics
+from synapse.api.constants import EventTypes, Membership, PresenceState
 from synapse.api.errors import SynapseError
 from synapse.metrics import LaterGauge
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.presence import UserPresenceState
 from synapse.types import UserID, get_domain_from_id
 from synapse.util.async_helpers import Linearizer
@@ -98,6 +100,7 @@ class PresenceHandler(object):
         self.hs = hs
         self.is_mine = hs.is_mine
         self.is_mine_id = hs.is_mine_id
+        self.server_name = hs.hostname
         self.clock = hs.get_clock()
         self.store = hs.get_datastore()
         self.wheel_timer = WheelTimer()
@@ -132,9 +135,6 @@ class PresenceHandler(object):
             )
         )
 
-        distributor = hs.get_distributor()
-        distributor.observe("user_joined_room", self.user_joined_room)
-
         active_presence = self.store.take_presence_startup_info()
 
         # A dictionary of the current state of users. This is prefilled with
@@ -220,6 +220,15 @@ class PresenceHandler(object):
         LaterGauge("synapse_handlers_presence_wheel_timer_size", "", [],
                    lambda: len(self.wheel_timer))
 
+        # Used to handle sending of presence to newly joined users/servers
+        if hs.config.use_presence:
+            self.notifier.add_replication_callback(self.notify_new_event)
+
+        # Presence is best effort and quickly heals itself, so lets just always
+        # stream from the current state when we restart.
+        self._event_pos = self.store.get_current_events_token()
+        self._event_processing = False
+
     @defer.inlineCallbacks
     def _on_shutdown(self):
         """Gets called when shutting down. This lets us persist any updates that
@@ -751,31 +760,6 @@ class PresenceHandler(object):
         yield self._update_states([prev_state.copy_and_replace(**new_fields)])
 
     @defer.inlineCallbacks
-    def user_joined_room(self, user, room_id):
-        """Called (via the distributor) when a user joins a room. This funciton
-        sends presence updates to servers, either:
-            1. the joining user is a local user and we send their presence to
-               all servers in the room.
-            2. the joining user is a remote user and so we send presence for all
-               local users in the room.
-        """
-        # We only need to send presence to servers that don't have it yet. We
-        # don't need to send to local clients here, as that is done as part
-        # of the event stream/sync.
-        # TODO: Only send to servers not already in the room.
-        if self.is_mine(user):
-            state = yield self.current_state_for_user(user.to_string())
-
-            self._push_to_remotes([state])
-        else:
-            user_ids = yield self.store.get_users_in_room(room_id)
-            user_ids = list(filter(self.is_mine_id, user_ids))
-
-            states = yield self.current_state_for_users(user_ids)
-
-            self._push_to_remotes(list(states.values()))
-
-    @defer.inlineCallbacks
     def get_presence_list(self, observer_user, accepted=None):
         """Returns the presence for all users in their presence list.
         """
@@ -945,6 +929,140 @@ class PresenceHandler(object):
         rows = yield self.store.get_all_presence_updates(last_id, current_id)
         defer.returnValue(rows)
 
+    def notify_new_event(self):
+        """Called when new events have happened. Handles users and servers
+        joining rooms and require being sent presence.
+        """
+
+        if self._event_processing:
+            return
+
+        @defer.inlineCallbacks
+        def _process_presence():
+            assert not self._event_processing
+
+            self._event_processing = True
+            try:
+                yield self._unsafe_process()
+            finally:
+                self._event_processing = False
+
+        run_as_background_process("presence.notify_new_event", _process_presence)
+
+    @defer.inlineCallbacks
+    def _unsafe_process(self):
+        # Loop round handling deltas until we're up to date
+        while True:
+            with Measure(self.clock, "presence_delta"):
+                deltas = yield self.store.get_current_state_deltas(self._event_pos)
+                if not deltas:
+                    return
+
+                yield self._handle_state_delta(deltas)
+
+                self._event_pos = deltas[-1]["stream_id"]
+
+                # Expose current event processing position to prometheus
+                synapse.metrics.event_processing_positions.labels("presence").set(
+                    self._event_pos
+                )
+
+    @defer.inlineCallbacks
+    def _handle_state_delta(self, deltas):
+        """Process current state deltas to find new joins that need to be
+        handled.
+        """
+        for delta in deltas:
+            typ = delta["type"]
+            state_key = delta["state_key"]
+            room_id = delta["room_id"]
+            event_id = delta["event_id"]
+            prev_event_id = delta["prev_event_id"]
+
+            logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
+
+            if typ != EventTypes.Member:
+                continue
+
+            event = yield self.store.get_event(event_id)
+            if event.content.get("membership") != Membership.JOIN:
+                # We only care about joins
+                continue
+
+            if prev_event_id:
+                prev_event = yield self.store.get_event(prev_event_id)
+                if prev_event.content.get("membership") == Membership.JOIN:
+                    # Ignore changes to join events.
+                    continue
+
+            yield self._on_user_joined_room(room_id, state_key)
+
+    @defer.inlineCallbacks
+    def _on_user_joined_room(self, room_id, user_id):
+        """Called when we detect a user joining the room via the current state
+        delta stream.
+
+        Args:
+            room_id (str)
+            user_id (str)
+
+        Returns:
+            Deferred
+        """
+
+        if self.is_mine_id(user_id):
+            # If this is a local user then we need to send their presence
+            # out to hosts in the room (who don't already have it)
+
+            # TODO: We should be able to filter the hosts down to those that
+            # haven't previously seen the user
+
+            state = yield self.current_state_for_user(user_id)
+            hosts = yield self.state.get_current_hosts_in_room(room_id)
+
+            # Filter out ourselves.
+            hosts = set(host for host in hosts if host != self.server_name)
+
+            self.federation.send_presence_to_destinations(
+                states=[state],
+                destinations=hosts,
+            )
+        else:
+            # A remote user has joined the room, so we need to:
+            #   1. Check if this is a new server in the room
+            #   2. If so send any presence they don't already have for
+            #      local users in the room.
+
+            # TODO: We should be able to filter the users down to those that
+            # the server hasn't previously seen
+
+            # TODO: Check that this is actually a new server joining the
+            # room.
+
+            user_ids = yield self.state.get_current_user_in_room(room_id)
+            user_ids = list(filter(self.is_mine_id, user_ids))
+
+            states = yield self.current_state_for_users(user_ids)
+
+            # Filter out old presence, i.e. offline presence states where
+            # the user hasn't been active for a week. We can change this
+            # depending on what we want the UX to be, but at the least we
+            # should filter out offline presence where the state is just the
+            # default state.
+            now = self.clock.time_msec()
+            states = [
+                state for state in states.values()
+                if state.state != PresenceState.OFFLINE
+                or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000
+                or state.status_msg is not None
+            ]
+
+            if states:
+                self.federation.send_presence_to_destinations(
+                    states=states,
+                    destinations=[get_domain_from_id(user_id)],
+                )
+
 
 def should_notify(old_state, new_state):
     """Decides if a presence state change should be sent to interested parties.