summary refs log tree commit diff
path: root/synapse/handlers/device.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/device.py')
-rw-r--r--synapse/handlers/device.py303
1 files changed, 191 insertions, 112 deletions
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 4fc6fcd7ae..ce26c91a7b 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -20,10 +20,20 @@
 #
 #
 import logging
-from typing import TYPE_CHECKING, Dict, Iterable, List, Mapping, Optional, Set, Tuple
+from typing import (
+    TYPE_CHECKING,
+    AbstractSet,
+    Dict,
+    Iterable,
+    List,
+    Mapping,
+    Optional,
+    Set,
+    Tuple,
+)
 
 from synapse.api import errors
-from synapse.api.constants import EduTypes, EventTypes
+from synapse.api.constants import EduTypes, EventTypes, Membership
 from synapse.api.errors import (
     Codes,
     FederationDeniedError,
@@ -38,6 +48,7 @@ from synapse.metrics.background_process_metrics import (
     wrap_as_background_process,
 )
 from synapse.storage.databases.main.client_ips import DeviceLastConnectionInfo
+from synapse.storage.databases.main.state_deltas import StateDelta
 from synapse.types import (
     DeviceListUpdates,
     JsonDict,
@@ -222,129 +233,115 @@ class DeviceWorkerHandler:
 
         set_tag("user_id", user_id)
         set_tag("from_token", str(from_token))
-        now_room_key = self.store.get_room_max_token()
 
-        room_ids = await self.store.get_rooms_for_user(user_id)
+        now_token = self._event_sources.get_current_token()
 
-        changed = await self.get_device_changes_in_shared_rooms(
-            user_id, room_ids, from_token
-        )
+        # We need to work out all the different membership changes for the user
+        # and user they share a room with, to pass to
+        # `generate_sync_entry_for_device_list`. See its docstring for details
+        # on the data required.
 
-        # Then work out if any users have since joined
-        rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key)
+        joined_room_ids = await self.store.get_rooms_for_user(user_id)
 
-        member_events = await self.store.get_membership_changes_for_user(
-            user_id, from_token.room_key, now_room_key
+        # Get the set of rooms that the user has joined/left
+        membership_changes = (
+            await self.store.get_current_state_delta_membership_changes_for_user(
+                user_id, from_key=from_token.room_key, to_key=now_token.room_key
+            )
         )
-        rooms_changed.update(event.room_id for event in member_events)
 
-        stream_ordering = from_token.room_key.stream
+        # Check for newly joined or left rooms. We need to make sure that we add
+        # to newly joined in the case membership goes from join -> leave -> join
+        # again.
+        newly_joined_rooms: Set[str] = set()
+        newly_left_rooms: Set[str] = set()
+        for change in membership_changes:
+            # We check for changes in "joinedness", i.e. if the membership has
+            # changed to or from JOIN.
+            if change.membership == Membership.JOIN:
+                if change.prev_membership != Membership.JOIN:
+                    newly_joined_rooms.add(change.room_id)
+                    newly_left_rooms.discard(change.room_id)
+            elif change.prev_membership == Membership.JOIN:
+                newly_joined_rooms.discard(change.room_id)
+                newly_left_rooms.add(change.room_id)
+
+        # We now work out if any other users have since joined or left the rooms
+        # the user is currently in. First we filter out rooms that we know
+        # haven't changed recently.
+        rooms_changed = self.store.get_rooms_that_changed(
+            joined_room_ids, from_token.room_key
+        )
 
-        possibly_changed = set(changed)
-        possibly_left = set()
+        # List of membership changes per room
+        room_to_deltas: Dict[str, List[StateDelta]] = {}
+        # The set of event IDs of membership events (so we can fetch their
+        # associated membership).
+        memberships_to_fetch: Set[str] = set()
         for room_id in rooms_changed:
-            # Check if the forward extremities have changed. If not then we know
-            # the current state won't have changed, and so we can skip this room.
-            try:
-                if not await self.store.have_room_forward_extremities_changed_since(
-                    room_id, stream_ordering
-                ):
-                    continue
-            except errors.StoreError:
-                pass
-
-            current_state_ids = await self._state_storage.get_current_state_ids(
-                room_id, await_full_state=False
+            # TODO: Only pull out membership events?
+            state_changes = await self.store.get_current_state_deltas_for_room(
+                room_id, from_token=from_token.room_key, to_token=now_token.room_key
             )
-
-            # The user may have left the room
-            # TODO: Check if they actually did or if we were just invited.
-            if room_id not in room_ids:
-                for etype, state_key in current_state_ids.keys():
-                    if etype != EventTypes.Member:
-                        continue
-                    possibly_left.add(state_key)
-                continue
-
-            # Fetch the current state at the time.
-            try:
-                event_ids = await self.store.get_forward_extremities_for_room_at_stream_ordering(
-                    room_id, stream_ordering=stream_ordering
-                )
-            except errors.StoreError:
-                # we have purged the stream_ordering index since the stream
-                # ordering: treat it the same as a new room
-                event_ids = []
-
-            # special-case for an empty prev state: include all members
-            # in the changed list
-            if not event_ids:
-                log_kv(
-                    {"event": "encountered empty previous state", "room_id": room_id}
-                )
-                for etype, state_key in current_state_ids.keys():
-                    if etype != EventTypes.Member:
-                        continue
-                    possibly_changed.add(state_key)
-                continue
-
-            current_member_id = current_state_ids.get((EventTypes.Member, user_id))
-            if not current_member_id:
-                continue
-
-            # mapping from event_id -> state_dict
-            prev_state_ids = await self._state_storage.get_state_ids_for_events(
-                event_ids,
-                await_full_state=False,
-            )
-
-            # Check if we've joined the room? If so we just blindly add all the users to
-            # the "possibly changed" users.
-            for state_dict in prev_state_ids.values():
-                member_event = state_dict.get((EventTypes.Member, user_id), None)
-                if not member_event or member_event != current_member_id:
-                    for etype, state_key in current_state_ids.keys():
-                        if etype != EventTypes.Member:
-                            continue
-                        possibly_changed.add(state_key)
-                    break
-
-            # If there has been any change in membership, include them in the
-            # possibly changed list. We'll check if they are joined below,
-            # and we're not toooo worried about spuriously adding users.
-            for key, event_id in current_state_ids.items():
-                etype, state_key = key
-                if etype != EventTypes.Member:
+            for delta in state_changes:
+                if delta.event_type != EventTypes.Member:
                     continue
 
-                # check if this member has changed since any of the extremities
-                # at the stream_ordering, and add them to the list if so.
-                for state_dict in prev_state_ids.values():
-                    prev_event_id = state_dict.get(key, None)
-                    if not prev_event_id or prev_event_id != event_id:
-                        if state_key != user_id:
-                            possibly_changed.add(state_key)
-                        break
-
-        if possibly_changed or possibly_left:
-            possibly_joined = possibly_changed
-            possibly_left = possibly_changed | possibly_left
-
-            # Double check if we still share rooms with the given user.
-            users_rooms = await self.store.get_rooms_for_users(possibly_left)
-            for changed_user_id, entries in users_rooms.items():
-                if any(rid in room_ids for rid in entries):
-                    possibly_left.discard(changed_user_id)
-                else:
-                    possibly_joined.discard(changed_user_id)
+                room_to_deltas.setdefault(room_id, []).append(delta)
+                if delta.event_id:
+                    memberships_to_fetch.add(delta.event_id)
+                if delta.prev_event_id:
+                    memberships_to_fetch.add(delta.prev_event_id)
 
-        else:
-            possibly_joined = set()
-            possibly_left = set()
+        # Fetch all the memberships for the membership events
+        event_id_to_memberships = await self.store.get_membership_from_event_ids(
+            memberships_to_fetch
+        )
+
+        joined_invited_knocked = (
+            Membership.JOIN,
+            Membership.INVITE,
+            Membership.KNOCK,
+        )
 
-        device_list_updates = DeviceListUpdates(
-            changed=possibly_joined,
-            left=possibly_left,
+        # We now want to find any user that have newly joined/invited/knocked,
+        # or newly left, similarly to above.
+        newly_joined_or_invited_or_knocked_users: Set[str] = set()
+        newly_left_users: Set[str] = set()
+        for _, deltas in room_to_deltas.items():
+            for delta in deltas:
+                # Get the prev/new memberships for the delta
+                new_membership = None
+                prev_membership = None
+                if delta.event_id:
+                    m = event_id_to_memberships.get(delta.event_id)
+                    if m is not None:
+                        new_membership = m.membership
+                if delta.prev_event_id:
+                    m = event_id_to_memberships.get(delta.prev_event_id)
+                    if m is not None:
+                        prev_membership = m.membership
+
+                # Check if a user has newly joined/invited/knocked, or left.
+                if new_membership in joined_invited_knocked:
+                    if prev_membership not in joined_invited_knocked:
+                        newly_joined_or_invited_or_knocked_users.add(delta.state_key)
+                        newly_left_users.discard(delta.state_key)
+                elif prev_membership in joined_invited_knocked:
+                    newly_joined_or_invited_or_knocked_users.discard(delta.state_key)
+                    newly_left_users.add(delta.state_key)
+
+        # Now we actually calculate the device list entry with the information
+        # calculated above.
+        device_list_updates = await self.generate_sync_entry_for_device_list(
+            user_id=user_id,
+            since_token=from_token,
+            now_token=now_token,
+            joined_room_ids=joined_room_ids,
+            newly_joined_rooms=newly_joined_rooms,
+            newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users,
+            newly_left_rooms=newly_left_rooms,
+            newly_left_users=newly_left_users,
         )
 
         log_kv(
@@ -356,6 +353,88 @@ class DeviceWorkerHandler:
 
         return device_list_updates
 
+    @measure_func("_generate_sync_entry_for_device_list")
+    async def generate_sync_entry_for_device_list(
+        self,
+        user_id: str,
+        since_token: StreamToken,
+        now_token: StreamToken,
+        joined_room_ids: AbstractSet[str],
+        newly_joined_rooms: AbstractSet[str],
+        newly_joined_or_invited_or_knocked_users: AbstractSet[str],
+        newly_left_rooms: AbstractSet[str],
+        newly_left_users: AbstractSet[str],
+    ) -> DeviceListUpdates:
+        """Generate the DeviceListUpdates section of sync
+
+        Args:
+            sync_result_builder
+            newly_joined_rooms: Set of rooms user has joined since previous sync
+            newly_joined_or_invited_or_knocked_users: Set of users that have joined,
+                been invited to a room or are knocking on a room since
+                previous sync.
+            newly_left_rooms: Set of rooms user has left since previous sync
+            newly_left_users: Set of users that have left a room we're in since
+                previous sync
+        """
+        # Take a copy since these fields will be mutated later.
+        newly_joined_or_invited_or_knocked_users = set(
+            newly_joined_or_invited_or_knocked_users
+        )
+        newly_left_users = set(newly_left_users)
+
+        # We want to figure out what user IDs the client should refetch
+        # device keys for, and which users we aren't going to track changes
+        # for anymore.
+        #
+        # For the first step we check:
+        #   a. if any users we share a room with have updated their devices,
+        #      and
+        #   b. we also check if we've joined any new rooms, or if a user has
+        #      joined a room we're in.
+        #
+        # For the second step we just find any users we no longer share a
+        # room with by looking at all users that have left a room plus users
+        # that were in a room we've left.
+
+        users_that_have_changed = set()
+
+        # Step 1a, check for changes in devices of users we share a room
+        # with
+        users_that_have_changed = await self.get_device_changes_in_shared_rooms(
+            user_id,
+            joined_room_ids,
+            from_token=since_token,
+            now_token=now_token,
+        )
+
+        # Step 1b, check for newly joined rooms
+        for room_id in newly_joined_rooms:
+            joined_users = await self.store.get_users_in_room(room_id)
+            newly_joined_or_invited_or_knocked_users.update(joined_users)
+
+        # TODO: Check that these users are actually new, i.e. either they
+        # weren't in the previous sync *or* they left and rejoined.
+        users_that_have_changed.update(newly_joined_or_invited_or_knocked_users)
+
+        user_signatures_changed = await self.store.get_users_whose_signatures_changed(
+            user_id, since_token.device_list_key
+        )
+        users_that_have_changed.update(user_signatures_changed)
+
+        # Now find users that we no longer track
+        for room_id in newly_left_rooms:
+            left_users = await self.store.get_users_in_room(room_id)
+            newly_left_users.update(left_users)
+
+        # Remove any users that we still share a room with.
+        left_users_rooms = await self.store.get_rooms_for_users(newly_left_users)
+        for user_id, entries in left_users_rooms.items():
+            if any(rid in joined_room_ids for rid in entries):
+                newly_left_users.discard(user_id)
+
+        return DeviceListUpdates(changed=users_that_have_changed, left=newly_left_users)
+
     async def on_federation_query_user_devices(self, user_id: str) -> JsonDict:
         if not self.hs.is_mine(UserID.from_string(user_id)):
             raise SynapseError(400, "User is not hosted on this homeserver")