diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index fb85b19770..b6a9ce4f38 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -849,6 +849,9 @@ class PresenceHandler(BasePresenceHandler):
"""Process current state deltas to find new joins that need to be
handled.
"""
+ # A map of destination to a set of user state that they should receive
+ presence_destinations = {} # type: Dict[str, Set[UserPresenceState]]
+
for delta in deltas:
typ = delta["type"]
state_key = delta["state_key"]
@@ -858,6 +861,7 @@ class PresenceHandler(BasePresenceHandler):
logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
+ # Drop any event that isn't a membership join
if typ != EventTypes.Member:
continue
@@ -880,13 +884,38 @@ class PresenceHandler(BasePresenceHandler):
# Ignore changes to join events.
continue
- await self._on_user_joined_room(room_id, state_key)
+ # Retrieve any user presence state updates that need to be sent as a result,
+ # and the destinations that need to receive it
+ destinations, user_presence_states = await self._on_user_joined_room(
+ room_id, state_key
+ )
+
+ # Insert the destinations and respective updates into our destinations dict
+ for destination in destinations:
+ presence_destinations.setdefault(destination, set()).update(
+ user_presence_states
+ )
+
+ # Send out user presence updates for each destination
+ for destination, user_state_set in presence_destinations.items():
+ self.federation.send_presence_to_destinations(
+ destinations=[destination], states=user_state_set
+ )
- async def _on_user_joined_room(self, room_id: str, user_id: str) -> None:
+ async def _on_user_joined_room(
+ self, room_id: str, user_id: str
+ ) -> Tuple[List[str], List[UserPresenceState]]:
"""Called when we detect a user joining the room via the current state
- delta stream.
- """
+ delta stream. Returns the destinations that need to be updated and the
+ presence updates to send to them.
+
+ Args:
+ room_id: The ID of the room that the user has joined.
+ user_id: The ID of the user that has joined the room.
+ Returns:
+ A tuple of destinations and presence updates to send to them.
+ """
if self.is_mine_id(user_id):
# If this is a local user then we need to send their presence
# out to hosts in the room (who don't already have it)
@@ -894,15 +923,15 @@ class PresenceHandler(BasePresenceHandler):
# TODO: We should be able to filter the hosts down to those that
# haven't previously seen the user
- state = await self.current_state_for_user(user_id)
- hosts = await self.state.get_current_hosts_in_room(room_id)
+ remote_hosts = await self.state.get_current_hosts_in_room(room_id)
# Filter out ourselves.
- hosts = {host for host in hosts if host != self.server_name}
+ filtered_remote_hosts = [
+ host for host in remote_hosts if host != self.server_name
+ ]
- self.federation.send_presence_to_destinations(
- states=[state], destinations=hosts
- )
+ state = await self.current_state_for_user(user_id)
+ return filtered_remote_hosts, [state]
else:
# A remote user has joined the room, so we need to:
# 1. Check if this is a new server in the room
@@ -915,6 +944,8 @@ class PresenceHandler(BasePresenceHandler):
# TODO: Check that this is actually a new server joining the
# room.
+ remote_host = get_domain_from_id(user_id)
+
users = await self.state.get_current_users_in_room(room_id)
user_ids = list(filter(self.is_mine_id, users))
@@ -934,10 +965,7 @@ class PresenceHandler(BasePresenceHandler):
or state.status_msg is not None
]
- if states:
- self.federation.send_presence_to_destinations(
- states=states, destinations=[get_domain_from_id(user_id)]
- )
+ return [remote_host], states
def should_notify(old_state, new_state):
|