diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 4fc6fcd7ae..ce26c91a7b 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -20,10 +20,20 @@
#
#
import logging
-from typing import TYPE_CHECKING, Dict, Iterable, List, Mapping, Optional, Set, Tuple
+from typing import (
+ TYPE_CHECKING,
+ AbstractSet,
+ Dict,
+ Iterable,
+ List,
+ Mapping,
+ Optional,
+ Set,
+ Tuple,
+)
from synapse.api import errors
-from synapse.api.constants import EduTypes, EventTypes
+from synapse.api.constants import EduTypes, EventTypes, Membership
from synapse.api.errors import (
Codes,
FederationDeniedError,
@@ -38,6 +48,7 @@ from synapse.metrics.background_process_metrics import (
wrap_as_background_process,
)
from synapse.storage.databases.main.client_ips import DeviceLastConnectionInfo
+from synapse.storage.databases.main.state_deltas import StateDelta
from synapse.types import (
DeviceListUpdates,
JsonDict,
@@ -222,129 +233,115 @@ class DeviceWorkerHandler:
set_tag("user_id", user_id)
set_tag("from_token", str(from_token))
- now_room_key = self.store.get_room_max_token()
- room_ids = await self.store.get_rooms_for_user(user_id)
+ now_token = self._event_sources.get_current_token()
- changed = await self.get_device_changes_in_shared_rooms(
- user_id, room_ids, from_token
- )
+ # We need to work out all the different membership changes for the user
+ # and user they share a room with, to pass to
+ # `generate_sync_entry_for_device_list`. See its docstring for details
+ # on the data required.
- # Then work out if any users have since joined
- rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key)
+ joined_room_ids = await self.store.get_rooms_for_user(user_id)
- member_events = await self.store.get_membership_changes_for_user(
- user_id, from_token.room_key, now_room_key
+ # Get the set of rooms that the user has joined/left
+ membership_changes = (
+ await self.store.get_current_state_delta_membership_changes_for_user(
+ user_id, from_key=from_token.room_key, to_key=now_token.room_key
+ )
)
- rooms_changed.update(event.room_id for event in member_events)
- stream_ordering = from_token.room_key.stream
+ # Check for newly joined or left rooms. We need to make sure that we add
+ # to newly joined in the case membership goes from join -> leave -> join
+ # again.
+ newly_joined_rooms: Set[str] = set()
+ newly_left_rooms: Set[str] = set()
+ for change in membership_changes:
+ # We check for changes in "joinedness", i.e. if the membership has
+ # changed to or from JOIN.
+ if change.membership == Membership.JOIN:
+ if change.prev_membership != Membership.JOIN:
+ newly_joined_rooms.add(change.room_id)
+ newly_left_rooms.discard(change.room_id)
+ elif change.prev_membership == Membership.JOIN:
+ newly_joined_rooms.discard(change.room_id)
+ newly_left_rooms.add(change.room_id)
+
+ # We now work out if any other users have since joined or left the rooms
+ # the user is currently in. First we filter out rooms that we know
+ # haven't changed recently.
+ rooms_changed = self.store.get_rooms_that_changed(
+ joined_room_ids, from_token.room_key
+ )
- possibly_changed = set(changed)
- possibly_left = set()
+ # List of membership changes per room
+ room_to_deltas: Dict[str, List[StateDelta]] = {}
+ # The set of event IDs of membership events (so we can fetch their
+ # associated membership).
+ memberships_to_fetch: Set[str] = set()
for room_id in rooms_changed:
- # Check if the forward extremities have changed. If not then we know
- # the current state won't have changed, and so we can skip this room.
- try:
- if not await self.store.have_room_forward_extremities_changed_since(
- room_id, stream_ordering
- ):
- continue
- except errors.StoreError:
- pass
-
- current_state_ids = await self._state_storage.get_current_state_ids(
- room_id, await_full_state=False
+ # TODO: Only pull out membership events?
+ state_changes = await self.store.get_current_state_deltas_for_room(
+ room_id, from_token=from_token.room_key, to_token=now_token.room_key
)
-
- # The user may have left the room
- # TODO: Check if they actually did or if we were just invited.
- if room_id not in room_ids:
- for etype, state_key in current_state_ids.keys():
- if etype != EventTypes.Member:
- continue
- possibly_left.add(state_key)
- continue
-
- # Fetch the current state at the time.
- try:
- event_ids = await self.store.get_forward_extremities_for_room_at_stream_ordering(
- room_id, stream_ordering=stream_ordering
- )
- except errors.StoreError:
- # we have purged the stream_ordering index since the stream
- # ordering: treat it the same as a new room
- event_ids = []
-
- # special-case for an empty prev state: include all members
- # in the changed list
- if not event_ids:
- log_kv(
- {"event": "encountered empty previous state", "room_id": room_id}
- )
- for etype, state_key in current_state_ids.keys():
- if etype != EventTypes.Member:
- continue
- possibly_changed.add(state_key)
- continue
-
- current_member_id = current_state_ids.get((EventTypes.Member, user_id))
- if not current_member_id:
- continue
-
- # mapping from event_id -> state_dict
- prev_state_ids = await self._state_storage.get_state_ids_for_events(
- event_ids,
- await_full_state=False,
- )
-
- # Check if we've joined the room? If so we just blindly add all the users to
- # the "possibly changed" users.
- for state_dict in prev_state_ids.values():
- member_event = state_dict.get((EventTypes.Member, user_id), None)
- if not member_event or member_event != current_member_id:
- for etype, state_key in current_state_ids.keys():
- if etype != EventTypes.Member:
- continue
- possibly_changed.add(state_key)
- break
-
- # If there has been any change in membership, include them in the
- # possibly changed list. We'll check if they are joined below,
- # and we're not toooo worried about spuriously adding users.
- for key, event_id in current_state_ids.items():
- etype, state_key = key
- if etype != EventTypes.Member:
+ for delta in state_changes:
+ if delta.event_type != EventTypes.Member:
continue
- # check if this member has changed since any of the extremities
- # at the stream_ordering, and add them to the list if so.
- for state_dict in prev_state_ids.values():
- prev_event_id = state_dict.get(key, None)
- if not prev_event_id or prev_event_id != event_id:
- if state_key != user_id:
- possibly_changed.add(state_key)
- break
-
- if possibly_changed or possibly_left:
- possibly_joined = possibly_changed
- possibly_left = possibly_changed | possibly_left
-
- # Double check if we still share rooms with the given user.
- users_rooms = await self.store.get_rooms_for_users(possibly_left)
- for changed_user_id, entries in users_rooms.items():
- if any(rid in room_ids for rid in entries):
- possibly_left.discard(changed_user_id)
- else:
- possibly_joined.discard(changed_user_id)
+ room_to_deltas.setdefault(room_id, []).append(delta)
+ if delta.event_id:
+ memberships_to_fetch.add(delta.event_id)
+ if delta.prev_event_id:
+ memberships_to_fetch.add(delta.prev_event_id)
- else:
- possibly_joined = set()
- possibly_left = set()
+ # Fetch all the memberships for the membership events
+ event_id_to_memberships = await self.store.get_membership_from_event_ids(
+ memberships_to_fetch
+ )
+
+ joined_invited_knocked = (
+ Membership.JOIN,
+ Membership.INVITE,
+ Membership.KNOCK,
+ )
- device_list_updates = DeviceListUpdates(
- changed=possibly_joined,
- left=possibly_left,
+ # We now want to find any user that have newly joined/invited/knocked,
+ # or newly left, similarly to above.
+ newly_joined_or_invited_or_knocked_users: Set[str] = set()
+ newly_left_users: Set[str] = set()
+ for _, deltas in room_to_deltas.items():
+ for delta in deltas:
+ # Get the prev/new memberships for the delta
+ new_membership = None
+ prev_membership = None
+ if delta.event_id:
+ m = event_id_to_memberships.get(delta.event_id)
+ if m is not None:
+ new_membership = m.membership
+ if delta.prev_event_id:
+ m = event_id_to_memberships.get(delta.prev_event_id)
+ if m is not None:
+ prev_membership = m.membership
+
+ # Check if a user has newly joined/invited/knocked, or left.
+ if new_membership in joined_invited_knocked:
+ if prev_membership not in joined_invited_knocked:
+ newly_joined_or_invited_or_knocked_users.add(delta.state_key)
+ newly_left_users.discard(delta.state_key)
+ elif prev_membership in joined_invited_knocked:
+ newly_joined_or_invited_or_knocked_users.discard(delta.state_key)
+ newly_left_users.add(delta.state_key)
+
+ # Now we actually calculate the device list entry with the information
+ # calculated above.
+ device_list_updates = await self.generate_sync_entry_for_device_list(
+ user_id=user_id,
+ since_token=from_token,
+ now_token=now_token,
+ joined_room_ids=joined_room_ids,
+ newly_joined_rooms=newly_joined_rooms,
+ newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users,
+ newly_left_rooms=newly_left_rooms,
+ newly_left_users=newly_left_users,
)
log_kv(
@@ -356,6 +353,88 @@ class DeviceWorkerHandler:
return device_list_updates
+ @measure_func("_generate_sync_entry_for_device_list")
+ async def generate_sync_entry_for_device_list(
+ self,
+ user_id: str,
+ since_token: StreamToken,
+ now_token: StreamToken,
+ joined_room_ids: AbstractSet[str],
+ newly_joined_rooms: AbstractSet[str],
+ newly_joined_or_invited_or_knocked_users: AbstractSet[str],
+ newly_left_rooms: AbstractSet[str],
+ newly_left_users: AbstractSet[str],
+ ) -> DeviceListUpdates:
+ """Generate the DeviceListUpdates section of sync
+
+ Args:
+ sync_result_builder
+ newly_joined_rooms: Set of rooms user has joined since previous sync
+ newly_joined_or_invited_or_knocked_users: Set of users that have joined,
+ been invited to a room or are knocking on a room since
+ previous sync.
+ newly_left_rooms: Set of rooms user has left since previous sync
+ newly_left_users: Set of users that have left a room we're in since
+ previous sync
+ """
+ # Take a copy since these fields will be mutated later.
+ newly_joined_or_invited_or_knocked_users = set(
+ newly_joined_or_invited_or_knocked_users
+ )
+ newly_left_users = set(newly_left_users)
+
+ # We want to figure out what user IDs the client should refetch
+ # device keys for, and which users we aren't going to track changes
+ # for anymore.
+ #
+ # For the first step we check:
+ # a. if any users we share a room with have updated their devices,
+ # and
+ # b. we also check if we've joined any new rooms, or if a user has
+ # joined a room we're in.
+ #
+ # For the second step we just find any users we no longer share a
+ # room with by looking at all users that have left a room plus users
+ # that were in a room we've left.
+
+ users_that_have_changed = set()
+
+ # Step 1a, check for changes in devices of users we share a room
+ # with
+ users_that_have_changed = await self.get_device_changes_in_shared_rooms(
+ user_id,
+ joined_room_ids,
+ from_token=since_token,
+ now_token=now_token,
+ )
+
+ # Step 1b, check for newly joined rooms
+ for room_id in newly_joined_rooms:
+ joined_users = await self.store.get_users_in_room(room_id)
+ newly_joined_or_invited_or_knocked_users.update(joined_users)
+
+ # TODO: Check that these users are actually new, i.e. either they
+ # weren't in the previous sync *or* they left and rejoined.
+ users_that_have_changed.update(newly_joined_or_invited_or_knocked_users)
+
+ user_signatures_changed = await self.store.get_users_whose_signatures_changed(
+ user_id, since_token.device_list_key
+ )
+ users_that_have_changed.update(user_signatures_changed)
+
+ # Now find users that we no longer track
+ for room_id in newly_left_rooms:
+ left_users = await self.store.get_users_in_room(room_id)
+ newly_left_users.update(left_users)
+
+ # Remove any users that we still share a room with.
+ left_users_rooms = await self.store.get_rooms_for_users(newly_left_users)
+ for user_id, entries in left_users_rooms.items():
+ if any(rid in joined_room_ids for rid in entries):
+ newly_left_users.discard(user_id)
+
+ return DeviceListUpdates(changed=users_that_have_changed, left=newly_left_users)
+
async def on_federation_query_user_devices(self, user_id: str) -> JsonDict:
if not self.hs.is_mine(UserID.from_string(user_id)):
raise SynapseError(400, "User is not hosted on this homeserver")
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 6af2eeb75f..c44baa7042 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -86,7 +86,7 @@ from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.response_cache import ResponseCache, ResponseCacheContext
-from synapse.util.metrics import Measure, measure_func
+from synapse.util.metrics import Measure
from synapse.visibility import filter_events_for_client
if TYPE_CHECKING:
@@ -1779,8 +1779,15 @@ class SyncHandler:
)
if include_device_list_updates:
- device_lists = await self._generate_sync_entry_for_device_list(
- sync_result_builder,
+ # include_device_list_updates can only be True if we have a
+ # since token.
+ assert since_token is not None
+
+ device_lists = await self._device_handler.generate_sync_entry_for_device_list(
+ user_id=user_id,
+ since_token=since_token,
+ now_token=sync_result_builder.now_token,
+ joined_room_ids=sync_result_builder.joined_room_ids,
newly_joined_rooms=newly_joined_rooms,
newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users,
newly_left_rooms=newly_left_rooms,
@@ -1892,8 +1899,14 @@ class SyncHandler:
newly_left_users,
) = sync_result_builder.calculate_user_changes()
- device_lists = await self._generate_sync_entry_for_device_list(
- sync_result_builder,
+ # include_device_list_updates can only be True if we have a
+ # since token.
+ assert since_token is not None
+ device_lists = await self._device_handler.generate_sync_entry_for_device_list(
+ user_id=user_id,
+ since_token=since_token,
+ now_token=sync_result_builder.now_token,
+ joined_room_ids=sync_result_builder.joined_room_ids,
newly_joined_rooms=newly_joined_rooms,
newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users,
newly_left_rooms=newly_left_rooms,
@@ -2070,94 +2083,6 @@ class SyncHandler:
return sync_result_builder
- @measure_func("_generate_sync_entry_for_device_list")
- async def _generate_sync_entry_for_device_list(
- self,
- sync_result_builder: "SyncResultBuilder",
- newly_joined_rooms: AbstractSet[str],
- newly_joined_or_invited_or_knocked_users: AbstractSet[str],
- newly_left_rooms: AbstractSet[str],
- newly_left_users: AbstractSet[str],
- ) -> DeviceListUpdates:
- """Generate the DeviceListUpdates section of sync
-
- Args:
- sync_result_builder
- newly_joined_rooms: Set of rooms user has joined since previous sync
- newly_joined_or_invited_or_knocked_users: Set of users that have joined,
- been invited to a room or are knocking on a room since
- previous sync.
- newly_left_rooms: Set of rooms user has left since previous sync
- newly_left_users: Set of users that have left a room we're in since
- previous sync
- """
-
- user_id = sync_result_builder.sync_config.user.to_string()
- since_token = sync_result_builder.since_token
- assert since_token is not None
-
- # Take a copy since these fields will be mutated later.
- newly_joined_or_invited_or_knocked_users = set(
- newly_joined_or_invited_or_knocked_users
- )
- newly_left_users = set(newly_left_users)
-
- # We want to figure out what user IDs the client should refetch
- # device keys for, and which users we aren't going to track changes
- # for anymore.
- #
- # For the first step we check:
- # a. if any users we share a room with have updated their devices,
- # and
- # b. we also check if we've joined any new rooms, or if a user has
- # joined a room we're in.
- #
- # For the second step we just find any users we no longer share a
- # room with by looking at all users that have left a room plus users
- # that were in a room we've left.
-
- users_that_have_changed = set()
-
- joined_room_ids = sync_result_builder.joined_room_ids
-
- # Step 1a, check for changes in devices of users we share a room
- # with
- users_that_have_changed = (
- await self._device_handler.get_device_changes_in_shared_rooms(
- user_id,
- joined_room_ids,
- from_token=since_token,
- now_token=sync_result_builder.now_token,
- )
- )
-
- # Step 1b, check for newly joined rooms
- for room_id in newly_joined_rooms:
- joined_users = await self.store.get_users_in_room(room_id)
- newly_joined_or_invited_or_knocked_users.update(joined_users)
-
- # TODO: Check that these users are actually new, i.e. either they
- # weren't in the previous sync *or* they left and rejoined.
- users_that_have_changed.update(newly_joined_or_invited_or_knocked_users)
-
- user_signatures_changed = await self.store.get_users_whose_signatures_changed(
- user_id, since_token.device_list_key
- )
- users_that_have_changed.update(user_signatures_changed)
-
- # Now find users that we no longer track
- for room_id in newly_left_rooms:
- left_users = await self.store.get_users_in_room(room_id)
- newly_left_users.update(left_users)
-
- # Remove any users that we still share a room with.
- left_users_rooms = await self.store.get_rooms_for_users(newly_left_users)
- for user_id, entries in left_users_rooms.items():
- if any(rid in joined_room_ids for rid in entries):
- newly_left_users.discard(user_id)
-
- return DeviceListUpdates(changed=users_that_have_changed, left=newly_left_users)
-
@trace
async def _generate_sync_entry_for_to_device(
self, sync_result_builder: "SyncResultBuilder"
diff --git a/synapse/storage/databases/main/state_deltas.py b/synapse/storage/databases/main/state_deltas.py
index 9ed39e688a..7d491d1728 100644
--- a/synapse/storage/databases/main/state_deltas.py
+++ b/synapse/storage/databases/main/state_deltas.py
@@ -166,6 +166,11 @@ class StateDeltasStore(SQLBaseStore):
) -> List[StateDelta]:
"""Get the state deltas between two tokens."""
+ if not self._curr_state_delta_stream_cache.has_entity_changed(
+ room_id, from_token.stream
+ ):
+ return []
+
def get_current_state_deltas_for_room_txn(
txn: LoggingTransaction,
) -> List[StateDelta]:
|