Source code for framework.base.capability

"""
Base Capability Class - LangGraph Migration

Convention-based base class for all capabilities in the Alpha Berkeley Agent framework.
Implements the LangGraph-native architecture with configuration-driven patterns.
"""

import logging
from abc import ABC, abstractmethod
from typing import List, Optional, Dict, Any, TYPE_CHECKING

from framework.base.errors import ErrorClassification, ErrorSeverity

# Import types for type hints
if TYPE_CHECKING:
    from framework.state import AgentState
    from framework.base.planning import PlannedStep

# Direct imports - no circular dependencies in practice
from .examples import OrchestratorGuide
from .examples import TaskClassifierGuide


[docs] class BaseCapability(ABC): """Base class for framework capabilities using convention-based configuration. This class provides the foundation for all capabilities in the ALS Expert framework. Capabilities are self-contained business logic components that perform specific tasks and integrate seamlessly with the LangGraph execution model through convention-based patterns and automatic discovery. The BaseCapability class enforces a strict contract through reflection-based validation: capabilities must define required components and can optionally implement guidance systems for orchestration and classification. The @capability_node decorator provides complete LangGraph integration including error handling, retry policies, and execution tracking. Required Components (enforced at initialization): - name: Unique capability identifier used for registration and routing - description: Human-readable description for documentation and logging - execute(): Async static method containing the main business logic Optional Components (with defaults provided): - provides: List of data types this capability generates (default: []) - requires: List of data types this capability depends on (default: []) - classify_error(): Domain-specific error classification (default: all CRITICAL) - get_retry_policy(): Retry configuration for failure recovery (default: 3 attempts) - _create_orchestrator_guide(): Integration guidance for execution planning - _create_classifier_guide(): Task classification guidance for capability selection Architecture Integration: The capability integrates with multiple framework systems: 1. **Execution System**: Via @capability_node decorator for LangGraph nodes 2. **Planning System**: Via orchestrator guides for step planning 3. **Classification System**: Via classifier guides for capability selection 4. **Error Handling**: Via error classification for recovery strategies 5. **Registry System**: Via convention-based configuration :param name: Unique capability identifier for registry and routing :type name: str :param description: Human-readable description for documentation :type description: str :param provides: Data types generated by this capability :type provides: List[str] :param requires: Data types required by this capability :type requires: List[str] :raises NotImplementedError: If required class attributes or methods are missing .. note:: Use the @capability_node decorator to enable LangGraph integration with automatic error handling, retry policies, and execution tracking. .. warning:: The execute method must be implemented as a static method and should return a dictionary of state updates for LangGraph to merge. Examples: Basic capability implementation:: @capability_node class WeatherCapability(BaseCapability): name = "weather_data" description = "Retrieve current weather conditions" provides = ["WEATHER_DATA"] requires = ["LOCATION"] @staticmethod async def execute(state: AgentState, **kwargs) -> Dict[str, Any]: location = state.get("location") weather_data = await fetch_weather(location) return { "weather_current_conditions": weather_data, "weather_last_updated": datetime.now().isoformat() } Capability with custom error handling:: @capability_node class DatabaseCapability(BaseCapability): name = "database_query" description = "Execute database queries with connection handling" @staticmethod def classify_error(exc: Exception, context: dict) -> ErrorClassification: if isinstance(exc, ConnectionError): return ErrorClassification( severity=ErrorSeverity.RETRIABLE, user_message="Database connection lost, retrying...", metadata={"technical_details": str(exc)} ) return ErrorClassification( severity=ErrorSeverity.CRITICAL, user_message=f"Database error: {exc}", metadata={"technical_details": str(exc)} ) @staticmethod async def execute(state: AgentState, **kwargs) -> Dict[str, Any]: # Implementation with database operations pass .. seealso:: :func:`capability_node` : Decorator for LangGraph integration :class:`BaseInfrastructureNode` : Infrastructure components base class :class:`ErrorClassification` : Error classification system """ # Required class attributes - must be overridden in subclasses name: str = None description: str = None # Optional class attributes - defaults provided provides: List[str] = [] requires: List[str] = []
[docs] def __init__(self): """Initialize the capability and validate required components. Performs comprehensive validation of the capability class to ensure all required components are properly defined. This validation happens at initialization time to provide immediate feedback during development rather than waiting for runtime execution failures. The validation process checks: 1. Required class attributes (name, description) are defined and non-None 2. The execute method is implemented as a static method 3. Optional attributes are properly initialized with defaults if missing :raises NotImplementedError: If name or description class attributes are missing :raises NotImplementedError: If execute static method is not implemented .. note:: This initialization performs validation only. The actual LangGraph integration happens through the @capability_node decorator. .. warning:: Subclasses should not override this method unless they need additional validation. Override _create_orchestrator_guide() or _create_classifier_guide() for customization instead. """ # Validate that subclass has defined required attributes if self.name is None: raise NotImplementedError(f"{self.__class__.__name__} must define 'name' class attribute") if self.description is None: raise NotImplementedError(f"{self.__class__.__name__} must define 'description' class attribute") # Validate that execute method is implemented if not hasattr(self.__class__, 'execute'): raise NotImplementedError(f"{self.__class__.__name__} must implement 'execute' static method") # Set defaults for optional attributes if not defined if not hasattr(self.__class__, 'provides') or self.provides is None: self.__class__.provides = [] if not hasattr(self.__class__, 'requires') or self.requires is None: self.__class__.requires = []
[docs] @staticmethod @abstractmethod async def execute( state: 'AgentState', **kwargs ) -> Dict[str, Any]: """Execute the main capability logic with comprehensive state management. This is the core method that all capabilities must implement. It contains the primary business logic and integrates with the framework's state management system. The method should be implemented as a static method to support LangGraph's execution model and enable proper serialization. The execute method receives the complete agent state and should return state updates that LangGraph will automatically merge. The framework handles timing, error classification, retry logic, and execution tracking through the @capability_node decorator. State Management Patterns: 1. **Read from state**: Access required data using state.get() with defaults 2. **Process data**: Perform the capability's core business logic 3. **Return updates**: Return dictionary with state updates for merging 4. **Use structured keys**: Follow naming conventions for state organization :param state: Current agent state containing all execution context and data :type state: AgentState :param kwargs: Additional parameters passed from the execution system :type kwargs: dict :return: Dictionary of state updates for LangGraph to merge into agent state :rtype: Dict[str, Any] :raises NotImplementedError: This is an abstract method that must be implemented :raises ValidationError: If required state data is missing or invalid :raises CapabilityError: For capability-specific execution failures .. note:: The @capability_node decorator provides automatic error handling, retry policies, timing, and execution tracking. Focus on the core business logic in this method. .. warning:: This method must be implemented as a static method. Instance methods will not work with the LangGraph execution model. Examples: Simple data retrieval capability:: @staticmethod async def execute(state: AgentState, **kwargs) -> Dict[str, Any]: query = state.get("user_query", "") if not query: raise ValidationError("No query provided") results = await search_database(query) return { "search_results": results, "search_timestamp": datetime.now().isoformat(), "search_result_count": len(results) } Capability with data transformation:: @staticmethod async def execute(state: AgentState, **kwargs) -> Dict[str, Any]: raw_data = state.get("raw_sensor_data", []) processing_config = state.get("processing_config", {}) processed_data = await transform_sensor_data( raw_data, config=processing_config ) return { "processed_sensor_data": processed_data, "processing_metadata": { "processed_at": datetime.now().isoformat(), "record_count": len(processed_data), "config_used": processing_config } } .. seealso:: :func:`capability_node` : Decorator that provides execution infrastructure :class:`AgentState` : State management and structure documentation """ logger = logging.getLogger(__name__) logger.warning("⚠️ Capability is using the empty base execute() - consider implementing execute() for proper functionality.") pass
# Optional methods for registry configuration - implement as needed
[docs] @staticmethod def classify_error(exc: Exception, context: dict) -> Optional['ErrorClassification']: """Classify errors for capability-specific error handling and recovery. This method provides domain-specific error classification to determine appropriate recovery strategies. The default implementation treats all errors as critical, but capabilities should override this method to provide sophisticated error handling based on their specific failure modes. The error classification determines how the framework responds to failures: - CRITICAL: End execution immediately - RETRIABLE: Retry with same parameters - REPLANNING: Create new execution plan - RECLASSIFICATION: Reclassify task capabilities - FATAL: System-level failure, terminate execution :param exc: The exception that occurred during capability execution :type exc: Exception :param context: Error context including capability info and execution state :type context: dict :return: Error classification with recovery strategy, or None to use default :rtype: Optional[ErrorClassification] .. note:: The context dictionary contains useful information including: - 'capability': capability name - 'current_step_index': step being executed - 'execution_time': time spent before failure - 'current_state': agent state at time of error Examples: Network-aware error classification:: @staticmethod def classify_error(exc: Exception, context: dict) -> ErrorClassification: # Retry network timeouts and connection errors if isinstance(exc, (ConnectionError, TimeoutError)): return ErrorClassification( severity=ErrorSeverity.RETRIABLE, user_message="Network issue detected, retrying...", metadata={"technical_details": str(exc)} ) # Default to critical for unexpected errors return ErrorClassification( severity=ErrorSeverity.CRITICAL, user_message=f"Unexpected error: {exc}", metadata={"technical_details": str(exc)} ) Missing input data requiring replanning:: @staticmethod def classify_error(exc: Exception, context: dict) -> ErrorClassification: if isinstance(exc, KeyError) and "context" in str(exc): return ErrorClassification( severity=ErrorSeverity.REPLANNING, user_message="Required data not available, trying different approach", metadata={"technical_details": f"Missing context data: {str(exc)}"} ) return BaseCapability.classify_error(exc, context) .. seealso:: :class:`ErrorClassification` : Error classification result structure :class:`ErrorSeverity` : Available severity levels and their meanings """ capability_name = context.get('capability', 'unknown_capability') return ErrorClassification( severity=ErrorSeverity.CRITICAL, user_message=f"Unhandled error in {capability_name}: {exc}", metadata={"technical_details": str(exc)} )
[docs] @staticmethod def get_retry_policy() -> Dict[str, Any]: """Get retry policy configuration for failure recovery strategies. This method provides retry configuration that the framework uses for manual retry handling when capabilities fail with RETRIABLE errors. The default policy provides reasonable defaults for most capabilities, but should be overridden for capabilities with specific timing or retry requirements. The retry policy controls: - Maximum number of retry attempts before giving up - Initial delay between retry attempts - Backoff factor for exponential delay increase :return: Dictionary containing retry configuration parameters :rtype: Dict[str, Any] .. note:: The framework uses manual retry handling rather than LangGraph's native retry policies to ensure consistent behavior across all components and to enable sophisticated error classification. Examples: Aggressive retry for network-dependent capability:: @staticmethod def get_retry_policy() -> Dict[str, Any]: return { "max_attempts": 5, # More attempts for network issues "delay_seconds": 2.0, # Longer delay for external services "backoff_factor": 2.0 # Exponential backoff } Conservative retry for expensive operations:: @staticmethod def get_retry_policy() -> Dict[str, Any]: return { "max_attempts": 2, # Minimal retries for expensive ops "delay_seconds": 0.1, # Quick retry for transient issues "backoff_factor": 1.0 # No backoff for fast operations } .. seealso:: :func:`classify_error` : Error classification that determines when to retry :class:`ErrorSeverity` : RETRIABLE severity triggers retry policy usage """ return { "max_attempts": 3, "delay_seconds": 0.5, "backoff_factor": 1.5 }
[docs] def _create_orchestrator_guide(self) -> Optional[Any]: """Template Method: Create orchestrator guide for planning integration. IMPLEMENTATION APPROACHES (choose based on your needs): 1. **Production (Recommended)**: Use prompt builders through registry:: def _create_orchestrator_guide(self): prompt_provider = get_framework_prompts() builder = prompt_provider.get_my_capability_prompt_builder() return builder.get_orchestrator_guide() 2. **R&D/Experimentation**: Direct implementation for quick prototyping:: def _create_orchestrator_guide(self): return OrchestratorGuide( instructions="Use when user mentions X, Y, Z...", examples=[...], priority=10 ) :return: Orchestrator snippet for planning integration, or None if not needed :rtype: Optional[OrchestratorGuide] Example:: def _create_orchestrator_guide(self) -> Optional[OrchestratorGuide]: return OrchestratorGuide( capability_name="time_range_parsing", description="Parse time references into structured datetime ranges", when_to_use="When user mentions time periods, dates, or relative time references", provides_context="TIME_RANGE with start_date and end_date datetime objects", example_usage="For 'show me data from last week' or 'yesterday's performance'" ) """ logger = logging.getLogger(__name__) logger.warning(f"⚠️ Capability '{self.name}' is using base _create_orchestrator_guide() - " "this may cause orchestrator hallucination. Consider implementing " "_create_orchestrator_guide() for proper integration.") return None
[docs] def _create_classifier_guide(self) -> Optional[Any]: """Template Method: Create classifier guide for capability activation. IMPLEMENTATION APPROACHES (choose based on your needs): 1. **Production (Recommended)**: Use prompt builders through registry:: def _create_classifier_guide(self): prompt_provider = get_framework_prompts() builder = prompt_provider.get_my_capability_prompt_builder() return builder.get_classifier_guide() 2. **R&D/Experimentation**: Direct implementation for quick testing:: def _create_classifier_guide(self): return TaskClassifierGuide( instructions="Activate when user mentions time-related data requests", examples=[ ClassifierExample( query="Show me data from last week", result=True, reason="Contains time range requiring parsing" ), ClassifierExample( query="What is the current status?", result=False, reason="Current status request, no time parsing needed" ) ] ) :return: Classifier guide for capability selection, or None if not needed :rtype: Optional[TaskClassifierGuide] """ logger = logging.getLogger(__name__) logger.warning(f"⚠️ Capability '{self.name}' is using base _create_classifier_guide() - " "this may cause classification issues. Consider implementing " "_create_classifier_guide() for proper task classification.") return None
# Properties for compatibility and introspection @property def orchestrator_guide(self) -> Optional[Any]: """Get the orchestrator guide for this capability (lazy-loaded). Standardized interface used by the framework. Automatically calls _create_orchestrator_guide() on first access and caches the result. :return: Orchestrator guide for execution planning integration, or None if not needed :rtype: Optional[OrchestratorGuide] """ if not hasattr(self, '_orchestrator_guide'): self._orchestrator_guide = self._create_orchestrator_guide() return self._orchestrator_guide @property def classifier_guide(self) -> Optional[Any]: """Get the classifier guide for this capability (lazy-loaded). Standardized interface used by the framework. Automatically calls _create_classifier_guide() on first access and caches the result. :return: Classifier guide for capability activation, or None if not needed :rtype: Optional[TaskClassifierGuide] """ if not hasattr(self, '_classifier_guide'): self._classifier_guide = self._create_classifier_guide() return self._classifier_guide
[docs] def __repr__(self) -> str: """Return a string representation of the capability. :return: String representation including class name and capability name :rtype: str """ return f"<{self.__class__.__name__}: {self.name}>"