Process state deltas in presence by room
1 files changed, 15 insertions, 5 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index e9e0f1338f..1cbe65f72d 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -1183,7 +1183,16 @@ class PresenceHandler(BasePresenceHandler):
max_pos, deltas = await self.store.get_current_state_deltas(
self._event_pos, room_max_stream_ordering
)
- await self._handle_state_delta(deltas)
+
+ # We may get multiple deltas for different rooms, but we want to
+ # handle them on a room by room basis, so we batch them up by
+ # room.
+ deltas_by_room: Dict[str, List[JsonDict]] = {}
+ for delta in deltas:
+ deltas_by_room.setdefault(delta["room_id"], []).append(delta)
+
+ for room_id, deltas_for_room in deltas_by_room.items():
+ await self._handle_state_delta(room_id, deltas_for_room)
self._event_pos = max_pos
@@ -1192,9 +1201,9 @@ class PresenceHandler(BasePresenceHandler):
max_pos
)
- async def _handle_state_delta(self, deltas: List[JsonDict]) -> None:
- """Process current state deltas to find new joins that need to be
- handled.
+ async def _handle_state_delta(self, room_id: str, deltas: List[JsonDict]) -> None:
+ """Process current state deltas for the room to find new joins that need
+ to be handled.
"""
# Sets of newly joined users. Note that if the local server is
@@ -1203,9 +1212,10 @@ class PresenceHandler(BasePresenceHandler):
newly_joined_users = set()
for delta in deltas:
+ assert room_id == delta["room_id"]
+
typ = delta["type"]
state_key = delta["state_key"]
- room_id = delta["room_id"]
event_id = delta["event_id"]
prev_event_id = delta["prev_event_id"]
|