Execution Control#
Infrastructure components that control the flow of execution including routing decisions, error handling, and conditional logic.
Router Node#
- class framework.infrastructure.router_node.RouterNode[source]#
Bases:
BaseInfrastructureNode
Central routing decision node for the Alpha Berkeley Agent Framework.
This node serves as the single decision-making authority that determines what should happen next based on the current agent state. It does no business logic - only routing decisions and metadata management.
The actual routing is handled by the router_conditional_edge function.
- name: str = 'router'#
- description: str = 'Central routing decision authority'#
- async static execute(state, **kwargs)[source]#
Router node execution - updates routing metadata only.
This node serves as the entry point and routing hub, but does no routing logic itself. The actual routing decision is made by the conditional edge function. This keeps the logic DRY and avoids duplication.
- Parameters:
state (AgentState) – Current agent state
kwargs – Additional LangGraph parameters
- Returns:
Dictionary of state updates for routing metadata
- Return type:
Dict[str, Any]
- __repr__()#
Return a string representation of the infrastructure node for debugging.
Provides a concise string representation that includes both the Python class name and the infrastructure node’s registered name. This is useful for debugging, logging, and development workflows where infrastructure nodes need to be identified clearly.
- Returns:
String representation including class name and node name
- Return type:
str
Example
>>> node = TaskExtractionNode() >>> repr(node) '<TaskExtractionNode: task_extraction>'
Note
The format follows the pattern ‘<ClassName: node_name>’ for consistency across all framework components.
- static classify_error(exc, context)#
Classify errors for infrastructure-specific error handling and recovery.
This method provides default error classification for all infrastructure nodes with a conservative approach that treats most errors as critical. Infrastructure nodes handle system-critical functions like orchestration and routing, so failures typically require immediate attention rather than automatic retry attempts.
The default implementation prioritizes system stability by failing fast with clear error messages. Subclasses should override this method only when specific infrastructure components can benefit from retry logic (e.g., LLM-based orchestrators that may encounter temporary API issues).
- Parameters:
exc (Exception) – The exception that occurred during infrastructure operation
context (dict) – Error context including node info, execution state, and timing
- Returns:
Error classification with severity and recovery strategy
- Return type:
Note
The context dictionary includes:
infrastructure_node
: node name for identificationexecution_time
: time spent before failurecurrent_state
: agent state at time of error
Example:
@staticmethod def classify_error(exc: Exception, context: dict) -> ErrorClassification: # Retry network timeouts for LLM-based infrastructure if isinstance(exc, (ConnectionError, TimeoutError)): return ErrorClassification( severity=ErrorSeverity.RETRIABLE, user_message="Network timeout, retrying...", metadata={"technical_details": str(exc)} ) return ErrorClassification( severity=ErrorSeverity.CRITICAL, user_message=f"Infrastructure error: {exc}", metadata={"technical_details": str(exc)} )
Note
Infrastructure nodes should generally fail fast, so the default implementation treats most errors as critical. Override this method for infrastructure that can benefit from retries (e.g., LLM-based nodes).
- static get_retry_policy()#
Get conservative retry policy configuration for infrastructure operations.
This method provides retry configuration optimized for infrastructure nodes that handle system-critical functions. The default policy uses conservative settings with minimal retry attempts and fast failure detection to maintain system stability.
Infrastructure nodes should generally fail fast rather than retry extensively, since failures often indicate system-level issues that require immediate attention. Override this method only for specific infrastructure components that can benefit from retry logic.
- Returns:
Dictionary containing conservative retry configuration parameters
- Return type:
Dict[str, Any]
Note
Infrastructure default policy: 2 attempts, 0.2s delay, minimal backoff. This prioritizes fast failure detection over retry persistence.
Example:
@staticmethod def get_retry_policy() -> Dict[str, Any]: return { "max_attempts": 3, # More retries for LLM-based infrastructure "delay_seconds": 1.0, # Longer delay for external service calls "backoff_factor": 2.0 # Exponential backoff }
Note
The router uses this configuration to determine retry behavior. Infrastructure default: 2 attempts, 0.2s delay, minimal backoff.
- async langgraph_node(**kwargs)#
LangGraph-native node function with manual error handling.
This function is called by LangGraph during execution. Infrastructure nodes now use get_stream_writer() and get_config() directly for pure LangGraph integration.
- Parameters:
state (AgentState) – Current agent state
kwargs – Additional parameters from LangGraph
- Returns:
State updates dictionary
- Return type:
Dict[str, Any]
- framework.infrastructure.router_node.router_conditional_edge(state)[source]#
LangGraph conditional edge function for dynamic routing.
This is the main export of this module - a pure conditional edge function that determines which node should execute next based on agent state.
Follows LangGraph native patterns where conditional edge functions take only the state parameter and handle logging internally.
Manual retry handling: - Checks for errors and retry count first - Routes retriable errors back to same capability if retries available - Routes to error node when retries exhausted - Routes critical/replanning errors immediately
- Parameters:
state (AgentState) – Current agent state containing all execution context
- Returns:
Name of next node to execute or “END” to terminate
- Return type:
str
Error Node#
- class framework.infrastructure.error_node.ErrorNode[source]#
Bases:
BaseInfrastructureNode
Generate comprehensive, user-friendly error responses with intelligent analysis.
The ErrorNode serves as the centralized error response generation system for the Alpha Berkeley Agent Framework. It transforms technical error information into comprehensive user responses by combining structured factual reports with context-aware LLM analysis and recovery suggestions.
This infrastructure node operates as the final destination in the error handling pipeline, ensuring that all system failures result in meaningful, actionable information for users. The node implements a robust two-phase approach to error response generation with multiple fallback mechanisms to guarantee response delivery even under adverse conditions.
- Architecture Overview:
The error response generation follows a structured two-phase approach:
Structured Report Generation: - Extracts error details from agent state control_error_info - Formats using ErrorClassification.format_for_llm() for consistency - Adds execution statistics, timing data, and retry information - Generates step-by-step execution summaries with success/failure tracking
LLM Analysis Phase: - Provides error context and available system capabilities to LLM - Generates intelligent explanations of failure causes - Produces context-aware recovery suggestions and next steps - Integrates with framework prompt system for consistent analysis quality
- Error Recovery Strategy:
The node implements multiple layers of error handling to ensure reliability: - Comprehensive fallback response if LLM generation fails - Self-classification of internal errors as FATAL to prevent infinite loops - Structured logging of all error generation attempts for monitoring - Guaranteed response delivery through robust exception handling
- Integration Points:
Input: Pre-classified errors from capability decorators via agent state
Streaming: Real-time progress updates through LangGraph streaming system
Output: AIMessage objects formatted for direct user presentation
Monitoring: Comprehensive logging integration for operational visibility
Warning
The ErrorNode must never raise unhandled exceptions as it serves as the final error handling mechanism. All internal errors are caught and result in structured fallback responses.
Note
Error classification within this node always uses FATAL severity to prevent recursive error handling that could lead to infinite loops or system instability.
Examples
The ErrorNode is typically invoked automatically by the framework, but can be tested with manual state construction:
>>> from framework.state import AgentState >>> from framework.base.errors import ErrorClassification, ErrorSeverity >>> >>> # Construct agent state with error information >>> state = AgentState() >>> state['control_error_info'] = { ... 'classification': ErrorClassification( ... severity=ErrorSeverity.CRITICAL, ... user_message="Database connection timeout", ... metadata={"timeout": 30, "host": "db.example.com"} ... ), ... 'capability_name': 'database_query', ... 'execution_time': 31.5 ... } >>> state['task_current_task'] = "Retrieve user profile data" >>> >>> # Execute error response generation >>> result = await ErrorNode.execute(state) >>> print(f"Response type: {type(result['messages'][0])}") <class 'langchain_core.messages.ai.AIMessage'>
Framework integration through error decorator:
>>> @capability("database_operations") ... async def query_user_data(user_id: int, state: AgentState): ... # This will automatically route to ErrorNode on failure ... connection = await get_db_connection() ... return await connection.fetch_user(user_id)
See also
ErrorContext
: Data structure for error response generationframework.base.errors.ErrorClassification
: Error classification systemframework.base.decorators.capability()
: Capability decorator with error handlingframework.state.AgentState
: Agent state management system- name: str = 'error'#
- description: str = 'Error Response Generation'#
- static classify_error(exc, context)[source]#
Classify internal ErrorNode failures with FATAL severity to prevent infinite loops.
This method handles the critical responsibility of classifying errors that occur within the error response generation system itself. All such errors are automatically classified as FATAL to ensure clean termination and prevent recursive error handling scenarios that could destabilize the entire system.
The FATAL classification ensures that if the error response generation mechanism fails, execution terminates immediately rather than attempting additional error recovery operations that could compound the original problem or create infinite error handling loops.
- Parameters:
exc (Exception) – Exception that occurred during error response generation process
context (dict) – Execution context containing node information, timing data, and state
- Returns:
Error classification with FATAL severity and diagnostic metadata
- Return type:
Warning
This method should only be called by the framework’s error handling system. Manual invocation could disrupt the error classification hierarchy.
Note
The FATAL severity ensures immediate execution termination without further error recovery attempts, preventing system instability.
Examples
Framework automatic error classification:
>>> try: ... # ErrorNode internal operation fails ... await ErrorNode.execute(state) ... except Exception as e: ... classification = ErrorNode.classify_error(e, context) ... print(f"Severity: {classification.severity.value}") fatal
Error classification structure:
>>> context = {"node_name": "error", "execution_time": 1.2} >>> exc = RuntimeError("LLM generation failed") >>> classification = ErrorNode.classify_error(exc, context) >>> print(classification.metadata["technical_details"]) Error node failure: LLM generation failed
- async static execute(state, **kwargs)[source]#
Generate comprehensive error response with structured analysis and LLM insights.
This method orchestrates the complete error response generation pipeline, transforming technical error information into user-friendly responses with actionable recovery suggestions. The process combines factual error reporting with intelligent analysis to provide maximum value to users encountering issues.
The execution follows a carefully designed two-phase approach that ensures robust error handling even when components of the error generation system itself experience failures. Streaming progress updates keep users informed during the response generation process.
- Processing Pipeline:
Context Extraction: Reads error details from agent state including error classification, execution statistics, and step-by-step history
Context Population: Enriches error context with execution timeline, successful operations, and failure categorization
Structured Report Generation: Creates factual error report using standardized formatting and execution statistics
LLM Analysis: Generates intelligent explanations and recovery suggestions based on error context and available capabilities
Response Assembly: Combines structured report with LLM analysis into coherent user response
- Error Handling Strategy:
Comprehensive exception handling prevents method failure
Streaming progress updates provide real-time feedback
Automatic fallback to structured response if LLM generation fails
All failures logged for operational monitoring and debugging
- Parameters:
state (AgentState) – Agent state containing error information in control_error_info field
kwargs (dict) – Additional LangGraph execution parameters including config and streaming
- Returns:
Dictionary containing AIMessage with formatted error response for user presentation
- Return type:
dict[str, list[AIMessage]]
Note
This method is designed to never raise exceptions. All internal errors result in structured fallback responses to ensure users receive meaningful information regardless of system state.
Warning
The method expects error information to be present in state[‘control_error_info’]. Missing error information will result in fallback responses with generic messaging.
Examples
Standard error response generation:
>>> from framework.state import AgentState >>> from framework.base.errors import ErrorClassification, ErrorSeverity >>> >>> # Prepare agent state with error information >>> state = AgentState() >>> state['control_error_info'] = { ... 'classification': ErrorClassification( ... severity=ErrorSeverity.REPLANNING, ... user_message="API rate limit exceeded", ... metadata={"retry_after": 300} ... ), ... 'capability_name': 'external_api_call', ... 'execution_time': 5.2 ... } >>> state['task_current_task'] = "Fetch weather data" >>> >>> # Generate error response >>> result = await ErrorNode.execute(state) >>> message = result['messages'][0] >>> print(f"Response length: {len(message.content)} characters") Response length: 847 characters
Error response with execution history:
>>> state['execution_step_results'] = { ... 'step_0': { ... 'step_index': 0, ... 'capability': 'input_validation', ... 'task_objective': 'Validate API parameters', ... 'success': True ... }, ... 'step_1': { ... 'step_index': 1, ... 'capability': 'external_api_call', ... 'task_objective': 'Fetch weather data', ... 'success': False ... } ... } >>> result = await ErrorNode.execute(state) >>> # Response includes execution summary with successful/failed steps
See also
_create_error_context_from_state()
: Error context extraction_generate_error_response()
: Response generation pipelineAIMessage
: Response message format
- __repr__()#
Return a string representation of the infrastructure node for debugging.
Provides a concise string representation that includes both the Python class name and the infrastructure node’s registered name. This is useful for debugging, logging, and development workflows where infrastructure nodes need to be identified clearly.
- Returns:
String representation including class name and node name
- Return type:
str
Example
>>> node = TaskExtractionNode() >>> repr(node) '<TaskExtractionNode: task_extraction>'
Note
The format follows the pattern ‘<ClassName: node_name>’ for consistency across all framework components.
- static get_retry_policy()#
Get conservative retry policy configuration for infrastructure operations.
This method provides retry configuration optimized for infrastructure nodes that handle system-critical functions. The default policy uses conservative settings with minimal retry attempts and fast failure detection to maintain system stability.
Infrastructure nodes should generally fail fast rather than retry extensively, since failures often indicate system-level issues that require immediate attention. Override this method only for specific infrastructure components that can benefit from retry logic.
- Returns:
Dictionary containing conservative retry configuration parameters
- Return type:
Dict[str, Any]
Note
Infrastructure default policy: 2 attempts, 0.2s delay, minimal backoff. This prioritizes fast failure detection over retry persistence.
Example:
@staticmethod def get_retry_policy() -> Dict[str, Any]: return { "max_attempts": 3, # More retries for LLM-based infrastructure "delay_seconds": 1.0, # Longer delay for external service calls "backoff_factor": 2.0 # Exponential backoff }
Note
The router uses this configuration to determine retry behavior. Infrastructure default: 2 attempts, 0.2s delay, minimal backoff.
- async langgraph_node(**kwargs)#
LangGraph-native node function with manual error handling.
This function is called by LangGraph during execution. Infrastructure nodes now use get_stream_writer() and get_config() directly for pure LangGraph integration.
- Parameters:
state (AgentState) – Current agent state
kwargs – Additional parameters from LangGraph
- Returns:
State updates dictionary
- Return type:
Dict[str, Any]
- class framework.infrastructure.error_node.ErrorContext(error_classification, current_task, failed_operation, total_operations=0, execution_time=None, retry_count=None, successful_steps=None, failed_steps=None)[source]#
Bases:
object
Comprehensive error context for generating detailed error responses.
This data structure encapsulates all information required to generate meaningful error reports including original error classification, execution statistics, and step-by-step execution history. The class serves as the primary data container for the error response generation pipeline, ensuring consistent access to error details and execution context across all error handling components.
The ErrorContext integrates with the ErrorClassification system to maintain authoritative error information while adding execution-specific details such as timing, retry attempts, and step-by-step progress tracking.
- Parameters:
error_classification (ErrorClassification) – Complete error classification with severity, messages, and metadata
current_task (str) – Human-readable description of the high-level task being executed
failed_operation (str) – Specific operation or capability name that encountered the error
total_operations (int) – Total number of operations attempted in current execution cycle
execution_time (float, optional) – Duration in seconds from start to error occurrence
retry_count (int, optional) – Number of retry attempts made before final failure
successful_steps (list[str], optional) – Chronological list of execution steps that completed successfully
failed_steps (list[str], optional) – Chronological list of execution steps that failed during execution
Note
The class automatically initializes list fields to empty lists if None is provided, ensuring safe iteration over step results in error report generation.
Warning
This class is designed for read-only access during error response generation. Modifying fields after creation may lead to inconsistent error reports.
Examples
Basic error context creation:
>>> from framework.base.errors import ErrorClassification, ErrorSeverity >>> classification = ErrorClassification( ... severity=ErrorSeverity.CRITICAL, ... user_message="Database connection failed", ... metadata={"host": "db.example.com"} ... ) >>> context = ErrorContext( ... error_classification=classification, ... current_task="Fetch user data", ... failed_operation="database_query", ... execution_time=2.5 ... ) >>> print(f"Severity: {context.error_severity.value}") critical
Context with execution history:
>>> context = ErrorContext( ... error_classification=classification, ... current_task="Process user request", ... failed_operation="user_authentication", ... total_operations=3, ... successful_steps=["Step 1: Validate input", "Step 2: Load config"], ... failed_steps=["Step 3: Authenticate user - Failed"] ... ) >>> print(f"Progress: {len(context.successful_steps)}/{context.total_operations}") Progress: 2/3
See also
framework.base.errors.ErrorClassification
: Error classification systemErrorNode
: Primary consumer of ErrorContext instances- error_classification: ErrorClassification#
- current_task: str#
- failed_operation: str#
- total_operations: int = 0#
- execution_time: float | None = None#
- retry_count: int | None = None#
- successful_steps: List[str] = None#
- failed_steps: List[str] = None#
- property error_severity: ErrorSeverity#
Extract error severity level from the underlying error classification.
- Returns:
Severity level indicating error impact and recovery strategy
- Return type:
Note
Severity levels guide error handling strategy: RETRIABLE errors may be retried, REPLANNING errors require task modification, CRITICAL errors need user intervention, and FATAL errors terminate execution.
- property error_message: str#
Extract user-friendly error message from the error classification.
- Returns:
Human-readable error message suitable for user presentation, with automatic fallback to generic message if none provided
- Return type:
str
Note
This property ensures that error responses always contain a meaningful message even when the original error classification lacks user-facing text.
- property capability_name: str | None#
Extract the name of the specific capability that encountered the error.
- Returns:
Name of the failing capability if available in context, None otherwise
- Return type:
str, optional
Note
This property accesses a dynamically set attribute (_capability_name) that is populated during error context creation from agent state.
- __init__(error_classification, current_task, failed_operation, total_operations=0, execution_time=None, retry_count=None, successful_steps=None, failed_steps=None)#
Core Models#
Execution control uses models defined in the core framework:
See also
ErrorClassification
Error classification system
ErrorSeverity
Error severity levels
BaseInfrastructureNode
Base class for infrastructure components
Registration#
RouterNode is automatically registered as:
NodeRegistration(
name="router",
module_path="framework.infrastructure.router_node",
function_name="RouterNode",
description="Central routing decision authority"
)
ErrorNode is automatically registered as:
NodeRegistration(
name="error",
module_path="framework.infrastructure.error_node",
function_name="ErrorNode",
description="Error response generation"
)
See also
- Prompt System
Prompt customization system
- Error Handling
Implementation details and usage patterns