diff options
Diffstat (limited to 'synapse/handlers/sync.py')
-rw-r--r-- | synapse/handlers/sync.py | 242 |
1 files changed, 126 insertions, 116 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index f7fd6d7933..c00f30518a 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -45,7 +45,8 @@ from synapse.logging.tracing import ( start_active_span, ) from synapse.push.clientformat import format_push_rules_for_user -from synapse.storage.databases.main.event_push_actions import NotifCounts +from synapse.storage.databases.main.event_push_actions import RoomNotifCounts +from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary from synapse.storage.roommember import MemberSummary from synapse.storage.state import StateFilter from synapse.types import ( @@ -133,6 +134,7 @@ class JoinedSyncResult: ephemeral: List[JsonDict] account_data: List[JsonDict] unread_notifications: JsonDict + unread_thread_notifications: JsonDict summary: Optional[JsonDict] unread_count: int @@ -809,18 +811,6 @@ class SyncHandler: if canonical_alias and canonical_alias.content.get("alias"): return summary - me = sync_config.user.to_string() - - joined_user_ids = [ - r[0] for r in details.get(Membership.JOIN, empty_ms).members if r[0] != me - ] - invited_user_ids = [ - r[0] for r in details.get(Membership.INVITE, empty_ms).members if r[0] != me - ] - gone_user_ids = [ - r[0] for r in details.get(Membership.LEAVE, empty_ms).members if r[0] != me - ] + [r[0] for r in details.get(Membership.BAN, empty_ms).members if r[0] != me] - # FIXME: only build up a member_ids list for our heroes member_ids = {} for membership in ( @@ -832,11 +822,8 @@ class SyncHandler: for user_id, event_id in details.get(membership, empty_ms).members: member_ids[user_id] = event_id - # FIXME: order by stream ordering rather than as returned by SQL - if joined_user_ids or invited_user_ids: - summary["m.heroes"] = sorted(joined_user_ids + invited_user_ids)[0:5] - else: - summary["m.heroes"] = sorted(gone_user_ids)[0:5] + me = sync_config.user.to_string() + summary["m.heroes"] = extract_heroes_from_room_summary(details, me) if not sync_config.filter_collection.lazy_load_members(): return summary @@ -1196,7 +1183,9 @@ class SyncHandler: room_id: The partial state room to find the remaining memberships for. members_to_fetch: The memberships to find. events_with_membership_auth: A mapping from user IDs to events whose auth - events are known to contain their membership. + events would contain their prior membership, if one exists. + Note that join events will not cite a prior membership if a user has + never been in a room before. found_state_ids: A dict from (type, state_key) -> state_event_id, containing memberships that have been previously found. Entries in `members_to_fetch` that have a membership in `found_state_ids` are @@ -1206,6 +1195,10 @@ class SyncHandler: A dict from ("m.room.member", state_key) -> state_event_id, containing the memberships missing from `found_state_ids`. + When `events_with_membership_auth` contains a join event for a given user + which does not cite a prior membership, no membership is returned for that + user. + Raises: KeyError: if `events_with_membership_auth` does not have an entry for a missing membership. Memberships in `found_state_ids` do not need an @@ -1223,8 +1216,18 @@ class SyncHandler: if (EventTypes.Member, member) in found_state_ids: continue - missing_members.add(member) event_with_membership_auth = events_with_membership_auth[member] + is_join = ( + event_with_membership_auth.is_state() + and event_with_membership_auth.type == EventTypes.Member + and event_with_membership_auth.state_key == member + and event_with_membership_auth.content.get("membership") + == Membership.JOIN + ) + if not is_join: + # The event must include the desired membership as an auth event, unless + # it's the first join event for a given user. + missing_members.add(member) auth_event_ids.update(event_with_membership_auth.auth_event_ids()) auth_events = await self.store.get_events(auth_event_ids) @@ -1248,7 +1251,7 @@ class SyncHandler: auth_event.type == EventTypes.Member and auth_event.state_key == member ): - missing_members.remove(member) + missing_members.discard(member) additional_state_ids[ (EventTypes.Member, member) ] = auth_event.event_id @@ -1277,7 +1280,7 @@ class SyncHandler: async def unread_notifs_for_room_id( self, room_id: str, sync_config: SyncConfig - ) -> NotifCounts: + ) -> RoomNotifCounts: with Measure(self.clock, "unread_notifs_for_room_id"): return await self.store.get_unread_event_push_actions_by_room_for_user( @@ -1303,6 +1306,19 @@ class SyncHandler: At the end, we transfer data from the `sync_result_builder` to a new `SyncResult` instance to signify that the sync calculation is complete. """ + + user_id = sync_config.user.to_string() + app_service = self.store.get_app_service_by_user_id(user_id) + if app_service: + # We no longer support AS users using /sync directly. + # See https://github.com/matrix-org/matrix-doc/issues/1144 + raise NotImplementedError() + + # Note: we get the users room list *before* we get the current token, this + # avoids checking back in history if rooms are joined after the token is fetched. + token_before_rooms = self.event_sources.get_current_token() + mutable_joined_room_ids = set(await self.store.get_rooms_for_user(user_id)) + # NB: The now_token gets changed by some of the generate_sync_* methods, # this is due to some of the underlying streams not supporting the ability # to query up to a given point. @@ -1310,6 +1326,57 @@ class SyncHandler: now_token = self.event_sources.get_current_token() log_kv({"now_token": str(now_token)}) + # Since we fetched the users room list before the token, there's a small window + # during which membership events may have been persisted, so we fetch these now + # and modify the joined room list for any changes between the get_rooms_for_user + # call and the get_current_token call. + membership_change_events = [] + if since_token: + membership_change_events = await self.store.get_membership_changes_for_user( + user_id, since_token.room_key, now_token.room_key, self.rooms_to_exclude + ) + + mem_last_change_by_room_id: Dict[str, EventBase] = {} + for event in membership_change_events: + mem_last_change_by_room_id[event.room_id] = event + + # For the latest membership event in each room found, add/remove the room ID + # from the joined room list accordingly. In this case we only care if the + # latest change is JOIN. + + for room_id, event in mem_last_change_by_room_id.items(): + assert event.internal_metadata.stream_ordering + if ( + event.internal_metadata.stream_ordering + < token_before_rooms.room_key.stream + ): + continue + + logger.info( + "User membership change between getting rooms and current token: %s %s %s", + user_id, + event.membership, + room_id, + ) + # User joined a room - we have to then check the room state to ensure we + # respect any bans if there's a race between the join and ban events. + if event.membership == Membership.JOIN: + user_ids_in_room = await self.store.get_users_in_room(room_id) + if user_id in user_ids_in_room: + mutable_joined_room_ids.add(room_id) + # The user left the room, or left and was re-invited but not joined yet + else: + mutable_joined_room_ids.discard(room_id) + + # Now we have our list of joined room IDs, exclude as configured and freeze + joined_room_ids = frozenset( + ( + room_id + for room_id in mutable_joined_room_ids + if room_id not in self.rooms_to_exclude + ) + ) + logger.debug( "Calculating sync response for %r between %s and %s", sync_config.user, @@ -1317,22 +1384,13 @@ class SyncHandler: now_token, ) - user_id = sync_config.user.to_string() - app_service = self.store.get_app_service_by_user_id(user_id) - if app_service: - # We no longer support AS users using /sync directly. - # See https://github.com/matrix-org/matrix-doc/issues/1144 - raise NotImplementedError() - else: - joined_room_ids = await self.get_rooms_for_user_at( - user_id, now_token.room_key - ) sync_result_builder = SyncResultBuilder( sync_config, full_state, since_token=since_token, now_token=now_token, joined_room_ids=joined_room_ids, + membership_change_events=membership_change_events, ) logger.debug("Fetching account data") @@ -1479,16 +1537,14 @@ class SyncHandler: since_token.device_list_key ) if changed_users is not None: - result = await self.store.get_rooms_for_users_with_stream_ordering( - changed_users - ) + result = await self.store.get_rooms_for_users(changed_users) for changed_user_id, entries in result.items(): # Check if the changed user shares any rooms with the user, # or if the changed user is the syncing user (as we always # want to include device list updates of their own devices). if user_id == changed_user_id or any( - e.room_id in joined_rooms for e in entries + rid in joined_rooms for rid in entries ): users_that_have_changed.add(changed_user_id) else: @@ -1522,13 +1578,9 @@ class SyncHandler: newly_left_users.update(left_users) # Remove any users that we still share a room with. - left_users_rooms = ( - await self.store.get_rooms_for_users_with_stream_ordering( - newly_left_users - ) - ) + left_users_rooms = await self.store.get_rooms_for_users(newly_left_users) for user_id, entries in left_users_rooms.items(): - if any(e.room_id in joined_rooms for e in entries): + if any(rid in joined_rooms for rid in entries): newly_left_users.discard(user_id) return DeviceListUpdates( @@ -1819,19 +1871,12 @@ class SyncHandler: Does not modify the `sync_result_builder`. """ - user_id = sync_result_builder.sync_config.user.to_string() since_token = sync_result_builder.since_token - now_token = sync_result_builder.now_token + membership_change_events = sync_result_builder.membership_change_events assert since_token - # Get a list of membership change events that have happened to the user - # requesting the sync. - membership_changes = await self.store.get_membership_changes_for_user( - user_id, since_token.room_key, now_token.room_key - ) - - if membership_changes: + if membership_change_events: return True stream_id = since_token.room_key.stream @@ -1870,16 +1915,10 @@ class SyncHandler: since_token = sync_result_builder.since_token now_token = sync_result_builder.now_token sync_config = sync_result_builder.sync_config + membership_change_events = sync_result_builder.membership_change_events assert since_token - # TODO: we've already called this function and ran this query in - # _have_rooms_changed. We could keep the results in memory to avoid a - # second query, at the cost of more complicated source code. - membership_change_events = await self.store.get_membership_changes_for_user( - user_id, since_token.room_key, now_token.room_key, self.rooms_to_exclude - ) - mem_change_events_by_room_id: Dict[str, List[EventBase]] = {} for event in membership_change_events: mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) @@ -2348,6 +2387,7 @@ class SyncHandler: ephemeral=ephemeral, account_data=account_data_events, unread_notifications=unread_notifications, + unread_thread_notifications={}, summary=summary, unread_count=0, ) @@ -2355,10 +2395,33 @@ class SyncHandler: if room_sync or always_include: notifs = await self.unread_notifs_for_room_id(room_id, sync_config) - unread_notifications["notification_count"] = notifs.notify_count - unread_notifications["highlight_count"] = notifs.highlight_count - - room_sync.unread_count = notifs.unread_count + # Notifications for the main timeline. + notify_count = notifs.main_timeline.notify_count + highlight_count = notifs.main_timeline.highlight_count + unread_count = notifs.main_timeline.unread_count + + # Check the sync configuration. + if sync_config.filter_collection.unread_thread_notifications(): + # And add info for each thread. + room_sync.unread_thread_notifications = { + thread_id: { + "notification_count": thread_notifs.notify_count, + "highlight_count": thread_notifs.highlight_count, + } + for thread_id, thread_notifs in notifs.threads.items() + if thread_id is not None + } + + else: + # Combine the unread counts for all threads and main timeline. + for thread_notifs in notifs.threads.values(): + notify_count += thread_notifs.notify_count + highlight_count += thread_notifs.highlight_count + unread_count += thread_notifs.unread_count + + unread_notifications["notification_count"] = notify_count + unread_notifications["highlight_count"] = highlight_count + room_sync.unread_count = unread_count sync_result_builder.joined.append(room_sync) @@ -2380,60 +2443,6 @@ class SyncHandler: else: raise Exception("Unrecognized rtype: %r", room_builder.rtype) - async def get_rooms_for_user_at( - self, - user_id: str, - room_key: RoomStreamToken, - ) -> FrozenSet[str]: - """Get set of joined rooms for a user at the given stream ordering. - - The stream ordering *must* be recent, otherwise this may throw an - exception if older than a month. (This function is called with the - current token, which should be perfectly fine). - - Args: - user_id - stream_ordering - - ReturnValue: - Set of room_ids the user is in at given stream_ordering. - """ - joined_rooms = await self.store.get_rooms_for_user_with_stream_ordering(user_id) - - joined_room_ids = set() - - # We need to check that the stream ordering of the join for each room - # is before the stream_ordering asked for. This might not be the case - # if the user joins a room between us getting the current token and - # calling `get_rooms_for_user_with_stream_ordering`. - # If the membership's stream ordering is after the given stream - # ordering, we need to go and work out if the user was in the room - # before. - # We also need to check whether the room should be excluded from sync - # responses as per the homeserver config. - for joined_room in joined_rooms: - if joined_room.room_id in self.rooms_to_exclude: - continue - - if not joined_room.event_pos.persisted_after(room_key): - joined_room_ids.add(joined_room.room_id) - continue - - logger.info("User joined room after current token: %s", joined_room.room_id) - - extrems = ( - await self.store.get_forward_extremities_for_room_at_stream_ordering( - joined_room.room_id, joined_room.event_pos.stream - ) - ) - user_ids_in_room = await self.state.get_current_user_ids_in_room( - joined_room.room_id, extrems - ) - if user_id in user_ids_in_room: - joined_room_ids.add(joined_room.room_id) - - return frozenset(joined_room_ids) - def _action_has_highlight(actions: List[JsonDict]) -> bool: for action in actions: @@ -2530,6 +2539,7 @@ class SyncResultBuilder: since_token: Optional[StreamToken] now_token: StreamToken joined_room_ids: FrozenSet[str] + membership_change_events: List[EventBase] presence: List[UserPresenceState] = attr.Factory(list) account_data: List[JsonDict] = attr.Factory(list) |