Recovery Coordination#
Router-based recovery strategies and infrastructure error coordination.
The framework implements intelligent recovery coordination through a centralized router system that automatically determines recovery strategies based on error classification. The system coordinates between retry mechanisms, orchestrator replanning, and graceful termination.
Router Recovery System#
Router Conditional Edge#
- framework.infrastructure.router_node.router_conditional_edge(state)[source]#
LangGraph conditional edge function for dynamic routing.
This is the main export of this module - a pure conditional edge function that determines which node should execute next based on agent state.
Follows LangGraph native patterns where conditional edge functions take only the state parameter and handle logging internally.
Manual retry handling: - Checks for errors and retry count first - Routes retriable errors back to same capability if retries available - Routes to error node when retries exhausted - Routes critical/replanning errors immediately
- Parameters:
state (AgentState) – Current agent state containing all execution context
- Returns:
Name of next node to execute or “END” to terminate
- Return type:
str
Central recovery coordination implementing the complete recovery strategy.
Recovery Flow:
Manual Retry Handling (checked first): - RETRIABLE errors: retry with backoff if attempts remain - REPLANNING errors: route to orchestrator if planning attempts remain - RECLASSIFICATION errors: route to classifier if reclassification attempts remain - CRITICAL errors: route to error node immediately
Normal Routing Logic: - Check execution state and route to next capability or termination
RouterNode Infrastructure#
- class framework.infrastructure.router_node.RouterNode[source]#
Bases:
BaseInfrastructureNode
Central routing decision node for the Alpha Berkeley Agent Framework.
This node serves as the single decision-making authority that determines what should happen next based on the current agent state. It does no business logic - only routing decisions and metadata management.
The actual routing is handled by the router_conditional_edge function.
Infrastructure node that coordinates routing decisions and state management.
- name: str = 'router'#
- description: str = 'Central routing decision authority'#
- async static execute(state, **kwargs)[source]#
Router node execution - updates routing metadata only.
This node serves as the entry point and routing hub, but does no routing logic itself. The actual routing decision is made by the conditional edge function. This keeps the logic DRY and avoids duplication.
- Parameters:
state (AgentState) – Current agent state
kwargs – Additional LangGraph parameters
- Returns:
Dictionary of state updates for routing metadata
- Return type:
Dict[str, Any]
- async langgraph_node(**kwargs)#
LangGraph-native node function with manual error handling.
This function is called by LangGraph during execution. Infrastructure nodes now use get_stream_writer() and get_config() directly for pure LangGraph integration.
- Parameters:
state (AgentState) – Current agent state
kwargs – Additional parameters from LangGraph
- Returns:
State updates dictionary
- Return type:
Dict[str, Any]
Orchestrator Replanning#
OrchestrationNode#
- async static OrchestrationNode.execute(state, **kwargs)[source]#
Create execution plans with LangGraph native interrupt support.
This implementation creates execution plans from task requirements and handles planning mode with native LangGraph interrupts for approval workflows.
- Parameters:
state (AgentState) – Current agent state
kwargs – Additional LangGraph parameters
- Returns:
Dictionary of state updates for LangGraph
- Return type:
Dict[str, Any]
Handles replanning when REPLANNING errors are encountered:
Plan Creation: Generate new execution plan based on current task
Capability Validation: Ensure all planned capabilities exist
State Updates: Clear error state and increment plan counter
Limits: Respect maximum planning attempts to prevent infinite loops
Classifier Reclassification#
ClassificationNode#
- async static ClassificationNode.execute(state, **kwargs)[source]#
Main classification logic with sophisticated capability selection and reclassification handling.
Analyzes user tasks and selects appropriate capabilities using parallel LLM-based classification. Handles both initial classification and reclassification scenarios with state preservation.
- Parameters:
state (AgentState) – Current agent state
kwargs – Additional LangGraph parameters
- Returns:
Dictionary of state updates for LangGraph
- Return type:
Dict[str, Any]
Handles reclassification when RECLASSIFICATION errors are encountered:
Context Awareness: Uses previous failure context for improved classification
Capability Reselection: Analyzes available capabilities with failure context
State Updates: Clear reclassification flags and increment counters
Limits: Respect maximum reclassification attempts (configurable via
max_reclassifications
)Fresh Analysis: Resets planning state for completely new capability selection
Error Response Generation#
ErrorNode System#
- class framework.infrastructure.error_node.ErrorNode[source]#
Bases:
BaseInfrastructureNode
Generate comprehensive, user-friendly error responses with intelligent analysis.
The ErrorNode serves as the centralized error response generation system for the Alpha Berkeley Agent Framework. It transforms technical error information into comprehensive user responses by combining structured factual reports with context-aware LLM analysis and recovery suggestions.
This infrastructure node operates as the final destination in the error handling pipeline, ensuring that all system failures result in meaningful, actionable information for users. The node implements a robust two-phase approach to error response generation with multiple fallback mechanisms to guarantee response delivery even under adverse conditions.
- Architecture Overview:
The error response generation follows a structured two-phase approach:
Structured Report Generation: - Extracts error details from agent state control_error_info - Formats using ErrorClassification.format_for_llm() for consistency - Adds execution statistics, timing data, and retry information - Generates step-by-step execution summaries with success/failure tracking
LLM Analysis Phase: - Provides error context and available system capabilities to LLM - Generates intelligent explanations of failure causes - Produces context-aware recovery suggestions and next steps - Integrates with framework prompt system for consistent analysis quality
- Error Recovery Strategy:
The node implements multiple layers of error handling to ensure reliability: - Comprehensive fallback response if LLM generation fails - Self-classification of internal errors as FATAL to prevent infinite loops - Structured logging of all error generation attempts for monitoring - Guaranteed response delivery through robust exception handling
- Integration Points:
Input: Pre-classified errors from capability decorators via agent state
Streaming: Real-time progress updates through LangGraph streaming system
Output: AIMessage objects formatted for direct user presentation
Monitoring: Comprehensive logging integration for operational visibility
Warning
The ErrorNode must never raise unhandled exceptions as it serves as the final error handling mechanism. All internal errors are caught and result in structured fallback responses.
Note
Error classification within this node always uses FATAL severity to prevent recursive error handling that could lead to infinite loops or system instability.
Examples
The ErrorNode is typically invoked automatically by the framework, but can be tested with manual state construction:
>>> from framework.state import AgentState >>> from framework.base.errors import ErrorClassification, ErrorSeverity >>> >>> # Construct agent state with error information >>> state = AgentState() >>> state['control_error_info'] = { ... 'classification': ErrorClassification( ... severity=ErrorSeverity.CRITICAL, ... user_message="Database connection timeout", ... metadata={"timeout": 30, "host": "db.example.com"} ... ), ... 'capability_name': 'database_query', ... 'execution_time': 31.5 ... } >>> state['task_current_task'] = "Retrieve user profile data" >>> >>> # Execute error response generation >>> result = await ErrorNode.execute(state) >>> print(f"Response type: {type(result['messages'][0])}") <class 'langchain_core.messages.ai.AIMessage'>
Framework integration through error decorator:
>>> @capability("database_operations") ... async def query_user_data(user_id: int, state: AgentState): ... # This will automatically route to ErrorNode on failure ... connection = await get_db_connection() ... return await connection.fetch_user(user_id)
See also
ErrorContext
: Data structure for error response generationframework.base.errors.ErrorClassification
: Error classification systemframework.base.decorators.capability()
: Capability decorator with error handlingframework.state.AgentState
: Agent state management systemProvides comprehensive error handling with LLM-powered analysis and structured error reports.
- name: str = 'error'#
- description: str = 'Error Response Generation'#
- static classify_error(exc, context)[source]#
Classify internal ErrorNode failures with FATAL severity to prevent infinite loops.
This method handles the critical responsibility of classifying errors that occur within the error response generation system itself. All such errors are automatically classified as FATAL to ensure clean termination and prevent recursive error handling scenarios that could destabilize the entire system.
The FATAL classification ensures that if the error response generation mechanism fails, execution terminates immediately rather than attempting additional error recovery operations that could compound the original problem or create infinite error handling loops.
- Parameters:
exc (Exception) – Exception that occurred during error response generation process
context (dict) – Execution context containing node information, timing data, and state
- Returns:
Error classification with FATAL severity and diagnostic metadata
- Return type:
Warning
This method should only be called by the framework’s error handling system. Manual invocation could disrupt the error classification hierarchy.
Note
The FATAL severity ensures immediate execution termination without further error recovery attempts, preventing system instability.
Examples
Framework automatic error classification:
>>> try: ... # ErrorNode internal operation fails ... await ErrorNode.execute(state) ... except Exception as e: ... classification = ErrorNode.classify_error(e, context) ... print(f"Severity: {classification.severity.value}") fatal
Error classification structure:
>>> context = {"node_name": "error", "execution_time": 1.2} >>> exc = RuntimeError("LLM generation failed") >>> classification = ErrorNode.classify_error(exc, context) >>> print(classification.metadata["technical_details"]) Error node failure: LLM generation failed
- async static execute(state, **kwargs)[source]#
Generate comprehensive error response with structured analysis and LLM insights.
This method orchestrates the complete error response generation pipeline, transforming technical error information into user-friendly responses with actionable recovery suggestions. The process combines factual error reporting with intelligent analysis to provide maximum value to users encountering issues.
The execution follows a carefully designed two-phase approach that ensures robust error handling even when components of the error generation system itself experience failures. Streaming progress updates keep users informed during the response generation process.
- Processing Pipeline:
Context Extraction: Reads error details from agent state including error classification, execution statistics, and step-by-step history
Context Population: Enriches error context with execution timeline, successful operations, and failure categorization
Structured Report Generation: Creates factual error report using standardized formatting and execution statistics
LLM Analysis: Generates intelligent explanations and recovery suggestions based on error context and available capabilities
Response Assembly: Combines structured report with LLM analysis into coherent user response
- Error Handling Strategy:
Comprehensive exception handling prevents method failure
Streaming progress updates provide real-time feedback
Automatic fallback to structured response if LLM generation fails
All failures logged for operational monitoring and debugging
- Parameters:
state (AgentState) – Agent state containing error information in control_error_info field
kwargs (dict) – Additional LangGraph execution parameters including config and streaming
- Returns:
Dictionary containing AIMessage with formatted error response for user presentation
- Return type:
dict[str, list[AIMessage]]
Note
This method is designed to never raise exceptions. All internal errors result in structured fallback responses to ensure users receive meaningful information regardless of system state.
Warning
The method expects error information to be present in state[‘control_error_info’]. Missing error information will result in fallback responses with generic messaging.
Examples
Standard error response generation:
>>> from framework.state import AgentState >>> from framework.base.errors import ErrorClassification, ErrorSeverity >>> >>> # Prepare agent state with error information >>> state = AgentState() >>> state['control_error_info'] = { ... 'classification': ErrorClassification( ... severity=ErrorSeverity.REPLANNING, ... user_message="API rate limit exceeded", ... metadata={"retry_after": 300} ... ), ... 'capability_name': 'external_api_call', ... 'execution_time': 5.2 ... } >>> state['task_current_task'] = "Fetch weather data" >>> >>> # Generate error response >>> result = await ErrorNode.execute(state) >>> message = result['messages'][0] >>> print(f"Response length: {len(message.content)} characters") Response length: 847 characters
Error response with execution history:
>>> state['execution_step_results'] = { ... 'step_0': { ... 'step_index': 0, ... 'capability': 'input_validation', ... 'task_objective': 'Validate API parameters', ... 'success': True ... }, ... 'step_1': { ... 'step_index': 1, ... 'capability': 'external_api_call', ... 'task_objective': 'Fetch weather data', ... 'success': False ... } ... } >>> result = await ErrorNode.execute(state) >>> # Response includes execution summary with successful/failed steps
See also
_create_error_context_from_state()
: Error context extraction_generate_error_response()
: Response generation pipelineAIMessage
: Response message format
- async langgraph_node(**kwargs)#
LangGraph-native node function with manual error handling.
This function is called by LangGraph during execution. Infrastructure nodes now use get_stream_writer() and get_config() directly for pure LangGraph integration.
- Parameters:
state (AgentState) – Current agent state
kwargs – Additional parameters from LangGraph
- Returns:
State updates dictionary
- Return type:
Dict[str, Any]
Infrastructure Error Classification#
Classification Node#
- static ClassificationNode.classify_error(exc, context)[source]#
Built-in error classification for classifier operations.
- Parameters:
exc (Exception) – Exception that occurred
context (Dict[str, Any]) – Error context information
- Returns:
Classification with severity and retry guidance
- Return type:
Built-in error classification for classifier operations with LLM-aware retry logic.
- static ClassificationNode.get_retry_policy()[source]#
Custom retry policy for LLM-based classification operations.
Classification uses parallel LLM calls for capability selection and can be flaky due to: - Multiple concurrent LLM requests - Network timeouts to LLM services - LLM provider rate limiting - Classification model variability
Use more attempts with moderate delays for better reliability.
Custom retry policy for LLM-based classification operations.
- Return type:
Dict[str, Any]
See also
- Classification System
Error classification and severity management
- Exception Reference
Complete exception hierarchy with inheritance structure
- Execution Control
ErrorNode, RouterNode, and complete execution control flow