Recovery Coordination#

Router-based recovery strategies and infrastructure error coordination.

The framework implements intelligent recovery coordination through a centralized router system that automatically determines recovery strategies based on error classification. The system coordinates between retry mechanisms, orchestrator replanning, and graceful termination.

Router Recovery System#

Router Conditional Edge#

framework.infrastructure.router_node.router_conditional_edge(state)[source]#

LangGraph conditional edge function for dynamic routing.

This is the main export of this module - a pure conditional edge function that determines which node should execute next based on agent state.

Follows LangGraph native patterns where conditional edge functions take only the state parameter and handle logging internally.

Manual retry handling: - Checks for errors and retry count first - Routes retriable errors back to same capability if retries available - Routes to error node when retries exhausted - Routes critical/replanning errors immediately

Parameters:

state (AgentState) – Current agent state containing all execution context

Returns:

Name of next node to execute or “END” to terminate

Return type:

str

Central recovery coordination implementing the complete recovery strategy.

Recovery Flow:

  1. Manual Retry Handling (checked first): - RETRIABLE errors: retry with backoff if attempts remain - REPLANNING errors: route to orchestrator if planning attempts remain - RECLASSIFICATION errors: route to classifier if reclassification attempts remain - CRITICAL errors: route to error node immediately

  2. Normal Routing Logic: - Check execution state and route to next capability or termination

RouterNode Infrastructure#

class framework.infrastructure.router_node.RouterNode[source]#

Bases: BaseInfrastructureNode

Central routing decision node for the Alpha Berkeley Agent Framework.

This node serves as the single decision-making authority that determines what should happen next based on the current agent state. It does no business logic - only routing decisions and metadata management.

The actual routing is handled by the router_conditional_edge function.

Infrastructure node that coordinates routing decisions and state management.

name: str = 'router'#
description: str = 'Central routing decision authority'#
async static execute(state, **kwargs)[source]#

Router node execution - updates routing metadata only.

This node serves as the entry point and routing hub, but does no routing logic itself. The actual routing decision is made by the conditional edge function. This keeps the logic DRY and avoids duplication.

Parameters:
  • state (AgentState) – Current agent state

  • kwargs – Additional LangGraph parameters

Returns:

Dictionary of state updates for routing metadata

Return type:

Dict[str, Any]

async langgraph_node(**kwargs)#

LangGraph-native node function with manual error handling.

This function is called by LangGraph during execution. Infrastructure nodes now use get_stream_writer() and get_config() directly for pure LangGraph integration.

Parameters:
  • state (AgentState) – Current agent state

  • kwargs – Additional parameters from LangGraph

Returns:

State updates dictionary

Return type:

Dict[str, Any]

Orchestrator Replanning#

OrchestrationNode#

async static OrchestrationNode.execute(state, **kwargs)[source]#

Create execution plans with LangGraph native interrupt support.

This implementation creates execution plans from task requirements and handles planning mode with native LangGraph interrupts for approval workflows.

Parameters:
  • state (AgentState) – Current agent state

  • kwargs – Additional LangGraph parameters

Returns:

Dictionary of state updates for LangGraph

Return type:

Dict[str, Any]

Handles replanning when REPLANNING errors are encountered:

  • Plan Creation: Generate new execution plan based on current task

  • Capability Validation: Ensure all planned capabilities exist

  • State Updates: Clear error state and increment plan counter

  • Limits: Respect maximum planning attempts to prevent infinite loops

Classifier Reclassification#

ClassificationNode#

async static ClassificationNode.execute(state, **kwargs)[source]#

Main classification logic with sophisticated capability selection and reclassification handling.

Analyzes user tasks and selects appropriate capabilities using parallel LLM-based classification. Handles both initial classification and reclassification scenarios with state preservation.

Parameters:
  • state (AgentState) – Current agent state

  • kwargs – Additional LangGraph parameters

Returns:

Dictionary of state updates for LangGraph

Return type:

Dict[str, Any]

Handles reclassification when RECLASSIFICATION errors are encountered:

  • Context Awareness: Uses previous failure context for improved classification

  • Capability Reselection: Analyzes available capabilities with failure context

  • State Updates: Clear reclassification flags and increment counters

  • Limits: Respect maximum reclassification attempts (configurable via max_reclassifications)

  • Fresh Analysis: Resets planning state for completely new capability selection

Error Response Generation#

ErrorNode System#

class framework.infrastructure.error_node.ErrorNode[source]#

Bases: BaseInfrastructureNode

Generate comprehensive, user-friendly error responses with intelligent analysis.

The ErrorNode serves as the centralized error response generation system for the Alpha Berkeley Agent Framework. It transforms technical error information into comprehensive user responses by combining structured factual reports with context-aware LLM analysis and recovery suggestions.

This infrastructure node operates as the final destination in the error handling pipeline, ensuring that all system failures result in meaningful, actionable information for users. The node implements a robust two-phase approach to error response generation with multiple fallback mechanisms to guarantee response delivery even under adverse conditions.

Architecture Overview:

The error response generation follows a structured two-phase approach:

  1. Structured Report Generation: - Extracts error details from agent state control_error_info - Formats using ErrorClassification.format_for_llm() for consistency - Adds execution statistics, timing data, and retry information - Generates step-by-step execution summaries with success/failure tracking

  2. LLM Analysis Phase: - Provides error context and available system capabilities to LLM - Generates intelligent explanations of failure causes - Produces context-aware recovery suggestions and next steps - Integrates with framework prompt system for consistent analysis quality

Error Recovery Strategy:

The node implements multiple layers of error handling to ensure reliability: - Comprehensive fallback response if LLM generation fails - Self-classification of internal errors as FATAL to prevent infinite loops - Structured logging of all error generation attempts for monitoring - Guaranteed response delivery through robust exception handling

Integration Points:
  • Input: Pre-classified errors from capability decorators via agent state

  • Streaming: Real-time progress updates through LangGraph streaming system

  • Output: AIMessage objects formatted for direct user presentation

  • Monitoring: Comprehensive logging integration for operational visibility

Warning

The ErrorNode must never raise unhandled exceptions as it serves as the final error handling mechanism. All internal errors are caught and result in structured fallback responses.

Note

Error classification within this node always uses FATAL severity to prevent recursive error handling that could lead to infinite loops or system instability.

Examples

The ErrorNode is typically invoked automatically by the framework, but can be tested with manual state construction:

>>> from framework.state import AgentState
>>> from framework.base.errors import ErrorClassification, ErrorSeverity
>>>
>>> # Construct agent state with error information
>>> state = AgentState()
>>> state['control_error_info'] = {
...     'classification': ErrorClassification(
...         severity=ErrorSeverity.CRITICAL,
...         user_message="Database connection timeout",
...         metadata={"timeout": 30, "host": "db.example.com"}
...     ),
...     'capability_name': 'database_query',
...     'execution_time': 31.5
... }
>>> state['task_current_task'] = "Retrieve user profile data"
>>>
>>> # Execute error response generation
>>> result = await ErrorNode.execute(state)
>>> print(f"Response type: {type(result['messages'][0])}")
<class 'langchain_core.messages.ai.AIMessage'>

Framework integration through error decorator:

>>> @capability("database_operations")
... async def query_user_data(user_id: int, state: AgentState):
...     # This will automatically route to ErrorNode on failure
...     connection = await get_db_connection()
...     return await connection.fetch_user(user_id)

See also

ErrorContext : Data structure for error response generation framework.base.errors.ErrorClassification : Error classification system framework.base.decorators.capability() : Capability decorator with error handling framework.state.AgentState : Agent state management system

Provides comprehensive error handling with LLM-powered analysis and structured error reports.

name: str = 'error'#
description: str = 'Error Response Generation'#
static classify_error(exc, context)[source]#

Classify internal ErrorNode failures with FATAL severity to prevent infinite loops.

This method handles the critical responsibility of classifying errors that occur within the error response generation system itself. All such errors are automatically classified as FATAL to ensure clean termination and prevent recursive error handling scenarios that could destabilize the entire system.

The FATAL classification ensures that if the error response generation mechanism fails, execution terminates immediately rather than attempting additional error recovery operations that could compound the original problem or create infinite error handling loops.

Parameters:
  • exc (Exception) – Exception that occurred during error response generation process

  • context (dict) – Execution context containing node information, timing data, and state

Returns:

Error classification with FATAL severity and diagnostic metadata

Return type:

ErrorClassification

Warning

This method should only be called by the framework’s error handling system. Manual invocation could disrupt the error classification hierarchy.

Note

The FATAL severity ensures immediate execution termination without further error recovery attempts, preventing system instability.

Examples

Framework automatic error classification:

>>> try:
...     # ErrorNode internal operation fails
...     await ErrorNode.execute(state)
... except Exception as e:
...     classification = ErrorNode.classify_error(e, context)
...     print(f"Severity: {classification.severity.value}")
fatal

Error classification structure:

>>> context = {"node_name": "error", "execution_time": 1.2}
>>> exc = RuntimeError("LLM generation failed")
>>> classification = ErrorNode.classify_error(exc, context)
>>> print(classification.metadata["technical_details"])
Error node failure: LLM generation failed
async static execute(state, **kwargs)[source]#

Generate comprehensive error response with structured analysis and LLM insights.

This method orchestrates the complete error response generation pipeline, transforming technical error information into user-friendly responses with actionable recovery suggestions. The process combines factual error reporting with intelligent analysis to provide maximum value to users encountering issues.

The execution follows a carefully designed two-phase approach that ensures robust error handling even when components of the error generation system itself experience failures. Streaming progress updates keep users informed during the response generation process.

Processing Pipeline:
  1. Context Extraction: Reads error details from agent state including error classification, execution statistics, and step-by-step history

  2. Context Population: Enriches error context with execution timeline, successful operations, and failure categorization

  3. Structured Report Generation: Creates factual error report using standardized formatting and execution statistics

  4. LLM Analysis: Generates intelligent explanations and recovery suggestions based on error context and available capabilities

  5. Response Assembly: Combines structured report with LLM analysis into coherent user response

Error Handling Strategy:
  • Comprehensive exception handling prevents method failure

  • Streaming progress updates provide real-time feedback

  • Automatic fallback to structured response if LLM generation fails

  • All failures logged for operational monitoring and debugging

Parameters:
  • state (AgentState) – Agent state containing error information in control_error_info field

  • kwargs (dict) – Additional LangGraph execution parameters including config and streaming

Returns:

Dictionary containing AIMessage with formatted error response for user presentation

Return type:

dict[str, list[AIMessage]]

Note

This method is designed to never raise exceptions. All internal errors result in structured fallback responses to ensure users receive meaningful information regardless of system state.

Warning

The method expects error information to be present in state[‘control_error_info’]. Missing error information will result in fallback responses with generic messaging.

Examples

Standard error response generation:

>>> from framework.state import AgentState
>>> from framework.base.errors import ErrorClassification, ErrorSeverity
>>>
>>> # Prepare agent state with error information
>>> state = AgentState()
>>> state['control_error_info'] = {
...     'classification': ErrorClassification(
...         severity=ErrorSeverity.REPLANNING,
...         user_message="API rate limit exceeded",
...         metadata={"retry_after": 300}
...     ),
...     'capability_name': 'external_api_call',
...     'execution_time': 5.2
... }
>>> state['task_current_task'] = "Fetch weather data"
>>>
>>> # Generate error response
>>> result = await ErrorNode.execute(state)
>>> message = result['messages'][0]
>>> print(f"Response length: {len(message.content)} characters")
Response length: 847 characters

Error response with execution history:

>>> state['execution_step_results'] = {
...     'step_0': {
...         'step_index': 0,
...         'capability': 'input_validation',
...         'task_objective': 'Validate API parameters',
...         'success': True
...     },
...     'step_1': {
...         'step_index': 1,
...         'capability': 'external_api_call',
...         'task_objective': 'Fetch weather data',
...         'success': False
...     }
... }
>>> result = await ErrorNode.execute(state)
>>> # Response includes execution summary with successful/failed steps

See also

_create_error_context_from_state() : Error context extraction _generate_error_response() : Response generation pipeline AIMessage : Response message format

async langgraph_node(**kwargs)#

LangGraph-native node function with manual error handling.

This function is called by LangGraph during execution. Infrastructure nodes now use get_stream_writer() and get_config() directly for pure LangGraph integration.

Parameters:
  • state (AgentState) – Current agent state

  • kwargs – Additional parameters from LangGraph

Returns:

State updates dictionary

Return type:

Dict[str, Any]

Infrastructure Error Classification#

Classification Node#

static ClassificationNode.classify_error(exc, context)[source]#

Built-in error classification for classifier operations.

Parameters:
  • exc (Exception) – Exception that occurred

  • context (Dict[str, Any]) – Error context information

Returns:

Classification with severity and retry guidance

Return type:

ErrorClassification

Built-in error classification for classifier operations with LLM-aware retry logic.

static ClassificationNode.get_retry_policy()[source]#

Custom retry policy for LLM-based classification operations.

Classification uses parallel LLM calls for capability selection and can be flaky due to: - Multiple concurrent LLM requests - Network timeouts to LLM services - LLM provider rate limiting - Classification model variability

Use more attempts with moderate delays for better reliability.

Custom retry policy for LLM-based classification operations.

Return type:

Dict[str, Any]

See also

Classification System

Error classification and severity management

Exception Reference

Complete exception hierarchy with inheritance structure

Execution Control

ErrorNode, RouterNode, and complete execution control flow