Base Components#

Foundation classes for capabilities and infrastructure nodes with LangGraph integration, plus decorators for seamless framework integration.

Base Classes#

BaseCapability#

class framework.base.BaseCapability[source]#

Bases: ABC

Base class for framework capabilities using convention-based configuration.

This class provides the foundation for all capabilities in the ALS Expert framework. Capabilities are self-contained business logic components that perform specific tasks and integrate seamlessly with the LangGraph execution model through convention-based patterns and automatic discovery.

The BaseCapability class enforces a strict contract through reflection-based validation: capabilities must define required components and can optionally implement guidance systems for orchestration and classification. The @capability_node decorator provides complete LangGraph integration including error handling, retry policies, and execution tracking.

Required Components (enforced at initialization):
  • name: Unique capability identifier used for registration and routing

  • description: Human-readable description for documentation and logging

  • execute(): Async static method containing the main business logic

Optional Components (with defaults provided):
  • provides: List of data types this capability generates (default: [])

  • requires: List of data types this capability depends on (default: [])

  • classify_error(): Domain-specific error classification (default: all CRITICAL)

  • get_retry_policy(): Retry configuration for failure recovery (default: 3 attempts)

  • _create_orchestrator_guide(): Integration guidance for execution planning

  • _create_classifier_guide(): Task classification guidance for capability selection

Architecture Integration:

The capability integrates with multiple framework systems:

  1. Execution System: Via @capability_node decorator for LangGraph nodes

  2. Planning System: Via orchestrator guides for step planning

  3. Classification System: Via classifier guides for capability selection

  4. Error Handling: Via error classification for recovery strategies

  5. Registry System: Via convention-based configuration

Parameters:
  • name (str) – Unique capability identifier for registry and routing

  • description (str) – Human-readable description for documentation

  • provides (List[str]) – Data types generated by this capability

  • requires (List[str]) – Data types required by this capability

Raises:

NotImplementedError – If required class attributes or methods are missing

Note

Use the @capability_node decorator to enable LangGraph integration with automatic error handling, retry policies, and execution tracking.

Warning

The execute method must be implemented as a static method and should return a dictionary of state updates for LangGraph to merge.

Examples

Basic capability implementation:

@capability_node
class WeatherCapability(BaseCapability):
    name = "weather_data"
    description = "Retrieve current weather conditions"
    provides = ["WEATHER_DATA"]
    requires = ["LOCATION"]

    @staticmethod
    async def execute(state: AgentState, **kwargs) -> Dict[str, Any]:
        location = state.get("location")
        weather_data = await fetch_weather(location)
        return {
            "weather_current_conditions": weather_data,
            "weather_last_updated": datetime.now().isoformat()
        }

Capability with custom error handling:

@capability_node
class DatabaseCapability(BaseCapability):
    name = "database_query"
    description = "Execute database queries with connection handling"

    @staticmethod
    def classify_error(exc: Exception, context: dict) -> ErrorClassification:
        if isinstance(exc, ConnectionError):
            return ErrorClassification(
                severity=ErrorSeverity.RETRIABLE,
                user_message="Database connection lost, retrying...",
                metadata={"technical_details": str(exc)}
            )
        return ErrorClassification(
            severity=ErrorSeverity.CRITICAL,
            user_message=f"Database error: {exc}",
            metadata={"technical_details": str(exc)}
        )

    @staticmethod
    async def execute(state: AgentState, **kwargs) -> Dict[str, Any]:
        # Implementation with database operations
        pass

See also

capability_node() : Decorator for LangGraph integration BaseInfrastructureNode : Infrastructure components base class ErrorClassification : Error classification system

Initialize the capability and validate required components.

Performs comprehensive validation of the capability class to ensure all required components are properly defined. This validation happens at initialization time to provide immediate feedback during development rather than waiting for runtime execution failures.

The validation process checks: 1. Required class attributes (name, description) are defined and non-None 2. The execute method is implemented as a static method 3. Optional attributes are properly initialized with defaults if missing

Raises:
  • NotImplementedError – If name or description class attributes are missing

  • NotImplementedError – If execute static method is not implemented

Note

This initialization performs validation only. The actual LangGraph integration happens through the @capability_node decorator.

Warning

Subclasses should not override this method unless they need additional validation. Override _create_orchestrator_guide() or _create_classifier_guide() for customization instead.

Required Class Attributes

name: str#

Unique capability identifier for registration and routing (required).

description: str#

Human-readable description for documentation and logging (required).

provides: List[str]#

List of context types this capability generates (default: []).

requires: List[str]#

List of context types this capability depends on (default: []).

Abstract Methods

abstractmethod async static execute(state, **kwargs)[source]#

Execute the main capability logic with comprehensive state management.

This is the core method that all capabilities must implement. It contains the primary business logic and integrates with the framework’s state management system. The method should be implemented as a static method to support LangGraph’s execution model and enable proper serialization.

The execute method receives the complete agent state and should return state updates that LangGraph will automatically merge. The framework handles timing, error classification, retry logic, and execution tracking through the @capability_node decorator.

State Management Patterns: 1. Read from state: Access required data using state.get() with defaults 2. Process data: Perform the capability’s core business logic 3. Return updates: Return dictionary with state updates for merging 4. Use structured keys: Follow naming conventions for state organization

Parameters:
  • state (AgentState) – Current agent state containing all execution context and data

  • kwargs (dict) – Additional parameters passed from the execution system

Returns:

Dictionary of state updates for LangGraph to merge into agent state

Return type:

Dict[str, Any]

Raises:
  • NotImplementedError – This is an abstract method that must be implemented

  • ValidationError – If required state data is missing or invalid

  • CapabilityError – For capability-specific execution failures

Note

The @capability_node decorator provides automatic error handling, retry policies, timing, and execution tracking. Focus on the core business logic in this method.

Warning

This method must be implemented as a static method. Instance methods will not work with the LangGraph execution model.

Examples

Simple data retrieval capability:

@staticmethod
async def execute(state: AgentState, **kwargs) -> Dict[str, Any]:
    query = state.get("user_query", "")
    if not query:
        raise ValidationError("No query provided")

    results = await search_database(query)
    return {
        "search_results": results,
        "search_timestamp": datetime.now().isoformat(),
        "search_result_count": len(results)
    }

Capability with data transformation:

@staticmethod
async def execute(state: AgentState, **kwargs) -> Dict[str, Any]:
    raw_data = state.get("raw_sensor_data", [])
    processing_config = state.get("processing_config", {})

    processed_data = await transform_sensor_data(
        raw_data,
        config=processing_config
    )

    return {
        "processed_sensor_data": processed_data,
        "processing_metadata": {
            "processed_at": datetime.now().isoformat(),
            "record_count": len(processed_data),
            "config_used": processing_config
        }
    }

See also

capability_node() : Decorator that provides execution infrastructure AgentState : State management and structure documentation

Optional Methods

static classify_error(exc, context)[source]#

Classify errors for capability-specific error handling and recovery.

This method provides domain-specific error classification to determine appropriate recovery strategies. The default implementation treats all errors as critical, but capabilities should override this method to provide sophisticated error handling based on their specific failure modes.

The error classification determines how the framework responds to failures: - CRITICAL: End execution immediately - RETRIABLE: Retry with same parameters - REPLANNING: Create new execution plan - RECLASSIFICATION: Reclassify task capabilities - FATAL: System-level failure, terminate execution

Parameters:
  • exc (Exception) – The exception that occurred during capability execution

  • context (dict) – Error context including capability info and execution state

Returns:

Error classification with recovery strategy, or None to use default

Return type:

Optional[ErrorClassification]

Note

The context dictionary contains useful information including: - ‘capability’: capability name - ‘current_step_index’: step being executed - ‘execution_time’: time spent before failure - ‘current_state’: agent state at time of error

Examples

Network-aware error classification:

@staticmethod
def classify_error(exc: Exception, context: dict) -> ErrorClassification:
    # Retry network timeouts and connection errors
    if isinstance(exc, (ConnectionError, TimeoutError)):
        return ErrorClassification(
            severity=ErrorSeverity.RETRIABLE,
            user_message="Network issue detected, retrying...",
            metadata={"technical_details": str(exc)}
        )


    # Default to critical for unexpected errors
    return ErrorClassification(
        severity=ErrorSeverity.CRITICAL,
        user_message=f"Unexpected error: {exc}",
        metadata={"technical_details": str(exc)}
    )

Missing input data requiring replanning:

@staticmethod
def classify_error(exc: Exception, context: dict) -> ErrorClassification:
    if isinstance(exc, KeyError) and "context" in str(exc):
        return ErrorClassification(
            severity=ErrorSeverity.REPLANNING,
            user_message="Required data not available, trying different approach",
            metadata={"technical_details": f"Missing context data: {str(exc)}"}
        )
    return BaseCapability.classify_error(exc, context)

See also

ErrorClassification : Error classification result structure ErrorSeverity : Available severity levels and their meanings

static get_retry_policy()[source]#

Get retry policy configuration for failure recovery strategies.

This method provides retry configuration that the framework uses for manual retry handling when capabilities fail with RETRIABLE errors. The default policy provides reasonable defaults for most capabilities, but should be overridden for capabilities with specific timing or retry requirements.

The retry policy controls: - Maximum number of retry attempts before giving up - Initial delay between retry attempts - Backoff factor for exponential delay increase

Returns:

Dictionary containing retry configuration parameters

Return type:

Dict[str, Any]

Note

The framework uses manual retry handling rather than LangGraph’s native retry policies to ensure consistent behavior across all components and to enable sophisticated error classification.

Examples

Aggressive retry for network-dependent capability:

@staticmethod
def get_retry_policy() -> Dict[str, Any]:
    return {
        "max_attempts": 5,      # More attempts for network issues
        "delay_seconds": 2.0,   # Longer delay for external services
        "backoff_factor": 2.0   # Exponential backoff
    }

Conservative retry for expensive operations:

@staticmethod
def get_retry_policy() -> Dict[str, Any]:
    return {
        "max_attempts": 2,      # Minimal retries for expensive ops
        "delay_seconds": 0.1,   # Quick retry for transient issues
        "backoff_factor": 1.0   # No backoff for fast operations
    }

See also

classify_error() : Error classification that determines when to retry ErrorSeverity : RETRIABLE severity triggers retry policy usage

Template Methods

_create_orchestrator_guide()[source]#

Template Method: Create orchestrator guide for planning integration.

IMPLEMENTATION APPROACHES (choose based on your needs):

  1. Production (Recommended): Use prompt builders through registry:

    def _create_orchestrator_guide(self):
        prompt_provider = get_framework_prompts()
        builder = prompt_provider.get_my_capability_prompt_builder()
        return builder.get_orchestrator_guide()
    
  2. R&D/Experimentation: Direct implementation for quick prototyping:

    def _create_orchestrator_guide(self):
        return OrchestratorGuide(
            instructions="Use when user mentions X, Y, Z...",
            examples=[...], priority=10
        )
    
Returns:

Orchestrator snippet for planning integration, or None if not needed

Return type:

Optional[OrchestratorGuide]

Example:

def _create_orchestrator_guide(self) -> Optional[OrchestratorGuide]:
    return OrchestratorGuide(
        capability_name="time_range_parsing",
        description="Parse time references into structured datetime ranges",
        when_to_use="When user mentions time periods, dates, or relative time references",
        provides_context="TIME_RANGE with start_date and end_date datetime objects",
        example_usage="For 'show me data from last week' or 'yesterday's performance'"
    )
_create_classifier_guide()[source]#

Template Method: Create classifier guide for capability activation.

IMPLEMENTATION APPROACHES (choose based on your needs):

  1. Production (Recommended): Use prompt builders through registry:

    def _create_classifier_guide(self):
        prompt_provider = get_framework_prompts()
        builder = prompt_provider.get_my_capability_prompt_builder()
        return builder.get_classifier_guide()
    
  2. R&D/Experimentation: Direct implementation for quick testing:

    def _create_classifier_guide(self):
        return TaskClassifierGuide(
            instructions="Activate when user mentions time-related data requests",
            examples=[
                ClassifierExample(
                    query="Show me data from last week",
                    result=True,
                    reason="Contains time range requiring parsing"
                ),
                ClassifierExample(
                    query="What is the current status?",
                    result=False,
                    reason="Current status request, no time parsing needed"
                )
            ]
        )
    
Returns:

Classifier guide for capability selection, or None if not needed

Return type:

Optional[TaskClassifierGuide]

Properties

property orchestrator_guide: Any | None#

Get the orchestrator guide for this capability (lazy-loaded).

Standardized interface used by the framework. Automatically calls _create_orchestrator_guide() on first access and caches the result.

Returns:

Orchestrator guide for execution planning integration, or None if not needed

Return type:

Optional[OrchestratorGuide]

property classifier_guide: Any | None#

Get the classifier guide for this capability (lazy-loaded).

Standardized interface used by the framework. Automatically calls _create_classifier_guide() on first access and caches the result.

Returns:

Classifier guide for capability activation, or None if not needed

Return type:

Optional[TaskClassifierGuide]

name: str = None#
description: str = None#
provides: List[str] = []#
requires: List[str] = []#
__init__()[source]#

Initialize the capability and validate required components.

Performs comprehensive validation of the capability class to ensure all required components are properly defined. This validation happens at initialization time to provide immediate feedback during development rather than waiting for runtime execution failures.

The validation process checks: 1. Required class attributes (name, description) are defined and non-None 2. The execute method is implemented as a static method 3. Optional attributes are properly initialized with defaults if missing

Raises:
  • NotImplementedError – If name or description class attributes are missing

  • NotImplementedError – If execute static method is not implemented

Note

This initialization performs validation only. The actual LangGraph integration happens through the @capability_node decorator.

Warning

Subclasses should not override this method unless they need additional validation. Override _create_orchestrator_guide() or _create_classifier_guide() for customization instead.

abstractmethod async static execute(state, **kwargs)[source]#

Execute the main capability logic with comprehensive state management.

This is the core method that all capabilities must implement. It contains the primary business logic and integrates with the framework’s state management system. The method should be implemented as a static method to support LangGraph’s execution model and enable proper serialization.

The execute method receives the complete agent state and should return state updates that LangGraph will automatically merge. The framework handles timing, error classification, retry logic, and execution tracking through the @capability_node decorator.

State Management Patterns: 1. Read from state: Access required data using state.get() with defaults 2. Process data: Perform the capability’s core business logic 3. Return updates: Return dictionary with state updates for merging 4. Use structured keys: Follow naming conventions for state organization

Parameters:
  • state (AgentState) – Current agent state containing all execution context and data

  • kwargs (dict) – Additional parameters passed from the execution system

Returns:

Dictionary of state updates for LangGraph to merge into agent state

Return type:

Dict[str, Any]

Raises:
  • NotImplementedError – This is an abstract method that must be implemented

  • ValidationError – If required state data is missing or invalid

  • CapabilityError – For capability-specific execution failures

Note

The @capability_node decorator provides automatic error handling, retry policies, timing, and execution tracking. Focus on the core business logic in this method.

Warning

This method must be implemented as a static method. Instance methods will not work with the LangGraph execution model.

Examples

Simple data retrieval capability:

@staticmethod
async def execute(state: AgentState, **kwargs) -> Dict[str, Any]:
    query = state.get("user_query", "")
    if not query:
        raise ValidationError("No query provided")

    results = await search_database(query)
    return {
        "search_results": results,
        "search_timestamp": datetime.now().isoformat(),
        "search_result_count": len(results)
    }

Capability with data transformation:

@staticmethod
async def execute(state: AgentState, **kwargs) -> Dict[str, Any]:
    raw_data = state.get("raw_sensor_data", [])
    processing_config = state.get("processing_config", {})

    processed_data = await transform_sensor_data(
        raw_data,
        config=processing_config
    )

    return {
        "processed_sensor_data": processed_data,
        "processing_metadata": {
            "processed_at": datetime.now().isoformat(),
            "record_count": len(processed_data),
            "config_used": processing_config
        }
    }

See also

capability_node() : Decorator that provides execution infrastructure AgentState : State management and structure documentation

static classify_error(exc, context)[source]#

Classify errors for capability-specific error handling and recovery.

This method provides domain-specific error classification to determine appropriate recovery strategies. The default implementation treats all errors as critical, but capabilities should override this method to provide sophisticated error handling based on their specific failure modes.

The error classification determines how the framework responds to failures: - CRITICAL: End execution immediately - RETRIABLE: Retry with same parameters - REPLANNING: Create new execution plan - RECLASSIFICATION: Reclassify task capabilities - FATAL: System-level failure, terminate execution

Parameters:
  • exc (Exception) – The exception that occurred during capability execution

  • context (dict) – Error context including capability info and execution state

Returns:

Error classification with recovery strategy, or None to use default

Return type:

Optional[ErrorClassification]

Note

The context dictionary contains useful information including: - ‘capability’: capability name - ‘current_step_index’: step being executed - ‘execution_time’: time spent before failure - ‘current_state’: agent state at time of error

Examples

Network-aware error classification:

@staticmethod
def classify_error(exc: Exception, context: dict) -> ErrorClassification:
    # Retry network timeouts and connection errors
    if isinstance(exc, (ConnectionError, TimeoutError)):
        return ErrorClassification(
            severity=ErrorSeverity.RETRIABLE,
            user_message="Network issue detected, retrying...",
            metadata={"technical_details": str(exc)}
        )


    # Default to critical for unexpected errors
    return ErrorClassification(
        severity=ErrorSeverity.CRITICAL,
        user_message=f"Unexpected error: {exc}",
        metadata={"technical_details": str(exc)}
    )

Missing input data requiring replanning:

@staticmethod
def classify_error(exc: Exception, context: dict) -> ErrorClassification:
    if isinstance(exc, KeyError) and "context" in str(exc):
        return ErrorClassification(
            severity=ErrorSeverity.REPLANNING,
            user_message="Required data not available, trying different approach",
            metadata={"technical_details": f"Missing context data: {str(exc)}"}
        )
    return BaseCapability.classify_error(exc, context)

See also

ErrorClassification : Error classification result structure ErrorSeverity : Available severity levels and their meanings

static get_retry_policy()[source]#

Get retry policy configuration for failure recovery strategies.

This method provides retry configuration that the framework uses for manual retry handling when capabilities fail with RETRIABLE errors. The default policy provides reasonable defaults for most capabilities, but should be overridden for capabilities with specific timing or retry requirements.

The retry policy controls: - Maximum number of retry attempts before giving up - Initial delay between retry attempts - Backoff factor for exponential delay increase

Returns:

Dictionary containing retry configuration parameters

Return type:

Dict[str, Any]

Note

The framework uses manual retry handling rather than LangGraph’s native retry policies to ensure consistent behavior across all components and to enable sophisticated error classification.

Examples

Aggressive retry for network-dependent capability:

@staticmethod
def get_retry_policy() -> Dict[str, Any]:
    return {
        "max_attempts": 5,      # More attempts for network issues
        "delay_seconds": 2.0,   # Longer delay for external services
        "backoff_factor": 2.0   # Exponential backoff
    }

Conservative retry for expensive operations:

@staticmethod
def get_retry_policy() -> Dict[str, Any]:
    return {
        "max_attempts": 2,      # Minimal retries for expensive ops
        "delay_seconds": 0.1,   # Quick retry for transient issues
        "backoff_factor": 1.0   # No backoff for fast operations
    }

See also

classify_error() : Error classification that determines when to retry ErrorSeverity : RETRIABLE severity triggers retry policy usage

property orchestrator_guide: Any | None#

Get the orchestrator guide for this capability (lazy-loaded).

Standardized interface used by the framework. Automatically calls _create_orchestrator_guide() on first access and caches the result.

Returns:

Orchestrator guide for execution planning integration, or None if not needed

Return type:

Optional[OrchestratorGuide]

property classifier_guide: Any | None#

Get the classifier guide for this capability (lazy-loaded).

Standardized interface used by the framework. Automatically calls _create_classifier_guide() on first access and caches the result.

Returns:

Classifier guide for capability activation, or None if not needed

Return type:

Optional[TaskClassifierGuide]

__repr__()[source]#

Return a string representation of the capability.

Returns:

String representation including class name and capability name

Return type:

str

BaseInfrastructureNode#

class framework.base.BaseInfrastructureNode[source]#

Bases: ABC

Base class for infrastructure nodes in the LangGraph-native architecture.

This class provides the foundation for all infrastructure components in the ALS Expert framework. Infrastructure nodes handle system-level operations that orchestrate, route, classify, and monitor agent execution. Unlike capabilities which contain business logic, infrastructure nodes focus on system coordination and management.

The BaseInfrastructureNode class enforces a strict contract through reflection-based validation and provides standardized integration with LangGraph’s execution model. Infrastructure nodes are designed for fast failure detection and minimal retry attempts since they handle system-critical functions.

Infrastructure Node Responsibilities:

  • Task Extraction: Parse and structure user requests into actionable tasks

  • Classification: Determine which capabilities should handle specific requests

  • Orchestration: Plan and coordinate execution sequences across capabilities

  • Routing: Direct execution flow based on state conditions and results

  • Monitoring: Track execution progress and system health

  • Error Coordination: Handle system-level error recovery and routing

Required Components (enforced at decoration time):

  • name: Infrastructure node identifier for routing and logging

  • description: Human-readable description for documentation

  • execute(): Async static method containing orchestration logic

  • classify_error(): Error classification method (inherited or custom)

  • get_retry_policy(): Retry configuration method (inherited or custom)

Architecture Integration:

Infrastructure nodes integrate with the framework through:

  1. LangGraph Integration: Via @infrastructure_node decorator

  2. State Management: Pure dictionary operations for serialization

  3. Error Handling: Conservative policies with fast failure detection

  4. Streaming: Native LangGraph streaming for real-time updates

  5. Configuration: Access to LangGraph’s configuration system

Example:

@infrastructure_node
class TaskExtractionNode(BaseInfrastructureNode):
    name = "task_extraction"
    description = "Task Extraction and Processing"

    @staticmethod
    async def execute(state: AgentState, **kwargs) -> Dict[str, Any]:
        # Explicit logger retrieval - professional practice
        from configs.logger import get_logger
        logger = get_logger("framework", "task_extraction")

        # Use get_stream_writer() for pure LangGraph streaming
        from langgraph.config import get_stream_writer
        streaming = get_stream_writer()

        if streaming:
            streaming({"event_type": "status", "message": "Extracting task", "progress": 0.3})

        logger.info("Starting task extraction")

        # Extract and process task from flat state structure
        task = state.get("task_current_task", "")

        if streaming:
            streaming({"event_type": "status", "message": "Task extraction complete", "progress": 1.0, "complete": True})

        # Return state updates for flat structure
        return {
            "task_current_task": task,
            "task_depends_on_chat_history": True,
            "task_depends_on_user_memory": False
        }

    @staticmethod
    def classify_error(exc: Exception, context: dict) -> ErrorClassification:
        # Optional error classification
        return ErrorClassification(severity=ErrorSeverity.RETRIABLE, ...)
Parameters:
  • name (str) – Infrastructure node identifier (required class attribute)

  • description (str) – Human-readable description (required class attribute)

Note

Infrastructure nodes use the @infrastructure_node decorator which handles all LangGraph integration, parameter injection, and error handling.

Warning

The name and description class attributes are required. The execute method must be implemented as a static method and should return state updates.

Required Class Attributes

name: str

Infrastructure node identifier for routing and logging (required).

description: str

Human-readable description for documentation and monitoring (required).

Abstract Methods

abstractmethod async static execute(state, **kwargs)[source]

Execute the infrastructure operation with comprehensive system coordination.

This is the core method that all infrastructure nodes must implement. It contains the orchestration, routing, or monitoring logic and integrates with the framework’s state management system. The method should be implemented as a static method to support LangGraph’s execution model.

Infrastructure nodes should focus on system coordination rather than business logic. They receive the complete agent state and return updates that LangGraph automatically merges. The @infrastructure_node decorator provides timing, error handling, and execution tracking.

Common Infrastructure Patterns: 1. Task Extraction: Parse user input into structured task information 2. Classification: Analyze requests to determine capability routing 3. Orchestration: Create execution plans with capability sequences 4. Routing: Direct flow based on state conditions and results 5. Monitoring: Track progress and system health metrics

Parameters:
  • state (AgentState) – Current agent state containing all execution context and data

  • kwargs (dict) – Additional parameters including logger and configuration

Returns:

Dictionary of state updates for LangGraph to merge into agent state

Return type:

Dict[str, Any]

Raises:
  • NotImplementedError – This is an abstract method that must be implemented

  • ValidationError – If required state data is missing or invalid

  • InfrastructureError – For infrastructure-specific operation failures

Example:

@staticmethod
async def execute(state: AgentState, **kwargs) -> Dict[str, Any]:
    # Explicit logger retrieval - professional practice
    from configs.logger import get_logger
    logger = get_logger("framework", "orchestrator")

    # Define streaming helper here for step awareness
    from configs.streaming import get_streamer
    streamer = get_streamer("framework", "orchestrator", state)
    streamer.status("Starting orchestration")

    logger.info("Starting execution planning")

    # Infrastructure logic
    plan = create_execution_plan(state)

    streamer.status("Orchestration complete")
    logger.info("Execution plan created")

    # Return state updates
    return {
        "planning_execution_plan": plan,
        "planning_ready_for_execution": True
    }

Note

Infrastructure nodes should focus on orchestration, routing, and state management logic. Retrieve loggers explicitly using get_logger() for professional code. The @infrastructure_node decorator handles timing, error handling, and retry policies. Use get_streamer() for streaming updates.

Error Handling Methods

static classify_error(exc, context)[source]

Classify errors for infrastructure-specific error handling and recovery.

This method provides default error classification for all infrastructure nodes with a conservative approach that treats most errors as critical. Infrastructure nodes handle system-critical functions like orchestration and routing, so failures typically require immediate attention rather than automatic retry attempts.

The default implementation prioritizes system stability by failing fast with clear error messages. Subclasses should override this method only when specific infrastructure components can benefit from retry logic (e.g., LLM-based orchestrators that may encounter temporary API issues).

Parameters:
  • exc (Exception) – The exception that occurred during infrastructure operation

  • context (dict) – Error context including node info, execution state, and timing

Returns:

Error classification with severity and recovery strategy

Return type:

ErrorClassification

Note

The context dictionary includes:

  • infrastructure_node: node name for identification

  • execution_time: time spent before failure

  • current_state: agent state at time of error

Example:

@staticmethod
def classify_error(exc: Exception, context: dict) -> ErrorClassification:
    # Retry network timeouts for LLM-based infrastructure
    if isinstance(exc, (ConnectionError, TimeoutError)):
        return ErrorClassification(
            severity=ErrorSeverity.RETRIABLE,
            user_message="Network timeout, retrying...",
            metadata={"technical_details": str(exc)}
        )
    return ErrorClassification(
        severity=ErrorSeverity.CRITICAL,
        user_message=f"Infrastructure error: {exc}",
        metadata={"technical_details": str(exc)}
    )

Note

Infrastructure nodes should generally fail fast, so the default implementation treats most errors as critical. Override this method for infrastructure that can benefit from retries (e.g., LLM-based nodes).

static get_retry_policy()[source]

Get conservative retry policy configuration for infrastructure operations.

This method provides retry configuration optimized for infrastructure nodes that handle system-critical functions. The default policy uses conservative settings with minimal retry attempts and fast failure detection to maintain system stability.

Infrastructure nodes should generally fail fast rather than retry extensively, since failures often indicate system-level issues that require immediate attention. Override this method only for specific infrastructure components that can benefit from retry logic.

Returns:

Dictionary containing conservative retry configuration parameters

Return type:

Dict[str, Any]

Note

Infrastructure default policy: 2 attempts, 0.2s delay, minimal backoff. This prioritizes fast failure detection over retry persistence.

Example:

@staticmethod
def get_retry_policy() -> Dict[str, Any]:
    return {
        "max_attempts": 3,  # More retries for LLM-based infrastructure
        "delay_seconds": 1.0,  # Longer delay for external service calls
        "backoff_factor": 2.0  # Exponential backoff
    }

Note

The router uses this configuration to determine retry behavior. Infrastructure default: 2 attempts, 0.2s delay, minimal backoff.

name: str = None#
description: str = None#
abstractmethod async static execute(state, **kwargs)[source]#

Execute the infrastructure operation with comprehensive system coordination.

This is the core method that all infrastructure nodes must implement. It contains the orchestration, routing, or monitoring logic and integrates with the framework’s state management system. The method should be implemented as a static method to support LangGraph’s execution model.

Infrastructure nodes should focus on system coordination rather than business logic. They receive the complete agent state and return updates that LangGraph automatically merges. The @infrastructure_node decorator provides timing, error handling, and execution tracking.

Common Infrastructure Patterns: 1. Task Extraction: Parse user input into structured task information 2. Classification: Analyze requests to determine capability routing 3. Orchestration: Create execution plans with capability sequences 4. Routing: Direct flow based on state conditions and results 5. Monitoring: Track progress and system health metrics

Parameters:
  • state (AgentState) – Current agent state containing all execution context and data

  • kwargs (dict) – Additional parameters including logger and configuration

Returns:

Dictionary of state updates for LangGraph to merge into agent state

Return type:

Dict[str, Any]

Raises:
  • NotImplementedError – This is an abstract method that must be implemented

  • ValidationError – If required state data is missing or invalid

  • InfrastructureError – For infrastructure-specific operation failures

Example:

@staticmethod
async def execute(state: AgentState, **kwargs) -> Dict[str, Any]:
    # Explicit logger retrieval - professional practice
    from configs.logger import get_logger
    logger = get_logger("framework", "orchestrator")

    # Define streaming helper here for step awareness
    from configs.streaming import get_streamer
    streamer = get_streamer("framework", "orchestrator", state)
    streamer.status("Starting orchestration")

    logger.info("Starting execution planning")

    # Infrastructure logic
    plan = create_execution_plan(state)

    streamer.status("Orchestration complete")
    logger.info("Execution plan created")

    # Return state updates
    return {
        "planning_execution_plan": plan,
        "planning_ready_for_execution": True
    }

Note

Infrastructure nodes should focus on orchestration, routing, and state management logic. Retrieve loggers explicitly using get_logger() for professional code. The @infrastructure_node decorator handles timing, error handling, and retry policies. Use get_streamer() for streaming updates.

static classify_error(exc, context)[source]#

Classify errors for infrastructure-specific error handling and recovery.

This method provides default error classification for all infrastructure nodes with a conservative approach that treats most errors as critical. Infrastructure nodes handle system-critical functions like orchestration and routing, so failures typically require immediate attention rather than automatic retry attempts.

The default implementation prioritizes system stability by failing fast with clear error messages. Subclasses should override this method only when specific infrastructure components can benefit from retry logic (e.g., LLM-based orchestrators that may encounter temporary API issues).

Parameters:
  • exc (Exception) – The exception that occurred during infrastructure operation

  • context (dict) – Error context including node info, execution state, and timing

Returns:

Error classification with severity and recovery strategy

Return type:

ErrorClassification

Note

The context dictionary includes:

  • infrastructure_node: node name for identification

  • execution_time: time spent before failure

  • current_state: agent state at time of error

Example:

@staticmethod
def classify_error(exc: Exception, context: dict) -> ErrorClassification:
    # Retry network timeouts for LLM-based infrastructure
    if isinstance(exc, (ConnectionError, TimeoutError)):
        return ErrorClassification(
            severity=ErrorSeverity.RETRIABLE,
            user_message="Network timeout, retrying...",
            metadata={"technical_details": str(exc)}
        )
    return ErrorClassification(
        severity=ErrorSeverity.CRITICAL,
        user_message=f"Infrastructure error: {exc}",
        metadata={"technical_details": str(exc)}
    )

Note

Infrastructure nodes should generally fail fast, so the default implementation treats most errors as critical. Override this method for infrastructure that can benefit from retries (e.g., LLM-based nodes).

static get_retry_policy()[source]#

Get conservative retry policy configuration for infrastructure operations.

This method provides retry configuration optimized for infrastructure nodes that handle system-critical functions. The default policy uses conservative settings with minimal retry attempts and fast failure detection to maintain system stability.

Infrastructure nodes should generally fail fast rather than retry extensively, since failures often indicate system-level issues that require immediate attention. Override this method only for specific infrastructure components that can benefit from retry logic.

Returns:

Dictionary containing conservative retry configuration parameters

Return type:

Dict[str, Any]

Note

Infrastructure default policy: 2 attempts, 0.2s delay, minimal backoff. This prioritizes fast failure detection over retry persistence.

Example:

@staticmethod
def get_retry_policy() -> Dict[str, Any]:
    return {
        "max_attempts": 3,  # More retries for LLM-based infrastructure
        "delay_seconds": 1.0,  # Longer delay for external service calls
        "backoff_factor": 2.0  # Exponential backoff
    }

Note

The router uses this configuration to determine retry behavior. Infrastructure default: 2 attempts, 0.2s delay, minimal backoff.

__repr__()[source]#

Return a string representation of the infrastructure node for debugging.

Provides a concise string representation that includes both the Python class name and the infrastructure node’s registered name. This is useful for debugging, logging, and development workflows where infrastructure nodes need to be identified clearly.

Returns:

String representation including class name and node name

Return type:

str

Example

>>> node = TaskExtractionNode()
>>> repr(node)
'<TaskExtractionNode: task_extraction>'

Note

The format follows the pattern ‘<ClassName: node_name>’ for consistency across all framework components.

LangGraph Integration Decorators#

capability_node#

framework.base.capability_node(cls)[source]#

Decorator that validates capability conventions and injects comprehensive LangGraph infrastructure.

This decorator serves as the primary integration point between capability classes and LangGraph’s execution model. It performs reflection-based validation to ensure capability classes implement required components, then creates a LangGraph-compatible node function with complete infrastructure including error handling, retry coordination, execution tracking, and state management.

The decorator implements the framework’s convention-based architecture by: 1. Validation: Ensures all required components are properly implemented 2. Infrastructure Injection: Provides timing, logging, streaming, and error handling 3. LangGraph Integration: Creates compatible node functions with state management 4. Error Coordination: Routes all errors through manual retry system for consistency 5. Execution Tracking: Comprehensive performance monitoring and state updates

Required Components (validated through reflection):
  • name: Unique capability identifier for registry and routing

  • description: Human-readable description for documentation and logging

  • execute(): Async static method containing the main business logic

  • classify_error(): Error classification method (inherited from BaseCapability or custom)

  • get_retry_policy(): Retry configuration method (inherited from BaseCapability or custom)

Infrastructure Features:
  • Error Classification: Domain-specific error analysis with recovery strategies

  • Manual Retry System: Consistent retry handling via router (no LangGraph retries)

  • State Management: Automatic state updates and step progression tracking

  • Streaming Support: Real-time status updates through LangGraph’s streaming

  • Development Mode: Raw error re-raising for debugging when configured

  • Execution Tracking: Comprehensive timing and performance monitoring

Parameters:

cls (type) – The capability class to decorate with LangGraph infrastructure

Returns:

Original class enhanced with langgraph_node attribute containing the LangGraph function

Return type:

type

Raises:
  • ValueError – If required class attributes (name, description) are missing

  • ValueError – If required methods (execute, classify_error, get_retry_policy) are missing

Note

The decorator creates a langgraph_node attribute on the class containing the LangGraph-compatible function. The original class remains unchanged for introspection and testing purposes.

Warning

All capability errors are routed through the manual retry system rather than using LangGraph’s native retry policies to ensure consistent behavior across all framework components.

Examples

Basic capability decoration:

@capability_node
class WeatherCapability(BaseCapability):
    name = "weather_data"
    description = "Retrieve current weather conditions"

    @staticmethod
    async def execute(state: AgentState, **kwargs) -> Dict[str, Any]:
        # Business logic implementation
        return {"weather_data": weather_info}

Capability with custom error handling:

@capability_node
class DatabaseCapability(BaseCapability):
    name = "database_query"
    description = "Execute database queries"

    @staticmethod
    def classify_error(exc: Exception, context: dict) -> ErrorClassification:
        if isinstance(exc, ConnectionError):
            return ErrorClassification(severity=ErrorSeverity.RETRIABLE, ...)
        return ErrorClassification(severity=ErrorSeverity.CRITICAL, ...)

    @staticmethod
    async def execute(state: AgentState, **kwargs) -> Dict[str, Any]:
        # Database operation implementation
        return {"query_results": results}

See also

BaseCapability : Base class with required method implementations infrastructure_node() : Decorator for infrastructure components ErrorClassification : Error classification system

Validation Requirements:

The decorator validates that the decorated class implements:

  • name (str): Unique capability identifier

  • description (str): Human-readable description

  • execute() (async static method): Main business logic

  • classify_error() (static method): Error classification (inherited or custom)

  • get_retry_policy() (static method): Retry configuration (inherited or custom)

Infrastructure Features:

  • Error classification with domain-specific recovery strategies

  • Manual retry system via router (no LangGraph retries)

  • State management with automatic state updates and step progression

  • Streaming support through LangGraph’s streaming system

  • Development mode with raw error re-raising for debugging

  • Comprehensive timing and performance monitoring

Usage Example:

@capability_node
class WeatherCapability(BaseCapability):
    name = "weather_data"
    description = "Retrieve current weather conditions"
    provides = ["WEATHER_DATA"]
    requires = ["LOCATION"]

    @staticmethod
    async def execute(state: AgentState, **kwargs) -> Dict[str, Any]:
        # Business logic implementation
        return {"weather_data": weather_info}

infrastructure_node#

framework.base.infrastructure_node(cls=None, *, quiet=False)[source]#

Decorator that validates infrastructure node conventions and injects comprehensive LangGraph infrastructure.

This decorator serves as the primary integration point between infrastructure node classes and LangGraph’s execution model. It performs reflection-based validation to ensure infrastructure classes implement required components, then creates a LangGraph-compatible node function with complete system infrastructure including error handling, performance monitoring, and state coordination.

Infrastructure nodes handle system-critical operations like orchestration, routing, classification, and monitoring. The decorator emphasizes fast failure detection and conservative error handling since infrastructure failures typically indicate system-level issues requiring immediate attention.

The decorator implements comprehensive system coordination by: 1. Validation: Ensures all required components are properly implemented 2. Infrastructure Injection: Provides timing, logging, streaming, and error handling 3. LangGraph Integration: Creates compatible node functions with native features 4. Error Coordination: Manual retry system with conservative failure policies 5. System Monitoring: Performance tracking and infrastructure health monitoring

Required Components (validated through reflection):
  • name: Infrastructure node identifier for routing and logging

  • description: Human-readable description for documentation and monitoring

  • execute(): Async static method containing orchestration/routing logic

  • classify_error(): Error classification method (inherited or custom)

  • get_retry_policy(): Retry configuration method (inherited or custom)

Infrastructure Features:
  • Conservative Error Handling: Fast failure detection with minimal retry attempts

  • System Monitoring: Comprehensive timing and performance tracking

  • LangGraph Native Integration: Full streaming, configuration, and checkpoint support

  • Development Mode Support: Raw error re-raising for debugging when configured

  • Optional Quiet Mode: Suppressed logging for high-frequency routing operations

  • Fatal Error Handling: System-level failure detection with immediate termination

Parameters:
  • cls (Optional[type]) – The infrastructure node class to decorate (None for parameterized usage)

  • quiet (bool) – If True, suppress start/completion logging (useful for routing nodes)

Returns:

Enhanced infrastructure class with langgraph_node attribute

Return type:

type

Raises:
  • ValueError – If required class attributes (name, description) are missing

  • ValueError – If required methods (execute, classify_error, get_retry_policy) are missing

Note

The decorator supports both @infrastructure_node and @infrastructure_node(quiet=True) syntax. The quiet parameter is useful for high-frequency routing operations that would otherwise generate excessive logging.

Warning

Infrastructure nodes use conservative retry policies with fast failure detection. FATAL errors immediately terminate execution to prevent system-level issues.

Examples

Basic infrastructure node:

@infrastructure_node
class TaskExtractionNode(BaseInfrastructureNode):
    name = "task_extraction"
    description = "Extract and structure user tasks"

    @staticmethod
    async def execute(state: AgentState, **kwargs) -> Dict[str, Any]:
        # Task extraction logic
        return {"task_current_task": extracted_task}

Quiet routing node:

@infrastructure_node(quiet=True)
class RouterNode(BaseInfrastructureNode):
    name = "router"
    description = "Dynamic routing based on agent state"

    @staticmethod
    async def execute(state: AgentState, **kwargs) -> Dict[str, Any]:
        # Routing logic without verbose logging
        return {"control_next_node": next_node}

Infrastructure node with custom error handling:

@infrastructure_node
class OrchestratorNode(BaseInfrastructureNode):
    name = "orchestrator"
    description = "Create execution plans"

    @staticmethod
    def classify_error(exc: Exception, context: dict) -> ErrorClassification:
        # Retry LLM timeouts for planning operations
        if isinstance(exc, TimeoutError):
            return ErrorClassification(severity=ErrorSeverity.RETRIABLE, ...)
        return ErrorClassification(severity=ErrorSeverity.CRITICAL, ...)

See also

BaseInfrastructureNode : Base class with required method implementations capability_node() : Decorator for business logic components ErrorSeverity : Error severity levels and recovery strategies

Example:

@infrastructure_node  # Validates requirements and injects infrastructure!
class TaskExtractionNode(BaseInfrastructureNode):
    name = "task_extraction"
    description = "Task Extraction and Processing"

    @staticmethod
    async def execute(state: AgentState, **kwargs):
        # Explicit logger retrieval - professional practice
        from configs.logger import get_logger
        logger = get_logger("framework", "task_extraction")

        # Define streaming helper here for step awareness
        from configs.streaming import get_streamer
        streamer = get_streamer("framework", "task_extraction", state)
        streamer.status("Processing...")

        logger.info("Starting task extraction")

        # Main infrastructure logic
        result = await extract_task_from_conversation(state)
        return {"task_current_task": result.task}

    @staticmethod
    def classify_error(exc: Exception, context: dict) -> ErrorClassification:
        # Infrastructure-specific error classification
        return ErrorClassification(severity=ErrorSeverity.RETRIABLE, ...)
Parameters:
  • cls (Type[BaseInfrastructureNode]) – Infrastructure node class to enhance with LangGraph-native execution

  • quiet (bool) – If True, suppress start/completion logging (useful for routing nodes)

Returns:

Enhanced infrastructure class with LangGraph-native execution

Return type:

Type[BaseInfrastructureNode]

Raises:

ValueError – If required components are missing

Note

The decorator creates a langgraph_node attribute containing the LangGraph-compatible function. This is what the registry uses for actual execution.

Infrastructure nodes use manual error handling for consistency with capability nodes. No automatic retry policies are created.

Validation Requirements:

The decorator validates that the decorated class implements:

  • name (str): Infrastructure node identifier

  • description (str): Human-readable description

  • execute() (async static method): Orchestration/routing logic

  • classify_error() (static method): Error classification (inherited or custom)

  • get_retry_policy() (static method): Retry configuration (inherited or custom)

Infrastructure Features:

  • Conservative error handling with fast failure detection

  • System monitoring with comprehensive timing and performance tracking

  • LangGraph native integration with streaming, configuration, and checkpoints

  • Development mode support with raw error re-raising

  • Optional quiet mode for high-frequency routing operations

  • Fatal error handling with immediate termination for system-level failures

Usage Examples:

@infrastructure_node
class TaskExtractionNode(BaseInfrastructureNode):
    name = "task_extraction"
    description = "Extract and structure user tasks"

    @staticmethod
    async def execute(state: AgentState, **kwargs) -> Dict[str, Any]:
        # Task extraction logic
        return {"task_current_task": extracted_task}

@infrastructure_node(quiet=True)
class RouterNode(BaseInfrastructureNode):
    name = "router"
    description = "Dynamic routing based on agent state"

    @staticmethod
    async def execute(state: AgentState, **kwargs) -> Dict[str, Any]:
        # Routing logic without verbose logging
        return {"control_next_node": next_node}

Supporting Types#

Error Classification#

class framework.base.ErrorSeverity(value)[source]#

Bases: Enum

Enumeration of error severity levels with comprehensive recovery strategies.

This enum defines the complete spectrum of error severity classifications and their corresponding recovery strategies used throughout the ALS Expert framework. Each severity level triggers specific recovery behavior designed to maintain robust system operation while enabling intelligent error handling and graceful degradation.

The severity levels form a hierarchy of recovery strategies from simple retries to complete execution termination. The framework’s error handling system uses these classifications to coordinate recovery efforts between capabilities, infrastructure nodes, and the overall execution system.

Recovery Strategy Hierarchy: 1. Automatic Recovery: RETRIABLE errors with retry mechanisms 2. Strategy Adjustment: REPLANNING for execution plan adaptation 3. Capability Adjustment: RECLASSIFICATION for capability selection adaptation 4. Execution Control: CRITICAL for graceful termination 5. System Protection: FATAL for immediate termination

Parameters:
  • CRITICAL (str) – End execution immediately - unrecoverable errors requiring termination

  • RETRIABLE (str) – Retry current execution step with same parameters - transient failures

  • REPLANNING (str) – Create new execution plan with different strategy - approach failures

  • RECLASSIFICATION (str) – Reclassify task to select different capabilities - selection failures

  • FATAL (str) – System-level failure requiring immediate termination - corruption prevention

Note

The framework uses manual retry coordination rather than automatic retries to ensure consistent behavior and sophisticated error analysis across all components.

Warning

FATAL errors immediately raise exceptions to terminate execution and prevent system corruption. Use FATAL only for errors that indicate serious system issues that could compromise framework integrity.

Examples

Network error classification:

if isinstance(exc, YourCustomConnectionError):
    return ErrorClassification(severity=ErrorSeverity.RETRIABLE, ...)
elif isinstance(exc, YourCustomAuthenticationError):
    return ErrorClassification(severity=ErrorSeverity.CRITICAL, ...)

Data validation error handling (example exception classes):

if isinstance(exc, ValidationError):
    return ErrorClassification(severity=ErrorSeverity.REPLANNING, ...)
elif isinstance(exc, YourCustomCapabilityMismatchError):
    return ErrorClassification(severity=ErrorSeverity.RECLASSIFICATION, ...)
elif isinstance(exc, YourCustomCorruptionError):
    return ErrorClassification(severity=ErrorSeverity.FATAL, ...)

Note

The exception classes in these examples (YourCustomCapabilityMismatchError, YourCustomCorruptionError) are not provided by the framework - they are examples of domain-specific exceptions you might implement in your capabilities.

See also

ErrorClassification : Structured error analysis with severity ExecutionError : Comprehensive error information container

CRITICAL = 'critical'#
RETRIABLE = 'retriable'#
REPLANNING = 'replanning'#
RECLASSIFICATION = 'reclassification'#
FATAL = 'fatal'#
class framework.base.ExecutionError(severity, message, capability_name=None, metadata=None)[source]#

Bases: object

Comprehensive execution error container with recovery coordination support.

This dataclass provides a complete representation of execution errors including severity classification, recovery suggestions, technical debugging information, and context for coordinating recovery strategies. It serves as the primary error data structure used throughout the framework for error handling, logging, and recovery coordination.

ExecutionError enables sophisticated error management by providing: 1. Error Classification: Severity-based recovery strategy determination 2. User Communication: Clear, actionable error messages for interfaces 3. Developer Support: Technical details and debugging context

  1. System Integration: Context for automated recovery systems

The error structure supports both automated error handling workflows and human-guided error resolution processes. It integrates seamlessly with the framework’s classification system and retry mechanisms to provide comprehensive error management.

Parameters:
  • severity (ErrorSeverity) – Error severity classification for recovery strategy selection

  • message (str) – Clear, human-readable description of the error condition

  • capability_name (Optional[str]) – Name of the capability or component that generated this error

  • metadata (Optional[Dict[str, Any]]) – Structured error context including technical details and debugging information

Note

ExecutionError instances are typically created by error classification methods in capabilities and infrastructure nodes. The framework’s decorators automatically handle the creation and routing of these errors.

Warning

The severity field directly impacts system behavior through recovery strategy selection. Ensure appropriate severity classification to avoid ineffective error handling or unnecessary system termination.

Examples

Database connection error:

error = ExecutionError(
    severity=ErrorSeverity.RETRIABLE,
    message="Database connection failed",
    capability_name="database_query",

    metadata={"technical_details": "PostgreSQL connection timeout after 30 seconds"}
)

Data corruption requiring immediate attention:

error = ExecutionError(
    severity=ErrorSeverity.FATAL,
    message="Critical data corruption detected",
    capability_name="data_processor",
    metadata={
        "technical_details": "Checksum validation failed on primary data store",
        "safety_abort_reason": "Data integrity compromised"
    },
    suggestions=[
        "Initiate emergency backup procedures",
        "Contact system administrator immediately",
        "Do not proceed with further operations"
    ]
)

See also

ErrorSeverity : Severity levels and recovery strategies ErrorClassification : Error analysis and classification system ExecutionResult : Result containers with error integration

severity: ErrorSeverity#
message: str#
capability_name: str | None = None#
metadata: Dict[str, Any] | None = None#
__init__(severity, message, capability_name=None, metadata=None)#

Result Types#

class framework.base.ExecutionResult(success, data=None, error=None, execution_time=None, start_time=None, end_time=None)[source]#

Bases: object

Comprehensive result container for capability and infrastructure node executions.

This dataclass provides a complete record of execution outcomes including success status, result data, error information, and comprehensive timing details. It serves as the primary result container throughout the framework for execution tracking, error handling, and performance monitoring.

ExecutionResult enables comprehensive execution analysis by capturing: 1. Outcome Status: Clear success/failure indication 2. Result Data: Actual output from successful executions 3. Error Information: Structured error details for failures 4. Timing Metrics: Performance data for optimization 5. Execution Context: Temporal information for debugging

The result structure supports both synchronous analysis and asynchronous processing patterns while maintaining type safety and serialization compatibility for persistence and inter-process communication.

Parameters:
  • success (bool) – Whether the execution completed successfully without errors

  • data (Optional[Any]) – Result data from successful execution, None for failures

  • error (Optional[ExecutionError]) – Structured error information for failed executions, None for success

  • execution_time (Optional[float]) – Total execution duration in seconds for performance tracking

  • start_time (Optional[datetime]) – UTC timestamp when execution began

  • end_time (Optional[datetime]) – UTC timestamp when execution completed

Note

The success field determines which additional fields are meaningful: - Success=True: data should contain results, error should be None - Success=False: error should contain details, data should be None Timing fields are optional but highly recommended for monitoring.

Warning

Avoid setting both data and error fields simultaneously as this creates ambiguous result states. Use success field to determine the authoritative execution outcome.

Examples

Successful execution result:

result = ExecutionResult(
    success=True,
    data={"weather_data": weather_info, "location": "San Francisco"},
    execution_time=1.23,
    start_time=datetime.utcnow(),
    end_time=datetime.utcnow()
)

Failed execution result:

result = ExecutionResult(
    success=False,
    error=ExecutionError(
        severity=ErrorSeverity.RETRIABLE,
        message="Connection timeout",
        metadata={"technical_details": "HTTP 408 Request Timeout"}
    ),
    execution_time=5.0,
    start_time=datetime.utcnow(),
    end_time=datetime.utcnow()
)

See also

ExecutionError : Structured error information for failures ExecutionRecord : Historical execution records with steps

success: bool#
data: Any | None = None#
error: ExecutionError | None = None#
execution_time: float | None = None#
start_time: datetime | None = None#
end_time: datetime | None = None#
__init__(success, data=None, error=None, execution_time=None, start_time=None, end_time=None)#
class framework.base.ExecutionRecord(step, start_time, result, end_time=None)[source]#

Bases: object

Comprehensive historical record of completed execution steps.

This dataclass maintains complete records of execution step history including the original planned step, comprehensive timing information, and detailed execution results. It serves as the primary mechanism for execution history tracking, performance analysis, debugging, and audit trails throughout the framework.

ExecutionRecord enables comprehensive execution analysis by preserving: 1. Step Context: Original planned step with objectives and requirements 2. Timing Data: Detailed execution timing for performance analysis 3. Result Information: Complete outcome data including success/failure details 4. Historical Tracking: Sequential execution records for audit trails 5. Debug Information: Context needed for troubleshooting execution issues

The record structure supports both real-time monitoring and historical analysis while maintaining referential integrity between planned steps and their execution outcomes. Records are designed for efficient storage and retrieval in execution history systems.

Parameters:
  • step (PlannedStep) – The planned step that was executed, containing objectives and configuration

  • start_time (datetime) – UTC timestamp when step execution began

  • result (ExecutionResult) – Complete execution result with outcome data and error information

  • end_time (Optional[datetime]) – UTC timestamp when step execution completed (optional)

Note

The end_time field may be None if timing information can be derived from result.end_time. This provides flexibility in record construction while maintaining timing accuracy. Prefer result.end_time when available for consistency.

Warning

ExecutionRecord instances should be treated as immutable once created to maintain execution history integrity. Create new records rather than modifying existing ones.

Examples

Successful step execution record:

record = ExecutionRecord(
    step=PlannedStep(
        context_key="weather_data",
        capability="weather_retrieval",
        task_objective="Get current weather for San Francisco",
        success_criteria="Weather data retrieved with temperature"
    ),
    start_time=datetime.utcnow(),
    result=ExecutionResult(
        success=True,
        data={"temperature": 72, "conditions": "sunny"},
        execution_time=1.2
    )
)

Failed step execution record:

record = ExecutionRecord(
    step=planned_step,
    start_time=start_timestamp,
    result=ExecutionResult(
        success=False,
        error=ExecutionError(
            severity=ErrorSeverity.RETRIABLE,
            message="API rate limit exceeded"
        ),
        execution_time=0.5
    ),
    end_time=end_timestamp
)

See also

PlannedStep : Execution step planning and configuration ExecutionResult : Individual execution outcome data framework.base.planning : Execution planning system

step: PlannedStep#
start_time: datetime#
result: ExecutionResult#
end_time: datetime | None = None#
__init__(step, start_time, result, end_time=None)#
class framework.base.CapabilityMatch(*, is_match)[source]#

Bases: BaseModel

Task classification result for capability matching and selection.

This Pydantic model represents the outcome of task classification analysis to determine whether a user’s request should be handled by a specific capability. It serves as the primary data structure used by the classification system to route requests to appropriate capabilities based on sophisticated task analysis and capability matching algorithms.

CapabilityMatch enables intelligent capability selection by providing: 1. Binary Classification: Clear match/no-match decision for routing 2. Type Safety: Pydantic validation ensures data integrity 3. Serialization: JSON-compatible for inter-process communication 4. Integration: Seamless integration with classification pipelines 5. Consistency: Standardized format across all capability matchers

The model is designed for use in classification workflows where multiple capabilities are evaluated against a user request, and the classification system needs to make routing decisions based on the match results.

Parameters:

is_match (bool) – Boolean indicating whether the user’s request matches this capability

Note

This uses Pydantic BaseModel to ensure type safety, validation, and JSON serialization support. The model automatically validates that is_match is a proper boolean value.

Warning

The classification system relies on the accuracy of this match result for proper capability routing. Ensure classification logic is thoroughly tested to avoid routing errors.

Examples

Positive capability match:

match = CapabilityMatch(is_match=True)
# Indicates the capability should handle this request

Negative capability match:

match = CapabilityMatch(is_match=False)
# Indicates the capability should not handle this request

Usage in classification workflow:

matches = []
for capability in available_capabilities:
    classifier_result = classify_request(user_request, capability)
    match = CapabilityMatch(is_match=classifier_result)
    matches.append((capability, match))

# Select capabilities with positive matches
selected_capabilities = [
    cap for cap, match in matches if match.is_match
]

See also

framework.infrastructure.classifier : Task classification system TaskClassifierGuide : Classification guidance for capabilities ClassifierExample : Few-shot examples for classification

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

is_match: bool#
model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Planning Types#

class framework.base.PlannedStep[source]#

Bases: TypedDict

Individual execution step with comprehensive orchestration context.

This TypedDict represents a single capability execution within an agent’s execution plan. It provides complete context including objectives, success criteria, input requirements, and expected outputs to enable sophisticated orchestration and capability coordination throughout the framework.

PlannedStep serves multiple critical functions: 1. Orchestration Guidance: Clear objectives and success criteria for execution 2. Data Flow Management: Input/output specifications for capability chaining 3. Context Management: Unique keys for result storage and retrieval 4. Parameter Passing: Flexible configuration for capability customization 5. Execution Tracking: Complete context for monitoring and debugging

The structure uses total=False to support partial updates in LangGraph’s state management system, enabling incremental plan construction and modification during execution. All fields are optional to provide flexibility in plan creation and evolution.

Field Definitions:
  • context_key: Unique identifier for storing step results in execution context

  • capability: Name of the capability to execute for this step

  • task_objective: Complete, self-sufficient description of step goals

  • success_criteria: Clear criteria for determining successful completion

  • expected_output: Context type key where results will be stored

  • parameters: Optional capability-specific configuration parameters

  • inputs: Step inputs as list of {context_type: context_key} mappings

Default behaviors (when fields not provided):
  • expected_output: None (no specific output context expected)

  • parameters: None (no custom parameters required)

  • inputs: [] (no input dependencies)

Note

The task_objective should be complete and self-sufficient to enable capability execution without additional context. Success_criteria should be specific and measurable for reliable execution validation.

Warning

Context keys must be unique within an execution plan to prevent result collisions. Use descriptive, namespaced keys for clarity.

Examples

Data retrieval step:

step = PlannedStep(
    context_key="weather_data",
    capability="weather_retrieval",
    task_objective="Retrieve current weather conditions for San Francisco",
    success_criteria="Weather data retrieved with temperature and conditions",
    expected_output="WEATHER_DATA",
    parameters={"location": "San Francisco", "units": "metric"}
)

Data processing step with dependencies:

step = PlannedStep(
    context_key="processed_data",
    capability="data_processor",
    task_objective="Process raw sensor data for trend analysis",
    success_criteria="Data processed with statistical summary available",
    expected_output="PROCESSED_DATA",
    inputs=[{"RAW_SENSOR_DATA": "sensor_readings"}],
    parameters={"analysis_type": "trend", "window_size": 24}
)

See also

ExecutionPlan : Complete execution plan containing multiple steps ExecutionRecord : Historical record of completed step executions

context_key: str#
capability: str#
task_objective: str#
success_criteria: str#
expected_output: str | None#
parameters: Dict[str, str | int | float] | None#
inputs: List[Dict[str, str]] | None#
class framework.base.ExecutionPlan[source]#

Bases: TypedDict

Complete execution plan with ordered capability sequence and orchestration context.

This TypedDict represents the orchestrator’s comprehensive plan for accomplishing a user’s request through a coordinated sequence of capability executions. It provides the complete execution roadmap including step ordering, data flow, and coordination requirements for complex multi-capability tasks.

ExecutionPlan serves as the primary coordination mechanism for: 1. Multi-Step Execution: Ordered sequence of capability invocations 2. Data Flow Management: Input/output coordination between capabilities 3. State Persistence: LangGraph-compatible structure for checkpointing 4. Execution Tracking: Foundation for monitoring and debugging 5. Plan Evolution: Support for dynamic plan modification during execution

The structure uses total=False to support incremental plan construction and modification in LangGraph’s state management system. This enables dynamic planning where plans can be built progressively and modified based on execution results and changing requirements.

Plan Structure:
  • steps: Ordered list of PlannedStep objects defining the execution sequence

Default behaviors (when fields not provided):
  • steps: [] (empty plan requiring population)

Note

ExecutionPlan uses pure dictionary format for maximum compatibility with LangGraph’s serialization and checkpointing systems. All plan data can be safely persisted and restored across execution sessions.

Warning

Step ordering is critical for proper execution flow. Ensure dependencies between steps are properly sequenced and context keys are unique to prevent execution conflicts.

Examples

Simple two-step execution plan:

plan = ExecutionPlan(
    steps=[
        PlannedStep(
            context_key="user_location",
            capability="location_detection",
            task_objective="Determine user's current location",
            success_criteria="Location coordinates available",
            expected_output="LOCATION_DATA"
        ),
        PlannedStep(
            context_key="weather_report",
            capability="weather_retrieval",
            task_objective="Get weather for user's location",
            success_criteria="Weather data retrieved successfully",
            expected_output="WEATHER_DATA",
            inputs=[{"LOCATION_DATA": "user_location"}]
        )
    ]
)

Complex data processing pipeline:

plan = ExecutionPlan(
    steps=[
        PlannedStep(
            context_key="raw_data",
            capability="data_ingestion",
            task_objective="Ingest sensor data from last 24 hours",
            success_criteria="Raw data available for processing",
            expected_output="RAW_SENSOR_DATA"
        ),
        PlannedStep(
            context_key="cleaned_data",
            capability="data_cleaning",
            task_objective="Clean and validate sensor data",
            success_criteria="Data cleaned with quality metrics",
            expected_output="CLEANED_DATA",
            inputs=[{"RAW_SENSOR_DATA": "raw_data"}]
        ),
        PlannedStep(
            context_key="analysis_results",
            capability="trend_analysis",
            task_objective="Analyze trends in cleaned sensor data",
            success_criteria="Trend analysis complete with insights",
            expected_output="ANALYSIS_RESULTS",
            inputs=[{"CLEANED_DATA": "cleaned_data"}]
        )
    ]
)

See also

PlannedStep : Individual execution step structure save_execution_plan_to_file() : Plan persistence utilities load_execution_plan_from_file() : Plan loading utilities

steps: List[PlannedStep]#

Example System#

class framework.base.BaseExample[source]#

Bases: ABC

Abstract base class for all few-shot examples with consistent formatting interface.

This abstract base class establishes the foundational interface for all example types used in few-shot learning and prompt engineering throughout the ALS Expert framework. It enforces consistent formatting patterns that ensure reliable LLM consumption and predictable behavior across all example implementations.

The BaseExample class serves multiple critical functions: 1. Interface Standardization: Common format_for_prompt() method across all examples 2. Type Safety: Clear inheritance hierarchy for example type checking 3. Consistency Enforcement: Uniform presentation patterns for LLM consumption 4. Framework Integration: Seamless integration with prompt building systems 5. Extensibility: Clear extension points for domain-specific example types

All concrete example classes must inherit from this base and implement the format_for_prompt() method to define their specific formatting behavior. This ensures that examples can be used polymorphically throughout the framework while maintaining consistent output formats.

Note

This is an abstract base class that cannot be instantiated directly. Subclasses must implement the format_for_prompt() method to provide their specific formatting logic.

Warning

Example formatting directly impacts LLM performance. Ensure implementations produce consistent, well-structured output that follows established patterns for reliable model consumption.

Example:

@dataclass
class CustomExample(BaseExample):
    content: str
    category: str

    def format_for_prompt(self) -> str:
        return f"Category: {self.category}

Content: {self.content}”

See also

OrchestratorExample : Planning examples for capability orchestration ClassifierExample : Training examples for task classification

abstractmethod format_for_prompt()[source]#

Format this example for inclusion in LLM prompts with consistent structure.

This is the core formatting method that all example subclasses must implement. It transforms the example data into a string format optimized for LLM consumption and few-shot learning. The method should produce consistent, well-structured output that follows established formatting patterns for reliable model performance.

Implementation guidelines: 1. Consistency: Use consistent formatting patterns across similar examples 2. Clarity: Ensure output is clear and unambiguous for LLM interpretation 3. Completeness: Include all necessary context for effective few-shot learning 4. Efficiency: Generate concise but comprehensive example representations

Returns:

Formatted string representation optimized for LLM prompt inclusion

Return type:

str

Raises:

NotImplementedError – This is an abstract method that must be implemented

Note

The formatted output will be directly included in LLM prompts, so it should be optimized for model consumption and follow consistent patterns to ensure reliable few-shot learning performance.

Warning

Poorly formatted examples can significantly impact LLM performance. Test formatting output thoroughly to ensure it produces the expected behavior in few-shot learning scenarios.

__init__()#
class framework.base.OrchestratorExample(step, scenario_description, context_requirements=None, notes=None)[source]#

Bases: BaseExample

Structured example for orchestrator prompt showing how to plan steps with this capability.

This class provides rich examples that demonstrate how to plan execution steps with specific capabilities. Each example includes the planned step, scenario context, requirements, and optional notes to guide the orchestrator in creating effective execution plans.

Parameters:
  • step (PlannedStep) – The planned execution step demonstrating capability usage

  • scenario_description (str) – Human-readable description of when/why to use this capability

  • context_requirements (Optional[Dict[str, str]]) – What data needs to be available in execution context

  • notes (Optional[str]) – Additional guidance, caveats, or usage tips

step: PlannedStep#
scenario_description: str#
context_requirements: Dict[str, str] | None = None#
notes: str | None = None#
format_for_prompt()[source]#

Format this orchestrator example for execution planning with comprehensive context.

This method transforms the orchestrator example into a rich, structured format suitable for guiding execution planning in orchestration systems. It provides complete context including scenario descriptions, step specifications, context requirements, and additional notes to enable effective capability planning.

The formatting dynamically adapts to the PlannedStep structure and includes: 1. Scenario Description: Clear context for when to use this capability 2. Context Requirements: Prerequisites for successful execution 3. Step Specification: Complete PlannedStep details with all parameters 4. Additional Notes: Supplementary guidance and usage tips

Returns:

Formatted string with complete orchestration context for planning guidance

Return type:

str

Note

The formatting dynamically adapts to the PlannedStep structure, only including fields that have values. This ensures clean, focused examples without unnecessary null or empty fields.

Warning

The step formatting accesses PlannedStep fields dynamically. Ensure the step object is properly constructed with valid field values to avoid formatting issues.

Examples

Formatted orchestrator example output:

example = OrchestratorExample(
    step=PlannedStep(
        context_key="weather_data",
        capability="weather_retrieval",
        task_objective="Get current weather",
        success_criteria="Weather data retrieved"
    ),
    scenario_description="When user requests weather information",
    context_requirements={"location": "User location data"}
)
formatted = example.format_for_prompt()
# Returns formatted example with scenario, requirements, and step details

See also

_format_field_value() : Field value formatting helper method PlannedStep : Execution step structure and field definitions

static format_examples_for_prompt(examples)[source]#

Format multiple orchestrator examples for LLM prompt inclusion with consistent structure.

This method transforms a collection of OrchestratorExample objects into a formatted string suitable for inclusion in LLM prompts. It provides numbered examples with consistent spacing and section headers to ensure reliable consumption by orchestration systems.

The formatting follows established patterns for orchestration guidance: 1. Section Header: Clear identification of example content 2. Numbered Examples: Sequential numbering for easy reference 3. Consistent Spacing: Uniform formatting for reliable parsing 4. Complete Context: Full example details including scenarios and requirements

param examples:

List of orchestrator examples to format for prompt inclusion

type examples:

List[OrchestratorExample]

return:

Formatted string with numbered examples and section headers

rtype:

str

Note

Returns empty string if no examples are provided. The formatting preserves all example details including scenario descriptions, context requirements, and additional notes for comprehensive orchestration guidance.

Examples:

Formatting orchestrator examples:

examples = [example1, example2, example3]
formatted = OrchestratorExample.format_examples_for_prompt(examples)
# Returns: "

Example Step Planning:

1. Scenario… …”

See also

format_for_prompt() : Individual example formatting method

Return type:

str

__init__(step, scenario_description, context_requirements=None, notes=None)#
class framework.base.OrchestratorGuide(*, instructions, examples=<factory>, priority=0)[source]#

Bases: BaseModel

Comprehensive orchestration guide with examples and priority-based ordering.

This Pydantic model provides complete guidance for orchestration systems on how to effectively plan and execute capabilities. It includes detailed instructions, rich examples, and priority settings that enable sophisticated execution planning and capability coordination throughout the framework.

OrchestratorGuide enables intelligent orchestration by providing: 1. Planning Instructions: Clear guidance on when and how to use capabilities 2. Rich Examples: Detailed execution step examples with context requirements 3. Priority Ordering: Configurable priority for guide concatenation and selection 4. Context Specification: Clear requirements for successful capability execution 5. Framework Integration: Seamless integration with orchestration infrastructure

The guide system ensures effective execution planning by providing orchestrators with comprehensive context about capability usage patterns, requirements, and best practices. This enables more accurate execution plan generation and reduces planning errors that could impact system performance.

Parameters:
  • instructions (str) – Detailed orchestration instructions for capability usage

  • examples (List[OrchestratorExample]) – Rich examples demonstrating effective capability execution planning

  • priority (int) – Priority for guide ordering during concatenation (lower values first)

Note

Priority values control the order in which guides are presented when multiple capabilities provide orchestration guidance. Lower values appear first, allowing critical capabilities to provide primary guidance.

Warning

Orchestration guidance directly impacts execution plan quality. Ensure instructions are comprehensive and examples represent realistic usage patterns to maintain effective execution planning.

Examples

Data analysis capability orchestration guide:

guide = OrchestratorGuide(
    instructions="Use for statistical analysis of numerical data sets",
    examples=[
        OrchestratorExample(
            step=PlannedStep(
                context_key="analysis_results",
                capability="statistical_analysis",
                task_objective="Analyze sensor data for trends and anomalies",
                success_criteria="Statistical summary with trend analysis complete",
                expected_output="ANALYSIS_RESULTS",
                inputs=[{"SENSOR_DATA": "sensor_readings"}]
            ),
            scenario_description="When user requests data analysis or trend identification",
            context_requirements={"SENSOR_DATA": "Numerical time series data"}
        )
    ],
    priority=10
)

See also

OrchestratorExample : Rich examples for execution step planning PlannedStep : Execution step structure and requirements framework.infrastructure.orchestration : Orchestration system integration

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

instructions: str#
examples: List[OrchestratorExample]#
priority: int#
model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class framework.base.ClassifierExample(query, result, reason)[source]#

Bases: BaseExample

Example for few-shot learning in classifiers.

This class represents training examples used for few-shot learning in classification tasks. Each example contains a query, expected result, and reasoning to help the classifier learn decision patterns.

Parameters:
  • query (str) – Input query text to be classified

  • result (bool) – Expected boolean classification result

  • reason (str) – Explanation of why this classification is correct

query: str#
result: bool#
reason: str#
format_for_prompt()[source]#

Format this classifier example for few-shot learning with complete context.

This method transforms the classifier example into the standard format used for few-shot learning in classification tasks. It provides the complete query/result/reason triplet that enables LLM-based classifiers to learn effective decision patterns through example-based training.

The formatting follows the established pattern for classifier training: - Query: The input text to be classified - Expected Output: The correct boolean classification result - Reason: The logical justification for the classification decision

Returns:

Formatted string with query, expected result, and reasoning for few-shot learning

Return type:

str

Note

The format is specifically optimized for LLM consumption in few-shot learning scenarios. The consistent structure enables reliable pattern recognition and classification performance.

Examples

Formatted classifier example output:

example = ClassifierExample(
    query="What's the weather like?",
    result=True,
    reason="Direct weather information request"
)
formatted = example.format_for_prompt()
# Returns: 'User Query: "What's the weather like?" -> Expected Output: True -> Reason: Direct weather information request'

See also

format_examples_for_prompt() : Batch formatting with bias prevention

static format_examples_for_prompt(examples)[source]#

Format multiple classifier examples with randomization for bias-free few-shot learning.

This method transforms a collection of ClassifierExample objects into a formatted string optimized for few-shot learning in classification tasks. It includes automatic randomization to prevent positional bias, ensuring that classifiers learn from content rather than example ordering patterns.

The formatting implements bias prevention strategies: 1. Automatic Randomization: Examples are shuffled to prevent position-based learning 2. Consistent Format: Uniform query/result/reason structure for reliable learning 3. Complete Context: Full reasoning provided for each classification decision 4. Optimal Presentation: Format optimized for LLM few-shot learning performance

param examples:

List of classifier examples to format with bias prevention

type examples:

List[ClassifierExample]

return:

Formatted string with randomized examples for reliable classification learning

rtype:

str

Note

The randomization creates a shallow copy of the input list, leaving the original list unchanged. This ensures thread safety and prevents side effects in concurrent usage scenarios.

Warning

The randomization is applied each time this method is called. For deterministic behavior in testing, consider using a fixed seed or pre-shuffled examples.

Example:

examples = [example1, example2, example3]
formatted = ClassifierExample.format_examples_for_prompt(examples)
# Returns randomized examples like:
# "  - User Query: "..." -> Expected Output: True -> Reason: ...

See also

format_for_prompt() : Individual example formatting method random : Randomization implementation for bias prevention

Return type:

str

__init__(query, result, reason)#
class framework.base.TaskClassifierGuide(*, instructions, examples=<factory>, actions_if_true=<factory>)[source]#

Bases: BaseModel

Comprehensive guide for task classification with few-shot learning support.

This Pydantic model provides complete guidance for task classification systems including classification instructions, training examples, and action specifications. It serves as the primary configuration mechanism for capability-specific classification that enables intelligent routing and task analysis throughout the framework.

TaskClassifierGuide enables sophisticated classification by providing: 1. Classification Instructions: Clear guidance on when to activate capabilities 2. Few-Shot Training: Curated examples for reliable classification learning 3. Action Specification: Automated responses to positive classifications 4. Bias Prevention: Randomized example presentation to prevent positional bias 5. Framework Integration: Seamless integration with classification infrastructure

The guide system ensures consistent and accurate capability selection by providing LLM-based classifiers with comprehensive context and training examples. This enables reliable task routing and reduces classification errors that could lead to incorrect capability activation.

Parameters:
  • instructions (str) – Detailed classification instructions specifying when to activate

  • examples (List[ClassifierExample]) – Training examples for few-shot learning with query/result/reason triplets

  • actions_if_true (ClassifierActions) – Action specifications for positive classification results

Note

The examples list is automatically randomized during prompt formatting to prevent positional bias in few-shot learning. This ensures more reliable classification performance.

Warning

Classification accuracy directly impacts system behavior through capability routing. Ensure instructions are clear and examples are representative to maintain reliable task classification.

Examples

Weather capability classification guide:

guide = TaskClassifierGuide(
    instructions="Activate when user requests weather information or forecasts",
    examples=[
        ClassifierExample(
            query="What's the weather like today?",
            result=True,
            reason="Direct weather information request"
        ),
        ClassifierExample(
            query="Should I bring an umbrella?",
            result=True,
            reason="Weather-dependent decision requiring forecast"
        ),
        ClassifierExample(
            query="What time is it?",
            result=False,
            reason="Time request, not weather-related"
        )
    ]
)

See also

ClassifierExample : Individual training examples for few-shot learning ClassifierActions : Action specifications for positive matches framework.infrastructure.classifier : Classification system integration

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

instructions: str#
examples: List[ClassifierExample]#
actions_if_true: ClassifierActions#
model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class framework.base.ClassifierActions[source]#

Bases: BaseModel

Action specification for classifier match responses with extensible design.

This Pydantic model defines actions that should be executed when a task classifier returns a positive match for a capability. It provides an extensible framework for defining automated responses to classification results, enabling sophisticated workflow automation based on task analysis.

The ClassifierActions system enables: 1. Automated Workflows: Define actions triggered by positive classifications 2. Response Coordination: Specify how the system should respond to matches 3. Future Extensibility: Placeholder for advanced action specifications 4. Integration Points: Clear interfaces for action execution systems

Currently serves as a foundational placeholder that can be extended with specific action types as the classification system evolves. Future implementations may include routing specifications, parameter configurations, or execution priority settings.

Note

This is currently a placeholder class designed for future extensibility. The structure provides a foundation for implementing sophisticated action systems based on classification results.

See also

TaskClassifierGuide : Classification guidance using action specifications CapabilityMatch : Classification results that trigger actions

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

See also

State and Context Management

State and context management systems used by components

Registry System

Registry system for component management

Configuration System

Configuration system for component settings

Registry and Discovery

Complete guide to component registration patterns