"""Base Infrastructure Classes - System Component Architecture
This module provides the BaseInfrastructureNode class and supporting components
for building infrastructure nodes in the ALS Expert framework. Infrastructure
nodes handle system-level operations like orchestration, classification, routing,
and monitoring that are essential for agent operation but distinct from business
logic capabilities.
The infrastructure architecture emphasizes simplicity, reliability, and fast
failure detection. Infrastructure nodes are designed to fail fast with clear
error messages rather than attempting complex recovery strategies, since they
handle system-critical functions that require immediate attention.
Key Infrastructure Components:
- Task extraction and processing
- Request classification and routing
- Execution orchestration and planning
- Error handling and recovery coordination
- State management and validation
- Monitoring and performance tracking
The infrastructure pattern follows these principles:
1. **Convention-based validation**: Required components enforced through reflection
2. **LangGraph-native integration**: Full streaming, configuration, and checkpoint support
3. **Fast failure detection**: Conservative error handling with immediate failure
4. **State management**: Pure dictionary operations for LangGraph compatibility
5. **Execution tracking**: Comprehensive timing and performance monitoring
.. note::
Infrastructure nodes use the @infrastructure_node decorator for LangGraph
integration and should focus on orchestration rather than business logic.
.. warning::
Infrastructure nodes use conservative retry policies since they handle
system-critical functions. Most errors are treated as critical by default.
.. seealso::
:func:`infrastructure_node` : Decorator for LangGraph integration
:class:`BaseCapability` : Business logic component base class
:mod:`framework.state` : State management and structure definitions
"""
from abc import ABC, abstractmethod
from typing import Optional, Dict, Any, TYPE_CHECKING
from framework.base.errors import ErrorClassification, ErrorSeverity
# Import types for type hints
if TYPE_CHECKING:
from framework.state import AgentState
[docs]
class BaseInfrastructureNode(ABC):
"""Base class for infrastructure nodes in the LangGraph-native architecture.
This class provides the foundation for all infrastructure components in the
ALS Expert framework. Infrastructure nodes handle system-level operations
that orchestrate, route, classify, and monitor agent execution. Unlike
capabilities which contain business logic, infrastructure nodes focus on
system coordination and management.
The BaseInfrastructureNode class enforces a strict contract through
reflection-based validation and provides standardized integration with
LangGraph's execution model. Infrastructure nodes are designed for fast
failure detection and minimal retry attempts since they handle system-critical
functions.
Infrastructure Node Responsibilities:
- **Task Extraction**: Parse and structure user requests into actionable tasks
- **Classification**: Determine which capabilities should handle specific requests
- **Orchestration**: Plan and coordinate execution sequences across capabilities
- **Routing**: Direct execution flow based on state conditions and results
- **Monitoring**: Track execution progress and system health
- **Error Coordination**: Handle system-level error recovery and routing
Required Components (enforced at decoration time):
- name: Infrastructure node identifier for routing and logging
- description: Human-readable description for documentation
- execute(): Async static method containing orchestration logic
- classify_error(): Error classification method (inherited or custom)
- get_retry_policy(): Retry configuration method (inherited or custom)
Architecture Integration:
Infrastructure nodes integrate with the framework through:
1. **LangGraph Integration**: Via @infrastructure_node decorator
2. **State Management**: Pure dictionary operations for serialization
3. **Error Handling**: Conservative policies with fast failure detection
4. **Streaming**: Native LangGraph streaming for real-time updates
5. **Configuration**: Access to LangGraph's configuration system
Example::
@infrastructure_node
class TaskExtractionNode(BaseInfrastructureNode):
name = "task_extraction"
description = "Task Extraction and Processing"
@staticmethod
async def execute(state: AgentState, **kwargs) -> Dict[str, Any]:
# Explicit logger retrieval - professional practice
from configs.logger import get_logger
logger = get_logger("framework", "task_extraction")
# Use get_stream_writer() for pure LangGraph streaming
from langgraph.config import get_stream_writer
streaming = get_stream_writer()
if streaming:
streaming({"event_type": "status", "message": "Extracting task", "progress": 0.3})
logger.info("Starting task extraction")
# Extract and process task from flat state structure
task = state.get("task_current_task", "")
if streaming:
streaming({"event_type": "status", "message": "Task extraction complete", "progress": 1.0, "complete": True})
# Return state updates for flat structure
return {
"task_current_task": task,
"task_depends_on_chat_history": True,
"task_depends_on_user_memory": False
}
@staticmethod
def classify_error(exc: Exception, context: dict) -> ErrorClassification:
# Optional error classification
return ErrorClassification(severity=ErrorSeverity.RETRIABLE, ...)
:param name: Infrastructure node identifier (required class attribute)
:type name: str
:param description: Human-readable description (required class attribute)
:type description: str
.. note::
Infrastructure nodes use the @infrastructure_node decorator which handles
all LangGraph integration, parameter injection, and error handling.
.. warning::
The name and description class attributes are required. The execute method
must be implemented as a static method and should return state updates.
"""
# Required class attributes - must be overridden in subclasses
name: str = None
description: str = None
[docs]
@staticmethod
@abstractmethod
async def execute(
state: 'AgentState',
**kwargs
) -> Dict[str, Any]:
"""Execute the infrastructure operation with comprehensive system coordination.
This is the core method that all infrastructure nodes must implement.
It contains the orchestration, routing, or monitoring logic and integrates
with the framework's state management system. The method should be
implemented as a static method to support LangGraph's execution model.
Infrastructure nodes should focus on system coordination rather than
business logic. They receive the complete agent state and return updates
that LangGraph automatically merges. The @infrastructure_node decorator
provides timing, error handling, and execution tracking.
Common Infrastructure Patterns:
1. **Task Extraction**: Parse user input into structured task information
2. **Classification**: Analyze requests to determine capability routing
3. **Orchestration**: Create execution plans with capability sequences
4. **Routing**: Direct flow based on state conditions and results
5. **Monitoring**: Track progress and system health metrics
:param state: Current agent state containing all execution context and data
:type state: AgentState
:param kwargs: Additional parameters including logger and configuration
:type kwargs: dict
:return: Dictionary of state updates for LangGraph to merge into agent state
:rtype: Dict[str, Any]
:raises NotImplementedError: This is an abstract method that must be implemented
:raises ValidationError: If required state data is missing or invalid
:raises InfrastructureError: For infrastructure-specific operation failures
Example::
@staticmethod
async def execute(state: AgentState, **kwargs) -> Dict[str, Any]:
# Explicit logger retrieval - professional practice
from configs.logger import get_logger
logger = get_logger("framework", "orchestrator")
# Define streaming helper here for step awareness
from configs.streaming import get_streamer
streamer = get_streamer("framework", "orchestrator", state)
streamer.status("Starting orchestration")
logger.info("Starting execution planning")
# Infrastructure logic
plan = create_execution_plan(state)
streamer.status("Orchestration complete")
logger.info("Execution plan created")
# Return state updates
return {
"planning_execution_plan": plan,
"planning_ready_for_execution": True
}
.. note::
Infrastructure nodes should focus on orchestration, routing, and state
management logic. Retrieve loggers explicitly using get_logger() for
professional code. The @infrastructure_node decorator handles timing,
error handling, and retry policies. Use get_streamer() for streaming updates.
"""
pass
[docs]
@staticmethod
def classify_error(exc: Exception, context: dict) -> 'ErrorClassification':
"""Classify errors for infrastructure-specific error handling and recovery.
This method provides default error classification for all infrastructure
nodes with a conservative approach that treats most errors as critical.
Infrastructure nodes handle system-critical functions like orchestration
and routing, so failures typically require immediate attention rather than
automatic retry attempts.
The default implementation prioritizes system stability by failing fast
with clear error messages. Subclasses should override this method only
when specific infrastructure components can benefit from retry logic
(e.g., LLM-based orchestrators that may encounter temporary API issues).
:param exc: The exception that occurred during infrastructure operation
:type exc: Exception
:param context: Error context including node info, execution state, and timing
:type context: dict
:return: Error classification with severity and recovery strategy
:rtype: ErrorClassification
.. note::
The context dictionary includes:
- ``infrastructure_node``: node name for identification
- ``execution_time``: time spent before failure
- ``current_state``: agent state at time of error
Example::
@staticmethod
def classify_error(exc: Exception, context: dict) -> ErrorClassification:
# Retry network timeouts for LLM-based infrastructure
if isinstance(exc, (ConnectionError, TimeoutError)):
return ErrorClassification(
severity=ErrorSeverity.RETRIABLE,
user_message="Network timeout, retrying...",
metadata={"technical_details": str(exc)}
)
return ErrorClassification(
severity=ErrorSeverity.CRITICAL,
user_message=f"Infrastructure error: {exc}",
metadata={"technical_details": str(exc)}
)
.. note::
Infrastructure nodes should generally fail fast, so the default
implementation treats most errors as critical. Override this method
for infrastructure that can benefit from retries (e.g., LLM-based nodes).
"""
node_name = context.get('infrastructure_node', 'unknown_infrastructure_node')
return ErrorClassification(
severity=ErrorSeverity.CRITICAL,
user_message=f"Infrastructure error in {node_name}: {exc}",
metadata={"technical_details": str(exc)}
)
[docs]
@staticmethod
def get_retry_policy() -> Dict[str, Any]:
"""Get conservative retry policy configuration for infrastructure operations.
This method provides retry configuration optimized for infrastructure
nodes that handle system-critical functions. The default policy uses
conservative settings with minimal retry attempts and fast failure
detection to maintain system stability.
Infrastructure nodes should generally fail fast rather than retry
extensively, since failures often indicate system-level issues that
require immediate attention. Override this method only for specific
infrastructure components that can benefit from retry logic.
:return: Dictionary containing conservative retry configuration parameters
:rtype: Dict[str, Any]
.. note::
Infrastructure default policy: 2 attempts, 0.2s delay, minimal backoff.
This prioritizes fast failure detection over retry persistence.
Example::
@staticmethod
def get_retry_policy() -> Dict[str, Any]:
return {
"max_attempts": 3, # More retries for LLM-based infrastructure
"delay_seconds": 1.0, # Longer delay for external service calls
"backoff_factor": 2.0 # Exponential backoff
}
.. note::
The router uses this configuration to determine retry behavior.
Infrastructure default: 2 attempts, 0.2s delay, minimal backoff.
"""
return {
"max_attempts": 2, # Conservative for infrastructure
"delay_seconds": 0.2, # Fast retry for infrastructure
"backoff_factor": 1.2 # Minimal backoff
}
[docs]
def __repr__(self) -> str:
"""Return a string representation of the infrastructure node for debugging.
Provides a concise string representation that includes both the Python
class name and the infrastructure node's registered name. This is useful
for debugging, logging, and development workflows where infrastructure
nodes need to be identified clearly.
:return: String representation including class name and node name
:rtype: str
Example:
>>> node = TaskExtractionNode()
>>> repr(node)
'<TaskExtractionNode: task_extraction>'
.. note::
The format follows the pattern '<ClassName: node_name>' for
consistency across all framework components.
"""
return f"<{self.__class__.__name__}: {self.name}>"