"""
ContextManager - Simplified LangGraph-Native Context Management
Ultra-simplified context manager using Pydantic for automatic serialization.
This eliminates 90% of the complexity from the previous implementation while
maintaining full LangGraph compatibility for checkpointing and serialization.
Key simplifications:
- Uses Pydantic's .model_dump() and .model_validate() for serialization
- No custom reflection-based serialization logic
- No complex type conversion or property detection
- No DotDict utilities needed
- Direct registry lookup without extensive validation
"""
import json
import logging
from typing import Dict, Any, Optional, List, TYPE_CHECKING
from datetime import datetime
from pathlib import Path
from configs.logger import get_logger
if TYPE_CHECKING:
from framework.context.base import CapabilityContext
from framework.state.state import AgentState
logger = get_logger("framework", "base")
[docs]
class ContextManager:
"""Simplified LangGraph-native context manager using Pydantic serialization.
This class provides sophisticated functionality over dictionary data while storing
everything in LangGraph-compatible dictionary format. It uses Pydantic's built-in
serialization capabilities to eliminate complex custom logic.
The data is stored as: {context_type: {context_key: {field: value}}}
"""
[docs]
def __init__(self, state: 'AgentState'):
"""Initialize ContextManager with agent state.
Args:
state: Full AgentState containing capability_context_data
Raises:
TypeError: If state is not an AgentState dictionary
ValueError: If state doesn't contain capability_context_data key
"""
if not isinstance(state, dict):
raise TypeError(f"ContextManager expects AgentState dictionary, got {type(state)}")
if 'capability_context_data' not in state:
raise ValueError("AgentState must contain 'capability_context_data' key")
self._data = state['capability_context_data']
self._object_cache: Dict[str, Dict[str, 'CapabilityContext']] = {}
[docs]
def __getattr__(self, context_type: str):
"""Enable dot notation access to context data with lazy namespace creation."""
if context_type.startswith('_'):
# For private attributes, use normal attribute access
raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{context_type}'")
if context_type in self._data:
# Create a namespace for this context type with lazy object reconstruction
namespace = ContextNamespace(self, context_type)
return namespace
# If not found in _data, raise AttributeError to maintain normal Python behavior
raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{context_type}'")
[docs]
def set_context(self, context_type: str, key: str, value: 'CapabilityContext', skip_validation: bool = False) -> None:
"""Store context using Pydantic's built-in serialization.
Args:
context_type: Type of context (e.g., "PV_ADDRESSES")
key: Unique key for this context instance
value: CapabilityContext object to store
skip_validation: Skip registry validation (useful for testing)
"""
# Validate using registry (unless skipped for testing)
if not skip_validation:
try:
# Import registry here to avoid circular imports
from framework.registry import get_registry
registry = get_registry()
# Check if registry is initialized before validation
if hasattr(registry, '_registries') and registry._registries:
# Validate context type is recognized
if not registry.is_valid_context_type(context_type):
raise ValueError(f"Unknown context type: {context_type}. Valid types: {registry.get_all_context_types()}")
# Validate value is correct type for context type
expected_type = registry.get_context_class(context_type)
if expected_type is not None and not isinstance(value, expected_type):
raise ValueError(f"Context type {context_type} expects {expected_type}, got {type(value)}")
else:
# Registry not initialized - just log a warning and continue
logger.warning(f"Registry not initialized, skipping validation for {context_type}")
except ImportError:
# If registry is not available yet, skip validation
logger.debug(f"Registry not available, skipping validation for {context_type}")
# Use Pydantic's built-in .model_dump() method for serialization
if context_type not in self._data:
self._data[context_type] = {}
self._data[context_type][key] = value.model_dump()
# Update cache
if context_type not in self._object_cache:
self._object_cache[context_type] = {}
self._object_cache[context_type][key] = value
logger.debug(f"Stored context: {context_type}.{key} = {type(value).__name__}")
[docs]
def get_context(self, context_type: str, key: str) -> Optional['CapabilityContext']:
"""Retrieve using Pydantic's .model_validate() for reconstruction.
Args:
context_type: Type of context to retrieve
key: Key of the context instance
Returns:
Reconstructed CapabilityContext object or None if not found
"""
# Check cache first
if (context_type in self._object_cache and
key in self._object_cache[context_type]):
cached_obj = self._object_cache[context_type][key]
logger.debug(f"Retrieved cached context: {context_type}.{key} = {type(cached_obj).__name__}")
return cached_obj
# Get raw dictionary data
raw_data = self._data.get(context_type, {}).get(key)
if raw_data is None:
return None
# Get context class from registry
context_class = self._get_context_class(context_type)
if context_class is None:
logger.warning(f"Unknown context type: {context_type}")
return None
# Use Pydantic's model_validate for reconstruction
try:
reconstructed_obj = context_class.model_validate(raw_data)
# Cache the reconstructed object
if context_type not in self._object_cache:
self._object_cache[context_type] = {}
self._object_cache[context_type][key] = reconstructed_obj
logger.debug(f"Retrieved and cached context: {context_type}.{key} = {type(reconstructed_obj).__name__}")
return reconstructed_obj
except Exception as e:
logger.error(f"Failed to reconstruct {context_type}: {e}")
return None
[docs]
def get_all_of_type(self, context_type: str) -> Dict[str, 'CapabilityContext']:
"""Get all contexts of a specific type as reconstructed objects.
Args:
context_type: Type of context to retrieve
Returns:
Dictionary of key -> CapabilityContext objects
"""
result = {}
context_keys = self._data.get(context_type, {}).keys()
for key in context_keys:
context_obj = self.get_context(context_type, key)
if context_obj:
result[key] = context_obj
return result
[docs]
def get_all(self) -> Dict[str, Any]:
"""Get all context data in flattened format for reporting/summary purposes.
Returns:
Dictionary with flattened keys in format "context_type.key" -> context object
"""
flattened = {}
for context_type in self._data.keys():
contexts_dict = self.get_all_of_type(context_type)
for key, context in contexts_dict.items():
flattened_key = f"{context_type}.{key}"
flattened[flattened_key] = context
return flattened
[docs]
def get_context_access_description(self, context_filter: Optional[List[Dict[str, str]]] = None) -> str:
"""Create detailed description of available context data for use in prompts.
Args:
context_filter: Optional list of context filter dictionaries
Returns:
Formatted string description of available context data
"""
if not self._data:
return "No context data available."
description_parts = []
description_parts.append("The agent context is available via the 'context' object with dot notation access:")
description_parts.append("")
# Determine which contexts to show based on context_filter
contexts_to_show = {}
if context_filter and isinstance(context_filter, list) and context_filter:
# Filter to only show contexts referenced in context_filter
for filter_dict in context_filter:
for context_type, context_key in filter_dict.items():
if context_type in self._data and context_key in self._data[context_type]:
if context_type not in contexts_to_show:
contexts_to_show[context_type] = {}
# Reconstruct the object for access details
context_obj = self.get_context(context_type, context_key)
if context_obj:
contexts_to_show[context_type][context_key] = context_obj
else:
# Show all contexts (reconstruct all objects)
for context_type in self._data.keys():
contexts_to_show[context_type] = self.get_all_of_type(context_type)
if not contexts_to_show:
return "No relevant context data available for the specified context filter."
for context_type, contexts_dict in contexts_to_show.items():
if isinstance(contexts_dict, dict):
description_parts.append(f"• context.{context_type}:")
for key, context_obj in contexts_dict.items():
# Use the get_access_details method with the actual key name
if hasattr(context_obj, 'get_access_details'):
try:
details = context_obj.get_access_details(key_name=key)
if isinstance(details, dict):
description_parts.append(f" └── {key}")
details_str = json.dumps(details, indent=6, default=str)
description_parts.append(f" └── Details: {details_str}")
else:
description_parts.append(f" └── {key}: {str(details)}")
except Exception as e:
description_parts.append(f" └── {key}: {type(context_obj).__name__} object (get_access_details error: {e})")
else:
description_parts.append(f" └── {key}: {type(context_obj).__name__} object (no get_access_details method available)")
description_parts.append("")
return "\n".join(description_parts)
[docs]
def get_human_summaries(self, step: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Get human summaries for specific step contexts or all contexts.
Args:
step: Optional step dict. If provided, extract contexts from step.inputs.
If None, get human summaries for all available contexts.
Returns:
Dict with flattened keys "context_type.key" -> human_summary_data
"""
# Step 1: Get contexts (filtered by step or all)
if step is not None:
# Get specific step contexts and convert to flattened format
try:
step_contexts = self.extract_from_step(step, {})
contexts_dict = {}
# Convert to flattened format like get_all() returns
for input_spec in step.get('inputs', []):
if isinstance(input_spec, dict):
for context_type, key_name in input_spec.items():
if context_type in step_contexts:
flattened_key = f"{context_type}.{key_name}"
contexts_dict[flattened_key] = step_contexts[context_type]
except Exception as e:
logger.error(f"Error extracting step contexts: {e}")
contexts_dict = self.get_all() # Fallback to all contexts
else:
# Get all contexts in flattened format
contexts_dict = self.get_all()
# Step 2: Convert contexts to human summaries (single consolidated logic)
return self._contexts_to_human_summaries(contexts_dict)
def _contexts_to_human_summaries(self, contexts_dict: Dict[str, 'CapabilityContext']) -> Dict[str, Any]:
"""Convert flattened contexts dict to human summaries dict.
Args:
contexts_dict: Dict with "context_type.key" -> context_object format
Returns:
Dict with "context_type.key" -> human_summary_data format
"""
summaries = {}
for flattened_key, context_object in contexts_dict.items():
# Extract key name from flattened key
if '.' in flattened_key:
context_type, key_name = flattened_key.split('.', 1)
else:
context_type, key_name = flattened_key, flattened_key
if hasattr(context_object, 'get_human_summary'):
summaries[flattened_key] = context_object.get_human_summary(key_name)
else:
# Fallback for context objects without human summary
summaries[flattened_key] = {
"type": context_type,
"raw_data": str(context_object)[:200] + "..." if len(str(context_object)) > 200 else str(context_object)
}
return summaries
[docs]
def get_raw_data(self) -> Dict[str, Dict[str, Dict[str, Any]]]:
"""Get the raw dictionary data for state updates.
Returns:
The raw dictionary data for LangGraph state updates
"""
return self._data
[docs]
def save_context_to_file(self, folder_path: Path, filename: str = "context.json") -> Path:
"""Save capability context data to a JSON file in the specified folder.
This method always saves the current context data to ensure it reflects
the latest state. It uses the same serialization format as the state system.
Args:
folder_path: Path to the folder where the context file should be saved
filename: Name of the context file (default: "context.json")
Returns:
Path to the saved context file
Raises:
OSError: If file cannot be written
TypeError: If context data cannot be serialized
ValueError: If filename is empty or contains invalid characters
"""
if not isinstance(folder_path, Path):
folder_path = Path(folder_path)
if not filename or not filename.strip():
raise ValueError("Filename cannot be empty")
# Ensure filename has .json extension if not provided
if not filename.endswith('.json'):
filename = f"{filename}.json"
# Ensure folder exists
folder_path.mkdir(parents=True, exist_ok=True)
context_file = folder_path / filename
try:
# Save using standard JSON (data is already JSON-serializable via Pydantic)
with open(context_file, 'w', encoding='utf-8') as f:
json.dump(self._data, f, indent=2, ensure_ascii=False, default=str)
logger.info(f"Saved context data to: {context_file}")
return context_file
except Exception as e:
logger.error(f"Failed to save context to {context_file}: {e}")
raise
def _get_context_class(self, context_type: str) -> Optional[type]:
"""Get context class from registry or direct mapping.
Args:
context_type: The context type string
Returns:
Context class or None if not found
"""
try:
# Import registry here to avoid circular imports
from framework.registry import get_registry
registry = get_registry()
return registry.get_context_class(context_type)
except Exception as e:
logger.error(f"Failed to get context class for {context_type}: {e}")
raise ValueError(f"Registry not available, cannot get context class for {context_type}")
[docs]
class ContextNamespace:
"""Namespace object that provides dot notation access to context objects."""
[docs]
def __init__(self, context_manager: ContextManager, context_type: str):
self._context_manager = context_manager
self._context_type = context_type
[docs]
def __getattr__(self, key: str):
"""Get context object by key with lazy reconstruction."""
context_obj = self._context_manager.get_context(self._context_type, key)
if context_obj is not None:
return context_obj
raise AttributeError(f"Context '{self._context_type}' has no key '{key}'")
[docs]
def __setattr__(self, key: str, value):
"""Set context object by key."""
if key.startswith('_'):
# Set private attributes normally
super().__setattr__(key, value)
else:
# This would require the value to be a CapabilityContext object
# For now, raise an error as direct assignment should go through set_context
raise AttributeError(f"Cannot directly assign to context key '{key}'. Use context_manager.set_context() instead.")