diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 8e085dfbec..6fd1f34289 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -1183,7 +1183,16 @@ class PresenceHandler(BasePresenceHandler):
max_pos, deltas = await self.store.get_current_state_deltas(
self._event_pos, room_max_stream_ordering
)
- await self._handle_state_delta(deltas)
+
+ # We may get multiple deltas for different rooms, but we want to
+ # handle them on a room by room basis, so we batch them up by
+ # room.
+ deltas_by_room: Dict[str, List[JsonDict]] = {}
+ for delta in deltas:
+ deltas_by_room.setdefault(delta["room_id"], []).append(delta)
+
+ for room_id, deltas_for_room in deltas_by_room.items():
+ await self._handle_state_delta(room_id, deltas_for_room)
self._event_pos = max_pos
@@ -1192,17 +1201,21 @@ class PresenceHandler(BasePresenceHandler):
max_pos
)
- async def _handle_state_delta(self, deltas: List[JsonDict]) -> None:
- """Process current state deltas to find new joins that need to be
- handled.
+ async def _handle_state_delta(self, room_id: str, deltas: List[JsonDict]) -> None:
+ """Process current state deltas for the room to find new joins that need
+ to be handled.
"""
- # A map of destination to a set of user state that they should receive
- presence_destinations = {} # type: Dict[str, Set[UserPresenceState]]
+
+ # Sets of newly joined users. Note that if the local server is
+ # joining a remote room for the first time we'll see both the joining
+ # user and all remote users as newly joined.
+ newly_joined_users = set()
for delta in deltas:
+ assert room_id == delta["room_id"]
+
typ = delta["type"]
state_key = delta["state_key"]
- room_id = delta["room_id"]
event_id = delta["event_id"]
prev_event_id = delta["prev_event_id"]
@@ -1231,72 +1244,55 @@ class PresenceHandler(BasePresenceHandler):
# Ignore changes to join events.
continue
- # Retrieve any user presence state updates that need to be sent as a result,
- # and the destinations that need to receive it
- destinations, user_presence_states = await self._on_user_joined_room(
- room_id, state_key
- )
-
- # Insert the destinations and respective updates into our destinations dict
- for destination in destinations:
- presence_destinations.setdefault(destination, set()).update(
- user_presence_states
- )
-
- # Send out user presence updates for each destination
- for destination, user_state_set in presence_destinations.items():
- self._federation_queue.send_presence_to_destinations(
- destinations=[destination], states=user_state_set
- )
-
- async def _on_user_joined_room(
- self, room_id: str, user_id: str
- ) -> Tuple[List[str], List[UserPresenceState]]:
- """Called when we detect a user joining the room via the current state
- delta stream. Returns the destinations that need to be updated and the
- presence updates to send to them.
-
- Args:
- room_id: The ID of the room that the user has joined.
- user_id: The ID of the user that has joined the room.
-
- Returns:
- A tuple of destinations and presence updates to send to them.
- """
- if self.is_mine_id(user_id):
- # If this is a local user then we need to send their presence
- # out to hosts in the room (who don't already have it)
-
- # TODO: We should be able to filter the hosts down to those that
- # haven't previously seen the user
-
- remote_hosts = await self.state.get_current_hosts_in_room(room_id)
+ newly_joined_users.add(state_key)
- # Filter out ourselves.
- filtered_remote_hosts = [
- host for host in remote_hosts if host != self.server_name
- ]
-
- state = await self.current_state_for_user(user_id)
- return filtered_remote_hosts, [state]
- else:
- # A remote user has joined the room, so we need to:
- # 1. Check if this is a new server in the room
- # 2. If so send any presence they don't already have for
- # local users in the room.
-
- # TODO: We should be able to filter the users down to those that
- # the server hasn't previously seen
-
- # TODO: Check that this is actually a new server joining the
- # room.
-
- remote_host = get_domain_from_id(user_id)
+ if not newly_joined_users:
+ # If nobody has joined then there's nothing to do.
+ return
- users = await self.store.get_users_in_room(room_id)
- user_ids = list(filter(self.is_mine_id, users))
+ # We want to send:
+ # 1. presence states of all local users in the room to newly joined
+ # remote servers
+ # 2. presence states of newly joined users to all remote servers in
+ # the room.
+ #
+ # TODO: Only send presence states to remote hosts that don't already
+ # have them (because they already share rooms).
+
+ # Get all the users who were already in the room, by fetching the
+ # current users in the room and removing the newly joined users.
+ users = await self.store.get_users_in_room(room_id)
+ prev_users = set(users) - newly_joined_users
+
+ # Construct sets for all the local users and remote hosts that were
+ # already in the room
+ prev_local_users = []
+ prev_remote_hosts = set()
+ for user_id in prev_users:
+ if self.is_mine_id(user_id):
+ prev_local_users.append(user_id)
+ else:
+ prev_remote_hosts.add(get_domain_from_id(user_id))
+
+ # Similarly, construct sets for all the local users and remote hosts
+ # that were *not* already in the room. Care needs to be taken with the
+ # calculating the remote hosts, as a host may have already been in the
+ # room even if there is a newly joined user from that host.
+ newly_joined_local_users = []
+ newly_joined_remote_hosts = set()
+ for user_id in newly_joined_users:
+ if self.is_mine_id(user_id):
+ newly_joined_local_users.append(user_id)
+ else:
+ host = get_domain_from_id(user_id)
+ if host not in prev_remote_hosts:
+ newly_joined_remote_hosts.add(host)
- states_d = await self.current_state_for_users(user_ids)
+ # Send presence states of all local users in the room to newly joined
+ # remote servers. (We actually only send states for local users already
+ # in the room, as we'll send states for newly joined local users below.)
+ if prev_local_users and newly_joined_remote_hosts:
+ local_states = await self.current_state_for_users(prev_local_users)
# Filter out old presence, i.e. offline presence states where
# the user hasn't been active for a week. We can change this
@@ -1306,13 +1302,27 @@ class PresenceHandler(BasePresenceHandler):
now = self.clock.time_msec()
states = [
state
- for state in states_d.values()
+ for state in local_states.values()
if state.state != PresenceState.OFFLINE
or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000
or state.status_msg is not None
]
- return [remote_host], states
+ self._federation_queue.send_presence_to_destinations(
+ destinations=newly_joined_remote_hosts,
+ states=states,
+ )
+
+ # Send presence states of newly joined users to all remote servers in
+ # the room
+ if newly_joined_local_users and (
+ prev_remote_hosts or newly_joined_remote_hosts
+ ):
+ local_states = await self.current_state_for_users(newly_joined_local_users)
+ self._federation_queue.send_presence_to_destinations(
+ destinations=prev_remote_hosts | newly_joined_remote_hosts,
+ states=list(local_states.values()),
+ )
def should_notify(old_state: UserPresenceState, new_state: UserPresenceState) -> bool:
|