diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index e9e0f1338f..6fd1f34289 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -1183,7 +1183,16 @@ class PresenceHandler(BasePresenceHandler):
max_pos, deltas = await self.store.get_current_state_deltas(
self._event_pos, room_max_stream_ordering
)
- await self._handle_state_delta(deltas)
+
+ # We may get multiple deltas for different rooms, but we want to
+ # handle them on a room by room basis, so we batch them up by
+ # room.
+ deltas_by_room: Dict[str, List[JsonDict]] = {}
+ for delta in deltas:
+ deltas_by_room.setdefault(delta["room_id"], []).append(delta)
+
+ for room_id, deltas_for_room in deltas_by_room.items():
+ await self._handle_state_delta(room_id, deltas_for_room)
self._event_pos = max_pos
@@ -1192,9 +1201,9 @@ class PresenceHandler(BasePresenceHandler):
max_pos
)
- async def _handle_state_delta(self, deltas: List[JsonDict]) -> None:
- """Process current state deltas to find new joins that need to be
- handled.
+ async def _handle_state_delta(self, room_id: str, deltas: List[JsonDict]) -> None:
+ """Process current state deltas for the room to find new joins that need
+ to be handled.
"""
# Sets of newly joined users. Note that if the local server is
@@ -1203,9 +1212,10 @@ class PresenceHandler(BasePresenceHandler):
newly_joined_users = set()
for delta in deltas:
+ assert room_id == delta["room_id"]
+
typ = delta["type"]
state_key = delta["state_key"]
- room_id = delta["room_id"]
event_id = delta["event_id"]
prev_event_id = delta["prev_event_id"]
@@ -1256,11 +1266,11 @@ class PresenceHandler(BasePresenceHandler):
# Construct sets for all the local users and remote hosts that were
# already in the room
- prev_local_users = set()
+ prev_local_users = []
prev_remote_hosts = set()
for user_id in prev_users:
if self.is_mine_id(user_id):
- prev_local_users.add(user_id)
+ prev_local_users.append(user_id)
else:
prev_remote_hosts.add(get_domain_from_id(user_id))
@@ -1268,11 +1278,11 @@ class PresenceHandler(BasePresenceHandler):
# that were *not* already in the room. Care needs to be taken with the
# calculating the remote hosts, as a host may have already been in the
# room even if there is a newly joined user from that host.
- newly_joined_local_users = set()
+ newly_joined_local_users = []
newly_joined_remote_hosts = set()
for user_id in newly_joined_users:
if self.is_mine_id(user_id):
- newly_joined_local_users.add(user_id)
+ newly_joined_local_users.append(user_id)
else:
host = get_domain_from_id(user_id)
if host not in prev_remote_hosts:
|