diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 7c7cda3e95..dfc0b9db07 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -110,6 +110,7 @@ from synapse.replication.http.streams import ReplicationGetStreamUpdates
from synapse.replication.tcp.commands import ClearUserSyncsCommand
from synapse.replication.tcp.streams import PresenceFederationStream, PresenceStream
from synapse.storage.databases.main import DataStore
+from synapse.storage.databases.main.state_deltas import StateDelta
from synapse.streams import EventSource
from synapse.types import (
JsonDict,
@@ -1499,9 +1500,9 @@ class PresenceHandler(BasePresenceHandler):
# We may get multiple deltas for different rooms, but we want to
# handle them on a room by room basis, so we batch them up by
# room.
- deltas_by_room: Dict[str, List[JsonDict]] = {}
+ deltas_by_room: Dict[str, List[StateDelta]] = {}
for delta in deltas:
- deltas_by_room.setdefault(delta["room_id"], []).append(delta)
+ deltas_by_room.setdefault(delta.room_id, []).append(delta)
for room_id, deltas_for_room in deltas_by_room.items():
await self._handle_state_delta(room_id, deltas_for_room)
@@ -1513,7 +1514,7 @@ class PresenceHandler(BasePresenceHandler):
max_pos
)
- async def _handle_state_delta(self, room_id: str, deltas: List[JsonDict]) -> None:
+ async def _handle_state_delta(self, room_id: str, deltas: List[StateDelta]) -> None:
"""Process current state deltas for the room to find new joins that need
to be handled.
"""
@@ -1524,31 +1525,30 @@ class PresenceHandler(BasePresenceHandler):
newly_joined_users = set()
for delta in deltas:
- assert room_id == delta["room_id"]
+ assert room_id == delta.room_id
- typ = delta["type"]
- state_key = delta["state_key"]
- event_id = delta["event_id"]
- prev_event_id = delta["prev_event_id"]
-
- logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
+ logger.debug(
+ "Handling: %r %r, %s", delta.event_type, delta.state_key, delta.event_id
+ )
# Drop any event that isn't a membership join
- if typ != EventTypes.Member:
+ if delta.event_type != EventTypes.Member:
continue
- if event_id is None:
+ if delta.event_id is None:
# state has been deleted, so this is not a join. We only care about
# joins.
continue
- event = await self.store.get_event(event_id, allow_none=True)
+ event = await self.store.get_event(delta.event_id, allow_none=True)
if not event or event.content.get("membership") != Membership.JOIN:
# We only care about joins
continue
- if prev_event_id:
- prev_event = await self.store.get_event(prev_event_id, allow_none=True)
+ if delta.prev_event_id:
+ prev_event = await self.store.get_event(
+ delta.prev_event_id, allow_none=True
+ )
if (
prev_event
and prev_event.content.get("membership") == Membership.JOIN
@@ -1556,7 +1556,7 @@ class PresenceHandler(BasePresenceHandler):
# Ignore changes to join events.
continue
- newly_joined_users.add(state_key)
+ newly_joined_users.add(delta.state_key)
if not newly_joined_users:
# If nobody has joined then there's nothing to do.
|