Execution Control#

Infrastructure components that control the flow of execution including routing decisions, error handling, and conditional logic.

Router Node#

class framework.infrastructure.router_node.RouterNode[source]#

Bases: BaseInfrastructureNode

Central routing decision node for the Alpha Berkeley Agent Framework.

This node serves as the single decision-making authority that determines what should happen next based on the current agent state. It does no business logic - only routing decisions and metadata management.

The actual routing is handled by the router_conditional_edge function.

name: str = 'router'#
description: str = 'Central routing decision authority'#
async static execute(state, **kwargs)[source]#

Router node execution - updates routing metadata only.

This node serves as the entry point and routing hub, but does no routing logic itself. The actual routing decision is made by the conditional edge function. This keeps the logic DRY and avoids duplication.

Parameters:
  • state (AgentState) – Current agent state

  • kwargs – Additional LangGraph parameters

Returns:

Dictionary of state updates for routing metadata

Return type:

Dict[str, Any]

__repr__()#

Return a string representation of the infrastructure node for debugging.

Provides a concise string representation that includes both the Python class name and the infrastructure node’s registered name. This is useful for debugging, logging, and development workflows where infrastructure nodes need to be identified clearly.

Returns:

String representation including class name and node name

Return type:

str

Example

>>> node = TaskExtractionNode()
>>> repr(node)
'<TaskExtractionNode: task_extraction>'

Note

The format follows the pattern ‘<ClassName: node_name>’ for consistency across all framework components.

static classify_error(exc, context)#

Classify errors for infrastructure-specific error handling and recovery.

This method provides default error classification for all infrastructure nodes with a conservative approach that treats most errors as critical. Infrastructure nodes handle system-critical functions like orchestration and routing, so failures typically require immediate attention rather than automatic retry attempts.

The default implementation prioritizes system stability by failing fast with clear error messages. Subclasses should override this method only when specific infrastructure components can benefit from retry logic (e.g., LLM-based orchestrators that may encounter temporary API issues).

Parameters:
  • exc (Exception) – The exception that occurred during infrastructure operation

  • context (dict) – Error context including node info, execution state, and timing

Returns:

Error classification with severity and recovery strategy

Return type:

ErrorClassification

Note

The context dictionary includes:

  • infrastructure_node: node name for identification

  • execution_time: time spent before failure

  • current_state: agent state at time of error

Example:

@staticmethod
def classify_error(exc: Exception, context: dict) -> ErrorClassification:
    # Retry network timeouts for LLM-based infrastructure
    if isinstance(exc, (ConnectionError, TimeoutError)):
        return ErrorClassification(
            severity=ErrorSeverity.RETRIABLE,
            user_message="Network timeout, retrying...",
            metadata={"technical_details": str(exc)}
        )
    return ErrorClassification(
        severity=ErrorSeverity.CRITICAL,
        user_message=f"Infrastructure error: {exc}",
        metadata={"technical_details": str(exc)}
    )

Note

Infrastructure nodes should generally fail fast, so the default implementation treats most errors as critical. Override this method for infrastructure that can benefit from retries (e.g., LLM-based nodes).

static get_retry_policy()#

Get conservative retry policy configuration for infrastructure operations.

This method provides retry configuration optimized for infrastructure nodes that handle system-critical functions. The default policy uses conservative settings with minimal retry attempts and fast failure detection to maintain system stability.

Infrastructure nodes should generally fail fast rather than retry extensively, since failures often indicate system-level issues that require immediate attention. Override this method only for specific infrastructure components that can benefit from retry logic.

Returns:

Dictionary containing conservative retry configuration parameters

Return type:

Dict[str, Any]

Note

Infrastructure default policy: 2 attempts, 0.2s delay, minimal backoff. This prioritizes fast failure detection over retry persistence.

Example:

@staticmethod
def get_retry_policy() -> Dict[str, Any]:
    return {
        "max_attempts": 3,  # More retries for LLM-based infrastructure
        "delay_seconds": 1.0,  # Longer delay for external service calls
        "backoff_factor": 2.0  # Exponential backoff
    }

Note

The router uses this configuration to determine retry behavior. Infrastructure default: 2 attempts, 0.2s delay, minimal backoff.

async langgraph_node(**kwargs)#

LangGraph-native node function with manual error handling.

This function is called by LangGraph during execution. Infrastructure nodes now use get_stream_writer() and get_config() directly for pure LangGraph integration.

Parameters:
  • state (AgentState) – Current agent state

  • kwargs – Additional parameters from LangGraph

Returns:

State updates dictionary

Return type:

Dict[str, Any]

framework.infrastructure.router_node.router_conditional_edge(state)[source]#

LangGraph conditional edge function for dynamic routing.

This is the main export of this module - a pure conditional edge function that determines which node should execute next based on agent state.

Follows LangGraph native patterns where conditional edge functions take only the state parameter and handle logging internally.

Manual retry handling: - Checks for errors and retry count first - Routes retriable errors back to same capability if retries available - Routes to error node when retries exhausted - Routes critical/replanning errors immediately

Parameters:

state (AgentState) – Current agent state containing all execution context

Returns:

Name of next node to execute or “END” to terminate

Return type:

str

Error Node#

class framework.infrastructure.error_node.ErrorNode[source]#

Bases: BaseInfrastructureNode

Generate comprehensive, user-friendly error responses with intelligent analysis.

The ErrorNode serves as the centralized error response generation system for the Alpha Berkeley Agent Framework. It transforms technical error information into comprehensive user responses by combining structured factual reports with context-aware LLM analysis and recovery suggestions.

This infrastructure node operates as the final destination in the error handling pipeline, ensuring that all system failures result in meaningful, actionable information for users. The node implements a robust two-phase approach to error response generation with multiple fallback mechanisms to guarantee response delivery even under adverse conditions.

Architecture Overview:

The error response generation follows a structured two-phase approach:

  1. Structured Report Generation: - Extracts error details from agent state control_error_info - Formats using ErrorClassification.format_for_llm() for consistency - Adds execution statistics, timing data, and retry information - Generates step-by-step execution summaries with success/failure tracking

  2. LLM Analysis Phase: - Provides error context and available system capabilities to LLM - Generates intelligent explanations of failure causes - Produces context-aware recovery suggestions and next steps - Integrates with framework prompt system for consistent analysis quality

Error Recovery Strategy:

The node implements multiple layers of error handling to ensure reliability: - Comprehensive fallback response if LLM generation fails - Self-classification of internal errors as FATAL to prevent infinite loops - Structured logging of all error generation attempts for monitoring - Guaranteed response delivery through robust exception handling

Integration Points:
  • Input: Pre-classified errors from capability decorators via agent state

  • Streaming: Real-time progress updates through LangGraph streaming system

  • Output: AIMessage objects formatted for direct user presentation

  • Monitoring: Comprehensive logging integration for operational visibility

Warning

The ErrorNode must never raise unhandled exceptions as it serves as the final error handling mechanism. All internal errors are caught and result in structured fallback responses.

Note

Error classification within this node always uses FATAL severity to prevent recursive error handling that could lead to infinite loops or system instability.

Examples

The ErrorNode is typically invoked automatically by the framework, but can be tested with manual state construction:

>>> from framework.state import AgentState
>>> from framework.base.errors import ErrorClassification, ErrorSeverity
>>>
>>> # Construct agent state with error information
>>> state = AgentState()
>>> state['control_error_info'] = {
...     'classification': ErrorClassification(
...         severity=ErrorSeverity.CRITICAL,
...         user_message="Database connection timeout",
...         metadata={"timeout": 30, "host": "db.example.com"}
...     ),
...     'capability_name': 'database_query',
...     'execution_time': 31.5
... }
>>> state['task_current_task'] = "Retrieve user profile data"
>>>
>>> # Execute error response generation
>>> result = await ErrorNode.execute(state)
>>> print(f"Response type: {type(result['messages'][0])}")
<class 'langchain_core.messages.ai.AIMessage'>

Framework integration through error decorator:

>>> @capability("database_operations")
... async def query_user_data(user_id: int, state: AgentState):
...     # This will automatically route to ErrorNode on failure
...     connection = await get_db_connection()
...     return await connection.fetch_user(user_id)

See also

ErrorContext : Data structure for error response generation framework.base.errors.ErrorClassification : Error classification system framework.base.decorators.capability() : Capability decorator with error handling framework.state.AgentState : Agent state management system

name: str = 'error'#
description: str = 'Error Response Generation'#
static classify_error(exc, context)[source]#

Classify internal ErrorNode failures with FATAL severity to prevent infinite loops.

This method handles the critical responsibility of classifying errors that occur within the error response generation system itself. All such errors are automatically classified as FATAL to ensure clean termination and prevent recursive error handling scenarios that could destabilize the entire system.

The FATAL classification ensures that if the error response generation mechanism fails, execution terminates immediately rather than attempting additional error recovery operations that could compound the original problem or create infinite error handling loops.

Parameters:
  • exc (Exception) – Exception that occurred during error response generation process

  • context (dict) – Execution context containing node information, timing data, and state

Returns:

Error classification with FATAL severity and diagnostic metadata

Return type:

ErrorClassification

Warning

This method should only be called by the framework’s error handling system. Manual invocation could disrupt the error classification hierarchy.

Note

The FATAL severity ensures immediate execution termination without further error recovery attempts, preventing system instability.

Examples

Framework automatic error classification:

>>> try:
...     # ErrorNode internal operation fails
...     await ErrorNode.execute(state)
... except Exception as e:
...     classification = ErrorNode.classify_error(e, context)
...     print(f"Severity: {classification.severity.value}")
fatal

Error classification structure:

>>> context = {"node_name": "error", "execution_time": 1.2}
>>> exc = RuntimeError("LLM generation failed")
>>> classification = ErrorNode.classify_error(exc, context)
>>> print(classification.metadata["technical_details"])
Error node failure: LLM generation failed
async static execute(state, **kwargs)[source]#

Generate comprehensive error response with structured analysis and LLM insights.

This method orchestrates the complete error response generation pipeline, transforming technical error information into user-friendly responses with actionable recovery suggestions. The process combines factual error reporting with intelligent analysis to provide maximum value to users encountering issues.

The execution follows a carefully designed two-phase approach that ensures robust error handling even when components of the error generation system itself experience failures. Streaming progress updates keep users informed during the response generation process.

Processing Pipeline:
  1. Context Extraction: Reads error details from agent state including error classification, execution statistics, and step-by-step history

  2. Context Population: Enriches error context with execution timeline, successful operations, and failure categorization

  3. Structured Report Generation: Creates factual error report using standardized formatting and execution statistics

  4. LLM Analysis: Generates intelligent explanations and recovery suggestions based on error context and available capabilities

  5. Response Assembly: Combines structured report with LLM analysis into coherent user response

Error Handling Strategy:
  • Comprehensive exception handling prevents method failure

  • Streaming progress updates provide real-time feedback

  • Automatic fallback to structured response if LLM generation fails

  • All failures logged for operational monitoring and debugging

Parameters:
  • state (AgentState) – Agent state containing error information in control_error_info field

  • kwargs (dict) – Additional LangGraph execution parameters including config and streaming

Returns:

Dictionary containing AIMessage with formatted error response for user presentation

Return type:

dict[str, list[AIMessage]]

Note

This method is designed to never raise exceptions. All internal errors result in structured fallback responses to ensure users receive meaningful information regardless of system state.

Warning

The method expects error information to be present in state[‘control_error_info’]. Missing error information will result in fallback responses with generic messaging.

Examples

Standard error response generation:

>>> from framework.state import AgentState
>>> from framework.base.errors import ErrorClassification, ErrorSeverity
>>>
>>> # Prepare agent state with error information
>>> state = AgentState()
>>> state['control_error_info'] = {
...     'classification': ErrorClassification(
...         severity=ErrorSeverity.REPLANNING,
...         user_message="API rate limit exceeded",
...         metadata={"retry_after": 300}
...     ),
...     'capability_name': 'external_api_call',
...     'execution_time': 5.2
... }
>>> state['task_current_task'] = "Fetch weather data"
>>>
>>> # Generate error response
>>> result = await ErrorNode.execute(state)
>>> message = result['messages'][0]
>>> print(f"Response length: {len(message.content)} characters")
Response length: 847 characters

Error response with execution history:

>>> state['execution_step_results'] = {
...     'step_0': {
...         'step_index': 0,
...         'capability': 'input_validation',
...         'task_objective': 'Validate API parameters',
...         'success': True
...     },
...     'step_1': {
...         'step_index': 1,
...         'capability': 'external_api_call',
...         'task_objective': 'Fetch weather data',
...         'success': False
...     }
... }
>>> result = await ErrorNode.execute(state)
>>> # Response includes execution summary with successful/failed steps

See also

_create_error_context_from_state() : Error context extraction _generate_error_response() : Response generation pipeline AIMessage : Response message format

__repr__()#

Return a string representation of the infrastructure node for debugging.

Provides a concise string representation that includes both the Python class name and the infrastructure node’s registered name. This is useful for debugging, logging, and development workflows where infrastructure nodes need to be identified clearly.

Returns:

String representation including class name and node name

Return type:

str

Example

>>> node = TaskExtractionNode()
>>> repr(node)
'<TaskExtractionNode: task_extraction>'

Note

The format follows the pattern ‘<ClassName: node_name>’ for consistency across all framework components.

static get_retry_policy()#

Get conservative retry policy configuration for infrastructure operations.

This method provides retry configuration optimized for infrastructure nodes that handle system-critical functions. The default policy uses conservative settings with minimal retry attempts and fast failure detection to maintain system stability.

Infrastructure nodes should generally fail fast rather than retry extensively, since failures often indicate system-level issues that require immediate attention. Override this method only for specific infrastructure components that can benefit from retry logic.

Returns:

Dictionary containing conservative retry configuration parameters

Return type:

Dict[str, Any]

Note

Infrastructure default policy: 2 attempts, 0.2s delay, minimal backoff. This prioritizes fast failure detection over retry persistence.

Example:

@staticmethod
def get_retry_policy() -> Dict[str, Any]:
    return {
        "max_attempts": 3,  # More retries for LLM-based infrastructure
        "delay_seconds": 1.0,  # Longer delay for external service calls
        "backoff_factor": 2.0  # Exponential backoff
    }

Note

The router uses this configuration to determine retry behavior. Infrastructure default: 2 attempts, 0.2s delay, minimal backoff.

async langgraph_node(**kwargs)#

LangGraph-native node function with manual error handling.

This function is called by LangGraph during execution. Infrastructure nodes now use get_stream_writer() and get_config() directly for pure LangGraph integration.

Parameters:
  • state (AgentState) – Current agent state

  • kwargs – Additional parameters from LangGraph

Returns:

State updates dictionary

Return type:

Dict[str, Any]

class framework.infrastructure.error_node.ErrorContext(error_classification, current_task, failed_operation, total_operations=0, execution_time=None, retry_count=None, successful_steps=None, failed_steps=None)[source]#

Bases: object

Comprehensive error context for generating detailed error responses.

This data structure encapsulates all information required to generate meaningful error reports including original error classification, execution statistics, and step-by-step execution history. The class serves as the primary data container for the error response generation pipeline, ensuring consistent access to error details and execution context across all error handling components.

The ErrorContext integrates with the ErrorClassification system to maintain authoritative error information while adding execution-specific details such as timing, retry attempts, and step-by-step progress tracking.

Parameters:
  • error_classification (ErrorClassification) – Complete error classification with severity, messages, and metadata

  • current_task (str) – Human-readable description of the high-level task being executed

  • failed_operation (str) – Specific operation or capability name that encountered the error

  • total_operations (int) – Total number of operations attempted in current execution cycle

  • execution_time (float, optional) – Duration in seconds from start to error occurrence

  • retry_count (int, optional) – Number of retry attempts made before final failure

  • successful_steps (list[str], optional) – Chronological list of execution steps that completed successfully

  • failed_steps (list[str], optional) – Chronological list of execution steps that failed during execution

Note

The class automatically initializes list fields to empty lists if None is provided, ensuring safe iteration over step results in error report generation.

Warning

This class is designed for read-only access during error response generation. Modifying fields after creation may lead to inconsistent error reports.

Examples

Basic error context creation:

>>> from framework.base.errors import ErrorClassification, ErrorSeverity
>>> classification = ErrorClassification(
...     severity=ErrorSeverity.CRITICAL,
...     user_message="Database connection failed",
...     metadata={"host": "db.example.com"}
... )
>>> context = ErrorContext(
...     error_classification=classification,
...     current_task="Fetch user data",
...     failed_operation="database_query",
...     execution_time=2.5
... )
>>> print(f"Severity: {context.error_severity.value}")
critical

Context with execution history:

>>> context = ErrorContext(
...     error_classification=classification,
...     current_task="Process user request",
...     failed_operation="user_authentication",
...     total_operations=3,
...     successful_steps=["Step 1: Validate input", "Step 2: Load config"],
...     failed_steps=["Step 3: Authenticate user - Failed"]
... )
>>> print(f"Progress: {len(context.successful_steps)}/{context.total_operations}")
Progress: 2/3

See also

framework.base.errors.ErrorClassification : Error classification system ErrorNode : Primary consumer of ErrorContext instances

error_classification: ErrorClassification#
current_task: str#
failed_operation: str#
total_operations: int = 0#
execution_time: float | None = None#
retry_count: int | None = None#
successful_steps: List[str] = None#
failed_steps: List[str] = None#
__post_init__()[source]#

Initialize list fields to empty lists if None.

property error_severity: ErrorSeverity#

Extract error severity level from the underlying error classification.

Returns:

Severity level indicating error impact and recovery strategy

Return type:

ErrorSeverity

Note

Severity levels guide error handling strategy: RETRIABLE errors may be retried, REPLANNING errors require task modification, CRITICAL errors need user intervention, and FATAL errors terminate execution.

property error_message: str#

Extract user-friendly error message from the error classification.

Returns:

Human-readable error message suitable for user presentation, with automatic fallback to generic message if none provided

Return type:

str

Note

This property ensures that error responses always contain a meaningful message even when the original error classification lacks user-facing text.

property capability_name: str | None#

Extract the name of the specific capability that encountered the error.

Returns:

Name of the failing capability if available in context, None otherwise

Return type:

str, optional

Note

This property accesses a dynamically set attribute (_capability_name) that is populated during error context creation from agent state.

__init__(error_classification, current_task, failed_operation, total_operations=0, execution_time=None, retry_count=None, successful_steps=None, failed_steps=None)#

Core Models#

Execution control uses models defined in the core framework:

See also

ErrorClassification

Error classification system

ErrorSeverity

Error severity levels

BaseInfrastructureNode

Base class for infrastructure components

Registration#

RouterNode is automatically registered as:

NodeRegistration(
    name="router",
    module_path="framework.infrastructure.router_node",
    function_name="RouterNode",
    description="Central routing decision authority"
)

ErrorNode is automatically registered as:

NodeRegistration(
    name="error",
    module_path="framework.infrastructure.error_node",
    function_name="ErrorNode",
    description="Error response generation"
)

See also

Prompt System

Prompt customization system

Error Handling

Implementation details and usage patterns