Source code for framework.services.python_executor.service

"""
Python Executor Service - LangGraph Architecture

Main service class that orchestrates Python code generation, analysis, and execution
using LangGraph with human approval integration through interrupts.
"""

from typing import Dict, Any, Optional
from langgraph.graph import StateGraph
from langgraph.types import Command

from .models import PythonExecutionState, PythonExecutionRequest, PythonServiceResult
from .generator_node import create_generator_node
from .analyzer_node import create_analyzer_node
from .executor_node import create_executor_node
from .approval_node import create_approval_node
from .exceptions import CodeRuntimeError
from framework.graph.graph_builder import create_memory_checkpointer, create_async_postgres_checkpointer
from configs.config import get_full_configuration
from configs.logger import get_logger

logger = get_logger("framework", "python")

[docs] class PythonExecutorService: """Advanced Python execution service with flexible deployment and human oversight capabilities. This service provides a production-ready, LangGraph-based workflow for Python code generation, static analysis, human approval, and secure execution. It implements three key innovations that make it particularly suitable for high-stakes scientific and industrial environments: ## 🎯 **Unique Capabilities** ### **1. Flexible Execution Environments** Switch between container and local execution with a single configuration change: - **Container Mode**: Secure, isolated Jupyter environments with full dependency management - **Local Mode**: Direct host execution with automatic Python environment detection - **Seamless Switching**: Same interface, same results, different isolation levels ### **2. Comprehensive Jupyter Notebook Generation** Automatic creation of rich, interactive notebooks for human evaluation: - **Multi-Stage Notebooks**: Generated at code creation, analysis, and execution phases - **Rich Context**: Complete execution metadata, analysis results, and error diagnostics - **Direct Access**: Click-to-open URLs for immediate notebook review in Jupyter - **Audit Trails**: Complete history of execution attempts with detailed context ### **3. Production-Ready Human-in-the-Loop Approval** Sophisticated approval workflows designed for high-stakes environments: - **LangGraph-Native Interrupts**: Seamless workflow suspension for human oversight - **Rich Approval Context**: Detailed safety assessments, code analysis, and execution plans - **Security Integration**: Automatic detection of potentially dangerous operations - **Resumable Workflows**: Checkpoint-based execution resumption after approval - **Configurable Policies**: Domain-specific approval rules for different operation types ## Execution Pipeline The service orchestrates a sophisticated multi-stage workflow: 1. **Code Generation**: LLM-based Python code generation with context awareness and iterative improvement 2. **Static Analysis**: Security and policy analysis with configurable domain-specific rules 3. **Approval Workflows**: Human oversight system with rich context and safety assessments 4. **Flexible Execution**: Container or local execution with unified result collection 5. **Notebook Generation**: Comprehensive Jupyter notebook creation for human evaluation 6. **Result Processing**: Structured result handling with artifact management and audit trails The service maintains complete compatibility with the existing capability interface while providing enhanced functionality through its internal LangGraph-based architecture. It supports both fresh execution requests and resumption of interrupted workflows (such as approval processes). Key architectural features: - **Exception-Based Flow Control**: Clean exception handling with categorized errors that determine appropriate retry strategies - **Checkpoint Support**: Full LangGraph checkpoint integration for workflow resumption and debugging - **Type-Safe Interfaces**: Pydantic models for request/response with comprehensive validation - **Service Isolation**: Self-contained service graph separate from the main agent workflow - **Comprehensive Logging**: Detailed execution tracking and debugging support The service integrates with the framework's configuration system, approval management, and context handling to provide seamless operation within the broader agent framework. .. note:: This service is designed to be invoked through the PythonCapability class rather than directly. Direct invocation is supported for advanced use cases. .. warning:: The service can execute arbitrary Python code within configured security constraints. Ensure proper approval policies are configured for production use. .. seealso:: :class:`framework.capabilities.python.PythonCapability` : Main capability interface :class:`PythonExecutionRequest` : Request model for service invocation :class:`PythonServiceResult` : Structured response from successful execution :class:`PythonExecutionState` : Internal LangGraph state management Examples: **Execution with automatic notebook generation**:: >>> service = PythonExecutorService() >>> request = PythonExecutionRequest( ... user_query="Analyze EPICS PV data and create trend plots", ... task_objective="Generate comprehensive data analysis report", ... execution_folder_name="epics_analysis" ... ) >>> result = await service.ainvoke(request, config=service_config) >>> >>> # Rich results with notebook access >>> print(f"Generated code: {result.generated_code}") >>> print(f"Execution time: {result.execution_result.execution_time}s") >>> print(f"Review notebook: {result.execution_result.notebook_link}") >>> print(f"Generated figures: {len(result.execution_result.figure_paths)}") **Container vs Local execution** (same interface, different isolation):: >>> # Container execution (config: execution_method: "container") >>> result_container = await service.ainvoke(request, config=container_config) >>> # Executes in secure Jupyter container >>> >>> # Local execution (config: execution_method: "local") >>> result_local = await service.ainvoke(request, config=local_config) >>> # Executes on host Python - same results, faster execution **Human-in-the-loop approval workflow**:: >>> # Request requiring approval automatically triggers interrupt >>> request = PythonExecutionRequest( ... user_query="Adjust beam current setpoints", ... task_objective="Optimize accelerator performance", ... execution_folder_name="beam_optimization" ... ) >>> # Service pauses execution, user receives rich approval context: >>> # - Generated code in reviewable notebook >>> # - Safety analysis and concerns >>> # - Execution environment details >>> # - Clear approve/reject options >>> >>> # After user approval, execution resumes automatically >>> resume_command = Command(resume={"approved": True}) >>> result = await service.ainvoke(resume_command, config=service_config) >>> print(f"Approved operation completed: {result.execution_result.results}") """
[docs] def __init__(self): self.config = self._load_config() self._compiled_graph = None
[docs] def get_compiled_graph(self): """Get the compiled LangGraph for this service.""" if self._compiled_graph is None: self._compiled_graph = self._build_and_compile_graph() return self._compiled_graph
[docs] async def ainvoke(self, input_data, config): """Main service entry point handling execution requests and workflow resumption. This method serves as the primary interface for the Python executor service, accepting both fresh execution requests and workflow resumption commands. It implements comprehensive input validation, workflow orchestration, and structured result processing. The method handles two primary input types: 1. **PythonExecutionRequest**: Fresh execution requests containing user queries, task objectives, and execution parameters. These trigger the complete code generation, analysis, and execution workflow. 2. **Command**: Workflow resumption commands, typically containing approval responses from interrupted workflows. These resume execution from the appropriate checkpoint. The service automatically determines the appropriate workflow path based on the input type and manages the complete execution lifecycle including error handling, result processing, and exception propagation. :param input_data: Execution request or resumption command :type input_data: Union[PythonExecutionRequest, Command] :param config: LangGraph configuration including thread_id and service settings :type config: Dict[str, Any] :return: Structured execution results for successful completion :rtype: PythonServiceResult :raises CodeRuntimeError: If Python code execution fails :raises TypeError: If input_data is not a supported type :raises ValueError: If Command contains invalid resume data .. note:: The service automatically raises appropriate exceptions for execution failures rather than returning error states, enabling clean error handling in calling code. .. warning:: This method can execute arbitrary Python code. Ensure proper approval policies are configured and input validation is performed. Examples: Processing a fresh execution request:: >>> service = PythonExecutorService() >>> request = PythonExecutionRequest( ... user_query="Calculate data statistics", ... task_objective="Generate summary statistics", ... execution_folder_name="stats_analysis" ... ) >>> config = {"thread_id": "session_123"} >>> result = await service.ainvoke(request, config) >>> print(f"Success: {result.execution_result.results}") Resuming after approval:: >>> resume_cmd = Command(resume={"approved": True}) >>> result = await service.ainvoke(resume_cmd, config) """ if isinstance(input_data, Command): logger.debug(f"Service ainvoke received input_data type: {type(input_data)}") logger.debug(f"Service ainvoke input_data isinstance PythonExecutionRequest: {isinstance(input_data, PythonExecutionRequest)}") # This is a resume command (approval response) if hasattr(input_data, 'resume') and input_data.resume: logger.info("Resuming Python service execution after approval") approval_result = input_data.resume.get('approved', False) logger.info(f"Approval result: {approval_result}") logger.info(f"Full resume payload keys: {list(input_data.resume.keys())}") # Pass Command directly to let LangGraph handle checkpoint resume # This preserves the entire approval payload and resumes from the correct checkpoint compiled_graph = self.get_compiled_graph() result = await compiled_graph.ainvoke(input_data, config) # Check for execution failure and raise exception if not result.get("is_successful", False): failure_reason = result.get("failure_reason") or result.get("execution_error", "Code execution failed") logger.error(f"Python execution failed: {failure_reason}") # Raise appropriate exception based on failure type raise CodeRuntimeError( message=f"Python code execution failed: {failure_reason}", traceback_info=result.get("execution_error", ""), execution_attempt=result.get("generation_attempt", 1) ) return result else: raise ValueError("Invalid Command received by service - missing or invalid resume data") elif isinstance(input_data, PythonExecutionRequest): logger.debug(f"Service ainvoke received input_data type: {type(input_data)}") logger.debug(f"Service ainvoke input_data isinstance PythonExecutionRequest: {isinstance(input_data, PythonExecutionRequest)}") logger.debug(f"Converting PythonExecutionRequest to internal state") internal_state = self._create_internal_state(input_data) logger.debug(f"Created internal_state type: {type(internal_state)}") logger.debug(f"Internal state keys: {list(internal_state.keys())}") logger.debug(f"Internal state has request: {internal_state.get('request') is not None}") compiled_graph = self.get_compiled_graph() result = await compiled_graph.ainvoke(internal_state, config) # **CRITICAL FIX**: Check for execution failure and raise exception if not result.get("is_successful", False): failure_reason = result.get("failure_reason") or result.get("execution_error", "Code execution failed") logger.error(f"Python execution failed: {failure_reason}") # Raise appropriate exception based on failure type raise CodeRuntimeError( message=f"Python code execution failed: {failure_reason}", traceback_info=result.get("execution_error", ""), execution_attempt=result.get("generation_attempt", 1) ) # Transform to structured result - no more dict validation needed by capabilities! return PythonServiceResult( execution_result=result["execution_result"], generated_code=result.get("generated_code", ""), generation_attempt=result.get("generation_attempt", 1), analysis_warnings=result.get("analysis_warnings", []) ) else: # Clean API: Only accept defined input types supported_types = [PythonExecutionRequest.__name__, "Command"] raise TypeError( f"Python executor service received unsupported input type: {type(input_data).__name__}. " f"Supported types: {', '.join(supported_types)}" )
def _build_and_compile_graph(self): """Build and compile the Python executor LangGraph.""" # Create state graph with PythonExecutionState workflow = StateGraph(PythonExecutionState) # Add service nodes (no @capability_node decorator!) workflow.add_node("python_code_generator", create_generator_node()) workflow.add_node("python_code_analyzer", create_analyzer_node()) workflow.add_node("python_approval_node", create_approval_node()) workflow.add_node("python_code_executor", create_executor_node()) # Set up internal flow workflow.set_entry_point("python_code_generator") workflow.add_edge("python_code_generator", "python_code_analyzer") workflow.add_conditional_edges( "python_code_analyzer", self._analyzer_conditional_edge, { "approve": "python_approval_node", "retry": "python_code_generator", "execute": "python_code_executor", "__end__": "__end__" # Handle permanent failures } ) workflow.add_conditional_edges( "python_approval_node", self._approval_conditional_edge, { "approved": "python_code_executor", "rejected": "__end__", "retry": "python_code_generator" } ) workflow.add_edge("python_code_executor", "__end__") # Compile with checkpointer for interrupt support - use same pattern as main graph checkpointer = self._create_checkpointer() compiled = workflow.compile(checkpointer=checkpointer) logger.info("Python executor service graph compiled successfully") return compiled def _create_internal_state(self, request: PythonExecutionRequest) -> PythonExecutionState: """Convert PythonExecutionRequest to internal service state. This preserves the existing request interface while enabling internal LangGraph state management for the service nodes. """ return PythonExecutionState( request=request, # Store serializable request data # Extract capability context data to top level for ContextManager compatibility capability_context_data=request.capability_context_data, # Initialize execution state generation_attempt=0, error_chain=[], current_stage="generation", # Approval state requires_approval=None, approval_interrupt_data=None, approval_result=None, approved=None, # Runtime state generated_code=None, analysis_result=None, analysis_failed=None, execution_result=None, execution_folder=None, # Control flags is_successful=False, is_failed=False, failure_reason=None, ) def _analyzer_conditional_edge(self, state: PythonExecutionState) -> str: """Route after static analysis.""" if state.get("is_failed", False): return "__end__" # Permanently failed - don't retry elif state.get("analysis_failed", False): # Check retry limit to prevent infinite loops generation_attempt = state.get("generation_attempt", 0) max_retries = 3 # Configurable limit if generation_attempt >= max_retries: logger.error(f"Max retries ({max_retries}) exceeded for code generation") # Force permanent failure instead of infinite retries state["is_failed"] = True state["failure_reason"] = f"Code generation failed after {max_retries} attempts" return "__end__" else: logger.warning(f"⚠️ Retrying code generation (attempt {generation_attempt + 1}/{max_retries})") return "retry" elif state.get("requires_approval", False): return "approve" else: return "execute" def _approval_conditional_edge(self, state: PythonExecutionState) -> str: """Route after approval process.""" if state.get("approved", False): return "approved" else: return "rejected" def _create_checkpointer(self): """Create checkpointer using same logic as main graph.""" # Check if we should use PostgreSQL (production mode) use_postgres = self.config.get("langgraph", {}).get("use_postgres", False) if use_postgres: try: # Try PostgreSQL when explicitly requested checkpointer = create_async_postgres_checkpointer() logger.info("Python executor service using async PostgreSQL checkpointer") return checkpointer except Exception as e: # Fall back to memory saver if PostgreSQL fails logger.warning(f"PostgreSQL checkpointer failed for Python executor service: {e}") logger.info("Python executor service falling back to in-memory checkpointer") return create_memory_checkpointer() else: # Default to memory saver for R&D mode logger.info("Python executor service using in-memory checkpointer") return create_memory_checkpointer() def _load_config(self) -> Dict[str, Any]: """Load service configuration.""" return get_full_configuration()