summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorErik Johnston <erikj@jki.re>2019-03-28 14:38:31 +0000
committerGitHub <noreply@github.com>2019-03-28 14:38:31 +0000
commit248014379e173b131138d278517394ffe1906cbe (patch)
tree6704350d85a1e498842f8d51314be49516d44f2b /synapse
parentMerge pull request #4953 from matrix-org/rav/refactor_replication_streams (diff)
parentUse an assert (diff)
downloadsynapse-248014379e173b131138d278517394ffe1906cbe.tar.xz
Merge pull request #4942 from matrix-org/erikj/fix_presence
Use event streams to calculate presence
Diffstat (limited to 'synapse')
-rw-r--r--synapse/federation/send_queue.py73
-rw-r--r--synapse/federation/sender/__init__.py19
-rw-r--r--synapse/handlers/presence.py176
3 files changed, 236 insertions, 32 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 04d04a4457..0240b339b0 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -55,7 +55,12 @@ class FederationRemoteSendQueue(object):
         self.is_mine_id = hs.is_mine_id
 
         self.presence_map = {}  # Pending presence map user_id -> UserPresenceState
-        self.presence_changed = SortedDict()  # Stream position -> user_id
+        self.presence_changed = SortedDict()  # Stream position -> list[user_id]
+
+        # Stores the destinations we need to explicitly send presence to about a
+        # given user.
+        # Stream position -> (user_id, destinations)
+        self.presence_destinations = SortedDict()
 
         self.keyed_edu = {}  # (destination, key) -> EDU
         self.keyed_edu_changed = SortedDict()  # stream position -> (destination, key)
@@ -77,7 +82,7 @@ class FederationRemoteSendQueue(object):
 
         for queue_name in [
             "presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed",
-            "edus", "device_messages", "pos_time",
+            "edus", "device_messages", "pos_time", "presence_destinations",
         ]:
             register(queue_name, getattr(self, queue_name))
 
@@ -121,6 +126,15 @@ class FederationRemoteSendQueue(object):
                 for user_id in uids
             )
 
+            keys = self.presence_destinations.keys()
+            i = self.presence_destinations.bisect_left(position_to_delete)
+            for key in keys[:i]:
+                del self.presence_destinations[key]
+
+            user_ids.update(
+                user_id for user_id, _ in self.presence_destinations.values()
+            )
+
             to_del = [
                 user_id for user_id in self.presence_map if user_id not in user_ids
             ]
@@ -209,6 +223,20 @@ class FederationRemoteSendQueue(object):
 
         self.notifier.on_new_replication_data()
 
+    def send_presence_to_destinations(self, states, destinations):
+        """As per FederationSender
+
+        Args:
+            states (list[UserPresenceState])
+            destinations (list[str])
+        """
+        for state in states:
+            pos = self._next_pos()
+            self.presence_map.update({state.user_id: state for state in states})
+            self.presence_destinations[pos] = (state.user_id, destinations)
+
+        self.notifier.on_new_replication_data()
+
     def send_device_messages(self, destination):
         """As per FederationSender"""
         pos = self._next_pos()
@@ -261,6 +289,16 @@ class FederationRemoteSendQueue(object):
                 state=self.presence_map[user_id],
             )))
 
+        # Fetch presence to send to destinations
+        i = self.presence_destinations.bisect_right(from_token)
+        j = self.presence_destinations.bisect_right(to_token) + 1
+
+        for pos, (user_id, dests) in self.presence_destinations.items()[i:j]:
+            rows.append((pos, PresenceDestinationsRow(
+                state=self.presence_map[user_id],
+                destinations=list(dests),
+            )))
+
         # Fetch changes keyed edus
         i = self.keyed_edu_changed.bisect_right(from_token)
         j = self.keyed_edu_changed.bisect_right(to_token) + 1
@@ -357,6 +395,29 @@ class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", (
         buff.presence.append(self.state)
 
 
+class PresenceDestinationsRow(BaseFederationRow, namedtuple("PresenceDestinationsRow", (
+    "state",  # UserPresenceState
+    "destinations",  # list[str]
+))):
+    TypeId = "pd"
+
+    @staticmethod
+    def from_data(data):
+        return PresenceDestinationsRow(
+            state=UserPresenceState.from_dict(data["state"]),
+            destinations=data["dests"],
+        )
+
+    def to_data(self):
+        return {
+            "state": self.state.as_dict(),
+            "dests": self.destinations,
+        }
+
+    def add_to_buffer(self, buff):
+        buff.presence_destinations.append((self.state, self.destinations))
+
+
 class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", (
     "key",  # tuple(str) - the edu key passed to send_edu
     "edu",  # Edu
@@ -428,6 +489,7 @@ TypeToRow = {
     Row.TypeId: Row
     for Row in (
         PresenceRow,
+        PresenceDestinationsRow,
         KeyedEduRow,
         EduRow,
         DeviceRow,
@@ -437,6 +499,7 @@ TypeToRow = {
 
 ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", (
     "presence",  # list(UserPresenceState)
+    "presence_destinations",  # list of tuples of UserPresenceState and destinations
     "keyed_edus",  # dict of destination -> { key -> Edu }
     "edus",  # dict of destination -> [Edu]
     "device_destinations",  # set of destinations
@@ -458,6 +521,7 @@ def process_rows_for_federation(transaction_queue, rows):
 
     buff = ParsedFederationStreamData(
         presence=[],
+        presence_destinations=[],
         keyed_edus={},
         edus={},
         device_destinations=set(),
@@ -476,6 +540,11 @@ def process_rows_for_federation(transaction_queue, rows):
     if buff.presence:
         transaction_queue.send_presence(buff.presence)
 
+    for state, destinations in buff.presence_destinations:
+        transaction_queue.send_presence_to_destinations(
+            states=[state], destinations=destinations,
+        )
+
     for destination, edu_map in iteritems(buff.keyed_edus):
         for key, edu in edu_map.items():
             transaction_queue.send_edu(edu, key)
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 1dc041752b..4f0f939102 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -371,7 +371,7 @@ class FederationSender(object):
             return
 
         # First we queue up the new presence by user ID, so multiple presence
-        # updates in quick successtion are correctly handled
+        # updates in quick succession are correctly handled.
         # We only want to send presence for our own users, so lets always just
         # filter here just in case.
         self.pending_presence.update({
@@ -402,6 +402,23 @@ class FederationSender(object):
         finally:
             self._processing_pending_presence = False
 
+    def send_presence_to_destinations(self, states, destinations):
+        """Send the given presence states to the given destinations.
+
+        Args:
+            states (list[UserPresenceState])
+            destinations (list[str])
+        """
+
+        if not states or not self.hs.config.use_presence:
+            # No-op if presence is disabled.
+            return
+
+        for destination in destinations:
+            if destination == self.server_name:
+                continue
+            self._get_per_destination_queue(destination).send_presence(states)
+
     @measure_func("txnqueue._process_presence")
     @defer.inlineCallbacks
     def _process_presence_inner(self, states):
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 37e87fc054..e85c49742d 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -31,9 +31,11 @@ from prometheus_client import Counter
 
 from twisted.internet import defer
 
-from synapse.api.constants import PresenceState
+import synapse.metrics
+from synapse.api.constants import EventTypes, Membership, PresenceState
 from synapse.api.errors import SynapseError
 from synapse.metrics import LaterGauge
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.presence import UserPresenceState
 from synapse.types import UserID, get_domain_from_id
 from synapse.util.async_helpers import Linearizer
@@ -98,6 +100,7 @@ class PresenceHandler(object):
         self.hs = hs
         self.is_mine = hs.is_mine
         self.is_mine_id = hs.is_mine_id
+        self.server_name = hs.hostname
         self.clock = hs.get_clock()
         self.store = hs.get_datastore()
         self.wheel_timer = WheelTimer()
@@ -132,9 +135,6 @@ class PresenceHandler(object):
             )
         )
 
-        distributor = hs.get_distributor()
-        distributor.observe("user_joined_room", self.user_joined_room)
-
         active_presence = self.store.take_presence_startup_info()
 
         # A dictionary of the current state of users. This is prefilled with
@@ -220,6 +220,15 @@ class PresenceHandler(object):
         LaterGauge("synapse_handlers_presence_wheel_timer_size", "", [],
                    lambda: len(self.wheel_timer))
 
+        # Used to handle sending of presence to newly joined users/servers
+        if hs.config.use_presence:
+            self.notifier.add_replication_callback(self.notify_new_event)
+
+        # Presence is best effort and quickly heals itself, so lets just always
+        # stream from the current state when we restart.
+        self._event_pos = self.store.get_current_events_token()
+        self._event_processing = False
+
     @defer.inlineCallbacks
     def _on_shutdown(self):
         """Gets called when shutting down. This lets us persist any updates that
@@ -751,31 +760,6 @@ class PresenceHandler(object):
         yield self._update_states([prev_state.copy_and_replace(**new_fields)])
 
     @defer.inlineCallbacks
-    def user_joined_room(self, user, room_id):
-        """Called (via the distributor) when a user joins a room. This funciton
-        sends presence updates to servers, either:
-            1. the joining user is a local user and we send their presence to
-               all servers in the room.
-            2. the joining user is a remote user and so we send presence for all
-               local users in the room.
-        """
-        # We only need to send presence to servers that don't have it yet. We
-        # don't need to send to local clients here, as that is done as part
-        # of the event stream/sync.
-        # TODO: Only send to servers not already in the room.
-        if self.is_mine(user):
-            state = yield self.current_state_for_user(user.to_string())
-
-            self._push_to_remotes([state])
-        else:
-            user_ids = yield self.store.get_users_in_room(room_id)
-            user_ids = list(filter(self.is_mine_id, user_ids))
-
-            states = yield self.current_state_for_users(user_ids)
-
-            self._push_to_remotes(list(states.values()))
-
-    @defer.inlineCallbacks
     def get_presence_list(self, observer_user, accepted=None):
         """Returns the presence for all users in their presence list.
         """
@@ -945,6 +929,140 @@ class PresenceHandler(object):
         rows = yield self.store.get_all_presence_updates(last_id, current_id)
         defer.returnValue(rows)
 
+    def notify_new_event(self):
+        """Called when new events have happened. Handles users and servers
+        joining rooms and require being sent presence.
+        """
+
+        if self._event_processing:
+            return
+
+        @defer.inlineCallbacks
+        def _process_presence():
+            assert not self._event_processing
+
+            self._event_processing = True
+            try:
+                yield self._unsafe_process()
+            finally:
+                self._event_processing = False
+
+        run_as_background_process("presence.notify_new_event", _process_presence)
+
+    @defer.inlineCallbacks
+    def _unsafe_process(self):
+        # Loop round handling deltas until we're up to date
+        while True:
+            with Measure(self.clock, "presence_delta"):
+                deltas = yield self.store.get_current_state_deltas(self._event_pos)
+                if not deltas:
+                    return
+
+                yield self._handle_state_delta(deltas)
+
+                self._event_pos = deltas[-1]["stream_id"]
+
+                # Expose current event processing position to prometheus
+                synapse.metrics.event_processing_positions.labels("presence").set(
+                    self._event_pos
+                )
+
+    @defer.inlineCallbacks
+    def _handle_state_delta(self, deltas):
+        """Process current state deltas to find new joins that need to be
+        handled.
+        """
+        for delta in deltas:
+            typ = delta["type"]
+            state_key = delta["state_key"]
+            room_id = delta["room_id"]
+            event_id = delta["event_id"]
+            prev_event_id = delta["prev_event_id"]
+
+            logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
+
+            if typ != EventTypes.Member:
+                continue
+
+            event = yield self.store.get_event(event_id)
+            if event.content.get("membership") != Membership.JOIN:
+                # We only care about joins
+                continue
+
+            if prev_event_id:
+                prev_event = yield self.store.get_event(prev_event_id)
+                if prev_event.content.get("membership") == Membership.JOIN:
+                    # Ignore changes to join events.
+                    continue
+
+            yield self._on_user_joined_room(room_id, state_key)
+
+    @defer.inlineCallbacks
+    def _on_user_joined_room(self, room_id, user_id):
+        """Called when we detect a user joining the room via the current state
+        delta stream.
+
+        Args:
+            room_id (str)
+            user_id (str)
+
+        Returns:
+            Deferred
+        """
+
+        if self.is_mine_id(user_id):
+            # If this is a local user then we need to send their presence
+            # out to hosts in the room (who don't already have it)
+
+            # TODO: We should be able to filter the hosts down to those that
+            # haven't previously seen the user
+
+            state = yield self.current_state_for_user(user_id)
+            hosts = yield self.state.get_current_hosts_in_room(room_id)
+
+            # Filter out ourselves.
+            hosts = set(host for host in hosts if host != self.server_name)
+
+            self.federation.send_presence_to_destinations(
+                states=[state],
+                destinations=hosts,
+            )
+        else:
+            # A remote user has joined the room, so we need to:
+            #   1. Check if this is a new server in the room
+            #   2. If so send any presence they don't already have for
+            #      local users in the room.
+
+            # TODO: We should be able to filter the users down to those that
+            # the server hasn't previously seen
+
+            # TODO: Check that this is actually a new server joining the
+            # room.
+
+            user_ids = yield self.state.get_current_user_in_room(room_id)
+            user_ids = list(filter(self.is_mine_id, user_ids))
+
+            states = yield self.current_state_for_users(user_ids)
+
+            # Filter out old presence, i.e. offline presence states where
+            # the user hasn't been active for a week. We can change this
+            # depending on what we want the UX to be, but at the least we
+            # should filter out offline presence where the state is just the
+            # default state.
+            now = self.clock.time_msec()
+            states = [
+                state for state in states.values()
+                if state.state != PresenceState.OFFLINE
+                or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000
+                or state.status_msg is not None
+            ]
+
+            if states:
+                self.federation.send_presence_to_destinations(
+                    states=states,
+                    destinations=[get_domain_from_id(user_id)],
+                )
+
 
 def should_notify(old_state, new_state):
     """Decides if a presence state change should be sent to interested parties.