Python Execution#
What you’ll build: Python execution system with LangGraph workflows, human approval integration, and flexible container/local deployment
📚 What You’ll Learn
Key Concepts:
Using registry-based
PythonExecutorService
access for code executionCreating
PythonExecutionRequest
with structuredcapability_prompts
Implementing approval handling with
handle_service_with_interrupts()
Managing approval resume/clear state with
get_approval_resume_data()
andclear_approval_state()
Proper service configuration with
thread_id
andcheckpoint_ns
Container vs local execution through
config.yml
settings
Prerequisites: Understanding of Human Approval and Message and Execution Flow
Overview#
The Python Execution Service provides a LangGraph-based system for Python code generation, static analysis, human approval, and secure execution. It supports both containerized and local execution environments with seamless switching through configuration.
Key Features:
Flexible Execution Environments: Switch between container and local execution with configuration
Jupyter Notebook Generation: Automatic creation of interactive notebooks for evaluation
Human-in-the-Loop Approval: LangGraph-native interrupts with rich context and safety assessments
Exception-Based Flow Control: Clean error handling with categorized errors for retry strategies
Multi-Stage Pipeline: Code generation → analysis → approval → execution → result processing
Execution Pipeline:
Code Generation: LLM-based Python code generation with context awareness
Static Analysis: Security and policy analysis with configurable rules
Approval Workflows: Human oversight system with rich context and safety assessments
Flexible Execution: Container or local execution with unified result collection
Notebook Generation: Comprehensive Jupyter notebook creation for evaluation
Result Processing: Structured result handling with artifact management
Configuration#
Configure your Python execution system with environment settings and approval policies:
# config.yml - Python Execution Configuration
framework:
# Python Executor Service Configuration
python_executor:
max_generation_retries: 3 # Maximum retries for code generation failures
max_execution_retries: 3 # Maximum retries for execution failures
execution_timeout_seconds: 600 # Execution timeout in seconds (default: 10 minutes)
execution:
execution_method: "container" # or "local"
modes:
read_only:
kernel_name: "python3-epics-readonly"
allows_writes: false
requires_approval: false
write_access:
kernel_name: "python3-epics-write"
allows_writes: true
requires_approval: true
# Container execution settings
container:
jupyter_host: "localhost"
jupyter_port: 8888
# Local execution settings
local:
python_env_path: "${LOCAL_PYTHON_VENV}"
# Approval configuration for Python execution
approval:
global_mode: "selective"
capabilities:
python_execution:
enabled: true
mode: "epics_writes" # disabled, all_code, epics_writes
Configuration Options:
python_executor: Service-level configuration for retry behavior and timeouts
max_generation_retries: Maximum attempts for code generation failures (default: 3)
max_execution_retries: Maximum attempts for code execution failures (default: 3)
execution_timeout_seconds: Maximum time allowed for code execution (default: 600 seconds)
execution_method: “container” for secure isolation, “local” for direct host execution
modes: Different execution environments with specific approval requirements
Container settings: Jupyter endpoint configuration for containerized execution
Local settings: Python environment path for direct execution
Integration Patterns#
Using Python Execution in Capabilities#
Use the Python execution service directly in your capabilities with proper approval handling:
from framework.base import BaseCapability, capability_node
from framework.state import AgentState, StateManager
from framework.registry import get_registry
from framework.services.python_executor import PythonExecutionRequest
from framework.approval import (
create_approval_type,
get_approval_resume_data,
clear_approval_state,
handle_service_with_interrupts
)
from configs.config import get_full_configuration
from langgraph.types import Command
@capability_node
class DataAnalysisCapability(BaseCapability):
"""Data analysis capability using Python execution service."""
async def execute(state: AgentState, **kwargs) -> dict:
# Get current step and registry
step = StateManager.get_current_step(state)
registry = get_registry()
# Get Python executor service from registry
python_service = registry.get_service("python_executor")
if not python_service:
raise RuntimeError("Python executor service not available")
# Create service configuration
main_configurable = get_full_configuration()
service_config = {
"configurable": {
**main_configurable,
"thread_id": f"data_analysis_{step.get('context_key', 'default')}",
"checkpoint_ns": "python_executor"
}
}
# Check for approval resume first
has_approval_resume, approved_payload = get_approval_resume_data(
state, create_approval_type("data_analysis")
)
if has_approval_resume:
# Handle approval resume
if approved_payload:
resume_response = {"approved": True, **approved_payload}
else:
resume_response = {"approved": False}
service_result = await python_service.ainvoke(
Command(resume=resume_response), config=service_config
)
approval_cleanup = clear_approval_state()
else:
# Normal execution flow
# Create structured prompts for Python generation
capability_prompts = [
"**ANALYSIS REQUIREMENTS:**",
"- Generate statistical summary of the data",
"- Create visualizations to identify trends",
"- Identify patterns and anomalies",
"**EXPECTED OUTPUT:**",
"Create a results dictionary with:",
"- statistics: Statistical summary metrics",
"- trends: Identified trends and patterns",
"- visualizations: List of generated plots"
]
# Create execution request
execution_request = PythonExecutionRequest(
user_query=state.get("input_output", {}).get("user_query", ""),
task_objective=step.get("task_objective", "Analyze data"),
capability_prompts=capability_prompts,
expected_results={
"statistics": "dict",
"trends": "list",
"visualizations": "list"
},
execution_folder_name="data_analysis",
capability_context_data=state.get('capability_context_data', {}),
retries=3
)
# Use centralized interrupt handler
service_result = await handle_service_with_interrupts(
service=python_service,
request=execution_request,
config=service_config,
logger=logger,
capability_name="DataAnalysis"
)
approval_cleanup = None
# Process results (both paths converge here)
execution_result = service_result.execution_result
# Store context using StateManager
context_updates = StateManager.store_context(
state,
registry.context_types.ANALYSIS_RESULTS,
step.get("context_key"),
analysis_context
)
# Return with optional approval cleanup
if approval_cleanup:
return {**context_updates, **approval_cleanup}
else:
return context_updates
Execution Environment Management#
Container vs Local Execution#
The Python execution service supports both container and local execution environments. The execution method is primarily configured through the config system, but you can also implement dynamic selection logic if needed.
Configuration-Based Execution Method:
The execution method is typically set in your config.yml
:
framework:
execution:
execution_method: "container" # or "local"
container:
jupyter_host: "localhost"
jupyter_port: 8888
local:
python_env_path: "${LOCAL_PYTHON_VENV}"
Example: Dynamic Environment Selection (Advanced Use Case):
For advanced scenarios where you need to dynamically choose execution environments based on request characteristics, here’s an example pattern:
class FlexiblePythonExecution:
"""Example: Dynamic execution environment selection.
Note: This is an advanced pattern. Most use cases should rely on
the standard config.yml execution_method setting.
"""
def _select_execution_environment(self, code_request: dict) -> str:
"""Example: Select execution environment based on request characteristics.
This would be used to override the default config.yml setting
for specific requests that have special requirements.
"""
requires_isolation = code_request.get("requires_isolation", False)
has_dependencies = code_request.get("has_special_dependencies", False)
is_long_running = code_request.get("estimated_time", 0) > 300
security_level = code_request.get("security_level", "medium")
# Example decision logic for environment selection
if security_level == "high" or requires_isolation:
return "container"
elif has_dependencies or is_long_running:
return "container"
else:
return "local" # Faster for simple operations
async def execute_with_dynamic_environment(self, state, request_data):
"""Example: Override execution method in service config."""
# Get the dynamic execution method
execution_method = self._select_execution_environment(request_data)
# Override the config setting for this specific request
main_configurable = get_full_configuration()
service_config = {
"configurable": {
**main_configurable,
"execution_method": execution_method, # Override config.yml setting
"thread_id": f"dynamic_{execution_method}",
"checkpoint_ns": "python_executor"
}
}
# Use the service with the dynamic configuration
# ... rest of service call ...
Advanced Patterns#
Multi-Stage Analysis Pipeline#
Chain multiple Python executions for complex analysis workflows with proper approval handling:
async def multi_stage_analysis(self, state: AgentState, data_context: dict) -> dict:
"""Execute multi-stage analysis pipeline with approval handling."""
registry = get_registry()
python_service = registry.get_service("python_executor")
main_configurable = get_full_configuration()
logger = logging.getLogger(__name__)
# Stage 1: Data preprocessing
stage1_config = {
"configurable": {
**main_configurable,
"thread_id": "stage1_preprocessing",
"checkpoint_ns": "python_executor"
}
}
preprocessing_prompts = [
"**PREPROCESSING STAGE:**",
"- Clean and validate the input data",
"- Handle missing values and outliers",
"- Prepare data for statistical analysis"
]
preprocessing_request = PythonExecutionRequest(
user_query="Data preprocessing stage",
task_objective="Clean and prepare data for analysis",
capability_prompts=preprocessing_prompts,
expected_results={"cleaned_data": "pandas.DataFrame", "summary": "dict"},
execution_folder_name="stage1_preprocessing",
capability_context_data=data_context
)
stage1_result = await handle_service_with_interrupts(
service=python_service,
request=preprocessing_request,
config=stage1_config,
logger=logger,
capability_name="PreprocessingStage"
)
# Stage 2: Statistical analysis (using results from stage 1)
stage2_config = {
"configurable": {
**main_configurable,
"thread_id": "stage2_analysis",
"checkpoint_ns": "python_executor"
}
}
analysis_prompts = [
"**STATISTICAL ANALYSIS STAGE:**",
"- Use the cleaned data from preprocessing stage",
"- Perform comprehensive statistical analysis",
"- Generate summary statistics and insights"
]
# Combine original context with preprocessing results
stage2_context = {
**data_context,
"preprocessing_results": stage1_result.execution_result.results
}
analysis_request = PythonExecutionRequest(
user_query="Statistical analysis stage",
task_objective="Perform statistical analysis on preprocessed data",
capability_prompts=analysis_prompts,
expected_results={"statistics": "dict", "insights": "list"},
execution_folder_name="stage2_analysis",
capability_context_data=stage2_context
)
stage2_result = await handle_service_with_interrupts(
service=python_service,
request=analysis_request,
config=stage2_config,
logger=logger,
capability_name="AnalysisStage"
)
return {
"pipeline_completed": True,
"stages": {
"preprocessing": stage1_result,
"analysis": stage2_result
}
}
Troubleshooting#
Common Issues:
- Issue: Python execution service not available
Cause: Service not registered in framework registry
Solution: Verify PythonExecutorService is registered in registry configuration using
registry.get_service("python_executor")
- Issue: GraphInterrupt not being handled properly
Cause: Using direct service.ainvoke() instead of handle_service_with_interrupts()
Solution: Always use handle_service_with_interrupts() for service calls that may require approval
- Issue: Approval resume not working
Cause: Missing approval resume check or incorrect Command usage
Solution: Check for approval resume with get_approval_resume_data() and use Command(resume=response) for resumption
- Issue: Service configuration errors
Cause: Missing thread_id, checkpoint_ns, or incorrect configurable structure
Solution: Use get_full_configuration() and include proper thread_id and checkpoint_ns in service config
- Issue: Container execution failing with connection errors
Cause: Jupyter container not accessible or misconfigured
Solution: Check container endpoints and ensure Jupyter is running
- Issue: Generated notebooks not accessible
Cause: File path or URL generation issues
Solution: Check execution folder configuration and notebook link generation
See also
- Memory Storage
Integrate memory storage with Python execution
- Container Deployment
Advanced container orchestration
- Human Approval
Understanding the approval system integration
- Python Execution
Complete Python execution API