summary refs log tree commit diff
diff options
context:
space:
mode:
authorHugh Nimmo-Smith <hughns@element.io>2024-07-16 17:58:18 +0100
committerHugh Nimmo-Smith <hughns@element.io>2024-07-17 10:47:16 +0100
commit8dea89837a58639350d43e3b5da54973061ad4dd (patch)
tree1db61793b8adb9926e7784e5b0a962e36eb71140
parent1.110.0 (diff)
downloadsynapse-hughns/msc3401-typing.tar.xz
Prototype of using m.typing to clean up MSC3401 call memberships hughns/msc3401-typing
Only apply logic where io.element.type=org.matrix.msc3401.call.member on the typing PUT
-rw-r--r--synapse/handlers/typing.py117
-rw-r--r--synapse/rest/client/room.py8
2 files changed, 110 insertions, 15 deletions
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 4c87718337..e031cdf3b3 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -40,7 +40,9 @@ from synapse.types import (
     StrCollection,
     StreamKeyType,
     UserID,
+    create_requester,
 )
+from synapse.types.state import StateFilter
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.metrics import Measure
 from synapse.util.retryutils import filter_destinations_by_retry_limiter
@@ -58,6 +60,8 @@ logger = logging.getLogger(__name__)
 class RoomMember:
     room_id: str
     user_id: str
+    type: str
+    device_id: Optional[str] = None
 
 
 # How often we expect remote servers to resend us presence.
@@ -108,6 +112,8 @@ class FollowerTypingHandler:
 
         self.clock.looping_call(self._handle_timeouts, 5000)
         self.clock.looping_call(self._prune_old_typing, FORGET_TIMEOUT)
+        self.state = hs.get_state_handler()
+        self.event_creation_handler = hs.get_event_creation_handler()
 
     def _reset(self) -> None:
         """Reset the typing handler's data caches."""
@@ -130,9 +136,9 @@ class FollowerTypingHandler:
         members = set(self.wheel_timer.fetch(now))
 
         for member in members:
-            self._handle_timeout_for_member(now, member)
+            await self._handle_timeout_for_member(now, member)
 
-    def _handle_timeout_for_member(self, now: int, member: RoomMember) -> None:
+    async def _handle_timeout_for_member(self, now: int, member: RoomMember) -> None:
         if not self.is_typing(member):
             # Nothing to do if they're no longer typing
             return
@@ -283,8 +289,8 @@ class TypingWriterHandler(FollowerTypingHandler):
             "TypingStreamChangeCache", self._latest_room_serial
         )
 
-    def _handle_timeout_for_member(self, now: int, member: RoomMember) -> None:
-        super()._handle_timeout_for_member(now, member)
+    async def _handle_timeout_for_member(self, now: int, member: RoomMember) -> None:
+        await super()._handle_timeout_for_member(now, member)
 
         if not self.is_typing(member):
             # Nothing to do if they're no longer typing
@@ -293,11 +299,16 @@ class TypingWriterHandler(FollowerTypingHandler):
         until = self._member_typing_until.get(member, None)
         if not until or until <= now:
             logger.info("Timing out typing for: %s", member.user_id)
-            self._stopped_typing(member)
+            await self._stopped_typing(member)
             return
 
     async def started_typing(
-        self, target_user: UserID, requester: Requester, room_id: str, timeout: int
+        self,
+        target_user: UserID,
+        requester: Requester,
+        room_id: str,
+        timeout: int,
+        type: str,
     ) -> None:
         target_user_id = target_user.to_string()
 
@@ -316,7 +327,12 @@ class TypingWriterHandler(FollowerTypingHandler):
 
         logger.debug("%s has started typing in %s", target_user_id, room_id)
 
-        member = RoomMember(room_id=room_id, user_id=target_user_id)
+        member = RoomMember(
+            room_id=room_id,
+            user_id=target_user_id,
+            device_id=requester.device_id,
+            type=type,
+        )
 
         was_present = member.user_id in self._room_typing.get(room_id, set())
 
@@ -332,7 +348,7 @@ class TypingWriterHandler(FollowerTypingHandler):
         self._push_update(member=member, typing=True)
 
     async def stopped_typing(
-        self, target_user: UserID, requester: Requester, room_id: str
+        self, target_user: UserID, requester: Requester, room_id: str, type: str
     ) -> None:
         target_user_id = target_user.to_string()
 
@@ -351,17 +367,90 @@ class TypingWriterHandler(FollowerTypingHandler):
 
         logger.debug("%s has stopped typing in %s", target_user_id, room_id)
 
-        member = RoomMember(room_id=room_id, user_id=target_user_id)
+        member = RoomMember(
+            room_id=room_id,
+            user_id=target_user_id,
+            device_id=requester.device_id,
+            type=type,
+        )
 
-        self._stopped_typing(member)
+        await self._stopped_typing(member)
 
-    def user_left_room(self, user: UserID, room_id: str) -> None:
+    async def user_left_room(self, user: UserID, room_id: str) -> None:
         user_id = user.to_string()
         if self.is_mine_id(user_id):
-            member = RoomMember(room_id=room_id, user_id=user_id)
-            self._stopped_typing(member)
+            # We use a device_id of "*" to indicate that all devices will have left the room
+            member = RoomMember(room_id=room_id, user_id=user_id, device_id="*")
+            await self._stopped_typing(member)
+
+    async def _stopped_typing(self, member: RoomMember) -> None:
+        logger.debug(
+            "User %s has stopped typing (type=%s) in %s on device %s",
+            member.user_id,
+            member.type,
+            member.room_id,
+            member.device_id,
+        )
+
+        if (
+            member.type == "org.matrix.msc3401.call.member"
+            and member.device_id is not None
+        ):
+            # Check room state to see if any MSC3401 member events needing removal
+            event_filter = [
+                ("org.matrix.msc3401.call.member", member.user_id),
+            ]
+
+            state_filter = StateFilter.from_types(event_filter)
+
+            state_ids = await self._storage_controllers.state.get_current_state_ids(
+                member.room_id,
+                state_filter,
+            )
+
+            state_events = await self.store.get_events(state_ids.values())
+
+            for event in state_events.values():
+                if "memberships" in event.content:
+                    # n.b. Simplified type
+                    memberships: List[Dict[str, str]] = event.content["memberships"]
+                    new_memberships = []
+                    modified = False
+                    if member.device_id == "*":
+                        # user has left room so remove all memberships if the were any
+                        modified = len(memberships) > 0
+                    else:
+                        # find a membership for this device_id
+                        for membership in memberships:
+                            if membership["device_id"] == member.device_id:
+                                modified = True
+                            else:
+                                new_memberships.append(membership)
+
+                    if modified:
+                        # send state event with new memberships
+                        logger.info(
+                            "Found matching MSC3401 call member event to cleanup. New memberships is %s",
+                            new_memberships,
+                        )
+                        requester = create_requester(
+                            member.user_id, authenticated_entity=member.user_id
+                        )
+
+                        await self.event_creation_handler.create_and_send_nonmember_event(
+                            requester,
+                            {
+                                "type": "org.matrix.msc3401.call.member",
+                                "state_key": member.user_id,
+                                "room_id": member.room_id,
+                                "sender": member.user_id,
+                                "content": {
+                                    "memberships": new_memberships,
+                                },
+                            },
+                            ratelimit=False,
+                        )
 
-    def _stopped_typing(self, member: RoomMember) -> None:
         if member.user_id not in self._room_typing.get(member.room_id, set()):
             # No point
             return
diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py
index 903c74f6d8..33d6f05f69 100644
--- a/synapse/rest/client/room.py
+++ b/synapse/rest/client/room.py
@@ -1256,6 +1256,8 @@ class RoomTypingRestServlet(RestServlet):
         # Limit timeout to stop people from setting silly typing timeouts.
         timeout = min(content.get("timeout", 30000), 120000)
 
+        type = content.get("io.element.type", "m.typing")
+
         # Defer getting the typing handler since it will raise on WORKER_PATTERNS.
         typing_handler = self.hs.get_typing_writer_handler()
 
@@ -1266,10 +1268,14 @@ class RoomTypingRestServlet(RestServlet):
                     requester=requester,
                     room_id=room_id,
                     timeout=timeout,
+                    type=type,
                 )
             else:
                 await typing_handler.stopped_typing(
-                    target_user=target_user, requester=requester, room_id=room_id
+                    target_user=target_user,
+                    requester=requester,
+                    room_id=room_id,
+                    type=type,
                 )
         except ShadowBanError:
             # Pretend this worked without error.