Orchestration#

Infrastructure node that creates execution plans from active capabilities and task requirements with native LangGraph interrupt support for human-in-the-loop approval.

OrchestrationNode#

class framework.infrastructure.orchestration_node.OrchestrationNode[source]#

Bases: BaseInfrastructureNode

Convention-based orchestration node with sophisticated execution planning logic.

Creates detailed execution plans from task requirements and available capabilities. Handles both initial planning and replanning scenarios with approval workflows.

Features: - Configuration-driven error classification and retry policies - LLM-based execution planning with fallback mechanisms - Approval workflow integration for execution plans - Context-aware planning with capability selection - Sophisticated error handling for LLM operations

name: str = 'orchestrator'#
description: str = 'Execution Planning and Orchestration'#
static classify_error(exc, context)[source]#

Built-in error classification for orchestration operations.

static get_retry_policy()[source]#

Custom retry policy for LLM-based orchestration operations.

Orchestration uses LLM calls heavily and can be flaky due to: - Network timeouts to LLM services - LLM provider rate limiting - Temporary LLM service unavailability

Use longer delays and more attempts than default infrastructure policy.

Return type:

Dict[str, Any]

async static execute(state, **kwargs)[source]#

Create execution plans with LangGraph native interrupt support.

This implementation creates execution plans from task requirements and handles planning mode with native LangGraph interrupts for approval workflows.

Parameters:
  • state (AgentState) – Current agent state

  • kwargs – Additional LangGraph parameters

Returns:

Dictionary of state updates for LangGraph

Return type:

Dict[str, Any]

__repr__()#

Return a string representation of the infrastructure node for debugging.

Provides a concise string representation that includes both the Python class name and the infrastructure node’s registered name. This is useful for debugging, logging, and development workflows where infrastructure nodes need to be identified clearly.

Returns:

String representation including class name and node name

Return type:

str

Example

>>> node = TaskExtractionNode()
>>> repr(node)
'<TaskExtractionNode: task_extraction>'

Note

The format follows the pattern ‘<ClassName: node_name>’ for consistency across all framework components.

async langgraph_node(**kwargs)#

LangGraph-native node function with manual error handling.

This function is called by LangGraph during execution. Infrastructure nodes now use get_stream_writer() and get_config() directly for pure LangGraph integration.

Parameters:
  • state (AgentState) – Current agent state

  • kwargs – Additional parameters from LangGraph

Returns:

State updates dictionary

Return type:

Dict[str, Any]

Core Models#

Orchestration uses models defined in the core framework:

See also

ExecutionPlan

Structured execution plan model

PlannedStep

Individual step within execution plans

BaseInfrastructureNode

Base class for infrastructure components

Approval System Integration#

framework.approval.approval_system.create_plan_approval_interrupt(execution_plan, plan_file_path=None, pending_plans_dir=None)[source]#

Create structured interrupt data for execution plan approval with file-based storage support.

Generates LangGraph-compatible interrupt data that presents execution plans to users for approval. The interrupt includes formatted step details, clear approval instructions, and structured payload data for seamless resume operations after user approval.

The function supports file-based execution plan storage for enhanced human-in-the-loop workflows, particularly for Open WebUI integration.

The generated user message provides a comprehensive view of planned operations with step-by-step breakdown, making it easy for users to understand and evaluate the proposed execution plan.

Parameters:
  • execution_plan (ExecutionPlan) – Execution plan object containing steps and configuration

  • plan_file_path (str, optional) – Optional file path where the execution plan was saved

  • pending_plans_dir (str, optional) – Optional directory path for pending plan files

Returns:

Dictionary containing user_message and resume_payload for LangGraph

Return type:

Dict[str, Any]

Examples

Basic plan approval:

>>> from framework.base.planning import ExecutionPlan
>>> plan = ExecutionPlan(steps=[
...     {'task_objective': 'Load data', 'capability': 'data_loader'},
...     {'task_objective': 'Analyze trends', 'capability': 'data_analysis'}
... ])
>>> interrupt_data = create_plan_approval_interrupt(plan)
>>> print(interrupt_data['user_message'])  # Contains formatted approval request

Note

The interrupt data follows LangGraph’s standard structure with user_message for display and resume_payload for execution continuation.

See also

framework.base.planning.ExecutionPlan : Input structure for this function create_approval_type() : Approval type generation used by this function get_approval_resume_data() : Function that processes the resume payload clear_approval_state() : State cleanup after approval processing

framework.approval.approval_system.clear_approval_state()[source]#

Clear approval state to prevent contamination between operations.

Provides centralized cleanup of approval state fields to maintain clean state hygiene between operations. This prevents approval data from previous interrupts from interfering with subsequent operations, ensuring each approval request is handled independently.

This function is typically called after processing approval results or when initializing new operations that should not inherit approval state.

Returns:

Dictionary containing approval state fields reset to None

Return type:

Dict[str, Any]

Examples

Clean state after processing approval:

>>> # After handling approved operation
>>> state_updates = clear_approval_state()
>>> # Apply to current state
>>> current_state.update(state_updates)

Initialize clean operation:

>>> # Before starting new capability that might need approval
>>> clean_state = clear_approval_state()
>>> new_state = {**current_state, **clean_state}

Note

This function only returns the state updates - callers must apply them to the actual state object.

framework.approval.approval_system.create_approval_type(capability_name, operation_type=None)[source]#

Generate dynamic approval type identifier from capability and operation.

Creates unique approval type identifiers that replace the hard-coded ApprovalType enum with a flexible system. This enables any capability to request approval without requiring framework modifications, while maintaining clear identification and supporting operation-level granularity within capabilities.

The generated identifiers follow a consistent naming pattern that ensures uniqueness and readability for logging, debugging, and user interfaces.

Parameters:
  • capability_name (str) – Name of the capability requesting approval

  • operation_type (str, optional) – Optional specific operation type for granular control

Returns:

Unique string identifier for the approval type

Return type:

str

Examples

Basic capability approval:

>>> approval_type = create_approval_type("python")
>>> print(approval_type)
"python"

Operation-specific approval:

>>> approval_type = create_approval_type("memory", "save")
>>> print(approval_type)
"memory_save"

Complex capability with operation:

>>> approval_type = create_approval_type("data_analysis", "execute_query")
>>> print(approval_type)
"data_analysis_execute_query"

See also

create_code_approval_interrupt() : Uses this function for approval type creation create_memory_approval_interrupt() : Uses this function for approval type creation create_plan_approval_interrupt() : Uses this function for approval type creation get_approval_resume_data() : Uses approval types for state management

Registration#

Automatically registered as:

NodeRegistration(
    name="orchestrator",
    module_path="framework.infrastructure.orchestration_node",
    function_name="OrchestrationNode",
    description="Execution planning and orchestration"
)

See also

Prompt System

Prompt customization system

Human Approval

Complete approval system architecture

Orchestrator Planning

Implementation details and usage patterns