diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/federation.py | 15 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 65 |
2 files changed, 60 insertions, 20 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 3217127865..233f8c113d 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1868,22 +1868,17 @@ class FederationHandler: async with self._is_partial_state_room_linearizer.queue(room_id): logger.info("Clearing partial-state flag for %s", room_id) - success = await self.store.clear_partial_state_room(room_id) + new_stream_id = await self.store.clear_partial_state_room(room_id) - # Poke the notifier so that other workers see the write to - # the un-partial-stated rooms stream. - self._notifier.notify_replication() - - if success: + if new_stream_id is not None: logger.info("State resync complete for %s", room_id) self._storage_controllers.state.notify_room_un_partial_stated( room_id ) - # Poke the notifier so that other workers see the write to - # the un-partial-stated rooms stream. - self._notifier.notify_replication() - + await self._notifier.on_un_partial_stated_room( + room_id, new_stream_id + ) return # we raced against more events arriving with partial state. Go round diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 78d488f2b1..ee11764567 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -290,7 +290,7 @@ class SyncHandler: expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE, ) - self.rooms_to_exclude = hs.config.server.rooms_to_exclude_from_sync + self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync async def wait_for_sync_for_user( self, @@ -1340,7 +1340,10 @@ class SyncHandler: membership_change_events = [] if since_token: membership_change_events = await self.store.get_membership_changes_for_user( - user_id, since_token.room_key, now_token.room_key, self.rooms_to_exclude + user_id, + since_token.room_key, + now_token.room_key, + self.rooms_to_exclude_globally, ) mem_last_change_by_room_id: Dict[str, EventBase] = {} @@ -1375,12 +1378,39 @@ class SyncHandler: else: mutable_joined_room_ids.discard(room_id) + # Tweak the set of rooms to return to the client for eager (non-lazy) syncs. + mutable_rooms_to_exclude = set(self.rooms_to_exclude_globally) + if not sync_config.filter_collection.lazy_load_members(): + # Non-lazy syncs should never include partially stated rooms. + # Exclude all partially stated rooms from this sync. + for room_id in mutable_joined_room_ids: + if await self.store.is_partial_state_room(room_id): + mutable_rooms_to_exclude.add(room_id) + + # Incremental eager syncs should additionally include rooms that + # - we are joined to + # - are full-stated + # - became fully-stated at some point during the sync period + # (These rooms will have been omitted during a previous eager sync.) + forced_newly_joined_room_ids = set() + if since_token and not sync_config.filter_collection.lazy_load_members(): + un_partial_stated_rooms = ( + await self.store.get_un_partial_stated_rooms_between( + since_token.un_partial_stated_rooms_key, + now_token.un_partial_stated_rooms_key, + mutable_joined_room_ids, + ) + ) + for room_id in un_partial_stated_rooms: + if not await self.store.is_partial_state_room(room_id): + forced_newly_joined_room_ids.add(room_id) + # Now we have our list of joined room IDs, exclude as configured and freeze joined_room_ids = frozenset( ( room_id for room_id in mutable_joined_room_ids - if room_id not in self.rooms_to_exclude + if room_id not in mutable_rooms_to_exclude ) ) @@ -1397,6 +1427,8 @@ class SyncHandler: since_token=since_token, now_token=now_token, joined_room_ids=joined_room_ids, + excluded_room_ids=frozenset(mutable_rooms_to_exclude), + forced_newly_joined_room_ids=frozenset(forced_newly_joined_room_ids), membership_change_events=membership_change_events, ) @@ -1834,14 +1866,16 @@ class SyncHandler: # 3. Work out which rooms need reporting in the sync response. ignored_users = await self.store.ignored_users(user_id) if since_token: - room_changes = await self._get_rooms_changed( + room_changes = await self._get_room_changes_for_incremental_sync( sync_result_builder, ignored_users ) tags_by_room = await self.store.get_updated_tags( user_id, since_token.account_data_key ) else: - room_changes = await self._get_all_rooms(sync_result_builder, ignored_users) + room_changes = await self._get_room_changes_for_initial_sync( + sync_result_builder, ignored_users + ) tags_by_room = await self.store.get_tags_for_user(user_id) log_kv({"rooms_changed": len(room_changes.room_entries)}) @@ -1900,7 +1934,7 @@ class SyncHandler: assert since_token - if membership_change_events: + if membership_change_events or sync_result_builder.forced_newly_joined_room_ids: return True stream_id = since_token.room_key.stream @@ -1909,7 +1943,7 @@ class SyncHandler: return True return False - async def _get_rooms_changed( + async def _get_room_changes_for_incremental_sync( self, sync_result_builder: "SyncResultBuilder", ignored_users: FrozenSet[str], @@ -1947,7 +1981,9 @@ class SyncHandler: for event in membership_change_events: mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) - newly_joined_rooms: List[str] = [] + newly_joined_rooms: List[str] = list( + sync_result_builder.forced_newly_joined_room_ids + ) newly_left_rooms: List[str] = [] room_entries: List[RoomSyncResultBuilder] = [] invited: List[InvitedSyncResult] = [] @@ -2153,7 +2189,7 @@ class SyncHandler: newly_left_rooms, ) - async def _get_all_rooms( + async def _get_room_changes_for_initial_sync( self, sync_result_builder: "SyncResultBuilder", ignored_users: FrozenSet[str], @@ -2178,7 +2214,7 @@ class SyncHandler: room_list = await self.store.get_rooms_for_local_user_where_membership_is( user_id=user_id, membership_list=Membership.LIST, - excluded_rooms=self.rooms_to_exclude, + excluded_rooms=sync_result_builder.excluded_room_ids, ) room_entries = [] @@ -2549,6 +2585,13 @@ class SyncResultBuilder: since_token: The token supplied by user, or None. now_token: The token to sync up to. joined_room_ids: List of rooms the user is joined to + excluded_room_ids: Set of room ids we should omit from the /sync response. + forced_newly_joined_room_ids: + Rooms that should be presented in the /sync response as if they were + newly joined during the sync period, even if that's not the case. + (This is useful if the room was previously excluded from a /sync response, + and now the client should be made aware of it.) + Only used by incremental syncs. # The following mirror the fields in a sync response presence @@ -2565,6 +2608,8 @@ class SyncResultBuilder: since_token: Optional[StreamToken] now_token: StreamToken joined_room_ids: FrozenSet[str] + excluded_room_ids: FrozenSet[str] + forced_newly_joined_room_ids: FrozenSet[str] membership_change_events: List[EventBase] presence: List[UserPresenceState] = attr.Factory(list) |