diff options
Diffstat (limited to 'synapse/handlers/sync.py')
-rw-r--r-- | synapse/handlers/sync.py | 189 |
1 files changed, 104 insertions, 85 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 4abb9b6127..0f684857ca 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -40,7 +40,7 @@ from synapse.handlers.relations import BundledAggregations from synapse.logging.context import current_context from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span from synapse.push.clientformat import format_push_rules_for_user -from synapse.storage.databases.main.event_push_actions import NotifCounts +from synapse.storage.databases.main.event_push_actions import RoomNotifCounts from synapse.storage.roommember import MemberSummary from synapse.storage.state import StateFilter from synapse.types import ( @@ -128,6 +128,7 @@ class JoinedSyncResult: ephemeral: List[JsonDict] account_data: List[JsonDict] unread_notifications: JsonDict + unread_thread_notifications: JsonDict summary: Optional[JsonDict] unread_count: int @@ -278,6 +279,8 @@ class SyncHandler: self.rooms_to_exclude = hs.config.server.rooms_to_exclude_from_sync + self._msc3773_enabled = hs.config.experimental.msc3773_enabled + async def wait_for_sync_for_user( self, requester: Requester, @@ -1288,7 +1291,7 @@ class SyncHandler: async def unread_notifs_for_room_id( self, room_id: str, sync_config: SyncConfig - ) -> NotifCounts: + ) -> RoomNotifCounts: with Measure(self.clock, "unread_notifs_for_room_id"): return await self.store.get_unread_event_push_actions_by_room_for_user( @@ -1314,6 +1317,19 @@ class SyncHandler: At the end, we transfer data from the `sync_result_builder` to a new `SyncResult` instance to signify that the sync calculation is complete. """ + + user_id = sync_config.user.to_string() + app_service = self.store.get_app_service_by_user_id(user_id) + if app_service: + # We no longer support AS users using /sync directly. + # See https://github.com/matrix-org/matrix-doc/issues/1144 + raise NotImplementedError() + + # Note: we get the users room list *before* we get the current token, this + # avoids checking back in history if rooms are joined after the token is fetched. + token_before_rooms = self.event_sources.get_current_token() + mutable_joined_room_ids = set(await self.store.get_rooms_for_user(user_id)) + # NB: The now_token gets changed by some of the generate_sync_* methods, # this is due to some of the underlying streams not supporting the ability # to query up to a given point. @@ -1321,6 +1337,57 @@ class SyncHandler: now_token = self.event_sources.get_current_token() log_kv({"now_token": now_token}) + # Since we fetched the users room list before the token, there's a small window + # during which membership events may have been persisted, so we fetch these now + # and modify the joined room list for any changes between the get_rooms_for_user + # call and the get_current_token call. + membership_change_events = [] + if since_token: + membership_change_events = await self.store.get_membership_changes_for_user( + user_id, since_token.room_key, now_token.room_key, self.rooms_to_exclude + ) + + mem_last_change_by_room_id: Dict[str, EventBase] = {} + for event in membership_change_events: + mem_last_change_by_room_id[event.room_id] = event + + # For the latest membership event in each room found, add/remove the room ID + # from the joined room list accordingly. In this case we only care if the + # latest change is JOIN. + + for room_id, event in mem_last_change_by_room_id.items(): + assert event.internal_metadata.stream_ordering + if ( + event.internal_metadata.stream_ordering + < token_before_rooms.room_key.stream + ): + continue + + logger.info( + "User membership change between getting rooms and current token: %s %s %s", + user_id, + event.membership, + room_id, + ) + # User joined a room - we have to then check the room state to ensure we + # respect any bans if there's a race between the join and ban events. + if event.membership == Membership.JOIN: + user_ids_in_room = await self.store.get_users_in_room(room_id) + if user_id in user_ids_in_room: + mutable_joined_room_ids.add(room_id) + # The user left the room, or left and was re-invited but not joined yet + else: + mutable_joined_room_ids.discard(room_id) + + # Now we have our list of joined room IDs, exclude as configured and freeze + joined_room_ids = frozenset( + ( + room_id + for room_id in mutable_joined_room_ids + if room_id not in self.rooms_to_exclude + ) + ) + logger.debug( "Calculating sync response for %r between %s and %s", sync_config.user, @@ -1328,22 +1395,13 @@ class SyncHandler: now_token, ) - user_id = sync_config.user.to_string() - app_service = self.store.get_app_service_by_user_id(user_id) - if app_service: - # We no longer support AS users using /sync directly. - # See https://github.com/matrix-org/matrix-doc/issues/1144 - raise NotImplementedError() - else: - joined_room_ids = await self.get_rooms_for_user_at( - user_id, now_token.room_key - ) sync_result_builder = SyncResultBuilder( sync_config, full_state, since_token=since_token, now_token=now_token, joined_room_ids=joined_room_ids, + membership_change_events=membership_change_events, ) logger.debug("Fetching account data") @@ -1824,19 +1882,12 @@ class SyncHandler: Does not modify the `sync_result_builder`. """ - user_id = sync_result_builder.sync_config.user.to_string() since_token = sync_result_builder.since_token - now_token = sync_result_builder.now_token + membership_change_events = sync_result_builder.membership_change_events assert since_token - # Get a list of membership change events that have happened to the user - # requesting the sync. - membership_changes = await self.store.get_membership_changes_for_user( - user_id, since_token.room_key, now_token.room_key - ) - - if membership_changes: + if membership_change_events: return True stream_id = since_token.room_key.stream @@ -1875,16 +1926,10 @@ class SyncHandler: since_token = sync_result_builder.since_token now_token = sync_result_builder.now_token sync_config = sync_result_builder.sync_config + membership_change_events = sync_result_builder.membership_change_events assert since_token - # TODO: we've already called this function and ran this query in - # _have_rooms_changed. We could keep the results in memory to avoid a - # second query, at the cost of more complicated source code. - membership_change_events = await self.store.get_membership_changes_for_user( - user_id, since_token.room_key, now_token.room_key, self.rooms_to_exclude - ) - mem_change_events_by_room_id: Dict[str, List[EventBase]] = {} for event in membership_change_events: mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) @@ -2353,6 +2398,7 @@ class SyncHandler: ephemeral=ephemeral, account_data=account_data_events, unread_notifications=unread_notifications, + unread_thread_notifications={}, summary=summary, unread_count=0, ) @@ -2360,10 +2406,36 @@ class SyncHandler: if room_sync or always_include: notifs = await self.unread_notifs_for_room_id(room_id, sync_config) - unread_notifications["notification_count"] = notifs.notify_count - unread_notifications["highlight_count"] = notifs.highlight_count + # Notifications for the main timeline. + notify_count = notifs.main_timeline.notify_count + highlight_count = notifs.main_timeline.highlight_count + unread_count = notifs.main_timeline.unread_count - room_sync.unread_count = notifs.unread_count + # Check the sync configuration. + if ( + self._msc3773_enabled + and sync_config.filter_collection.unread_thread_notifications() + ): + # And add info for each thread. + room_sync.unread_thread_notifications = { + thread_id: { + "notification_count": thread_notifs.notify_count, + "highlight_count": thread_notifs.highlight_count, + } + for thread_id, thread_notifs in notifs.threads.items() + if thread_id is not None + } + + else: + # Combine the unread counts for all threads and main timeline. + for thread_notifs in notifs.threads.values(): + notify_count += thread_notifs.notify_count + highlight_count += thread_notifs.highlight_count + unread_count += thread_notifs.unread_count + + unread_notifications["notification_count"] = notify_count + unread_notifications["highlight_count"] = highlight_count + room_sync.unread_count = unread_count sync_result_builder.joined.append(room_sync) @@ -2385,60 +2457,6 @@ class SyncHandler: else: raise Exception("Unrecognized rtype: %r", room_builder.rtype) - async def get_rooms_for_user_at( - self, - user_id: str, - room_key: RoomStreamToken, - ) -> FrozenSet[str]: - """Get set of joined rooms for a user at the given stream ordering. - - The stream ordering *must* be recent, otherwise this may throw an - exception if older than a month. (This function is called with the - current token, which should be perfectly fine). - - Args: - user_id - stream_ordering - - ReturnValue: - Set of room_ids the user is in at given stream_ordering. - """ - joined_rooms = await self.store.get_rooms_for_user_with_stream_ordering(user_id) - - joined_room_ids = set() - - # We need to check that the stream ordering of the join for each room - # is before the stream_ordering asked for. This might not be the case - # if the user joins a room between us getting the current token and - # calling `get_rooms_for_user_with_stream_ordering`. - # If the membership's stream ordering is after the given stream - # ordering, we need to go and work out if the user was in the room - # before. - # We also need to check whether the room should be excluded from sync - # responses as per the homeserver config. - for joined_room in joined_rooms: - if joined_room.room_id in self.rooms_to_exclude: - continue - - if not joined_room.event_pos.persisted_after(room_key): - joined_room_ids.add(joined_room.room_id) - continue - - logger.info("User joined room after current token: %s", joined_room.room_id) - - extrems = ( - await self.store.get_forward_extremities_for_room_at_stream_ordering( - joined_room.room_id, joined_room.event_pos.stream - ) - ) - user_ids_in_room = await self.state.get_current_user_ids_in_room( - joined_room.room_id, extrems - ) - if user_id in user_ids_in_room: - joined_room_ids.add(joined_room.room_id) - - return frozenset(joined_room_ids) - def _action_has_highlight(actions: List[JsonDict]) -> bool: for action in actions: @@ -2535,6 +2553,7 @@ class SyncResultBuilder: since_token: Optional[StreamToken] now_token: StreamToken joined_room_ids: FrozenSet[str] + membership_change_events: List[EventBase] presence: List[UserPresenceState] = attr.Factory(list) account_data: List[JsonDict] = attr.Factory(list) |