Python Execution#

What you’ll build: Python execution system with LangGraph workflows, human approval integration, and flexible container/local deployment

📚 What You’ll Learn

Key Concepts:

  • Using registry-based PythonExecutorService access for code execution

  • Creating PythonExecutionRequest with structured capability_prompts

  • Implementing approval handling with handle_service_with_interrupts()

  • Managing approval resume/clear state with get_approval_resume_data() and clear_approval_state()

  • Proper service configuration with thread_id and checkpoint_ns

  • Container vs local execution through config.yml settings

Prerequisites: Understanding of Human Approval and Message and Execution Flow

Overview#

The Python Execution Service provides a LangGraph-based system for Python code generation, static analysis, human approval, and secure execution. It supports both containerized and local execution environments with seamless switching through configuration.

Key Features:

  • Flexible Execution Environments: Switch between container and local execution with configuration

  • Jupyter Notebook Generation: Automatic creation of interactive notebooks for evaluation

  • Human-in-the-Loop Approval: LangGraph-native interrupts with rich context and safety assessments

  • Exception-Based Flow Control: Clean error handling with categorized errors for retry strategies

  • Multi-Stage Pipeline: Code generation → analysis → approval → execution → result processing

Execution Pipeline:

  1. Code Generation: LLM-based Python code generation with context awareness

  2. Static Analysis: Security and policy analysis with configurable rules

  3. Approval Workflows: Human oversight system with rich context and safety assessments

  4. Flexible Execution: Container or local execution with unified result collection

  5. Notebook Generation: Comprehensive Jupyter notebook creation for evaluation

  6. Result Processing: Structured result handling with artifact management

Configuration#

Configure your Python execution system with environment settings and approval policies:

# config.yml - Python Execution Configuration
framework:
  # Python Executor Service Configuration
  python_executor:
    max_generation_retries: 3      # Maximum retries for code generation failures
    max_execution_retries: 3       # Maximum retries for execution failures
    execution_timeout_seconds: 600 # Execution timeout in seconds (default: 10 minutes)

  execution:
    execution_method: "container"  # or "local"
    modes:
      read_only:
        kernel_name: "python3-epics-readonly"
        allows_writes: false
        requires_approval: false
      write_access:
        kernel_name: "python3-epics-write"
        allows_writes: true
        requires_approval: true

    # Container execution settings
    container:
      jupyter_host: "localhost"
      jupyter_port: 8888

    # Local execution settings
    local:
      python_env_path: "${LOCAL_PYTHON_VENV}"

# Approval configuration for Python execution
approval:
  global_mode: "selective"
  capabilities:
    python_execution:
      enabled: true
      mode: "epics_writes"  # disabled, all_code, epics_writes

Configuration Options:

  • python_executor: Service-level configuration for retry behavior and timeouts

    • max_generation_retries: Maximum attempts for code generation failures (default: 3)

    • max_execution_retries: Maximum attempts for code execution failures (default: 3)

    • execution_timeout_seconds: Maximum time allowed for code execution (default: 600 seconds)

  • execution_method: “container” for secure isolation, “local” for direct host execution

  • modes: Different execution environments with specific approval requirements

  • Container settings: Jupyter endpoint configuration for containerized execution

  • Local settings: Python environment path for direct execution

Integration Patterns#

Using Python Execution in Capabilities#

Use the Python execution service directly in your capabilities with proper approval handling:

from framework.base import BaseCapability, capability_node
from framework.state import AgentState, StateManager
from framework.registry import get_registry
from framework.services.python_executor import PythonExecutionRequest
from framework.approval import (
    create_approval_type,
    get_approval_resume_data,
    clear_approval_state,
    handle_service_with_interrupts
)
from configs.config import get_full_configuration
from langgraph.types import Command

@capability_node
class DataAnalysisCapability(BaseCapability):
    """Data analysis capability using Python execution service."""

    async def execute(state: AgentState, **kwargs) -> dict:
        # Get current step and registry
        step = StateManager.get_current_step(state)
        registry = get_registry()

        # Get Python executor service from registry
        python_service = registry.get_service("python_executor")
        if not python_service:
            raise RuntimeError("Python executor service not available")

        # Create service configuration
        main_configurable = get_full_configuration()
        service_config = {
            "configurable": {
                **main_configurable,
                "thread_id": f"data_analysis_{step.get('context_key', 'default')}",
                "checkpoint_ns": "python_executor"
            }
        }

        # Check for approval resume first
        has_approval_resume, approved_payload = get_approval_resume_data(
            state, create_approval_type("data_analysis")
        )

        if has_approval_resume:
            # Handle approval resume
            if approved_payload:
                resume_response = {"approved": True, **approved_payload}
            else:
                resume_response = {"approved": False}

            service_result = await python_service.ainvoke(
                Command(resume=resume_response), config=service_config
            )
            approval_cleanup = clear_approval_state()
        else:
            # Normal execution flow
            # Create structured prompts for Python generation
            capability_prompts = [
                "**ANALYSIS REQUIREMENTS:**",
                "- Generate statistical summary of the data",
                "- Create visualizations to identify trends",
                "- Identify patterns and anomalies",

                "**EXPECTED OUTPUT:**",
                "Create a results dictionary with:",
                "- statistics: Statistical summary metrics",
                "- trends: Identified trends and patterns",
                "- visualizations: List of generated plots"
            ]

            # Create execution request
            execution_request = PythonExecutionRequest(
                user_query=state.get("input_output", {}).get("user_query", ""),
                task_objective=step.get("task_objective", "Analyze data"),
                capability_prompts=capability_prompts,
                expected_results={
                    "statistics": "dict",
                    "trends": "list",
                    "visualizations": "list"
                },
                execution_folder_name="data_analysis",
                capability_context_data=state.get('capability_context_data', {}),
                retries=3
            )

            # Use centralized interrupt handler
            service_result = await handle_service_with_interrupts(
                service=python_service,
                request=execution_request,
                config=service_config,
                logger=logger,
                capability_name="DataAnalysis"
            )
            approval_cleanup = None

        # Process results (both paths converge here)
        execution_result = service_result.execution_result

        # Store context using StateManager
        context_updates = StateManager.store_context(
            state,
            registry.context_types.ANALYSIS_RESULTS,
            step.get("context_key"),
            analysis_context
        )

        # Return with optional approval cleanup
        if approval_cleanup:
            return {**context_updates, **approval_cleanup}
        else:
            return context_updates

Execution Environment Management#

Container vs Local Execution#

The Python execution service supports both container and local execution environments. The execution method is primarily configured through the config system, but you can also implement dynamic selection logic if needed.

Configuration-Based Execution Method:

The execution method is typically set in your config.yml:

framework:
  execution:
    execution_method: "container"  # or "local"
    container:
      jupyter_host: "localhost"
      jupyter_port: 8888
    local:
      python_env_path: "${LOCAL_PYTHON_VENV}"

Example: Dynamic Environment Selection (Advanced Use Case):

For advanced scenarios where you need to dynamically choose execution environments based on request characteristics, here’s an example pattern:

class FlexiblePythonExecution:
    """Example: Dynamic execution environment selection.

    Note: This is an advanced pattern. Most use cases should rely on
    the standard config.yml execution_method setting.
    """

    def _select_execution_environment(self, code_request: dict) -> str:
        """Example: Select execution environment based on request characteristics.

        This would be used to override the default config.yml setting
        for specific requests that have special requirements.
        """

        requires_isolation = code_request.get("requires_isolation", False)
        has_dependencies = code_request.get("has_special_dependencies", False)
        is_long_running = code_request.get("estimated_time", 0) > 300
        security_level = code_request.get("security_level", "medium")

        # Example decision logic for environment selection
        if security_level == "high" or requires_isolation:
            return "container"
        elif has_dependencies or is_long_running:
            return "container"
        else:
            return "local"  # Faster for simple operations

    async def execute_with_dynamic_environment(self, state, request_data):
        """Example: Override execution method in service config."""

        # Get the dynamic execution method
        execution_method = self._select_execution_environment(request_data)

        # Override the config setting for this specific request
        main_configurable = get_full_configuration()
        service_config = {
            "configurable": {
                **main_configurable,
                "execution_method": execution_method,  # Override config.yml setting
                "thread_id": f"dynamic_{execution_method}",
                "checkpoint_ns": "python_executor"
            }
        }

        # Use the service with the dynamic configuration
        # ... rest of service call ...

Advanced Patterns#

Multi-Stage Analysis Pipeline#

Chain multiple Python executions for complex analysis workflows with proper approval handling:

async def multi_stage_analysis(self, state: AgentState, data_context: dict) -> dict:
    """Execute multi-stage analysis pipeline with approval handling."""

    registry = get_registry()
    python_service = registry.get_service("python_executor")
    main_configurable = get_full_configuration()
    logger = logging.getLogger(__name__)

    # Stage 1: Data preprocessing
    stage1_config = {
        "configurable": {
            **main_configurable,
            "thread_id": "stage1_preprocessing",
            "checkpoint_ns": "python_executor"
        }
    }

    preprocessing_prompts = [
        "**PREPROCESSING STAGE:**",
        "- Clean and validate the input data",
        "- Handle missing values and outliers",
        "- Prepare data for statistical analysis"
    ]

    preprocessing_request = PythonExecutionRequest(
        user_query="Data preprocessing stage",
        task_objective="Clean and prepare data for analysis",
        capability_prompts=preprocessing_prompts,
        expected_results={"cleaned_data": "pandas.DataFrame", "summary": "dict"},
        execution_folder_name="stage1_preprocessing",
        capability_context_data=data_context
    )

    stage1_result = await handle_service_with_interrupts(
        service=python_service,
        request=preprocessing_request,
        config=stage1_config,
        logger=logger,
        capability_name="PreprocessingStage"
    )

    # Stage 2: Statistical analysis (using results from stage 1)
    stage2_config = {
        "configurable": {
            **main_configurable,
            "thread_id": "stage2_analysis",
            "checkpoint_ns": "python_executor"
        }
    }

    analysis_prompts = [
        "**STATISTICAL ANALYSIS STAGE:**",
        "- Use the cleaned data from preprocessing stage",
        "- Perform comprehensive statistical analysis",
        "- Generate summary statistics and insights"
    ]

    # Combine original context with preprocessing results
    stage2_context = {
        **data_context,
        "preprocessing_results": stage1_result.execution_result.results
    }

    analysis_request = PythonExecutionRequest(
        user_query="Statistical analysis stage",
        task_objective="Perform statistical analysis on preprocessed data",
        capability_prompts=analysis_prompts,
        expected_results={"statistics": "dict", "insights": "list"},
        execution_folder_name="stage2_analysis",
        capability_context_data=stage2_context
    )

    stage2_result = await handle_service_with_interrupts(
        service=python_service,
        request=analysis_request,
        config=stage2_config,
        logger=logger,
        capability_name="AnalysisStage"
    )

    return {
        "pipeline_completed": True,
        "stages": {
            "preprocessing": stage1_result,
            "analysis": stage2_result
        }
    }

Troubleshooting#

Common Issues:

Issue: Python execution service not available
  • Cause: Service not registered in framework registry

  • Solution: Verify PythonExecutorService is registered in registry configuration using registry.get_service("python_executor")

Issue: GraphInterrupt not being handled properly
  • Cause: Using direct service.ainvoke() instead of handle_service_with_interrupts()

  • Solution: Always use handle_service_with_interrupts() for service calls that may require approval

Issue: Approval resume not working
  • Cause: Missing approval resume check or incorrect Command usage

  • Solution: Check for approval resume with get_approval_resume_data() and use Command(resume=response) for resumption

Issue: Service configuration errors
  • Cause: Missing thread_id, checkpoint_ns, or incorrect configurable structure

  • Solution: Use get_full_configuration() and include proper thread_id and checkpoint_ns in service config

Issue: Container execution failing with connection errors
  • Cause: Jupyter container not accessible or misconfigured

  • Solution: Check container endpoints and ensure Jupyter is running

Issue: Generated notebooks not accessible
  • Cause: File path or URL generation issues

  • Solution: Check execution folder configuration and notebook link generation

See also

Memory Storage

Integrate memory storage with Python execution

Container Deployment

Advanced container orchestration

Human Approval

Understanding the approval system integration

Python Execution

Complete Python execution API