Message and Execution Flow#
The Alpha Berkeley Framework implements a router-controlled message processing system that transforms user input into agent responses through coordinated infrastructure components.
📚 What You’ll Learn
Key Concepts:
Router-controlled architecture and message processing flow
Gateway
preprocessing and state managementInfrastructure component coordination
Capability execution patterns
Error handling and approval workflow integration
Prerequisites: Understanding of State Management Architecture (AgentState), Context Management System (ContextManager), and Registry and Discovery (Registry systems)
Time Investment: 45-60 minutes for complete understanding
Architecture Overview#
The framework uses a router-controlled architecture where a central RouterNode determines execution flow based on agent state:
Core Components:
Gateway
: Single entry point for message preprocessing and state creationRouterNode
: Central routing authority that determines next execution stepsTaskExtractionNode
: Converts conversations into actionable task descriptionsClassificationNode
: Selects appropriate capabilities based on task analysisOrchestrationNode
: Creates detailed execution plans for multi-step workflowsCapabilities: Registry-discovered components that execute business logic
RespondCapability
: Final response generationErrorNode
: Error handling and recoveryClarifyCapability
: Clarification requests for ambiguous tasks
Execution Flow:
Gateway processes user input and creates fresh state
Router determines next step based on current state
Infrastructure nodes (task extraction, classification, orchestration) prepare execution
Router routes to appropriate capabilities for business logic execution
Router coordinates multi-step workflows
Response generation completes the cycle
Gateway: Single Entry Point#
The Gateway handles all message preprocessing, state management, and routing decisions.
from framework.infrastructure.gateway import Gateway
from framework.graph import create_graph
async def process_user_message(user_input: str) -> None:
# Initialize Gateway and graph
gateway = Gateway()
graph = create_graph()
config = {"configurable": {"thread_id": "session_id"}}
# Gateway processes message and returns execution-ready result
result = await gateway.process_message(user_input, graph, config)
if result.error:
print(f"Gateway error: {result.error}")
return
# Execute based on result type
if result.resume_command:
# Approval workflow resumption
final_state = await graph.ainvoke(result.resume_command, config=config)
elif result.agent_state:
# Normal conversation flow
final_state = await graph.ainvoke(result.agent_state, config=config)
Gateway Result Types:
@dataclass
class GatewayResult:
# For normal conversation flow
agent_state: Optional[Dict[str, Any]] = None
# For interrupt/approval flow
resume_command: Optional[Command] = None
# Processing metadata
slash_commands_processed: List[str] = None
approval_detected: bool = False
is_interrupt_resume: bool = False
# Error handling
error: Optional[str] = None
Gateway Processing Functions:
class Gateway:
async def process_message(self, user_input: str, compiled_graph, config) -> GatewayResult:
# Check for pending interrupts (approval workflow)
if self._has_pending_interrupts(compiled_graph, config):
return await self._handle_interrupt_flow(user_input, compiled_graph, config)
# Process as new conversation turn
return await self._handle_new_message_flow(user_input, compiled_graph, config)
async def _handle_new_message_flow(self, user_input: str, compiled_graph, config) -> GatewayResult:
# Parse slash commands
slash_commands, cleaned_message = self._parse_slash_commands(user_input)
# Get current state to preserve context
current_state = None
if compiled_graph and config:
graph_state = compiled_graph.get_state(config)
current_state = graph_state.values if graph_state else None
# Create fresh state with context preservation
fresh_state = StateManager.create_fresh_state(
user_input=cleaned_message.strip() if cleaned_message.strip() else user_input,
current_state=current_state
)
# Apply slash commands
if slash_commands:
agent_control_updates = self._apply_slash_commands(slash_commands)
fresh_state['agent_control'].update(agent_control_updates)
return GatewayResult(
agent_state=fresh_state,
slash_commands_processed=list(slash_commands.keys()) if slash_commands else []
)
Router-Controlled Execution Flow#
The RouterNode serves as the central decision-making authority, determining execution flow based on agent state.
from framework.infrastructure.router_node import RouterNode, router_conditional_edge
@infrastructure_node(quiet=True)
class RouterNode(BaseInfrastructureNode):
name = "router"
description = "Central routing decision authority"
@staticmethod
async def execute(state: AgentState, **kwargs) -> Dict[str, Any]:
# Update routing metadata only
return {
"control_routing_timestamp": time.time(),
"control_routing_count": state.get("control_routing_count", 0) + 1
}
Router Conditional Edge Logic:
def router_conditional_edge(state: AgentState) -> str:
"""Central routing logic that determines next execution step."""
# Check for errors first
if state.get('control_has_error'):
return "error"
# Check if task extraction needed
if not state.get('task_current_task'):
return "task_extraction"
# Check if classification needed
if not state.get('planning_active_capabilities'):
return "classifier"
# Check if orchestration needed
if not state.get('planning_execution_plan'):
return "orchestrator"
# Route to next capability in execution plan
current_step_index = state.get('planning_current_step_index', 0)
execution_plan = state.get('planning_execution_plan', {})
steps = execution_plan.get('steps', [])
if current_step_index < len(steps):
current_step = steps[current_step_index]
return current_step.get('capability', 'respond')
# Execution complete - generate response
return "respond"
Task Extraction#
TaskExtractionNode converts conversation history into structured, actionable tasks.
from framework.infrastructure.task_extraction_node import TaskExtractionNode
from framework.prompts.defaults.task_extraction import ExtractedTask
@infrastructure_node
class TaskExtractionNode(BaseInfrastructureNode):
name = "task_extraction"
description = "Task Extraction and Processing"
@staticmethod
async def execute(state: AgentState, **kwargs) -> Dict[str, Any]:
# Get conversation messages
messages = state["messages"]
# Extract task using LLM
extracted_task = await _extract_task(messages, retrieval_result, logger)
# Update state with task information
return {
"task_current_task": extracted_task.task,
"task_depends_on_chat_history": extracted_task.depends_on_chat_history,
"task_depends_on_user_memory": extracted_task.depends_on_user_memory
}
ExtractedTask Structure:
class ExtractedTask(BaseModel):
task: str = Field(description="Clear, actionable task description")
depends_on_chat_history: bool = Field(description="Whether task needs conversation context")
depends_on_user_memory: bool = Field(description="Whether task needs persistent user context")
Classification#
ClassificationNode analyzes tasks and selects appropriate capabilities.
from framework.infrastructure.classification_node import ClassificationNode, select_capabilities
@infrastructure_node
class ClassificationNode(BaseInfrastructureNode):
name = "classifier"
description = "Task Classification and Capability Selection"
@staticmethod
async def execute(state: AgentState, **kwargs) -> Dict[str, Any]:
# Get current task
current_task = state.get("task_current_task")
# Get available capabilities from registry
registry = get_registry()
available_capabilities = registry.get_all_capabilities()
# Select capabilities using LLM analysis
active_capabilities = await select_capabilities(
task=current_task,
available_capabilities=available_capabilities,
state=state,
logger=logger
)
return {
"planning_active_capabilities": active_capabilities,
"planning_execution_plan": None,
"planning_current_step_index": 0
}
Orchestration#
OrchestrationNode creates detailed execution plans with LLM coordination.
from framework.infrastructure.orchestration_node import OrchestrationNode
from framework.base.planning import ExecutionPlan, PlannedStep
@infrastructure_node
class OrchestrationNode(BaseInfrastructureNode):
name = "orchestrator"
description = "Execution Planning and Orchestration"
@staticmethod
async def execute(state: AgentState, **kwargs) -> Dict[str, Any]:
# Get planning context
current_task = state.get('task_current_task')
active_capabilities = state.get('planning_active_capabilities', [])
# Create execution plan using LLM
execution_plan = await create_execution_plan(
task=current_task,
capabilities=active_capabilities,
state=state
)
# Handle planning mode (approval workflow)
if _is_planning_mode_enabled(state):
await _handle_planning_mode(execution_plan, current_task, logger, streamer)
# Execution pauses here until user approval
return {
"planning_execution_plan": execution_plan,
"planning_current_step_index": 0
}
ExecutionPlan Structure:
execution_plan = ExecutionPlan(
steps=[
PlannedStep(
context_key="search_step",
capability="pv_address_finding",
task_objective="Find beam current PV addresses",
success_criteria="PV addresses discovered",
expected_output="PV_ADDRESSES",
inputs=[]
),
PlannedStep(
context_key="analysis_step",
capability="data_analysis",
task_objective="Analyze beam current data",
success_criteria="Analysis completed",
expected_output="ANALYSIS_RESULTS",
inputs=[{"PV_ADDRESSES": "search_step"}]
)
]
)
Capability Execution#
Capabilities execute business logic according to the orchestrated plan.
from framework.base import BaseCapability
from framework.decorators import capability_node
from framework.state import StateManager
from framework.context import ContextManager
@capability_node
class ExampleCapability(BaseCapability):
name = "example_capability"
description = "Example capability implementation"
@staticmethod
async def execute(state: AgentState, **kwargs) -> Dict[str, Any]:
# Get current execution step
current_step = StateManager.get_current_step(state)
context = ContextManager(state)
# Access input data from previous steps
step_inputs = current_step.get('inputs', [])
for input_spec in step_inputs:
for context_type, context_key in input_spec.items():
input_data = context.get_context(context_type, context_key)
# Use input_data for processing
# Execute capability business logic
result = await perform_business_logic()
# Store results using StateManager
return StateManager.store_context(
state, "RESULTS", current_step.get('context_key'), result
)
State Management#
StateManager provides utilities for state creation and context storage.
from framework.state import StateManager, AgentState
class StateManager:
@staticmethod
def create_fresh_state(
user_input: str,
current_state: Optional[AgentState] = None
) -> AgentState:
"""Create fresh state preserving only capability context data."""
# Implementation creates new state with preserved context
@staticmethod
def get_current_step(state: AgentState) -> PlannedStep:
"""Get current execution step from orchestration plan."""
# Implementation extracts current step
@staticmethod
def store_context(
state: AgentState,
context_type: str,
context_key: str,
context_data: Any
) -> Dict[str, Any]:
"""Store capability results in context system."""
# Implementation stores context data
Context Management#
ContextManager provides access to capability context data with Pydantic serialization.
from framework.context.context_manager import ContextManager
class ContextManager:
def __init__(self, state: AgentState):
self._data = state['capability_context_data']
self._object_cache = {}
def get_context(self, context_type: str, key: str) -> Optional[CapabilityContext]:
"""Retrieve context object with automatic Pydantic reconstruction."""
# Implementation reconstructs context objects
def set_context(self, context_type: str, key: str, value: CapabilityContext) -> None:
"""Store context object with automatic Pydantic serialization."""
# Implementation stores context data
def get_all_of_type(self, context_type: str) -> Dict[str, CapabilityContext]:
"""Get all context objects of specified type."""
# Implementation returns all matching contexts
Error Handling#
ErrorNode handles error recovery and response generation.
from framework.infrastructure.error_node import ErrorNode
@infrastructure_node
class ErrorNode(BaseInfrastructureNode):
name = "error"
description = "Error Response Generation"
@staticmethod
async def execute(state: AgentState, **kwargs) -> Dict[str, Any]:
# Generate error response based on error context
error_info = state.get('control_error_info', {})
error_response = await generate_error_response(error_info)
return {
"messages": [AIMessage(content=error_response)]
}
Complete Working Example#
from framework.infrastructure.gateway import Gateway
from framework.graph import create_graph
from framework.registry import get_registry
async def complete_message_processing():
# Initialize framework
registry = get_registry()
gateway = Gateway()
graph = create_graph(registry)
config = {"configurable": {"thread_id": "demo"}}
# Process user message
user_message = "Find beam current PV addresses"
# Gateway preprocessing
result = await gateway.process_message(user_message, graph, config)
if result.error:
print(f"Error: {result.error}")
return
# Execute through router-controlled flow
final_state = await graph.ainvoke(result.agent_state, config=config)
# Access results
messages = final_state.get("messages", [])
final_response = messages[-1].content if messages else "No response generated"
print(f"Response: {final_response}")
Graph Construction#
The framework uses LangGraph with router-controlled conditional edges.
from framework.graph import create_graph
from framework.registry import get_registry
def create_graph(registry: RegistryManager) -> StateGraph:
# Get all nodes from registry
all_nodes = registry.get_all_nodes().items()
# Create StateGraph
workflow = StateGraph(AgentState)
# Add all nodes
for name, node_callable in all_nodes:
workflow.add_node(name, node_callable)
# Set up router-controlled flow
workflow.set_entry_point("router")
workflow.add_conditional_edges("router", router_conditional_edge)
# All nodes route back to router
for name, _ in all_nodes:
if name != "router":
workflow.add_edge(name, "router")
return workflow.compile()
Next Steps#
Infrastructure Components: Detailed documentation for each infrastructure component
Capability Development: Building custom capabilities for the framework
Advanced Patterns: Complex workflows and approval systems
API Reference: Complete method documentation
See also
- Gateway Architecture
Gateway implementation details
- Task Extraction
Task extraction system documentation
- Classification and Routing
Classification and routing systems
- Orchestrator Planning
Orchestration and planning system