LangGraph Integration: Native StateGraph and Workflow Execution#

📚 What You’ll Learn

Key Concepts:

  • How the framework uses LangGraph’s StateGraph for workflow execution

  • Native state management with MessagesState integration

  • Checkpoint-based persistence for conversation continuity

  • Interrupt system for human approval workflows

  • Real-time streaming implementation

Prerequisites: Basic understanding of LangGraph concepts

Time Investment: 15-20 minutes for complete understanding

Overview#

The Alpha Berkeley Framework leverages LangGraph’s native features for production conversational agentic systems:

  1. StateGraph Workflow: Registry-based node discovery with router-controlled flow

  2. MessagesState Foundation: Native message handling with selective persistence

  3. Checkpoint System: PostgreSQL and memory-based conversation persistence

  4. Native Interrupts: Built-in human-in-the-loop for approval workflows

  5. Custom Streaming: Real-time status updates through LangGraph’s streaming API

Core LangGraph Integration#

StateGraph Workflow Creation#

The framework creates LangGraph workflows using registry-discovered components:

from langgraph.graph import StateGraph
from framework.state import AgentState
from framework.registry import get_registry
from framework.graph.graph_builder import create_graph

# Framework automatically creates StateGraph from registry
def create_graph(registry: RegistryManager) -> StateGraph:
    """Create LangGraph workflow with all framework components."""

    # Use framework's native state structure
    workflow = StateGraph(AgentState)

    # Add all nodes from registry (infrastructure + capabilities)
    all_nodes = registry.get_all_nodes().items()
    for name, node_callable in all_nodes:
        workflow.add_node(name, node_callable)

    # Router controls all flow via conditional edges
    workflow.add_conditional_edges("router", router_conditional_edge, {
        "task_extraction": "task_extraction",
        "classifier": "classifier",
        "orchestrator": "orchestrator",
        "pv_address_finding": "pv_address_finding",
        "respond": "respond",
        "END": END
    })

    # All nodes route back to router for next decision
    for name in node_names:
        if name not in ["router", "respond", "clarify", "error"]:
            workflow.add_edge(name, "router")

    return workflow.compile(checkpointer=checkpointer)

Key integration points: - AgentState: Extends LangGraph’s MessagesState for compatibility - Node Functions: All decorators create LangGraph-compatible callables - Router Control: Central routing using LangGraph’s conditional edges - Automatic Compilation: Framework handles checkpointer configuration

Native State Management#

The framework’s state extends LangGraph’s MessagesState with selective persistence:

from langgraph.graph import MessagesState
from typing import Annotated, Dict, Any

class AgentState(MessagesState):
    """Framework state extending LangGraph's MessagesState."""

    # LangGraph automatically manages 'messages' field

    # PERSISTENT: Only this field survives conversation turns
    capability_context_data: Annotated[
        Dict[str, Dict[str, Dict[str, Any]]],
        merge_capability_context_data  # Custom reducer
    ]

    # EXECUTION-SCOPED: Reset automatically each turn
    task_current_task: Optional[str]
    planning_active_capabilities: List[str]
    planning_execution_plan: Optional[ExecutionPlan]
    planning_current_step_index: int

State management features: - Native Messages: LangGraph handles message history automatically - Selective Persistence: Only capability_context_data persists across conversations - Custom Reducer: Framework provides specialized context merging - Type Safety: Full TypedDict definitions with proper annotations

Checkpoint-Based Persistence#

LangGraph’s checkpointer system provides automatic state persistence:

from langgraph.checkpoint.memory import MemorySaver
from langgraph.checkpoint.postgres import PostgresSaver

# Development: In-memory checkpointer
def create_development_graph(registry):
    checkpointer = MemorySaver()
    return create_graph(registry, checkpointer=checkpointer)

# Production: PostgreSQL checkpointer
def create_production_graph(registry):
    checkpointer = create_async_postgres_checkpointer()
    return create_graph(registry, checkpointer=checkpointer)

# Usage with conversation continuity
config = {"configurable": {"thread_id": "user-123"}}

# First conversation
response1 = await graph.ainvoke(
    {"messages": [HumanMessage(content="Find beam current PVs")]},
    config=config
)

# Later conversation - automatically resumes with context
response2 = await graph.ainvoke(
    {"messages": [HumanMessage(content="Show me the analysis")]},
    config=config  # Same thread_id = same conversation
)

Human Approval Workflows#

The framework uses LangGraph’s native interrupt system for human-in-the-loop operations:

from langgraph.types import interrupt
from framework.approval.approval_system import create_code_approval_interrupt

# In Python executor service - request human approval
@staticmethod
async def analyze_code(state: PythonExecutionState) -> Dict[str, Any]:
    # Analyze code for safety
    safety_analysis = analyze_code_safety(generated_code)

    if safety_analysis.requires_approval:
        # Create structured approval interrupt
        approval_data = create_code_approval_interrupt(
            code=generated_code,
            safety_concerns=safety_analysis.concerns,
            execution_environment="container"
        )

        # LangGraph interrupt - execution stops here
        interrupt(approval_data)

    # Continue if no approval needed
    return {"analysis_complete": True}

Interrupt workflow:

# 1. Service generates code and requests approval
result = await python_service.ainvoke(request, config=config)
# Execution pauses - interrupt created

# 2. Check for interrupts
graph_state = graph.get_state(config)
if graph_state.interrupts:
    interrupt_data = graph_state.interrupts[0]
    # Show approval UI with interrupt_data

# 3. Human approves/rejects
approval_response = "approved"  # or "rejected"

# 4. Resume with approval
resume_command = Command(resume={"approved": True})
await graph.ainvoke(resume_command, config=config)

Service Integration with Interrupts#

The framework handles service calls that may generate interrupts:

from framework.approval.approval_system import handle_service_with_interrupts

@capability_node
class PythonExecutorCapability(BaseCapability):
    """Execute Python code with approval workflows."""

    @staticmethod
    async def execute(state: AgentState, **kwargs) -> Dict[str, Any]:
        # Get Python executor service
        service = registry.get_service("python_executor")

        # Handle service with interrupt propagation
        try:
            result = await handle_service_with_interrupts(
                service=service,
                request={"code": python_code, "mode": "execute"},
                config=config,
                logger=logger,
                capability_name="python_executor"
            )
            return {"execution_results": result}

        except RuntimeError as e:
            return {"error": f"Service execution failed: {e}"}

Real-Time Streaming#

The framework provides real-time status updates through LangGraph’s streaming:

from configs.streaming import get_streamer

@capability_node
class DataAnalysisCapability(BaseCapability):
    @staticmethod
    async def execute(state: AgentState, **kwargs) -> Dict[str, Any]:
        # Get streaming helper
        streamer = get_streamer("framework", "data_analysis", state)

        # Provide real-time status updates
        streamer.status("Loading data sources...")
        data = await load_data_sources()

        streamer.status("Performing analysis...")
        analysis = await perform_analysis(data)

        streamer.status("Analysis complete")

        return {"analysis_results": analysis}

# Client receives real-time updates
async for chunk in graph.astream(input_data, config, stream_mode="custom"):
    if chunk.get("event_type") == "status":
        print(f"Status: {chunk['message']}")

Production Configuration#

PostgreSQL Checkpointer Setup#

from framework.graph.graph_builder import create_async_postgres_checkpointer

def create_production_checkpointer():
    """Create PostgreSQL checkpointer for production."""
    # Uses environment POSTGRESQL_URI or defaults to local
    checkpointer = create_async_postgres_checkpointer()
    return checkpointer

# Production graph with persistence
production_graph = create_graph(
    registry=get_registry(),
    use_postgres=True  # Automatically uses PostgreSQL
)

Development vs Production#

# Development: Fast iteration with memory checkpointer
dev_graph = create_graph(registry, use_postgres=False)

# Production: Full persistence with PostgreSQL
prod_graph = create_graph(registry, use_postgres=True)

# Testing: No persistence for isolated tests
test_graph = create_graph(registry, checkpointer=None)

Benefits of Native LangGraph Integration#

Zero-Configuration Persistence

Automatic conversation memory across restarts

Built-in Human-in-the-Loop

Native approval workflows through interrupts

Production-Ready Streaming

Real-time status updates from all framework components

Fault Tolerance

Conversations survive system crashes and can resume from checkpoints

Common Integration Patterns#

Interrupt Handling#

# Check for pending interrupts
def check_interrupts(graph, config):
    state = graph.get_state(config)
    if state.interrupts:
        return {
            "has_interrupts": True,
            "interrupt_data": state.interrupts[0],
            "next": state.next
        }
    return {"has_interrupts": False}

Custom State Updates#

# Framework provides StateManager for consistent state updates
from framework.state import StateManager

# Store capability results
return StateManager.store_context(
    state, "PV_ADDRESSES", context_key, pv_data
)

Troubleshooting#

Issue: State not persisting between conversations .. code-block:: python

# Problem: No checkpointer configured graph = create_graph(registry) # No persistence

# Solution: Configure checkpointer graph = create_graph(registry, use_postgres=True) config = {“configurable”: {“thread_id”: “user-123”}}

Issue: Streaming not working

# Problem: Wrong stream mode
async for chunk in graph.astream(input_data, config):
    # Only gets final results

# Solution: Use custom stream mode
async for chunk in graph.astream(input_data, config, stream_mode="custom"):
    # Gets real-time status updates

Next Steps#

  1. Build Components: Building Your First Capability - Create LangGraph-integrated capabilities

  2. Learn State Management: State Management Architecture - Deep dive into state patterns

  3. Production Setup: Container Deployment - Deploy with PostgreSQL checkpointing

  4. Advanced Patterns: Human Approval - Complex interrupt handling

See also

State and Context Management

Complete state structure and MessagesState integration

Gateway

LangGraph integration entry point for message processing

Human Approval

Interrupt system API for human-in-the-loop workflows

Container Management

Production deployment patterns with PostgreSQL checkpointing