Source code for framework.base.decorators

"""LangGraph Integration Decorators - Infrastructure Injection System

This module provides the core decorator system that enables seamless LangGraph
integration for capabilities and infrastructure nodes. The decorators implement
reflection-based validation, automatic infrastructure injection, and standardized
execution patterns that eliminate boilerplate while ensuring consistent behavior
across all framework components.

The decorator system serves as the bridge between the framework's convention-based
architecture and LangGraph's execution model. It provides comprehensive error
handling, state management, execution tracking, and streaming support while
maintaining type safety and development-time validation.

Key Features:
    - **Reflection-based validation**: Ensures required components are implemented
    - **Automatic infrastructure injection**: Provides timing, logging, and error handling
    - **LangGraph-native integration**: Full streaming, configuration, and checkpoint support
    - **Manual retry coordination**: Consistent error classification and retry policies
    - **Development mode support**: Raw error re-raising for debugging
    - **Execution tracking**: Comprehensive performance and state monitoring

Decorator Architecture:
    1. **@capability_node**: Business logic components with comprehensive execution tracking
    2. **@infrastructure_node**: System components with fast failure detection
    3. **Validation patterns**: Reflection-based requirement checking at decoration time
    4. **Error coordination**: Manual retry system via router for consistent behavior
    5. **State management**: Pure dictionary operations for LangGraph compatibility

.. note::
   These decorators create LangGraph-compatible node functions while preserving
   the original classes for introspection and testing. The framework uses manual
   retry handling rather than LangGraph's native retry policies for consistency.

.. warning::
   Decorated classes must implement required components (name, description, execute)
   or decoration will fail with clear error messages during development.

.. seealso::
   :class:`BaseCapability` : Capability base class with decorator integration
   :class:`BaseInfrastructureNode` : Infrastructure node base class
   :mod:`framework.state` : State management and execution tracking
"""

import time
import asyncio
from functools import wraps
from typing import Optional, Dict, Any, TYPE_CHECKING
from datetime import datetime

from configs.logger import get_logger
from framework.base.errors import ErrorSeverity


try:
    from langgraph.config import get_stream_writer, get_config
except ImportError:
    get_stream_writer = None
    get_config = None

# Import types for type hints
if TYPE_CHECKING:
    from framework.state import AgentState
    from framework.base.planning import PlannedStep
    from framework.base.errors import ErrorClassification

# Lazy imports to avoid circular dependencies
def _import_error_classification():
    try:
        from framework.base.errors import ErrorClassification, ErrorSeverity
        return ErrorClassification, ErrorSeverity
    except ImportError:
        return None, None

def _is_graph_interrupt(exc: Exception) -> bool:
    """Check if exception is a LangGraph GraphInterrupt."""
    # Check class name to avoid import issues
    return exc.__class__.__name__ == 'GraphInterrupt'


# =============================================================================
# LANGGRAPH INTEGRATION DECORATORS
# =============================================================================




[docs] def capability_node(cls): """Decorator that validates capability conventions and injects comprehensive LangGraph infrastructure. This decorator serves as the primary integration point between capability classes and LangGraph's execution model. It performs reflection-based validation to ensure capability classes implement required components, then creates a LangGraph-compatible node function with complete infrastructure including error handling, retry coordination, execution tracking, and state management. The decorator implements the framework's convention-based architecture by: 1. **Validation**: Ensures all required components are properly implemented 2. **Infrastructure Injection**: Provides timing, logging, streaming, and error handling 3. **LangGraph Integration**: Creates compatible node functions with state management 4. **Error Coordination**: Routes all errors through manual retry system for consistency 5. **Execution Tracking**: Comprehensive performance monitoring and state updates Required Components (validated through reflection): - name: Unique capability identifier for registry and routing - description: Human-readable description for documentation and logging - execute(): Async static method containing the main business logic - classify_error(): Error classification method (inherited from BaseCapability or custom) - get_retry_policy(): Retry configuration method (inherited from BaseCapability or custom) Infrastructure Features: - **Error Classification**: Domain-specific error analysis with recovery strategies - **Manual Retry System**: Consistent retry handling via router (no LangGraph retries) - **State Management**: Automatic state updates and step progression tracking - **Streaming Support**: Real-time status updates through LangGraph's streaming - **Development Mode**: Raw error re-raising for debugging when configured - **Execution Tracking**: Comprehensive timing and performance monitoring :param cls: The capability class to decorate with LangGraph infrastructure :type cls: type :return: Original class enhanced with langgraph_node attribute containing the LangGraph function :rtype: type :raises ValueError: If required class attributes (name, description) are missing :raises ValueError: If required methods (execute, classify_error, get_retry_policy) are missing .. note:: The decorator creates a `langgraph_node` attribute on the class containing the LangGraph-compatible function. The original class remains unchanged for introspection and testing purposes. .. warning:: All capability errors are routed through the manual retry system rather than using LangGraph's native retry policies to ensure consistent behavior across all framework components. Examples: Basic capability decoration:: @capability_node class WeatherCapability(BaseCapability): name = "weather_data" description = "Retrieve current weather conditions" @staticmethod async def execute(state: AgentState, **kwargs) -> Dict[str, Any]: # Business logic implementation return {"weather_data": weather_info} Capability with custom error handling:: @capability_node class DatabaseCapability(BaseCapability): name = "database_query" description = "Execute database queries" @staticmethod def classify_error(exc: Exception, context: dict) -> ErrorClassification: if isinstance(exc, ConnectionError): return ErrorClassification(severity=ErrorSeverity.RETRIABLE, ...) return ErrorClassification(severity=ErrorSeverity.CRITICAL, ...) @staticmethod async def execute(state: AgentState, **kwargs) -> Dict[str, Any]: # Database operation implementation return {"query_results": results} .. seealso:: :class:`BaseCapability` : Base class with required method implementations :func:`infrastructure_node` : Decorator for infrastructure components :class:`ErrorClassification` : Error classification system """ # Extract required components using reflection capability_name = getattr(cls, 'name', None) description = getattr(cls, 'description', None) execute_func = getattr(cls, 'execute', None) error_classifier = getattr(cls, 'classify_error', None) retry_policy_func = getattr(cls, 'get_retry_policy', None) logger = get_logger("framework", capability_name) # Validate required components if not capability_name: raise ValueError(f"Capability {cls.__name__} must define 'name' class attribute") if not description: raise ValueError(f"Capability class {cls.__name__} must define 'description' attribute") if not execute_func: raise ValueError(f"Capability class {cls.__name__} must define 'execute' static method") if not error_classifier: raise ValueError(f"Capability class {cls.__name__} must have 'classify_error' method (inherit from BaseCapability or define manually)") if not retry_policy_func: raise ValueError(f"Capability class {cls.__name__} must have 'get_retry_policy' method (inherit from BaseCapability or define manually)") # Create LangGraph-compatible node function async def langgraph_node( state: 'AgentState', **kwargs ) -> Dict[str, Any]: """LangGraph-native node function with manual retry handling via router.""" # Get streaming capability for status updates streaming = get_stream_writer() if get_stream_writer else None # Extract current step information using StateManager (lazy import to avoid circular imports) from framework.state import StateManager step = StateManager.get_current_step(state) start_time = time.time() try: if streaming: streaming({"event_type": "status", "message": f"Executing {capability_name}...", "progress": 0.1}) logger.info(f"Executing capability: {capability_name}") # Execute capability with state result = await execute_func(state) execution_time = time.time() - start_time # Handle state updates for step progression state_updates = _handle_capability_state_updates( state, result, step, capability_name, start_time, execution_time, logger ) return state_updates except Exception as exc: execution_time = time.time() - start_time # Check for development mode - re-raise original exception for debugging try: if get_config: config = get_config() configurable = config.get("configurable", {}) if configurable.get('development', {}).get('raise_raw_errors', False): logger.error(f"Development mode: Re-raising original exception directly for debugging") raise exc except (RuntimeError, ImportError, AttributeError, KeyError): # If config access fails (outside runnable context), continue with normal error handling pass # Re-raise GraphInterrupt immediately - it's not an error! if _is_graph_interrupt(exc): logger.info(f"GraphInterrupt detected in {capability_name} - re-raising for LangGraph to handle") raise exc # Classify the error using domain-specific or default logic error_context = { "capability": capability_name, "current_step_index": StateManager.get_current_step_index(state), "execution_time": execution_time, "current_state": state } error_classification = error_classifier(exc, error_context) # Get retry policy for this capability retry_policy = retry_policy_func() logger.error(f"Error in {capability_name} after {execution_time:.2f}s: {str(exc)}") logger.error(f"Classification: {error_classification.user_message or str(exc)}") # Always use manual retry system via router - NO LangGraph retries # Track failure in execution_step_results for consistent tracking step = StateManager.get_current_step(state) step_results = state.get("execution_step_results", {}).copy() current_step_index = StateManager.get_current_step_index(state) step_key = step.get("context_key", f"{current_step_index}_{capability_name}") step_results[step_key] = { "step_index": current_step_index, # For explicit ordering "capability": capability_name, "task_objective": step.get("task_objective", f"Execute {capability_name}"), # Add step objective "success": False, "execution_time": execution_time, "timestamp": datetime.now().isoformat() } return { "control_has_error": True, "control_error_info": { "capability_name": capability_name, "classification": error_classification, "retry_policy": retry_policy, "original_error": str(exc), "user_message": error_classification.user_message or str(exc), "execution_time": execution_time, "timestamp": datetime.now().isoformat() }, "control_retry_count": state.get("control_retry_count", 0) + 1, "execution_step_results": step_results, "execution_last_result": { "capability": capability_name, "success": False, "error": str(exc), "classification": error_classification.severity.value if hasattr(error_classification.severity, 'value') else str(error_classification.severity), "timestamp": datetime.now().isoformat() } } # Attach metadata to the function for LangGraph integration langgraph_node.name = capability_name langgraph_node.capability_name = capability_name langgraph_node.description = description langgraph_node.error_classifier = error_classifier langgraph_node.original_class = cls # Set the LangGraph-native function on the class for registry discovery cls.langgraph_node = langgraph_node return cls
def _handle_capability_state_updates( state: Dict[str, Any], result: Dict[str, Any], step: Dict[str, Any], capability_name: str, start_time: float, execution_time: float, logger ) -> Dict[str, Any]: """Handle comprehensive state updates for capability execution.""" # Lazy import to avoid circular imports from framework.state import StateManager # Start with the capability's result (now includes capability_context_data instead of execution_context) state_updates = result.copy() if isinstance(result, dict) else {} # Step progression - advance to next step after successful execution current_step_index = StateManager.get_current_step_index(state) state_updates["planning_current_step_index"] = current_step_index + 1 # Control flow updates state_updates["control_current_step_retry_count"] = 0 # Reset retry count # Clear retry state when capability succeeds state_updates["control_has_error"] = False state_updates["control_retry_count"] = 0 state_updates["control_error_info"] = None # Store step results with step information step_results = state.get("execution_step_results", {}).copy() step_key = step.get("context_key",f"{current_step_index}_{capability_name}") step_results[step_key] = { "step_index": current_step_index, # For explicit ordering "capability": capability_name, "task_objective": step.get("task_objective", f"Execute {capability_name}"), # Add step objective "success": True, "execution_time": execution_time, "timestamp": datetime.now().isoformat() } state_updates["execution_step_results"] = step_results # Update last result for router decision-making state_updates["execution_last_result"] = { "capability": capability_name, "success": True, "execution_time": execution_time, "timestamp": datetime.now().isoformat() } logger.info(f"State updates: step {current_step_index + 1}") return state_updates
[docs] def infrastructure_node(cls=None, *, quiet=False): """Decorator that validates infrastructure node conventions and injects comprehensive LangGraph infrastructure. This decorator serves as the primary integration point between infrastructure node classes and LangGraph's execution model. It performs reflection-based validation to ensure infrastructure classes implement required components, then creates a LangGraph-compatible node function with complete system infrastructure including error handling, performance monitoring, and state coordination. Infrastructure nodes handle system-critical operations like orchestration, routing, classification, and monitoring. The decorator emphasizes fast failure detection and conservative error handling since infrastructure failures typically indicate system-level issues requiring immediate attention. The decorator implements comprehensive system coordination by: 1. **Validation**: Ensures all required components are properly implemented 2. **Infrastructure Injection**: Provides timing, logging, streaming, and error handling 3. **LangGraph Integration**: Creates compatible node functions with native features 4. **Error Coordination**: Manual retry system with conservative failure policies 5. **System Monitoring**: Performance tracking and infrastructure health monitoring Required Components (validated through reflection): - name: Infrastructure node identifier for routing and logging - description: Human-readable description for documentation and monitoring - execute(): Async static method containing orchestration/routing logic - classify_error(): Error classification method (inherited or custom) - get_retry_policy(): Retry configuration method (inherited or custom) Infrastructure Features: - **Conservative Error Handling**: Fast failure detection with minimal retry attempts - **System Monitoring**: Comprehensive timing and performance tracking - **LangGraph Native Integration**: Full streaming, configuration, and checkpoint support - **Development Mode Support**: Raw error re-raising for debugging when configured - **Optional Quiet Mode**: Suppressed logging for high-frequency routing operations - **Fatal Error Handling**: System-level failure detection with immediate termination :param cls: The infrastructure node class to decorate (None for parameterized usage) :type cls: Optional[type] :param quiet: If True, suppress start/completion logging (useful for routing nodes) :type quiet: bool :return: Enhanced infrastructure class with langgraph_node attribute :rtype: type :raises ValueError: If required class attributes (name, description) are missing :raises ValueError: If required methods (execute, classify_error, get_retry_policy) are missing .. note:: The decorator supports both @infrastructure_node and @infrastructure_node(quiet=True) syntax. The quiet parameter is useful for high-frequency routing operations that would otherwise generate excessive logging. .. warning:: Infrastructure nodes use conservative retry policies with fast failure detection. FATAL errors immediately terminate execution to prevent system-level issues. Examples: Basic infrastructure node:: @infrastructure_node class TaskExtractionNode(BaseInfrastructureNode): name = "task_extraction" description = "Extract and structure user tasks" @staticmethod async def execute(state: AgentState, **kwargs) -> Dict[str, Any]: # Task extraction logic return {"task_current_task": extracted_task} Quiet routing node:: @infrastructure_node(quiet=True) class RouterNode(BaseInfrastructureNode): name = "router" description = "Dynamic routing based on agent state" @staticmethod async def execute(state: AgentState, **kwargs) -> Dict[str, Any]: # Routing logic without verbose logging return {"control_next_node": next_node} Infrastructure node with custom error handling:: @infrastructure_node class OrchestratorNode(BaseInfrastructureNode): name = "orchestrator" description = "Create execution plans" @staticmethod def classify_error(exc: Exception, context: dict) -> ErrorClassification: # Retry LLM timeouts for planning operations if isinstance(exc, TimeoutError): return ErrorClassification(severity=ErrorSeverity.RETRIABLE, ...) return ErrorClassification(severity=ErrorSeverity.CRITICAL, ...) .. seealso:: :class:`BaseInfrastructureNode` : Base class with required method implementations :func:`capability_node` : Decorator for business logic components :class:`ErrorSeverity` : Error severity levels and recovery strategies Example:: @infrastructure_node # Validates requirements and injects infrastructure! class TaskExtractionNode(BaseInfrastructureNode): name = "task_extraction" description = "Task Extraction and Processing" @staticmethod async def execute(state: AgentState, **kwargs): # Explicit logger retrieval - professional practice from configs.logger import get_logger logger = get_logger("framework", "task_extraction") # Define streaming helper here for step awareness from configs.streaming import get_streamer streamer = get_streamer("framework", "task_extraction", state) streamer.status("Processing...") logger.info("Starting task extraction") # Main infrastructure logic result = await extract_task_from_conversation(state) return {"task_current_task": result.task} @staticmethod def classify_error(exc: Exception, context: dict) -> ErrorClassification: # Infrastructure-specific error classification return ErrorClassification(severity=ErrorSeverity.RETRIABLE, ...) :param cls: Infrastructure node class to enhance with LangGraph-native execution :type cls: Type[BaseInfrastructureNode] :param quiet: If True, suppress start/completion logging (useful for routing nodes) :type quiet: bool :return: Enhanced infrastructure class with LangGraph-native execution :rtype: Type[BaseInfrastructureNode] :raises ValueError: If required components are missing .. note:: The decorator creates a `langgraph_node` attribute containing the LangGraph-compatible function. This is what the registry uses for actual execution. Infrastructure nodes use manual error handling for consistency with capability nodes. No automatic retry policies are created. """ def decorator(cls): return _create_infrastructure_node(cls, quiet=quiet) # Handle both @infrastructure_node and @infrastructure_node(quiet=True) syntax if cls is None: # Called with parameters: @infrastructure_node(quiet=True) return decorator else: # Called without parameters: @infrastructure_node return decorator(cls)
def _create_infrastructure_node(cls, quiet=False): # Extract required components using reflection node_name = getattr(cls, 'name', None) description = getattr(cls, 'description', None) execute_func = getattr(cls, 'execute', None) error_classifier = getattr(cls, 'classify_error', None) retry_policy_func = getattr(cls, 'get_retry_policy', None) logger = get_logger("framework", node_name) # Validate required components if not node_name: raise ValueError(f"Infrastructure node {cls.__name__} must define 'name' class attribute") if not description: raise ValueError(f"Infrastructure node {cls.__name__} must define 'description' class attribute") if not execute_func: raise ValueError(f"Infrastructure node {cls.__name__} must implement 'execute' static method") if not error_classifier: raise ValueError(f"Infrastructure node {cls.__name__} must have 'classify_error' method (inherit from BaseInfrastructureNode or define manually)") if not retry_policy_func: raise ValueError(f"Infrastructure node {cls.__name__} must have 'get_retry_policy' method (inherit from BaseInfrastructureNode or define manually)") # Create LangGraph-compatible node function async def langgraph_node( state: 'AgentState', **kwargs ) -> Dict[str, Any]: """LangGraph-native node function with manual error handling. This function is called by LangGraph during execution. Infrastructure nodes now use get_stream_writer() and get_config() directly for pure LangGraph integration. :param state: Current agent state :type state: AgentState :param kwargs: Additional parameters from LangGraph :return: State updates dictionary :rtype: Dict[str, Any] """ # Lazy import to avoid circular imports from framework.state import StateManager # Execution timing start_time = time.time() try: # Only log start message if not quiet if not quiet: logger.info(f"Starting {description}") # Execute main infrastructure logic - nodes use get_config() internally result = await execute_func( state, logger=logger, **kwargs ) execution_time = time.time() - start_time # Only log completion message if not quiet if not quiet: logger.success(f"Completed {description} in {execution_time:.2f}s") # Add execution tracking to result if isinstance(result, dict): if "control_flow" not in result: result["control_flow"] = {} result["control_flow"]["last_execution_time"] = execution_time result["control_flow"]["last_infrastructure_node"] = node_name return result except Exception as exc: execution_time = time.time() - start_time # Check for development mode - re-raise original exception for debugging try: if get_config: config = get_config() configurable = config.get("configurable", {}) if configurable.get('development', {}).get('raise_raw_errors', False): logger.error(f"Development mode: Re-raising original exception for debugging") raise exc except (RuntimeError, ImportError, AttributeError, KeyError): # If config access fails (outside runnable context), continue with normal error handling pass # Re-raise GraphInterrupt immediately - it's not an error! if _is_graph_interrupt(exc): logger.info(f"GraphInterrupt detected in {node_name} - re-raising for LangGraph to handle") raise exc # Handle actual errors context = { "infrastructure_node": node_name, "execution_time": execution_time, "current_state": state } classification = error_classifier(exc, context) # Check for FATAL severity - raise exception to stop execution entirely if classification.severity == ErrorSeverity.FATAL: logger.error(f"FATAL error in {node_name} - Terminating execution to prevent system issues") technical_details = "" if classification.metadata and 'technical_details' in classification.metadata: technical_details = f"Technical details: {classification.metadata['technical_details']}. " raise RuntimeError( f"Fatal error in {node_name}: {classification.user_message or str(exc)}. " f"{technical_details}" f"Execution terminated due to system-level failure." ) # Get retry policy for this infrastructure node retry_policy = retry_policy_func() logger.error(f"Error in {node_name} after {execution_time:.2f}s: {str(exc)}") logger.error(f"Classification: {classification.user_message or str(exc)}") # Use manual error handling for infrastructure nodes too # Track infrastructure failures in execution_step_results for consistent tracking step_results = state.get("execution_step_results", {}).copy() current_step_index = StateManager.get_current_step_index(state) step_key = f"infra_{current_step_index}_{node_name}" step_results[step_key] = { "step_index": current_step_index, # For explicit ordering "capability": node_name, "task_objective": f"Infrastructure: {node_name}", # Infrastructure nodes don't have step objectives "success": False, "execution_time": execution_time, "timestamp": datetime.now().isoformat() } return { "control_has_error": True, "control_error_info": { "node_name": node_name, "classification": classification, "retry_policy": retry_policy, "original_error": str(exc), "user_message": classification.user_message or str(exc), "execution_time": execution_time, "timestamp": datetime.now().isoformat() }, "execution_step_results": step_results, "execution_last_result": { "infrastructure_node": node_name, "success": False, "error": str(exc), "classification": classification.severity.value if hasattr(classification.severity, 'value') else str(classification.severity), "timestamp": datetime.now().isoformat() } } # Attach metadata to the function for LangGraph integration langgraph_node.name = node_name langgraph_node.node_name = node_name langgraph_node.description = description langgraph_node.error_classifier = error_classifier langgraph_node.original_class = cls # Set the LangGraph-native function on the class for registry discovery cls.langgraph_node = langgraph_node return cls