Infrastructure Architecture: Classification-Orchestration Pipeline#
📚 What You’ll Learn
Key Concepts:
The complete classification-orchestration pipeline from user input to agent response
How Task Extraction, Classification, and Orchestration work together
The role of the Gateway as a single entry point and the Router for coordination
State-based routing and deterministic execution planning
Error handling and retry logic throughout the infrastructure
Prerequisites: Basic understanding of LangGraph and agentic frameworks
Time Investment: 10-15 minutes for complete understanding
Overview#
The Alpha Berkeley Framework uses a Classification-Orchestration Pipeline where every user request flows through task extraction, capability classification, execution planning, and deterministic routing. The Gateway provides a unified entry point while the Router coordinates infrastructure components through state-based decisions.
Core Components#
Gateway - Single Entry Point#
The Gateway serves as the only entry point for all message processing across all interfaces (CLI, OpenWebUI, etc.). Operating outside the compiled graph, it handles preprocessing like state creation and slash commands before graph execution.
from framework.infrastructure.gateway import Gateway
# All interfaces use Gateway as single entry point
gateway = Gateway()
result = await gateway.process_message(
user_input="Find beam current PV addresses",
compiled_graph=graph,
config=config
)
# Gateway returns structured result ready for execution
if result.resume_command:
# Handle approval/interrupt flow
await graph.ainvoke(result.resume_command, config=config)
elif result.agent_state:
# Handle normal conversation flow
await graph.ainvoke(result.agent_state, config=config)
elif result.error:
# Handle processing errors
print(f"Error: {result.error}")
Gateway responsibilities: - State reset for new conversation turns - Slash command parsing (/planning, /approval, /debug) - Approval response detection and resume command generation - Message preprocessing and initial state creation
Task Extraction - Conversation to Action#
Task Extraction converts conversational input into structured, actionable tasks using LLM analysis.
@infrastructure_node
class TaskExtractionNode(BaseInfrastructureNode):
name = "task_extraction"
description = "Task Extraction and Processing"
@staticmethod
async def execute(state: AgentState, **kwargs) -> Dict[str, Any]:
messages = state["messages"]
# Extract structured task using LLM
processed_task = await asyncio.to_thread(
_extract_task, messages, retrieval_result, logger
)
return {
"task_current_task": processed_task.task,
"task_depends_on_chat_history": processed_task.depends_on_chat_history,
"task_depends_on_user_memory": processed_task.depends_on_user_memory
}
Classification - Capability Selection#
The Classification system analyzes tasks and selects appropriate capabilities using LLM-based analysis.
@infrastructure_node
class ClassificationNode(BaseInfrastructureNode):
name = "classifier"
description = "Task Classification and Capability Selection"
@staticmethod
async def execute(state: AgentState, **kwargs) -> Dict[str, Any]:
current_task = state.get("task_current_task")
# Get available capabilities from registry
registry = get_registry()
available_capabilities = registry.get_all_capabilities()
# Select capabilities using LLM-based classification
active_capabilities = await select_capabilities(
task=current_task,
available_capabilities=available_capabilities,
state=state,
logger=logger
)
return {
"planning_active_capabilities": active_capabilities,
"planning_execution_plan": None, # Reset for orchestrator
"planning_current_step_index": 0
}
Orchestrator - Execution Planning#
The Orchestrator creates complete execution plans before any capability execution begins.
@infrastructure_node
class OrchestrationNode(BaseInfrastructureNode):
name = "orchestrator"
description = "Execution Planning and Orchestration"
@staticmethod
async def execute(state: AgentState, **kwargs) -> Dict[str, Any]:
current_task = StateManager.get_current_task(state)
active_capability_names = state.get('planning_active_capabilities')
# Create execution plan using LLM
execution_plan = await asyncio.to_thread(
get_chat_completion,
message=system_prompt,
model_config=model_config,
output_model=ExecutionPlan
)
# Validate plan (check capabilities exist, ensure response step)
validated_plan = _validate_and_fix_execution_plan(
execution_plan, current_task, logger
)
return {
"planning_execution_plan": validated_plan,
"planning_current_step_index": 0
}
Message Processing Flow#
A user message flows through the complete architecture:
Gateway receives message and creates initial state
Router determines next action → “task_extraction” (no task yet)
Task Extraction analyzes conversation → returns structured task
Router checks state again → “classifier” (has task, no capabilities)
Classification selects relevant capabilities → returns capability list
Router checks state again → “orchestrator” (has capabilities, no plan)
Orchestrator creates execution plan → returns validated plan
Router executes plan step by step → routes through each capability → “END”
This pipeline ensures every request goes through the same structured analysis and planning before execution begins.
Graph Construction#
The framework uses LangGraph with router-controlled flow:
from framework.graph.graph_builder import create_graph
from framework.registry import get_registry
# Initialize registry and create graph
registry = get_registry()
graph = create_graph(registry, use_postgres=True)
# Graph structure:
# - Entry point: "router"
# - Router uses conditional edges to route to any registered node
# - All nodes route back to router except terminal nodes (respond, error)
# - Terminal nodes route to END
Error Handling and Retry Logic#
The framework includes sophisticated error handling with node-specific retry policies:
@infrastructure_node
class TaskExtractionNode(BaseInfrastructureNode):
@staticmethod
def classify_error(exc: Exception, context: dict):
# Retry on network/API timeouts
if isinstance(exc, (ConnectionError, TimeoutError)):
return ErrorClassification(
severity=ErrorSeverity.RETRIABLE,
user_message="Network timeout, retrying...",
metadata={"technical_details": str(exc)}
)
# Critical errors don't retry
return ErrorClassification(
severity=ErrorSeverity.CRITICAL,
user_message="Task extraction failed",
metadata={"technical_details": str(exc)}
)
@staticmethod
def get_retry_policy() -> Dict[str, Any]:
return {
"max_attempts": 3,
"delay_seconds": 1.0,
"backoff_factor": 1.5
}
Approval Workflows#
The framework supports human-in-the-loop approval for sensitive operations:
# Enable planning mode with slash command
user_input = "/planning Find and modify beam parameters"
gateway_result = await gateway.process_message(user_input, graph, config)
# Orchestrator will interrupt for approval
# User responds with "yes" or "no"
approval_result = await gateway.process_message("yes", graph, config)
# Execution resumes with approved plan
Interface Integration#
All interfaces use the same Gateway pattern:
# CLI Interface
class CLI:
async def _process_user_input(self, user_input: str):
result = await self._gateway.process_message(
user_input, self._graph, self._config
)
# Handle result...
# OpenWebUI Pipeline
class Pipeline:
def _execute_pipeline(self, user_message: str, ...):
result = loop.run_until_complete(
self._gateway.process_message(user_message, self._graph, config)
)
# Handle result...
Architecture Benefits#
- Reliability Through Single Entry Point
All message processing centralized in Gateway
Consistent preprocessing and state management
Single point of error handling and logging
- Performance Through Efficient Classification
Binary yes/no decisions for each capability
Only relevant capabilities loaded into orchestration context
State-based routing eliminates redundant processing
Deterministic execution plan following
- Maintainability Through Clear Separation
Task Extraction isolates conversation analysis
Classification handles capability selection
Orchestration manages execution planning
Router coordinates deterministic execution
Gateway and capabilities handle specific domain concerns
Next Steps#
Learn State Management: State Management Architecture - How state flows through the system
Explore Registry System: Registry and Discovery - How components are discovered and registered
Build Your First Capability: Building Your First Capability - Hands-on implementation
Understand Error Handling: Error Handling - Comprehensive error management
See also
- Gateway
Complete Gateway API for message processing and state management
- Execution Control
Router and routing logic for deterministic execution flow
- Task Extraction
Task extraction methods for conversation analysis
- Classification
Capability selection API for LLM-based classification