Gateway Architecture#

πŸ“š What You’ll Learn

Key Concepts:

  • How Gateway centralizes all message processing logic

  • State lifecycle management and conversation handling

  • Slash command integration and approval workflow coordination

  • Interface implementation patterns

Prerequisites: Understanding of State Management Architecture

Time Investment: 15 minutes for complete understanding

Core Concept#

Gateway serves as the single entry point for all message processing, eliminating interface duplication and ensuring consistent state management across CLI, web, and API interfaces. As an external coordinator, it operates outside the compiled LangGraph to handle preprocessing operations.

Problem Solved: Without centralized processing, each interface duplicates message logic, state creation, and approval handling.

Solution: All interfaces call Gateway.process_message() - Gateway handles preprocessing, interfaces handle presentation.

Architecture#

from framework.infrastructure.gateway import Gateway

# Universal pattern for all interfaces
gateway = Gateway()
result = await gateway.process_message(user_input, graph, config)

# Execute based on result type
                if result.resume_command:
    # Approval flow resumption
    response = await graph.ainvoke(result.resume_command, config=config)
                elif result.agent_state:
    # Normal conversation turn
    response = await graph.ainvoke(result.agent_state, config=config)

Key Features#

State Authority

Gateway is the only component that creates agent state, ensuring consistency.

Slash Commands

Built-in parsing for /planning:on, /approval:enabled, /debug:on, and performance bypass commands.

Approval Integration

Automatic detection of approval/rejection responses with LangGraph interrupt handling.

Interface Agnostic

Same processing logic for CLI, OpenWebUI, APIs, or custom interfaces.

Implementation Patterns#

Simple Interface Integration

class MyInterface:
    def __init__(self):
        self.gateway = Gateway()
        self.graph = create_graph()

    async def handle_message(self, message: str) -> str:
        result = await self.gateway.process_message(message, self.graph, self.config)

        if result.error:
            return f"Error: {result.error}"

        # Execute and extract response
        execution_input = result.resume_command or result.agent_state
        response = await self.graph.ainvoke(execution_input, config=self.config)
        return self._extract_response(response)

Streaming Interface Integration

async def handle_streaming(self, message: str):
    result = await self.gateway.process_message(message, self.graph, self.config)

        if result.error:
        yield {"error": result.error}
            return

    # Stream execution
        execution_input = result.resume_command or result.agent_state
        async for chunk in self.graph.astream(execution_input, config=self.config):
        yield chunk

State Management#

Gateway automatically handles:

  • Fresh state creation for new conversation turns

  • Persistent field preservation (execution history, user preferences)

  • Slash command application before execution

  • Approval state injection for interrupt resumption

State Creation Pattern:

# Gateway handles this automatically
fresh_state = StateManager.create_fresh_state(
    user_input=cleaned_message,
    current_state=current_state  # Preserves persistent fields
)

Slash Commands#

Gateway parses and applies slash commands automatically:

Planning and Execution Control:

  • /planning or /planning:on - Enable planning mode

  • /planning:off - Disable planning mode (default)

  • /approval:on - Enable approval workflows

  • /approval:selective - Selective approval mode

  • /debug:on - Enable debug logging

Performance Optimization:

  • /task:off - Bypass task extraction (use full chat history)

  • /task:on - Enable task extraction (default)

  • /caps:off - Bypass capability selection (activate all capabilities)

  • /caps:on - Enable capability selection (default)

Commands are parsed from user input and applied to agent_control state before execution and can temporarily change bypass settings for the current conversation session, overriding system defaults.

πŸ’‘ Slash Command Examples

Real CLI Sessions Demonstrating Different Modes

Human Approval Workflow Example

πŸ‘€ You: /planning What's the weather in San Francisco?
πŸ”„ Processing: /planning What's the weather in San Francisco?
βœ… Processed commands: ['planning']
πŸ”„ Extracting actionable task from conversation
πŸ”„ Analyzing task requirements...
πŸ”„ Generating execution plan...
πŸ”„ Requesting plan approval...

⚠️ **HUMAN APPROVAL REQUIRED** ⚠️

**Planned Steps (2 total):**
**Step 1:** Retrieve current weather conditions for San Francisco including temperature, weather conditions, and timestamp (current_weather)
**Step 2:** Present the current weather information for San Francisco to the user in a clear and readable format (respond)

**To proceed, respond with:**
- **`yes`** to approve and execute the plan
- **`no`** to cancel this operation

πŸ‘€ You: yes
πŸ”„ Processing: yes
πŸ”„ Resuming from interrupt...
πŸ”„ Using approved execution plan
πŸ”„ Executing current_weather... (10%)
πŸ”„ Weather retrieved: San Francisco - 18.0Β°C
πŸ”„ Executing respond... (10%)
πŸ“Š Execution completed (execution_step_results: 2 records)

πŸ€– Here is the current weather in San Francisco:
As of today, the weather in San Francisco is **18.0Β°C and Partly Cloudy**.

Planning mode provides transparent oversight of multi-step operations before execution begins.

Bypass Mode Examples for Faster Response Times

# Example 1: Task extraction bypass for context-rich queries
πŸ‘€ You: /task:off Analyze the correlation we discussed earlier
πŸ”„ Processing: /task:off Analyze the correlation we discussed earlier
βœ… Processed commands: ['task:off']
πŸ”„ Bypassing task extraction - using full conversation context
πŸ”„ Analyzing task requirements...
πŸ”„ Classification completed with 3 active capabilities
πŸ”„ Generating execution plan...

# Example 2: Full bypass for quick status queries
πŸ‘€ You: /task:off /caps:off What's the current beam status?
πŸ”„ Processing: /task:off /caps:off What's the current beam status?
βœ… Processed commands: ['task:off', 'caps:off']
πŸ”„ Bypassing task extraction - using full conversation context
πŸ”„ Bypassing capability selection - activating all capabilities
πŸ”„ Generating execution plan...

Performance Comparison:

  • Normal Mode: Task Extraction β†’ Classification β†’ Orchestration β†’ Execution (3 LLM calls)

  • Task Bypass: Classification β†’ Orchestration β†’ Execution (2 LLM call)

  • Caps Bypass: Task Extraction β†’ Orchestration β†’ Execution (2 LLM call)

  • Full Bypass: Orchestration β†’ Execution (1 preprocessing LLM calls)

Approval Workflow Integration#

Gateway automatically detects approval responses using LLM classification and creates resume commands:

LLM-Powered Approval Detection:

Uses the configured approval model from framework.models to classify user responses as approval or rejection.

Approval Model Configuration:

Configured in src/framework/config.yml under framework.models.approval (Ollama with Mistral 7B by default).

Fail-Safe Behavior:

If LLM classification fails for any reason, the system defaults to β€œnot approved” and logs a clear warning message.

Resume Command Creation:

Gateway extracts interrupt payload and injects approval decision into agent state for processing.

Error Handling#

Gateway provides graceful error handling:

# Gateway returns structured results
@dataclass
class GatewayResult:
    agent_state: Optional[Dict[str, Any]] = None
    resume_command: Optional[Command] = None
    error: Optional[str] = None
    slash_commands_processed: List[str] = None
    approval_detected: bool = False

Common error scenarios: - Interrupt detection failures fall back to new message processing - State access errors are handled gracefully - Approval parsing failures provide clear guidance

Validation#

The Gateway pattern is validated by existing interfaces:

  • CLI Interface (interfaces/CLI/direct_conversation.py)

  • OpenWebUI Pipeline (interfaces/openwebui/main.py)

Both follow the documented patterns exactly, providing real-world validation.

Best Practices#

Do: - Always use Gateway for message processing - Handle both normal and approval flows - Implement proper error handling for Gateway results

Don’t: - Create agent state manually in interfaces - Duplicate approval detection logic - Parse slash commands outside Gateway

Integration Example#

Complete CLI interface using Gateway:

class CLIInterface:
    def __init__(self):
        self.gateway = Gateway()
        self.graph = create_graph()
        self.config = {"configurable": {"thread_id": "cli_session"}}

    async def conversation_loop(self):
        while True:
            user_input = input("You: ").strip()
            if user_input.lower() in ['exit', 'quit']:
                break

            # Process through Gateway
            result = await self.gateway.process_message(
                user_input, self.graph, self.config
            )

            if result.error:
                print(f"Error: {result.error}")
                continue

            # Execute appropriate flow
            if result.resume_command:
                response = await self.graph.ainvoke(result.resume_command, self.config)
            else:
                response = await self.graph.ainvoke(result.agent_state, self.config)

            print(f"Agent: {self._extract_message(response)}")

Gateway Architecture provides the foundation for consistent, reliable message processing across all interfaces in the Alpha Berkeley Framework.

See also

Gateway

API reference for Gateway classes and functions

State Management Architecture

State lifecycle management and conversation handling

Human Approval

Advanced approval workflow integration patterns