Orchestration#
Infrastructure node that creates execution plans from active capabilities and task requirements with native LangGraph interrupt support for human-in-the-loop approval.
OrchestrationNode#
- class framework.infrastructure.orchestration_node.OrchestrationNode[source]#
Bases:
BaseInfrastructureNode
Convention-based orchestration node with sophisticated execution planning logic.
Creates detailed execution plans from task requirements and available capabilities. Handles both initial planning and replanning scenarios with approval workflows.
Features: - Configuration-driven error classification and retry policies - LLM-based execution planning with fallback mechanisms - Approval workflow integration for execution plans - Context-aware planning with capability selection - Sophisticated error handling for LLM operations
- name: str = 'orchestrator'#
- description: str = 'Execution Planning and Orchestration'#
- static classify_error(exc, context)[source]#
Built-in error classification for orchestration operations.
- static get_retry_policy()[source]#
Custom retry policy for LLM-based orchestration operations.
Orchestration uses LLM calls heavily and can be flaky due to: - Network timeouts to LLM services - LLM provider rate limiting - Temporary LLM service unavailability
Use longer delays and more attempts than default infrastructure policy.
- Return type:
Dict[str, Any]
- async static execute(state, **kwargs)[source]#
Create execution plans with LangGraph native interrupt support.
This implementation creates execution plans from task requirements and handles planning mode with native LangGraph interrupts for approval workflows.
- Parameters:
state (AgentState) – Current agent state
kwargs – Additional LangGraph parameters
- Returns:
Dictionary of state updates for LangGraph
- Return type:
Dict[str, Any]
- __repr__()#
Return a string representation of the infrastructure node for debugging.
Provides a concise string representation that includes both the Python class name and the infrastructure node’s registered name. This is useful for debugging, logging, and development workflows where infrastructure nodes need to be identified clearly.
- Returns:
String representation including class name and node name
- Return type:
str
Example
>>> node = TaskExtractionNode() >>> repr(node) '<TaskExtractionNode: task_extraction>'
Note
The format follows the pattern ‘<ClassName: node_name>’ for consistency across all framework components.
- async langgraph_node(**kwargs)#
LangGraph-native node function with manual error handling.
This function is called by LangGraph during execution. Infrastructure nodes now use get_stream_writer() and get_config() directly for pure LangGraph integration.
- Parameters:
state (AgentState) – Current agent state
kwargs – Additional parameters from LangGraph
- Returns:
State updates dictionary
- Return type:
Dict[str, Any]
Core Models#
Orchestration uses models defined in the core framework:
See also
ExecutionPlan
Structured execution plan model
PlannedStep
Individual step within execution plans
BaseInfrastructureNode
Base class for infrastructure components
Approval System Integration#
- framework.approval.approval_system.create_plan_approval_interrupt(execution_plan, plan_file_path=None, pending_plans_dir=None)[source]#
Create structured interrupt data for execution plan approval with file-based storage support.
Generates LangGraph-compatible interrupt data that presents execution plans to users for approval. The interrupt includes formatted step details, clear approval instructions, and structured payload data for seamless resume operations after user approval.
The function supports file-based execution plan storage for enhanced human-in-the-loop workflows, particularly for Open WebUI integration.
The generated user message provides a comprehensive view of planned operations with step-by-step breakdown, making it easy for users to understand and evaluate the proposed execution plan.
- Parameters:
execution_plan (ExecutionPlan) – Execution plan object containing steps and configuration
plan_file_path (str, optional) – Optional file path where the execution plan was saved
pending_plans_dir (str, optional) – Optional directory path for pending plan files
- Returns:
Dictionary containing user_message and resume_payload for LangGraph
- Return type:
Dict[str, Any]
Examples
Basic plan approval:
>>> from framework.base.planning import ExecutionPlan >>> plan = ExecutionPlan(steps=[ ... {'task_objective': 'Load data', 'capability': 'data_loader'}, ... {'task_objective': 'Analyze trends', 'capability': 'data_analysis'} ... ]) >>> interrupt_data = create_plan_approval_interrupt(plan) >>> print(interrupt_data['user_message']) # Contains formatted approval request
Note
The interrupt data follows LangGraph’s standard structure with user_message for display and resume_payload for execution continuation.
See also
framework.base.planning.ExecutionPlan
: Input structure for this functioncreate_approval_type()
: Approval type generation used by this functionget_approval_resume_data()
: Function that processes the resume payloadclear_approval_state()
: State cleanup after approval processing
- framework.approval.approval_system.clear_approval_state()[source]#
Clear approval state to prevent contamination between operations.
Provides centralized cleanup of approval state fields to maintain clean state hygiene between operations. This prevents approval data from previous interrupts from interfering with subsequent operations, ensuring each approval request is handled independently.
This function is typically called after processing approval results or when initializing new operations that should not inherit approval state.
- Returns:
Dictionary containing approval state fields reset to None
- Return type:
Dict[str, Any]
Examples
Clean state after processing approval:
>>> # After handling approved operation >>> state_updates = clear_approval_state() >>> # Apply to current state >>> current_state.update(state_updates)
Initialize clean operation:
>>> # Before starting new capability that might need approval >>> clean_state = clear_approval_state() >>> new_state = {**current_state, **clean_state}
Note
This function only returns the state updates - callers must apply them to the actual state object.
- framework.approval.approval_system.create_approval_type(capability_name, operation_type=None)[source]#
Generate dynamic approval type identifier from capability and operation.
Creates unique approval type identifiers that replace the hard-coded ApprovalType enum with a flexible system. This enables any capability to request approval without requiring framework modifications, while maintaining clear identification and supporting operation-level granularity within capabilities.
The generated identifiers follow a consistent naming pattern that ensures uniqueness and readability for logging, debugging, and user interfaces.
- Parameters:
capability_name (str) – Name of the capability requesting approval
operation_type (str, optional) – Optional specific operation type for granular control
- Returns:
Unique string identifier for the approval type
- Return type:
str
Examples
Basic capability approval:
>>> approval_type = create_approval_type("python") >>> print(approval_type) "python"
Operation-specific approval:
>>> approval_type = create_approval_type("memory", "save") >>> print(approval_type) "memory_save"
Complex capability with operation:
>>> approval_type = create_approval_type("data_analysis", "execute_query") >>> print(approval_type) "data_analysis_execute_query"
See also
create_code_approval_interrupt()
: Uses this function for approval type creationcreate_memory_approval_interrupt()
: Uses this function for approval type creationcreate_plan_approval_interrupt()
: Uses this function for approval type creationget_approval_resume_data()
: Uses approval types for state management
Registration#
Automatically registered as:
NodeRegistration(
name="orchestrator",
module_path="framework.infrastructure.orchestration_node",
function_name="OrchestrationNode",
description="Execution planning and orchestration"
)
See also
- Prompt System
Prompt customization system
- Human Approval
Complete approval system architecture
- Orchestrator Planning
Implementation details and usage patterns