Source code for framework.data_management.providers
"""
Data Source Abstraction Layer
This module provides the base abstractions for integrating external data sources
into the ALS Expert Agent. Data sources can include user memory, knowledge graphs,
databases, APIs, and custom user-defined sources.
"""
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional, TYPE_CHECKING
from dataclasses import dataclass, field
import logging
if TYPE_CHECKING:
from framework.state import AgentState
from .request import DataSourceRequest
logger = logging.getLogger(__name__)
[docs]
@dataclass
class DataSourceContext:
"""
Container for data source retrieval results.
This standardized format allows different data sources to return results
in a consistent way while preserving source-specific metadata.
"""
source_name: str # Unique identifier for the data source
context_type: str # Type of context data (for validation)
data: Any # The actual retrieved data
metadata: Dict[str, Any] = field(default_factory=dict) # Additional source metadata
provider: Optional['DataSourceProvider'] = None # Reference to the provider that created this context
[docs]
def format_for_prompt(self) -> str:
"""
Format this context for inclusion in LLM prompts.
Delegates to the provider's format_for_prompt method if available,
otherwise falls back to default formatting.
"""
if self.provider:
return self.provider.format_for_prompt(self)
# Fallback formatting if no provider reference
if hasattr(self.data, 'format_for_prompt'):
return self.data.format_for_prompt()
elif hasattr(self.data, 'format_for_llm'):
return self.data.format_for_llm()
else:
return str(self.data)
[docs]
def get_summary(self) -> Dict[str, Any]:
"""Get a summary of this data source context for logging/debugging."""
return {
'source_name': self.source_name,
'context_type': self.context_type,
'data_type': type(self.data).__name__,
'metadata': self.metadata,
'has_data': self.data is not None
}
[docs]
class DataSourceProvider(ABC):
"""
Abstract base class for all data source providers.
Data source providers are responsible for:
1. Determining if they can provide data for the current context
2. Retrieving data from their specific source
3. Returning data in a standardized format
"""
@property
@abstractmethod
def name(self) -> str:
"""Unique identifier for this data source provider."""
pass
@property
@abstractmethod
def context_type(self) -> str:
"""
Context type this provider creates.
Should match a registered context type in the context registry
for type validation and LLM prompt formatting.
"""
pass
[docs]
@abstractmethod
async def retrieve_data(self, request: 'DataSourceRequest') -> Optional[DataSourceContext]:
"""
Retrieve data from this source given the current request.
Args:
request: Data source request containing user info, session context, and requester details
Returns:
DataSourceContext with retrieved data, or None if no data available
Raises:
Should handle all internal exceptions and return None rather than raising,
unless the exception represents a critical system failure.
"""
pass
[docs]
@abstractmethod
def should_respond(self, request: 'DataSourceRequest') -> bool:
"""
Determine if this data source should respond to the given request.
This should be a fast check (no I/O) that determines whether it makes
sense to call retrieve_data() for the given request.
Args:
request: Data source request with requester information
Returns:
True if this data source should provide data for this request
"""
pass
@property
def description(self) -> str:
"""Human-readable description of this data source."""
return f"Data source: {self.name}"
[docs]
def get_config_requirements(self) -> Dict[str, Any]:
"""
Get configuration requirements for this data source.
Returns a dictionary describing what configuration this data source needs.
This can be used for validation and documentation.
"""
return {}
[docs]
async def health_check(self) -> bool:
"""
Perform a health check for this data source.
This is an optional method that can be implemented by data sources
that need to verify connectivity or service availability.
Returns:
True if the data source is healthy and available
"""
return True
def __repr__(self) -> str:
return f"{self.__class__.__name__}(name='{self.name}')"