summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/presence.py154
1 files changed, 82 insertions, 72 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 8e085dfbec..6fd1f34289 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -1183,7 +1183,16 @@ class PresenceHandler(BasePresenceHandler):
                 max_pos, deltas = await self.store.get_current_state_deltas(
                     self._event_pos, room_max_stream_ordering
                 )
-                await self._handle_state_delta(deltas)
+
+                # We may get multiple deltas for different rooms, but we want to
+                # handle them on a room by room basis, so we batch them up by
+                # room.
+                deltas_by_room: Dict[str, List[JsonDict]] = {}
+                for delta in deltas:
+                    deltas_by_room.setdefault(delta["room_id"], []).append(delta)
+
+                for room_id, deltas_for_room in deltas_by_room.items():
+                    await self._handle_state_delta(room_id, deltas_for_room)
 
                 self._event_pos = max_pos
 
@@ -1192,17 +1201,21 @@ class PresenceHandler(BasePresenceHandler):
                     max_pos
                 )
 
-    async def _handle_state_delta(self, deltas: List[JsonDict]) -> None:
-        """Process current state deltas to find new joins that need to be
-        handled.
+    async def _handle_state_delta(self, room_id: str, deltas: List[JsonDict]) -> None:
+        """Process current state deltas for the room to find new joins that need
+        to be handled.
         """
-        # A map of destination to a set of user state that they should receive
-        presence_destinations = {}  # type: Dict[str, Set[UserPresenceState]]
+
+        # Sets of newly joined users. Note that if the local server is
+        # joining a remote room for the first time we'll see both the joining
+        # user and all remote users as newly joined.
+        newly_joined_users = set()
 
         for delta in deltas:
+            assert room_id == delta["room_id"]
+
             typ = delta["type"]
             state_key = delta["state_key"]
-            room_id = delta["room_id"]
             event_id = delta["event_id"]
             prev_event_id = delta["prev_event_id"]
 
@@ -1231,72 +1244,55 @@ class PresenceHandler(BasePresenceHandler):
                     # Ignore changes to join events.
                     continue
 
-            # Retrieve any user presence state updates that need to be sent as a result,
-            # and the destinations that need to receive it
-            destinations, user_presence_states = await self._on_user_joined_room(
-                room_id, state_key
-            )
-
-            # Insert the destinations and respective updates into our destinations dict
-            for destination in destinations:
-                presence_destinations.setdefault(destination, set()).update(
-                    user_presence_states
-                )
-
-        # Send out user presence updates for each destination
-        for destination, user_state_set in presence_destinations.items():
-            self._federation_queue.send_presence_to_destinations(
-                destinations=[destination], states=user_state_set
-            )
-
-    async def _on_user_joined_room(
-        self, room_id: str, user_id: str
-    ) -> Tuple[List[str], List[UserPresenceState]]:
-        """Called when we detect a user joining the room via the current state
-        delta stream. Returns the destinations that need to be updated and the
-        presence updates to send to them.
-
-        Args:
-            room_id: The ID of the room that the user has joined.
-            user_id: The ID of the user that has joined the room.
-
-        Returns:
-            A tuple of destinations and presence updates to send to them.
-        """
-        if self.is_mine_id(user_id):
-            # If this is a local user then we need to send their presence
-            # out to hosts in the room (who don't already have it)
-
-            # TODO: We should be able to filter the hosts down to those that
-            # haven't previously seen the user
-
-            remote_hosts = await self.state.get_current_hosts_in_room(room_id)
+            newly_joined_users.add(state_key)
 
-            # Filter out ourselves.
-            filtered_remote_hosts = [
-                host for host in remote_hosts if host != self.server_name
-            ]
-
-            state = await self.current_state_for_user(user_id)
-            return filtered_remote_hosts, [state]
-        else:
-            # A remote user has joined the room, so we need to:
-            #   1. Check if this is a new server in the room
-            #   2. If so send any presence they don't already have for
-            #      local users in the room.
-
-            # TODO: We should be able to filter the users down to those that
-            # the server hasn't previously seen
-
-            # TODO: Check that this is actually a new server joining the
-            # room.
-
-            remote_host = get_domain_from_id(user_id)
+        if not newly_joined_users:
+            # If nobody has joined then there's nothing to do.
+            return
 
-            users = await self.store.get_users_in_room(room_id)
-            user_ids = list(filter(self.is_mine_id, users))
+        # We want to send:
+        #   1. presence states of all local users in the room to newly joined
+        #      remote servers
+        #   2. presence states of newly joined users to all remote servers in
+        #      the room.
+        #
+        # TODO: Only send presence states to remote hosts that don't already
+        # have them (because they already share rooms).
+
+        # Get all the users who were already in the room, by fetching the
+        # current users in the room and removing the newly joined users.
+        users = await self.store.get_users_in_room(room_id)
+        prev_users = set(users) - newly_joined_users
+
+        # Construct sets for all the local users and remote hosts that were
+        # already in the room
+        prev_local_users = []
+        prev_remote_hosts = set()
+        for user_id in prev_users:
+            if self.is_mine_id(user_id):
+                prev_local_users.append(user_id)
+            else:
+                prev_remote_hosts.add(get_domain_from_id(user_id))
+
+        # Similarly, construct sets for all the local users and remote hosts
+        # that were *not* already in the room. Care needs to be taken with the
+        # calculating the remote hosts, as a host may have already been in the
+        # room even if there is a newly joined user from that host.
+        newly_joined_local_users = []
+        newly_joined_remote_hosts = set()
+        for user_id in newly_joined_users:
+            if self.is_mine_id(user_id):
+                newly_joined_local_users.append(user_id)
+            else:
+                host = get_domain_from_id(user_id)
+                if host not in prev_remote_hosts:
+                    newly_joined_remote_hosts.add(host)
 
-            states_d = await self.current_state_for_users(user_ids)
+        # Send presence states of all local users in the room to newly joined
+        # remote servers. (We actually only send states for local users already
+        # in the room, as we'll send states for newly joined local users below.)
+        if prev_local_users and newly_joined_remote_hosts:
+            local_states = await self.current_state_for_users(prev_local_users)
 
             # Filter out old presence, i.e. offline presence states where
             # the user hasn't been active for a week. We can change this
@@ -1306,13 +1302,27 @@ class PresenceHandler(BasePresenceHandler):
             now = self.clock.time_msec()
             states = [
                 state
-                for state in states_d.values()
+                for state in local_states.values()
                 if state.state != PresenceState.OFFLINE
                 or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000
                 or state.status_msg is not None
             ]
 
-            return [remote_host], states
+            self._federation_queue.send_presence_to_destinations(
+                destinations=newly_joined_remote_hosts,
+                states=states,
+            )
+
+        # Send presence states of newly joined users to all remote servers in
+        # the room
+        if newly_joined_local_users and (
+            prev_remote_hosts or newly_joined_remote_hosts
+        ):
+            local_states = await self.current_state_for_users(newly_joined_local_users)
+            self._federation_queue.send_presence_to_destinations(
+                destinations=prev_remote_hosts | newly_joined_remote_hosts,
+                states=list(local_states.values()),
+            )
 
 
 def should_notify(old_state: UserPresenceState, new_state: UserPresenceState) -> bool: