Source code for framework.infrastructure.clarify_node
"""
Clarify Capability
This capability asks users specific questions when their queries are ambiguous
or missing critical information needed for execution. It helps refine user
intent before proceeding with technical operations.
"""
import asyncio
import logging
from typing import List, Dict, Optional, Any
from dataclasses import dataclass
from pydantic import BaseModel, Field
import textwrap
from pydantic_ai import Agent
from framework.base import BaseCapability
from framework.base.decorators import capability_node
from framework.base.errors import ErrorClassification, ErrorSeverity
from framework.context.context_manager import ContextManager
from framework.state import AgentState, ChatHistoryFormatter, StateManager
from framework.base.planning import PlannedStep
from framework.models import get_chat_completion
from framework.prompts.loader import get_framework_prompts
from configs.logger import get_logger
from configs.config import get_full_configuration, get_model_config
from langchain_core.messages import AIMessage
from langgraph.config import get_stream_writer
logger = logging.getLogger(__name__)
# --- Pydantic Model for Clarifying Questions ---
class ClarifyingQuestionsResponse(BaseModel):
"""Structured response for clarifying questions.
Contains the reason for clarification and specific questions to ask
the user to gather missing information.
:param reason: Brief explanation of why clarification is needed
:type reason: str
:param questions: List of specific, targeted questions to clarify the user's request
:type questions: List[str]
:param missing_info: List of types of missing information
:type missing_info: List[str]
"""
reason: str = Field(description="Brief explanation of why clarification is needed")
questions: List[str] = Field(
description="List of specific, targeted questions to clarify the user's request",
min_items=1,
max_items=4
)
missing_info: List[str] = Field(
description="List of types of missing information (e.g., 'time_range', 'system_specification')",
default=[]
)
# --- Convention-Based Capability Definition ---
[docs]
@capability_node
class ClarifyCapability(BaseCapability):
"""Ask user for clarification when queries are ambiguous.
Communication capability that generates targeted questions to clarify
user intent when requests lack sufficient detail or context.
"""
name = "clarify"
description = "Ask specific questions when user queries are ambiguous or missing critical details"
provides = [] # Communication capability - no context output (questions go to user via chat history)
requires = [] # Can work with any execution context
[docs]
@staticmethod
async def execute(state: AgentState, **kwargs):
"""Generate specific questions to ask user based on missing information.
:param state: Current agent state
:type state: AgentState
:param kwargs: Additional parameters (may include step)
:return: State update with clarification response
:rtype: Dict[str, Any]
"""
# Explicit logger retrieval - professional practice
logger = get_logger("framework", "clarify")
# Extract step if provided
step = kwargs.get('step', {})
try:
logger.info("Starting clarification generation")
# Use get_stream_writer() for pure LangGraph streaming
streaming = get_stream_writer()
if streaming:
streaming({"event_type": "status", "message": "Analyzing query for clarification...", "progress": 0.2})
# Generate clarifying questions using PydanticAI
# Run sync function in thread pool to avoid blocking event loop for streaming
questions_response = await asyncio.to_thread(
_generate_clarifying_questions, state, step.get('task_objective', 'unknown')
)
if streaming:
streaming({"event_type": "status", "message": "Generating clarification questions...", "progress": 0.6})
# Format questions for user interaction
formatted_questions = _format_questions_for_user(questions_response)
if streaming:
streaming({"event_type": "status", "message": "Clarification ready", "progress": 1.0, "complete": True})
logger.info(f"Generated {len(questions_response.questions)} clarifying questions")
# Return clarifying questions using native LangGraph pattern
return {
"messages": [AIMessage(content=formatted_questions)]
}
except Exception as e:
logger.error(f"Error generating clarifying questions: {e}")
# Let the decorator handle error classification
raise
# Optional: Add error classification if needed
[docs]
@staticmethod
def classify_error(exc: Exception, context: dict):
"""Clarify error classification."""
from framework.base.errors import ErrorClassification, ErrorSeverity
return ErrorClassification(
severity=ErrorSeverity.RETRIABLE,
user_message=f"Failed to generate clarifying questions: {str(exc)}",
metadata={"technical_details": str(exc)}
)
def _create_orchestrator_guide(self):
"""Get orchestrator snippet from prompt builder.
:return: Orchestrator prompt snippet for this capability
"""
prompt_provider = get_framework_prompts() # Registry will determine the right provider
clarification_builder = prompt_provider.get_clarification_prompt_builder()
return clarification_builder.get_orchestrator_guide()
def _create_classifier_guide(self):
"""Get classifier config from prompt builder.
:return: Classifier configuration for this capability
"""
prompt_provider = get_framework_prompts() # Registry will determine the right provider
clarification_builder = prompt_provider.get_clarification_prompt_builder()
return clarification_builder.get_classifier_guide()
# --- Helper Functions ---
def _generate_clarifying_questions(state, task_objective: str) -> ClarifyingQuestionsResponse:
"""Generate specific clarifying questions using PydanticAI.
:param state: Current agent state
:param task_objective: The task objective to clarify
:type task_objective: str
:return: Structured clarifying questions response
:rtype: ClarifyingQuestionsResponse
"""
# Format entire chat history for context using native message types
messages = state.get("input_output", {}).get("messages", [])
chat_history_str = ChatHistoryFormatter.format_for_llm(messages)
# Get relevant context using ContextManager's proper method
context_manager = ContextManager(state)
current_step = StateManager.get_current_step(state)
relevant_context = context_manager.get_human_summaries(current_step)
# Get question generation prompt from framework prompt builder
prompt_provider = get_framework_prompts()
clarification_builder = prompt_provider.get_clarification_prompt_builder()
clarification_query = clarification_builder.build_clarification_query(chat_history_str, task_objective)
# Use structured LLM generation for clarifying questions
system_instructions = clarification_builder.get_system_instructions()
# Include available context in the prompt so clarification can be context-aware
context_info = ""
if relevant_context:
context_items = []
for context_key, context_data in relevant_context.items():
context_items.append(f"- {context_key}: {context_data}")
context_info = f"\n\nAvailable context data:\n" + "\n".join(context_items)
message = f"{system_instructions}\n\n{clarification_query}{context_info}"
response_config = get_model_config("framework", "response")
result = get_chat_completion(
message=message,
model_config=response_config,
output_model=ClarifyingQuestionsResponse
)
return result
def _format_questions_for_user(questions_response: ClarifyingQuestionsResponse) -> str:
"""Format clarifying questions for direct user interaction.
:param questions_response: The structured questions response
:type questions_response: ClarifyingQuestionsResponse
:return: Formatted questions string for user
:rtype: str
"""
questions_text = "I need some clarification:\n\n"
for i, question in enumerate(questions_response.questions, 1):
questions_text += f"{i}. {question}\n"
return questions_text