"""File and Notebook Management Services for Python Executor.
This module provides the service layer for all file system operations and Jupyter
notebook management within the Python executor service. It implements clean
separation of concerns between domain logic and file I/O operations, providing
robust file management with proper error handling and permission management.
The module is organized around two primary service classes:
**FileManager**: Handles execution folder creation, file operations, and context
management. It provides the foundation for organizing execution artifacts and
ensuring proper file system permissions for container-based execution.
**NotebookManager**: Specializes in Jupyter notebook creation and management,
providing comprehensive notebook generation for different stages of the execution
workflow including debugging, audit trails, and result presentation.
Key Features:
- **Container-Aware Permissions**: Automatic permission management to ensure
container users can write to execution folders
- **Structured Organization**: Hierarchical folder organization with date-based
organization and unique execution identifiers
- **Jupyter Integration**: Seamless URL generation for accessing notebooks
and execution artifacts through Jupyter interfaces
- **Context Management**: Comprehensive context serialization and loading
for cross-process communication and debugging
- **Audit Trails**: Complete tracking of execution attempts and artifacts
The services integrate with the framework's configuration system to access
Jupyter container settings and provide appropriate URL generation for external
access to execution artifacts.
.. note::
These services handle the physical file system operations while the domain
models in the models module provide the logical structure and data management.
.. seealso::
:class:`framework.services.python_executor.models.PythonExecutionContext` : Execution context model
:class:`framework.services.python_executor.models.NotebookAttempt` : Notebook tracking model
:class:`framework.context.ContextManager` : Framework context management
Examples:
Creating and managing execution folders::
>>> file_manager = FileManager(configurable)
>>> context = file_manager.create_execution_folder("data_analysis")
>>> print(f"Execution folder: {context.folder_path}")
>>> print(f"Jupyter URL: {context.folder_url}")
Creating notebooks for execution tracking::
>>> notebook_manager = NotebookManager(configurable)
>>> notebook_path = notebook_manager.create_attempt_notebook(
... context=execution_context,
... code="import pandas as pd\ndf.describe()",
... stage="execution"
... )
>>> print(f"Notebook created: {notebook_path}")
"""
import json
import uuid
import nbformat
import os
from pathlib import Path
from datetime import datetime
import textwrap
from typing import Dict, Any, Optional, List
from configs.logger import get_logger
from framework.context import load_context
from .models import PythonExecutionContext, PythonExecutionState, NotebookAttempt, NotebookType
logger = get_logger("framework", "python_services")
# =============================================================================
# FILE MANAGEMENT
# =============================================================================
class FileManager:
"""Comprehensive file system management service for Python execution workflows.
This service provides robust file and folder operations specifically designed
for the Python executor service's needs. It handles execution folder creation,
permission management for container-based execution, context serialization,
and Jupyter URL generation for external access to execution artifacts.
The FileManager implements a structured approach to organizing execution
artifacts with date-based hierarchical organization and unique execution
identifiers. It automatically manages file system permissions to ensure
compatibility with container-based execution environments.
Key responsibilities include:
- Creating and organizing execution folder structures
- Managing file system permissions for container compatibility
- Serializing and loading execution context data
- Generating Jupyter-accessible URLs for execution artifacts
- Providing robust JSON serialization for complex Python objects
:param configurable: LangGraph configurable dictionary containing service configuration
:type configurable: Dict[str, Any]
.. note::
The FileManager automatically creates the base directory structure if it
doesn't exist and sets appropriate permissions for container access.
.. seealso::
:class:`PythonExecutionContext` : Execution context managed by this service
:class:`NotebookManager` : Notebook-specific file operations
:class:`framework.context.ContextManager` : Framework-level context management
Examples:
Basic file manager initialization and folder creation::
>>> configurable = {"agent_data_dir": "/path/to/data"}
>>> file_manager = FileManager(configurable)
>>> context = file_manager.create_execution_folder("analysis_task")
>>> print(f"Created folder: {context.folder_path}")
>>> print(f"Jupyter URL: {context.folder_url}")
Saving execution results::
>>> results = {"mean": 42.0, "std": 3.14, "count": 100}
>>> results_path = file_manager.save_results(results, context.folder_path)
>>> print(f"Results saved to: {results_path}")
"""
def __init__(self, configurable):
"""Initialize FileManager with configuration and set up base directory structure.
Sets up the base directory for execution artifacts and ensures proper
directory structure exists. The base directory is organized hierarchically
with date-based subdirectories for efficient organization of execution
artifacts over time.
:param configurable: LangGraph configurable dictionary containing service settings
:type configurable: Dict[str, Any]
.. note::
The base directory defaults to '_agent_data/executed_scripts' if not
specified in the configurable dictionary.
"""
self.configurable = configurable
# Get agent data directory from configurable, fallback to _agent_data
agent_data_dir = self.configurable.get('agent_data_dir', '_agent_data')
self.base_dir = Path(agent_data_dir) / 'executed_scripts'
self.base_dir = self.base_dir.resolve()
def create_execution_folder(self, name: str = "python_executor") -> PythonExecutionContext:
"""Create a structured execution folder with proper permissions and organization.
Creates a complete execution folder structure with date-based organization,
unique identifiers, and proper permissions for container-based execution.
The folder structure includes the main execution folder and an attempts
subfolder for organizing multiple execution attempts.
The method automatically handles permission management to ensure that
container users (such as the jovyan user in Jupyter containers) can
write to the created folders, solving common UID mapping issues between
host and container environments.
:param name: Descriptive name to include in the folder identifier
:type name: str
:return: Execution context with all folder paths and URLs configured
:rtype: PythonExecutionContext
:raises OSError: If folder creation fails due to permission or disk space issues
.. note::
The folder structure follows the pattern:
`base_dir/YYYY-MM/execution_YYYYMMDD_HHMMSS_{name}_{uuid}/`
with an `attempts/` subfolder for execution attempts.
.. warning::
This method sets very permissive permissions (0o777) to ensure container
compatibility. Ensure the base directory is properly secured.
Examples:
Creating execution folder for data analysis::
>>> file_manager = FileManager(configurable)
>>> context = file_manager.create_execution_folder("sensor_analysis")
>>> print(f"Main folder: {context.folder_path}")
>>> print(f"Attempts folder: {context.attempts_folder}")
>>> print(f"Jupyter URL: {context.folder_url}")
Using the context for file operations::
>>> if context.is_initialized:
... results_file = context.folder_path / "results.json"
... # File operations within the execution folder
"""
# Create year-month subdirectory
now = datetime.now()
month_dir = self.base_dir / now.strftime("%Y-%m")
month_dir.mkdir(parents=True, exist_ok=True)
# Generate unique folder name
timestamp = now.strftime("%Y%m%d_%H%M%S")
execution_id = uuid.uuid4().hex[:8]
folder_name = f"execution_{timestamp}_{name}_{execution_id}"
folder_path = month_dir / folder_name
folder_path.mkdir(parents=True, exist_ok=True)
# Create attempts subfolder
attempts_folder = folder_path / "attempts"
attempts_folder.mkdir(exist_ok=True)
# Fix permissions so container user (jovyan) can write to folders
# This solves the UID mapping issue between host and container
try:
# Set very permissive permissions to ensure container can write
os.chmod(folder_path, 0o777) # Full permissions for main folder
os.chmod(attempts_folder, 0o777) # Full permissions for attempts folder
# Also set group sticky bit to ensure new files inherit group permissions
current_mode = folder_path.stat().st_mode
os.chmod(folder_path, current_mode | 0o2000) # Add group sticky bit
except Exception as e:
logger.warning(f"Failed to fix folder permissions: {e}")
# Don't fail folder creation just for permission issues
# Create folder URL for Jupyter
folder_url = self._create_jupyter_url(folder_path)
context = PythonExecutionContext(
folder_path=folder_path,
folder_url=folder_url,
attempts_folder=attempts_folder
)
logger.info(f"Created execution folder: {folder_path}")
return context
def save_results(self, results: Dict[str, Any], folder_path: Path) -> Path:
"""
Save results dictionary to JSON file with service-level error handling.
This is the preferred method for internal service operations where you
want simple, clean error handling via exceptions.
Args:
results: Results dictionary to save
folder_path: Folder where to save the results
Returns:
Path to saved results file
Raises:
RuntimeError: If serialization or file writing fails
"""
results_file = folder_path / "results.json"
# Delegate to the robust utility function
metadata = serialize_results_to_file(results, str(results_file))
if not metadata["success"]:
error_msg = f"Failed to save results: {metadata['error']}"
logger.error(error_msg)
raise RuntimeError(error_msg)
logger.info(f"Saved results to: {results_file}")
return results_file
def _create_jupyter_url(self, folder_path: Path) -> str:
"""Create Jupyter Lab URL for the given folder path."""
try:
if folder_path.is_relative_to(self.base_dir):
relative_path = folder_path.relative_to(self.base_dir)
# Get Jupyter configuration from configurable
# TODO: cleanup
service_configs = self.configurable.get('service_configs', {})
framework_services = service_configs.get('framework', {})
jupyter_config = framework_services.get('jupyter', {})
containers = jupyter_config.get('containers', {})
read_container = containers.get('read', {})
port = read_container.get('port_host', 8088)
# Always use localhost for external access, not container hostname
hostname = 'localhost'
return f"http://{hostname}:{port}/lab/tree/executed_scripts/{relative_path.as_posix()}"
else:
return folder_path.as_uri()
except Exception as e:
logger.warning(f"Failed to create Jupyter URL: {e}")
return str(folder_path)
[docs]
def make_json_serializable(obj: Any) -> Any:
"""Convert complex objects to JSON-serializable format using modern Python patterns.
This is a standalone function that can be imported and used by execution wrappers
and other components that need robust JSON serialization.
Args:
obj: Any Python object to make JSON-serializable
Returns:
JSON-serializable representation of the object
Examples:
>>> import numpy as np
>>> import matplotlib.pyplot as plt
>>>
>>> # Handle numpy arrays
>>> arr = np.array([1, 2, 3])
>>> serializable = make_json_serializable(arr)
>>> print(serializable) # [1, 2, 3]
>>>
>>> # Handle matplotlib figures
>>> fig, ax = plt.subplots()
>>> ax.plot([1, 2, 3])
>>> serializable = make_json_serializable(fig)
>>> print(serializable['_type']) # 'matplotlib_figure'
"""
import json
from datetime import datetime
from pathlib import Path
def enhanced_json_serializer(obj):
"""Enhanced serializer supporting scientific computing objects."""
# Handle datetime objects
if isinstance(obj, datetime):
return obj.isoformat()
# Handle numpy arrays first (they also have 'item' method)
elif hasattr(obj, 'tolist'):
try:
return obj.tolist()
except (ValueError, AttributeError):
# Fallback for objects that have tolist but it fails
pass
# Handle numpy scalars (single-element arrays)
elif hasattr(obj, 'item'):
try:
return obj.item()
except ValueError:
# Multi-element arrays will fail here, should have been caught above
pass
# Handle matplotlib figures
elif _is_matplotlib_figure(obj):
return _serialize_matplotlib_figure(obj)
# Handle pandas DataFrames and Series
elif hasattr(obj, 'to_dict') and hasattr(obj, 'index'):
return obj.to_dict()
# Handle pathlib objects
elif isinstance(obj, Path):
return str(obj)
# Handle sets and frozensets
elif isinstance(obj, (set, frozenset)):
return {"items": list(obj), "_type": type(obj).__name__}
# Handle complex numbers
elif isinstance(obj, complex):
return {"real": obj.real, "imag": obj.imag, "_type": "complex"}
# Handle custom objects with to_dict method
elif hasattr(obj, 'to_dict') and callable(getattr(obj, 'to_dict')):
return obj.to_dict()
# Default to string representation with type info
else:
return {
"value": str(obj),
"type": type(obj).__name__,
"module": getattr(type(obj), '__module__', 'unknown'),
"_serialization_note": "converted_to_string"
}
# Use json.loads(json.dumps()) to leverage Python's built-in serialization
# This is more efficient and handles nested structures automatically
try:
return json.loads(json.dumps(obj, default=enhanced_json_serializer, ensure_ascii=False))
except (TypeError, ValueError) as e:
# If JSON serialization completely fails, return a structured fallback
return {
"_serialization_failed": True,
"_original_type": type(obj).__name__,
"_error": str(e),
"_string_representation": str(obj)
}
def _is_matplotlib_figure(obj) -> bool:
"""Check if object is a matplotlib figure."""
return (hasattr(obj, 'savefig') and
hasattr(obj, 'get_axes') and
type(obj).__name__ == 'Figure')
def _serialize_matplotlib_figure(fig) -> dict:
"""Serialize matplotlib figure to JSON-compatible dict."""
try:
# Extract basic figure information
axes_info = []
for ax in fig.get_axes():
ax_info = {
"title": ax.get_title(),
"xlabel": ax.get_xlabel(),
"ylabel": ax.get_ylabel(),
"xlim": ax.get_xlim(),
"ylim": ax.get_ylim(),
}
# Extract line data if present
lines_data = []
for line in ax.get_lines():
line_data = {
"xdata": line.get_xdata().tolist() if hasattr(line.get_xdata(), 'tolist') else list(line.get_xdata()),
"ydata": line.get_ydata().tolist() if hasattr(line.get_ydata(), 'tolist') else list(line.get_ydata()),
"label": line.get_label(),
}
lines_data.append(line_data)
ax_info["lines"] = lines_data
axes_info.append(ax_info)
return {
"_type": "matplotlib_figure",
"figure_size": fig.get_size_inches().tolist(),
"dpi": fig.get_dpi(),
"axes": axes_info,
"_note": "Matplotlib figure serialized with basic plot data"
}
except Exception as e:
return {
"_type": "matplotlib_figure",
"_error": f"Failed to serialize figure: {str(e)}",
"_fallback": str(fig)
}
[docs]
def serialize_results_to_file(results: Any, file_path: str) -> dict:
"""Serialize results and save to JSON file with comprehensive error handling.
This function is designed to be called from execution wrappers and provides
robust serialization with detailed error reporting.
Args:
results: The results object to serialize
file_path: Path where to save the JSON file
Returns:
dict: Metadata about the serialization operation
Examples:
>>> # Called from execution wrapper
>>> metadata = serialize_results_to_file(results, 'results.json')
>>> if metadata['success']:
>>> print(f"Results saved successfully to {metadata['file_path']}")
>>> else:
>>> print(f"Serialization failed: {metadata['error']}")
"""
import json
from pathlib import Path
metadata = {
"success": False,
"file_path": file_path,
"error": None,
"serialization_warnings": []
}
try:
# Convert to JSON-serializable format
serializable_results = make_json_serializable(results)
# Save to file
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(serializable_results, f, indent=2, ensure_ascii=False)
metadata["success"] = True
return metadata
except Exception as e:
metadata["error"] = str(e)
metadata["error_type"] = type(e).__name__
# Try to save a minimal fallback
try:
fallback_data = {
"_serialization_failed": True,
"_original_error": str(e),
"_fallback_representation": str(results)
}
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(fallback_data, f, indent=2, ensure_ascii=False)
metadata["fallback_saved"] = True
except Exception as fallback_error:
metadata["fallback_error"] = str(fallback_error)
return metadata
# =============================================================================
# NOTEBOOK MANAGEMENT
# =============================================================================
class NotebookManager:
"""Comprehensive Jupyter notebook management service for Python execution workflows.
This service specializes in creating, organizing, and managing Jupyter notebooks
throughout the Python execution lifecycle. It provides comprehensive notebook
generation capabilities for different stages including code generation attempts,
pre-execution analysis, execution tracking, and final result presentation.
The NotebookManager creates structured notebooks with rich metadata, execution
context, and proper formatting for both automated processing and human review.
It integrates with the FileManager to provide Jupyter-accessible URLs and
maintains comprehensive audit trails of all execution attempts.
Key features include:
- **Stage-Aware Notebooks**: Different notebook types for different execution stages
- **Rich Metadata**: Comprehensive context and execution information in each notebook
- **Error Context**: Detailed error information and debugging context for failed attempts
- **Approval Integration**: Notebooks formatted for human review and approval workflows
- **URL Generation**: Jupyter-accessible URLs for direct notebook access
:param configurable: LangGraph configurable dictionary containing service configuration
:type configurable: Dict[str, Any]
.. note::
The NotebookManager relies on FileManager for URL generation and file
system operations, ensuring consistency across the service layer.
.. seealso::
:class:`FileManager` : File system operations and URL generation
:class:`NotebookType` : Enumeration of supported notebook types
:class:`NotebookAttempt` : Notebook attempt tracking model
Examples:
Creating notebooks for different execution stages::
>>> notebook_manager = NotebookManager(configurable)
>>> # Create pre-execution notebook
>>> notebook_path = notebook_manager.create_attempt_notebook(
... context=execution_context,
... code="import pandas as pd\ndf = pd.read_csv('data.csv')",
... stage="pre_execution"
... )
Creating notebooks with error context for debugging::
>>> error_notebook = notebook_manager.create_attempt_notebook(
... context=execution_context,
... code=failed_code,
... stage="execution",
... error_context="NameError: name 'undefined_var' is not defined"
... )
"""
def __init__(self, configurable):
"""Initialize NotebookManager with configuration and file management integration.
Sets up the notebook manager with access to configuration settings and
creates a FileManager instance for URL generation and file operations.
This ensures consistent behavior across all file system operations.
:param configurable: LangGraph configurable dictionary containing service settings
:type configurable: Dict[str, Any]
.. note::
The NotebookManager automatically inherits FileManager capabilities
for URL generation and file system operations.
"""
self.configurable = configurable
# Create a FileManager instance to reuse URL generation logic
self._file_manager = FileManager(configurable)
def create_attempt_notebook(
self,
context: PythonExecutionContext,
code: str,
stage: str = "execution",
error_context: Optional[str] = None,
approval_context: Optional[str] = None,
silent: bool = False
) -> Path:
"""
Create a debugging notebook for an execution attempt.
Ensures context.json exists before creating the notebook.
Args:
context: Python execution context
code: The Python code for this attempt
stage: Current stage (e.g., "code_generation", "execution")
error_context: Optional error context for failed attempts
approval_context: Optional approval context for approval required attempts
silent: If True, don't log creation (for cleaner logs when parent node handles logging)
Returns:
Path to created notebook
"""
# Ensure context.json exists (should already be there, but verify)
if not context.context_file_path:
logger.warning("Context file not found when creating notebook - this should not happen")
# Generate filename
attempt_number = context.get_next_attempt_number()
timestamp = datetime.now().strftime("%H%M%S")
filename = f"{attempt_number:02d}_{stage}_{timestamp}.ipynb"
notebook_path = context.attempts_folder / filename
# Create notebook content
notebook = self._create_attempt_notebook_content(
attempt_number=attempt_number,
stage=stage,
code=code,
error_context=error_context,
approval_context=approval_context,
context_file_path=context.context_file_path
)
# Save notebook
with open(notebook_path, 'w') as f:
nbformat.write(notebook, f)
# Track the attempt - use FileManager's URL generation
attempt = NotebookAttempt(
notebook_type=NotebookType.EXECUTION_ATTEMPT,
attempt_number=attempt_number,
stage=stage,
notebook_path=notebook_path,
notebook_link=self._file_manager._create_jupyter_url(notebook_path),
error_context=error_context,
created_at=datetime.now().isoformat()
)
context.add_notebook_attempt(attempt)
if not silent:
logger.info(f"Created attempt notebook: {notebook_path}")
return notebook_path
def create_final_notebook(
self,
context: PythonExecutionContext,
code: str,
results: Optional[Dict[str, Any]] = None,
error_context: Optional[str] = None,
figure_paths: List[Path] = None
) -> Path:
"""
Create the final notebook in the execution folder.
Ensures context.json exists before creating the notebook.
Args:
context: Python execution context
code: The final Python code
results: Optional execution results
error_context: Optional error context for failures
figure_paths: List of figure paths to include
Returns:
Path to created notebook
"""
# Ensure context.json exists (should already be there, but verify)
if not context.context_file_path:
logger.warning("Context file not found when creating final notebook - this should not happen")
notebook_path = context.folder_path / "notebook.ipynb"
# Create notebook content
notebook = self._create_final_notebook_content(
code=code,
results=results,
error_context=error_context,
context_file_path=context.context_file_path,
figure_paths=figure_paths or [],
execution_folder=context.folder_path
)
# Save notebook
with open(notebook_path, 'w') as f:
nbformat.write(notebook, f)
logger.info(f"Created final notebook: {notebook_path}")
return notebook_path
def _create_attempt_notebook_content(
self,
attempt_number: int,
stage: str,
code: str,
error_context: Optional[str] = None,
approval_context: Optional[str] = None,
context_file_path: Optional[Path] = None
) -> nbformat.NotebookNode:
"""Create notebook content for attempt notebooks."""
cells = []
# Header
header = f"# Python Executor - Attempt #{attempt_number}\n\n"
header += f"**Stage:** {stage}\n"
header += f"**Created:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
if approval_context:
header += f"{approval_context}\n\n"
if error_context:
header += f"## Error Context\n{error_context}\n\n"
cells.append(nbformat.v4.new_markdown_cell(header))
# Context loading if available
if context_file_path:
context_code = textwrap.dedent("""
# Load execution context
from framework.context import load_context
context = load_context('../context.json')
""")
cells.append(nbformat.v4.new_code_cell(context_code))
# Main code
cells.append(nbformat.v4.new_code_cell(code))
notebook = nbformat.v4.new_notebook()
notebook.cells = cells
return notebook
def _create_final_notebook_content(
self,
code: str,
results: Optional[Dict[str, Any]] = None,
error_context: Optional[str] = None,
context_file_path: Optional[Path] = None,
figure_paths: List[Path] = None,
execution_folder: Optional[Path] = None
) -> nbformat.NotebookNode:
"""Create notebook content for final notebooks."""
cells = []
# Header with detailed error information
if error_context:
header = f"# Python Executor - Failed Execution\n\n## Error Context\n{error_context}\n\n"
else:
header = f"# Python Executor - Successful Execution\n\nExecution completed successfully.\n\n"
cells.append(nbformat.v4.new_markdown_cell(header))
# Context loading if available
if context_file_path:
context_code = textwrap.dedent("""
# Load execution context
from framework.context import load_context
context = load_context('context.json')
""")
cells.append(nbformat.v4.new_code_cell(context_code))
# Main code
cells.append(nbformat.v4.new_code_cell(code))
# Results section if available
if results:
# Add markdown header for results section
results_md = "## Execution Results\n\nResults saved to `results.json`"
cells.append(nbformat.v4.new_markdown_cell(results_md))
# Add executable Python code cell to load and display results
results_code = """import json
with open('results.json', 'r') as f:
results = json.load(f)
print(results)"""
cells.append(nbformat.v4.new_code_cell(results_code))
# Figures section if available
if figure_paths and execution_folder:
figures_md = "## Generated Figures\n\n"
for i, fig_path in enumerate(figure_paths, 1):
try:
# Convert to Path object if it's a string
if isinstance(fig_path, str):
fig_path = Path(fig_path)
elif not isinstance(fig_path, Path):
# Handle other types by converting to string first, then Path
fig_path = Path(str(fig_path))
# Ensure execution_folder is also a Path object
if isinstance(execution_folder, str):
execution_folder = Path(execution_folder)
relative_path = fig_path.relative_to(execution_folder)
figures_md += f"### Figure {i}\n})\n\n"
except (ValueError, AttributeError) as e:
# Fallback: use just the filename if relative_to fails
fig_name = fig_path.name if hasattr(fig_path, 'name') else str(fig_path).split('/')[-1]
figures_md += f"### Figure {i}\n\n\n"
cells.append(nbformat.v4.new_markdown_cell(figures_md))
notebook = nbformat.v4.new_notebook()
notebook.cells = cells
return notebook