Building Your First Capability#
Create a production-ready capability using the Alpha Berkeley Framework’s core patterns.
📚 What You’ll Learn
Key Concepts:
Implement BaseCapability with @capability_node decorator
Work with AgentState and context storage using StateManager utilities
Register capabilities for framework discovery
Handle errors and streaming properly
Create CapabilityContext classes for data exchange
Prerequisites: Python environment with Alpha Berkeley Framework installed
Time Investment: 30-45 minutes for complete implementation
Core Concepts#
Capabilities are business logic components that:
Extend BaseCapability and use @capability_node decorator
Store results in Pydantic context objects
Return state updates for LangGraph integration
Implementation Steps#
Step 1: Create Context Class#
from typing import ClassVar
from pydantic import Field
from framework.context.base import CapabilityContext
class ProcessedDataContext(CapabilityContext):
"""Context for processed user query data."""
CONTEXT_TYPE: ClassVar[str] = "PROCESSED_DATA"
CONTEXT_CATEGORY: ClassVar[str] = "ANALYSIS_RESULTS"
query: str = Field(..., description="Original user query")
word_count: int = Field(..., description="Number of words")
contains_numbers: bool = Field(..., description="Whether query contains numbers")
def get_access_details(self, key: str) -> dict:
return {
"summary": f"Processed data for query: '{self.query}'",
"word_count": self.word_count,
"key": key
}
def get_human_summary(self, key: str) -> dict:
return {
"title": "Query Analysis Results",
"content": f"Analyzed '{self.query}' - {self.word_count} words"
}
Step 2: Implement Capability#
from typing import Dict, Any
from datetime import datetime
from framework.base import BaseCapability, capability_node
from framework.state import AgentState, StateManager
from configs.logger import get_logger
from configs.streaming import get_streamer
from applications.my_app.context_classes import ProcessedDataContext
logger = get_logger("my_app", "data_processor")
@capability_node
class DataProcessorCapability(BaseCapability):
"""Process and analyze user data requests."""
name = "data_processor"
description = "Process and analyze user data requests"
provides = ["PROCESSED_DATA"]
requires = []
@staticmethod
async def execute(state: AgentState, **kwargs) -> Dict[str, Any]:
"""Execute the capability's core business logic."""
current_task = StateManager.get_current_task(state)
streamer = get_streamer("my_app", "data_processor")
try:
streamer.status("Processing your request...")
# Process the user query
user_query = current_task.lower() if current_task else ""
processed_data = {
"word_count": len(user_query.split()) if user_query else 0,
"contains_numbers": any(char.isdigit() for char in user_query),
}
# Create structured context
context = ProcessedDataContext(
query=current_task,
word_count=processed_data["word_count"],
contains_numbers=processed_data["contains_numbers"]
)
# Store context and return state updates
context_key = f"data_processing_{datetime.now().timestamp()}"
context_updates = StateManager.store_context(
state, "PROCESSED_DATA", context_key, context
)
streamer.status("Processing completed!")
return context_updates
except Exception as e:
logger.error(f"Processing error: {e}")
raise
Step 3: Register Your Capability#
from framework.registry import (
RegistryConfigProvider, RegistryConfig,
CapabilityRegistration, ContextClassRegistration
)
class MyAppRegistryProvider(RegistryConfigProvider):
def get_registry_config(self) -> RegistryConfig:
return RegistryConfig(
capabilities=[
CapabilityRegistration(
name="data_processor",
module_path="applications.my_app.capabilities.data_processor",
class_name="DataProcessorCapability",
description="Process and analyze user data requests",
provides=["PROCESSED_DATA"],
requires=[]
)
],
context_classes=[
ContextClassRegistration(
context_type="PROCESSED_DATA",
module_path="applications.my_app.context_classes",
class_name="ProcessedDataContext",
description="Structured results from user query analysis"
)
]
)
Step 4: Test Your Capability#
import asyncio
from framework.state import StateManager
from applications.my_app.capabilities.data_processor import DataProcessorCapability
async def test_capability():
# Create test state
state = StateManager.create_fresh_state("Hello world, this has 123 numbers!")
# Execute capability
capability = DataProcessorCapability()
result = await capability.execute(state)
print("Capability result:", result)
print("Context data keys:", list(result.get("capability_context_data", {}).keys()))
if __name__ == "__main__":
asyncio.run(test_capability())
Essential Patterns#
Error Handling#
Add custom error classification for domain-specific retry logic:
@staticmethod
def classify_error(exc: Exception, context: dict) -> ErrorClassification:
from framework.base.errors import ErrorClassification, ErrorSeverity
if isinstance(exc, (ConnectionError, TimeoutError)):
return ErrorClassification(
severity=ErrorSeverity.RETRIABLE,
user_message="Temporary service issue, retrying...",
metadata={"technical_details": str(exc)}
)
return ErrorClassification(
severity=ErrorSeverity.CRITICAL,
user_message=f"Processing error: {exc}",
metadata={"technical_details": str(exc)}
)
Streaming Updates#
Provide progress feedback during operations:
@staticmethod
async def execute(state: AgentState, **kwargs) -> Dict[str, Any]:
streamer = get_streamer("my_app", "my_capability")
streamer.status("Phase 1: Extracting data...")
raw_data = await extract_data(state)
streamer.status("Phase 2: Processing data...")
processed_data = await process_data(raw_data)
streamer.status("Phase 3: Storing results...")
context = create_context(processed_data)
return StateManager.store_context(state, "MY_DATA", "my_key", context)
Common Issues#
“Capability not found in registry”
Ensure exact name matching in registration:
# In capability class:
name = "data_processor"
# In registry:
CapabilityRegistration(name="data_processor", ...) # Must match exactly
“execute method not found”
Ensure execute is a static method:
@staticmethod # Required
async def execute(state: AgentState, **kwargs) -> Dict[str, Any]:
pass
“Context serialization error”
Use only JSON-compatible types:
# âś… Correct
timestamp: str # Use ISO string
data: Dict[str, Any]
# ❌ Wrong
timestamp: datetime # Not JSON serializable
Next Steps#
Essential: - Running and Testing - Learn debugging and deployment - State and Context Essentials - Master state management
Advanced: - Context Management System - Advanced context patterns - Error Handling - Comprehensive error handling
API Reference: - Base Components - BaseCapability documentation - State and Context Management - StateManager utilities