diff options
-rw-r--r-- | changelog.d/9916.feature | 1 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 154 | ||||
-rw-r--r-- | tests/handlers/test_presence.py | 14 |
3 files changed, 87 insertions, 82 deletions
diff --git a/changelog.d/9916.feature b/changelog.d/9916.feature new file mode 100644 index 0000000000..54165cce18 --- /dev/null +++ b/changelog.d/9916.feature @@ -0,0 +1 @@ +Improve performance after joining a large room when presence is enabled. diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 8e085dfbec..6fd1f34289 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -1183,7 +1183,16 @@ class PresenceHandler(BasePresenceHandler): max_pos, deltas = await self.store.get_current_state_deltas( self._event_pos, room_max_stream_ordering ) - await self._handle_state_delta(deltas) + + # We may get multiple deltas for different rooms, but we want to + # handle them on a room by room basis, so we batch them up by + # room. + deltas_by_room: Dict[str, List[JsonDict]] = {} + for delta in deltas: + deltas_by_room.setdefault(delta["room_id"], []).append(delta) + + for room_id, deltas_for_room in deltas_by_room.items(): + await self._handle_state_delta(room_id, deltas_for_room) self._event_pos = max_pos @@ -1192,17 +1201,21 @@ class PresenceHandler(BasePresenceHandler): max_pos ) - async def _handle_state_delta(self, deltas: List[JsonDict]) -> None: - """Process current state deltas to find new joins that need to be - handled. + async def _handle_state_delta(self, room_id: str, deltas: List[JsonDict]) -> None: + """Process current state deltas for the room to find new joins that need + to be handled. """ - # A map of destination to a set of user state that they should receive - presence_destinations = {} # type: Dict[str, Set[UserPresenceState]] + + # Sets of newly joined users. Note that if the local server is + # joining a remote room for the first time we'll see both the joining + # user and all remote users as newly joined. + newly_joined_users = set() for delta in deltas: + assert room_id == delta["room_id"] + typ = delta["type"] state_key = delta["state_key"] - room_id = delta["room_id"] event_id = delta["event_id"] prev_event_id = delta["prev_event_id"] @@ -1231,72 +1244,55 @@ class PresenceHandler(BasePresenceHandler): # Ignore changes to join events. continue - # Retrieve any user presence state updates that need to be sent as a result, - # and the destinations that need to receive it - destinations, user_presence_states = await self._on_user_joined_room( - room_id, state_key - ) - - # Insert the destinations and respective updates into our destinations dict - for destination in destinations: - presence_destinations.setdefault(destination, set()).update( - user_presence_states - ) - - # Send out user presence updates for each destination - for destination, user_state_set in presence_destinations.items(): - self._federation_queue.send_presence_to_destinations( - destinations=[destination], states=user_state_set - ) - - async def _on_user_joined_room( - self, room_id: str, user_id: str - ) -> Tuple[List[str], List[UserPresenceState]]: - """Called when we detect a user joining the room via the current state - delta stream. Returns the destinations that need to be updated and the - presence updates to send to them. - - Args: - room_id: The ID of the room that the user has joined. - user_id: The ID of the user that has joined the room. - - Returns: - A tuple of destinations and presence updates to send to them. - """ - if self.is_mine_id(user_id): - # If this is a local user then we need to send their presence - # out to hosts in the room (who don't already have it) - - # TODO: We should be able to filter the hosts down to those that - # haven't previously seen the user - - remote_hosts = await self.state.get_current_hosts_in_room(room_id) + newly_joined_users.add(state_key) - # Filter out ourselves. - filtered_remote_hosts = [ - host for host in remote_hosts if host != self.server_name - ] - - state = await self.current_state_for_user(user_id) - return filtered_remote_hosts, [state] - else: - # A remote user has joined the room, so we need to: - # 1. Check if this is a new server in the room - # 2. If so send any presence they don't already have for - # local users in the room. - - # TODO: We should be able to filter the users down to those that - # the server hasn't previously seen - - # TODO: Check that this is actually a new server joining the - # room. - - remote_host = get_domain_from_id(user_id) + if not newly_joined_users: + # If nobody has joined then there's nothing to do. + return - users = await self.store.get_users_in_room(room_id) - user_ids = list(filter(self.is_mine_id, users)) + # We want to send: + # 1. presence states of all local users in the room to newly joined + # remote servers + # 2. presence states of newly joined users to all remote servers in + # the room. + # + # TODO: Only send presence states to remote hosts that don't already + # have them (because they already share rooms). + + # Get all the users who were already in the room, by fetching the + # current users in the room and removing the newly joined users. + users = await self.store.get_users_in_room(room_id) + prev_users = set(users) - newly_joined_users + + # Construct sets for all the local users and remote hosts that were + # already in the room + prev_local_users = [] + prev_remote_hosts = set() + for user_id in prev_users: + if self.is_mine_id(user_id): + prev_local_users.append(user_id) + else: + prev_remote_hosts.add(get_domain_from_id(user_id)) + + # Similarly, construct sets for all the local users and remote hosts + # that were *not* already in the room. Care needs to be taken with the + # calculating the remote hosts, as a host may have already been in the + # room even if there is a newly joined user from that host. + newly_joined_local_users = [] + newly_joined_remote_hosts = set() + for user_id in newly_joined_users: + if self.is_mine_id(user_id): + newly_joined_local_users.append(user_id) + else: + host = get_domain_from_id(user_id) + if host not in prev_remote_hosts: + newly_joined_remote_hosts.add(host) - states_d = await self.current_state_for_users(user_ids) + # Send presence states of all local users in the room to newly joined + # remote servers. (We actually only send states for local users already + # in the room, as we'll send states for newly joined local users below.) + if prev_local_users and newly_joined_remote_hosts: + local_states = await self.current_state_for_users(prev_local_users) # Filter out old presence, i.e. offline presence states where # the user hasn't been active for a week. We can change this @@ -1306,13 +1302,27 @@ class PresenceHandler(BasePresenceHandler): now = self.clock.time_msec() states = [ state - for state in states_d.values() + for state in local_states.values() if state.state != PresenceState.OFFLINE or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000 or state.status_msg is not None ] - return [remote_host], states + self._federation_queue.send_presence_to_destinations( + destinations=newly_joined_remote_hosts, + states=states, + ) + + # Send presence states of newly joined users to all remote servers in + # the room + if newly_joined_local_users and ( + prev_remote_hosts or newly_joined_remote_hosts + ): + local_states = await self.current_state_for_users(newly_joined_local_users) + self._federation_queue.send_presence_to_destinations( + destinations=prev_remote_hosts | newly_joined_remote_hosts, + states=list(local_states.values()), + ) def should_notify(old_state: UserPresenceState, new_state: UserPresenceState) -> bool: diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index ce330e79cc..1ffab709fc 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -729,7 +729,7 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase): ) self.assertEqual(expected_state.state, PresenceState.ONLINE) self.federation_sender.send_presence_to_destinations.assert_called_once_with( - destinations=["server2"], states={expected_state} + destinations={"server2"}, states=[expected_state] ) # @@ -740,7 +740,7 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase): self._add_new_user(room_id, "@bob:server3") self.federation_sender.send_presence_to_destinations.assert_called_once_with( - destinations=["server3"], states={expected_state} + destinations={"server3"}, states=[expected_state] ) def test_remote_gets_presence_when_local_user_joins(self): @@ -788,14 +788,8 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase): self.presence_handler.current_state_for_user("@test2:server") ) self.assertEqual(expected_state.state, PresenceState.ONLINE) - self.assertEqual( - self.federation_sender.send_presence_to_destinations.call_count, 2 - ) - self.federation_sender.send_presence_to_destinations.assert_any_call( - destinations=["server3"], states={expected_state} - ) - self.federation_sender.send_presence_to_destinations.assert_any_call( - destinations=["server2"], states={expected_state} + self.federation_sender.send_presence_to_destinations.assert_called_once_with( + destinations={"server2", "server3"}, states=[expected_state] ) def _add_new_user(self, room_id, user_id): |