"""
Orchestration Node - LangGraph Compatible
Creates execution plans from active capabilities and task requirements.
Convention-based implementation with native LangGraph interrupt support.
"""
from __future__ import annotations
import time
import json
import datetime
from typing import Optional, Dict, Any, List, TYPE_CHECKING
from pathlib import Path
import asyncio
from framework.base.decorators import infrastructure_node
from framework.base.errors import ErrorClassification, ErrorSeverity
from framework.base.nodes import BaseInfrastructureNode
from framework.base.planning import ExecutionPlan, PlannedStep
from framework.base import BaseCapability
from framework.state import AgentState
from framework.state.state import create_status_update
from framework.registry import get_registry
from framework.context.context_manager import ContextManager
# Factory code consolidated inline as helper function
from configs.logger import get_logger
from configs.streaming import get_streamer
from configs.config import get_model_config, get_agent_dir
from framework.models import get_chat_completion
from framework.prompts.loader import get_framework_prompts
from langgraph.types import interrupt
from framework.state.state_manager import StateManager
from framework.approval.approval_system import (
create_plan_approval_interrupt,
get_approval_resume_data,
clear_approval_state,
create_approval_type
)
if TYPE_CHECKING:
from framework.observability import ExecutionObserver
from framework.base.errors import ErrorClassification
logger = get_logger("framework", "orchestrator")
registry = get_registry()
# =============================================================================
# EXECUTION PLAN VALIDATION
# =============================================================================
def _validate_and_fix_execution_plan(execution_plan: ExecutionPlan, current_task: str, logger) -> ExecutionPlan:
"""Validate and fix execution plan to ensure all capabilities exist and it ends with respond or clarify step.
This is the primary validation mechanism to:
1. Check that all capabilities in the plan exist in the registry
2. Ensure users always get a response by ending with respond/clarify
If hallucinated capabilities are found, raises ValueError for re-planning.
If the execution plan doesn't end with a respond or clarify step, we append a respond step.
:param execution_plan: The execution plan to validate
:param current_task: The current task for context
:param logger: Logger instance
:return: Fixed execution plan that ends with respond or clarify
:raises ValueError: If hallucinated capabilities are found requiring re-planning
"""
steps = execution_plan.get('steps', [])
# Create generic respond step (used in multiple cases)
generic_response = PlannedStep(
context_key="user_response",
capability="respond",
task_objective=f"Respond to user request: {current_task}",
expected_output="user_response",
success_criteria="Provide helpful response to user query",
inputs=[]
)
if not steps:
logger.warning("Empty execution plan - adding default respond step")
return {"steps": [generic_response]}
# =====================================================================
# STEP 1: VALIDATE ALL CAPABILITIES EXIST IN REGISTRY
# =====================================================================
hallucinated_capabilities = []
for i, step in enumerate(steps):
capability_name = step.get('capability', '')
if not capability_name:
logger.warning(f"Step {i+1} has no capability specified")
continue
# Check if capability exists in registry
if not registry.get_node(capability_name):
hallucinated_capabilities.append(capability_name)
logger.error(f"Step {i+1}: Capability '{capability_name}' not found in registry")
# If hallucinated capabilities found, trigger re-planning
if hallucinated_capabilities:
error_msg = f"Orchestrator hallucinated non-existent capabilities: {hallucinated_capabilities}. Available capabilities: {registry.get_stats()['capability_names']}"
logger.error(error_msg)
raise ValueError(error_msg)
logger.debug("✅ All capabilities in execution plan exist in registry")
# =====================================================================
# STEP 2: ENSURE PLAN ENDS WITH RESPOND OR CLARIFY
# =====================================================================
# Check if last step is respond or clarify
last_step = steps[-1]
last_capability = last_step.get('capability', '').lower()
if last_capability in ['respond', 'clarify']:
logger.debug(f"Execution plan correctly ends with {last_capability} step")
return execution_plan
# Plan doesn't end with respond/clarify - add respond step
logger.info(f"Execution plan ends with '{last_capability}' instead of respond/clarify - adding respond step")
# Append the respond step
fixed_steps = steps + [generic_response]
logger.success(f"Added respond step to execution plan (now {len(fixed_steps)} steps total)")
return {"steps": fixed_steps}
# =============================================================================
# CONVENTION-BASED ORCHESTRATION NODE
# =============================================================================
[docs]
@infrastructure_node
class OrchestrationNode(BaseInfrastructureNode):
"""Convention-based orchestration node with sophisticated execution planning logic.
Creates detailed execution plans from task requirements and available capabilities.
Handles both initial planning and replanning scenarios with approval workflows.
Features:
- Configuration-driven error classification and retry policies
- LLM-based execution planning with fallback mechanisms
- Approval workflow integration for execution plans
- Context-aware planning with capability selection
- Sophisticated error handling for LLM operations
"""
name = "orchestrator"
description = "Execution Planning and Orchestration"
[docs]
@staticmethod
def classify_error(exc: Exception, context: dict):
"""Built-in error classification for orchestration operations."""
# Retry LLM timeouts (orchestration uses LLM heavily)
if hasattr(exc, '__class__') and 'timeout' in exc.__class__.__name__.lower():
return ErrorClassification(
severity=ErrorSeverity.RETRIABLE,
user_message="LLM timeout during execution planning, retrying...",
metadata={"technical_details": str(exc)}
)
# Retry network/connection errors
if isinstance(exc, (ConnectionError, TimeoutError)):
return ErrorClassification(
severity=ErrorSeverity.RETRIABLE,
user_message="Network timeout during execution planning, retrying...",
metadata={"technical_details": str(exc)}
)
# Don't retry planning/validation errors (logic issues)
if isinstance(exc, (ValueError, TypeError)):
return ErrorClassification(
severity=ErrorSeverity.CRITICAL,
user_message="Execution planning configuration error",
metadata={"technical_details": str(exc)}
)
# Don't retry import/module errors (infrastructure issues)
if isinstance(exc, (ImportError, ModuleNotFoundError)):
return ErrorClassification(
severity=ErrorSeverity.CRITICAL,
user_message=f"Infrastructure dependency error: {str(exc)}",
metadata={"technical_details": str(exc)}
)
# Default: CRITICAL for unknown errors (fail safe principle)
# Only explicitly known errors should be RETRIABLE
return ErrorClassification(
severity=ErrorSeverity.CRITICAL,
user_message=f"Unknown execution planning error: {str(exc)}",
metadata={"technical_details": f"Error type: {type(exc).__name__}, Details: {str(exc)}"}
)
[docs]
@staticmethod
def get_retry_policy() -> Dict[str, Any]:
"""Custom retry policy for LLM-based orchestration operations.
Orchestration uses LLM calls heavily and can be flaky due to:
- Network timeouts to LLM services
- LLM provider rate limiting
- Temporary LLM service unavailability
Use longer delays and more attempts than default infrastructure policy.
"""
return {
"max_attempts": 4, # More attempts for LLM operations
"delay_seconds": 2.0, # Longer initial delay for LLM services
"backoff_factor": 2.0 # Aggressive backoff for rate limiting
}
[docs]
@staticmethod
async def execute(
state: AgentState,
**kwargs
) -> Dict[str, Any]:
"""Create execution plans with LangGraph native interrupt support.
This implementation creates execution plans from task requirements and
handles planning mode with native LangGraph interrupts for approval workflows.
:param state: Current agent state
:type state: AgentState
:param kwargs: Additional LangGraph parameters
:return: Dictionary of state updates for LangGraph
:rtype: Dict[str, Any]
"""
# Explicit logger retrieval - professional practice
logger = get_logger("framework", "orchestrator")
# Define streaming helper here for step awareness
streamer = get_streamer("framework", "orchestrator", state)
# =====================================================================
# STEP 1: CHECK FOR APPROVED PLAN IN AGENT STATE (HIGHEST PRIORITY)
# =====================================================================
# Check for approved execution plan using centralized function
has_approval_resume, approved_payload = get_approval_resume_data(state, create_approval_type("orchestrator", "plan"))
if has_approval_resume and approved_payload:
# Try to load execution plan from file first
file_load_result = _load_execution_plan_from_file(logger=logger)
if file_load_result["success"]:
approved_plan = file_load_result["execution_plan"]
plan_source = file_load_result["source"]
logger.success(f"Using approved execution plan from file ({plan_source})")
streamer.status(f"Using approved execution plan from file ({plan_source})")
# Clean up processed plan files
_cleanup_processed_plan_files(logger=logger)
# Create state updates with explicit cleanup of approval and error state
return {
**_create_state_updates(state, approved_plan, f"approved_from_file_{plan_source}"),
**clear_approval_state()
}
else:
# Fallback to old in-memory approach if file loading fails
approved_plan = approved_payload.get("execution_plan")
if approved_plan:
logger.warning(f"File loading failed ({file_load_result.get('error')}), using in-memory plan")
streamer.status("Using approved execution plan from memory")
# Create state updates with explicit cleanup of approval and error state
return {
**_create_state_updates(state, approved_plan, "approved_from_memory_fallback"),
**clear_approval_state()
}
else:
logger.warning("Both file loading and in-memory plan failed")
elif has_approval_resume:
# Plan was rejected - clean up any pending files
_cleanup_processed_plan_files(logger=logger)
logger.info("Execution plan was rejected by user")
# =====================================================================
# STEP 2: EXTRACT CURRENT TASK AND ACTIVE CAPABILITIES
# =====================================================================
current_task = StateManager.get_current_task(state)
if not current_task:
raise ValueError("No current task available for orchestration")
# Get active capabilities from state
active_capability_names = state.get('planning_active_capabilities')
if not active_capability_names:
logger.error("No active capabilities found in state")
return {
"control_needs_reclassification": True,
"control_reclassification_reason": "No active capabilities found"
}
# Get capability instances from registry using capability names
active_capabilities = []
for cap_name in active_capability_names:
capability = registry.get_capability(cap_name)
if capability:
active_capabilities.append(capability)
else:
logger.warning(f"Capability '{cap_name}' not found in registry")
if not active_capabilities:
raise ValueError("No valid capability instances found for orchestration")
logger.info(f"Planning for task: {current_task}")
logger.info(f"Available capabilities: {[cap.name for cap in active_capabilities]}")
# =====================================================================
# STEP 3: CREATE NEW EXECUTION PLAN
# =====================================================================
# =====================================================================
# HELPER FUNCTION: CREATE SYSTEM PROMPT
# =====================================================================
async def create_system_prompt() -> str:
"""Create orchestrator system prompt from capabilities and context."""
logger.info(f"Creating orchestrator prompt for task: \"{current_task[:100]}...\"")
# Extract capability names from active capabilities
active_capability_names = [cap.name for cap in active_capabilities]
logger.info(f"Active capabilities: {active_capability_names}")
# Create ContextManager from state data
context_manager = ContextManager(state)
# Format error context for replanning if available
error_info = state.get('control_error_info')
error_context = None
if error_info and isinstance(error_info, dict):
classification = error_info.get('classification')
if classification and hasattr(classification, 'format_for_llm'):
error_context = classification.format_for_llm()
if error_context:
logger.info("Error context detected - enabling replanning mode with failure analysis")
logger.debug(f"Error context for replanning: {error_context}")
# Use the prompt system to build the complete orchestrator prompt
prompt_provider = get_framework_prompts()
orchestrator_builder = prompt_provider.get_orchestrator_prompt_builder()
system_instructions = orchestrator_builder.get_system_instructions(
active_capabilities=active_capabilities,
context_manager=context_manager,
task_depends_on_chat_history=state.get('task_depends_on_chat_history', False),
task_depends_on_user_memory=state.get('task_depends_on_user_memory', False),
error_context=error_context
)
if not system_instructions:
logger.error("No prompt text generated. The instructions will be empty.")
raise ValueError("No prompt text generated. The instructions will be empty.")
# Count total examples across all capabilities
total_examples = sum(len(cap.orchestrator_guide.examples) for cap in active_capabilities
if cap.orchestrator_guide and hasattr(cap.orchestrator_guide, 'examples'))
# Get context data from ContextManager
raw_data = context_manager.get_raw_data()
context_types = len(raw_data) if raw_data else 0
logger.info(f"Constructed orchestrator instructions using:")
logger.info(f" - {len(active_capabilities)} capabilities")
logger.info(f" - {total_examples} structured examples")
logger.info(f" - {context_types} context types")
if error_context:
logger.info(f" - Error context for replanning (previous failure analysis)")
logger.debug(f"\n\n\n------------Orchestrator System Prompt:\n{system_instructions}\n------------\n\n\n")
return system_instructions
# =====================================================================
# GENERATE EXECUTION PLAN
# =====================================================================
# Create system prompt
system_prompt = await create_system_prompt()
streamer.status("Generating execution plan...")
# Call LLM directly for execution planning
logger.key_info("Creating execution plan with orchestrator LLM")
plan_start_time = time.time()
# Get model configuration and call LLM
model_config = get_model_config("framework", "orchestrator")
message = f"{system_prompt}\n\nTASK TO PLAN: {current_task}"
# Run sync LLM call in thread pool to avoid blocking event loop for streaming
execution_plan = await asyncio.to_thread(
get_chat_completion,
message=message,
model_config=model_config,
output_model=ExecutionPlan
)
execution_time = time.time() - plan_start_time
logger.info(f"Orchestrator LLM execution time: {execution_time:.2f} seconds")
# =====================================================================
# STEP 3.5: VALIDATE AND FIX EXECUTION PLAN
# =====================================================================
try:
# Validate that all capabilities exist and plan ends with respond/clarify
execution_plan = _validate_and_fix_execution_plan(execution_plan, current_task, logger)
# Log final validated execution plan (after any modifications)
_log_execution_plan(execution_plan, logger)
logger.success(f"Final execution plan ready with {len(execution_plan.get('steps', []))} steps")
except ValueError as e:
# Orchestrator hallucinated non-existent capabilities - trigger re-planning
logger.error(f"Execution plan validation failed: {e}")
logger.warning("Triggering re-classification due to hallucinated capabilities")
streamer.status("Re-planning due to invalid capabilities...")
# Return state that triggers re-classification with re-planning severity
return {
"control_needs_reclassification": True,
"control_reclassification_reason": f"Orchestrator validation failed: {e}",
"control_reclassification_severity": "re_planning" # This triggers full re-planning
}
# =====================================================================
# STEP 4: HANDLE RESULTING EXECUTION PLAN
# =====================================================================
if _is_planning_mode_enabled(state):
logger.info("PLANNING MODE DETECTED - entering approval workflow")
# LangGraph handles caching automatically - no manual caching needed
await _handle_planning_mode(execution_plan, current_task, state, logger, streamer)
else:
logger.info("Planning mode not enabled - proceeding with normal execution")
streamer.status("Execution plan created")
logger.key_info("Orchestration processing completed")
return _create_state_updates(state, execution_plan, "llm_based")
# =============================================================================
# BUSINESS LOGIC HELPERS
# =============================================================================
def _clear_error_state() -> Dict[str, Any]:
"""Clear error state to prevent router from staying in retry handling mode.
When orchestrator creates a new plan, we need to clear previous error state
so the router can execute the new plan instead of continuing to handle old errors.
Returns:
Dictionary with error state fields cleared
"""
return {
"control_has_error": False,
"control_error_info": None,
"control_last_error": None,
"control_retry_count": 0,
"control_current_step_retry_count": 0
}
def _log_execution_plan(execution_plan: ExecutionPlan, logger):
"""Log execution plan with clean formatting."""
logger.key_info("="*50)
for index, step in enumerate(execution_plan.get('steps', [])):
logger.key_info(f" << Step {index + 1}")
logger.info(f" << ├───── id: '{step.get('context_key', 'unknown')}'")
logger.info(f" << ├─── node: '{step.get('capability', 'unknown')}'")
logger.info(f" << ├─── task: '{step.get('task_objective', 'unknown')}'")
logger.info(f" << └─ inputs: '{step.get('inputs', [])}'")
logger.key_info("="*50)
def _save_execution_plan_to_file(
execution_plan: ExecutionPlan,
current_task: str,
state: AgentState,
logger = None
) -> Dict[str, Any]:
"""Save execution plan to JSON file for human approval workflow.
Args:
execution_plan: The execution plan to save
current_task: The extracted task from task extraction node
state: Agent state containing original user message
logger: Logger instance for logging
Returns:
Dictionary with success status and file path
"""
try:
# Get execution plans directory
execution_plans_dir = get_agent_dir("execution_plans")
pending_plans_dir = Path(execution_plans_dir) / "pending_plans"
pending_plans_dir.mkdir(parents=True, exist_ok=True)
# Extract original user query
original_query = StateManager.get_user_query(state)
if not original_query:
original_query = current_task # Fallback if no messages available
# Create plan data with metadata including both original query and extracted task
plan_data = {
"__metadata__": {
"current_task": current_task, # Extracted task from task extraction
"original_query": original_query, # Original user input
"created_at": datetime.datetime.now().isoformat(),
"serialization_type": "pending_execution_plan"
},
"steps": execution_plan.get("steps", [])
}
# Save to pending plan file (used by editor)
pending_plan_file = pending_plans_dir / "pending_execution_plan.json"
with open(pending_plan_file, 'w', encoding='utf-8') as f:
json.dump(plan_data, f, indent=2, ensure_ascii=False)
if logger:
logger.info(f"Execution plan saved to {pending_plan_file}")
return {
"success": True,
"file_path": str(pending_plan_file),
"pending_plans_dir": str(pending_plans_dir)
}
except Exception as e:
error_msg = f"Failed to save execution plan to file: {e}"
if logger:
logger.error(error_msg)
return {
"success": False,
"error": error_msg
}
def _load_execution_plan_from_file(logger = None) -> Dict[str, Any]:
"""Load execution plan from JSON file after human approval.
Args:
logger: Logger instance for logging
Returns:
Dictionary with success status and execution plan data
"""
try:
# Get execution plans directory using config
execution_plans_dir = get_agent_dir("execution_plans")
pending_plans_dir = Path(execution_plans_dir) / "pending_plans"
# Try to load modified plan first (if user modified the plan)
modified_plan_file = pending_plans_dir / "modified_execution_plan.json"
if modified_plan_file.exists():
with open(modified_plan_file, 'r', encoding='utf-8') as f:
plan_data = json.load(f)
if logger:
logger.info(f"Loaded modified execution plan from {modified_plan_file}")
return {
"success": True,
"execution_plan": {"steps": plan_data.get("steps", [])},
"metadata": plan_data.get("__metadata__", {}),
"source": "modified_plan"
}
# Fall back to original pending plan
pending_plan_file = pending_plans_dir / "pending_execution_plan.json"
if pending_plan_file.exists():
with open(pending_plan_file, 'r', encoding='utf-8') as f:
plan_data = json.load(f)
if logger:
logger.info(f"Loaded original execution plan from {pending_plan_file}")
return {
"success": True,
"execution_plan": {"steps": plan_data.get("steps", [])},
"metadata": plan_data.get("__metadata__", {}),
"source": "original_plan"
}
error_msg = "No execution plan file found for loading"
if logger:
logger.warning(error_msg)
return {
"success": False,
"error": error_msg
}
except Exception as e:
error_msg = f"Failed to load execution plan from file: {e}"
if logger:
logger.error(error_msg)
return {
"success": False,
"error": error_msg
}
def _cleanup_processed_plan_files(logger = None):
"""Clean up processed execution plan files after successful execution.
Args:
logger: Logger instance for logging
"""
try:
# Get execution plans directory using config
execution_plans_dir = get_agent_dir("execution_plans")
pending_plans_dir = Path(execution_plans_dir) / "pending_plans"
files_to_remove = []
# Remove pending plan file
pending_plan_file = pending_plans_dir / "pending_execution_plan.json"
if pending_plan_file.exists():
files_to_remove.append(pending_plan_file)
# Remove modified plan file if it exists
modified_plan_file = pending_plans_dir / "modified_execution_plan.json"
if modified_plan_file.exists():
files_to_remove.append(modified_plan_file)
# Remove the files
for file_path in files_to_remove:
file_path.unlink()
if logger:
logger.debug(f"Cleaned up plan file: {file_path}")
if logger and files_to_remove:
logger.info(f"Cleaned up {len(files_to_remove)} processed plan files")
except Exception as e:
if logger:
logger.warning(f"Failed to cleanup plan files: {e}")
async def _handle_planning_mode(execution_plan: ExecutionPlan, current_task: str, state: AgentState, logger, streamer):
"""Handle planning mode using structured approval system with file-based plan storage."""
logger.approval("Planning mode enabled - requesting plan approval")
streamer.status("Saving execution plan and requesting approval...")
# Save execution plan to file for human approval workflow
save_result = _save_execution_plan_to_file(
execution_plan=execution_plan,
current_task=current_task,
state=state,
logger=logger
)
if not save_result["success"]:
logger.warning(f"Failed to save execution plan: {save_result.get('error')}")
# Fallback to in-memory approach if file saving fails
interrupt_data = create_plan_approval_interrupt(
execution_plan=execution_plan
)
else:
logger.approval(f"Execution plan saved to {save_result['file_path']}")
# Create enhanced interrupt data with file path references
interrupt_data = create_plan_approval_interrupt(
execution_plan=execution_plan,
plan_file_path=save_result["file_path"],
pending_plans_dir=save_result["pending_plans_dir"]
)
logger.approval("Interrupting execution for plan approval")
logger.debug(f"Interrupt data created with {len(execution_plan.get('steps', []))} steps")
# LangGraph interrupt - execution stops here until user responds
interrupt(interrupt_data)
def _is_planning_mode_enabled(state: AgentState) -> bool:
"""Check if planning mode is enabled in agent control state."""
agent_control = state.get('agent_control', {})
return agent_control.get('planning_mode_enabled', False)
def _create_state_updates(state: AgentState, execution_plan: ExecutionPlan, approach: str) -> Dict[str, Any]:
"""Create state updates based on orchestration results using proper LangGraph merging."""
# Direct planning state update
planning_update = {
"planning_execution_plan": execution_plan,
"planning_current_step_index": 0
}
# Increment plans created counter - orchestrator owns this responsibility
current_plans_count = state.get('control_plans_created_count', 0)
planning_control_update = {
"control_plans_created_count": current_plans_count + 1
}
# CRITICAL: Clear error state so router can execute new plan instead of staying in retry mode
error_state_cleanup = _clear_error_state()
# Add status event using LangGraph's add reducer
status_event = create_status_update(
message=f"Execution plan created using {approach} (plan #{current_plans_count + 1})",
progress=1.0,
complete=True,
node="orchestration",
approach=approach,
total_steps=len(execution_plan.get('steps', []))
)
# Merge the updates - LangGraph will handle this properly
return {**planning_update, **planning_control_update, **error_state_cleanup, **status_event}