Source code for framework.data_management.manager

"""
Data Source Manager

Unified data source management system that replaces both the registry and integration service
with a cleaner approach supporting core and application-specific data sources.
"""

import logging
import asyncio
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, field
import time

from .providers import DataSourceContext, DataSourceProvider
from .request import DataSourceRequest

logger = logging.getLogger(__name__)

[docs] @dataclass class DataRetrievalResult: """Result of data retrieval from multiple sources.""" context_data: Dict[str, DataSourceContext] = field(default_factory=dict) successful_sources: List[str] = field(default_factory=list) failed_sources: List[str] = field(default_factory=list) total_sources_attempted: int = 0 retrieval_time_ms: Optional[float] = None @property def has_data(self) -> bool: """Check if any data was successfully retrieved.""" return bool(self.context_data) @property def success_rate(self) -> float: """Calculate the success rate of data retrieval.""" if self.total_sources_attempted == 0: return 0.0 return len(self.successful_sources) / self.total_sources_attempted
[docs] def get_summary(self) -> Dict[str, Any]: """Get a summary of the retrieval results.""" return { 'sources_attempted': self.total_sources_attempted, 'sources_successful': len(self.successful_sources), 'sources_failed': len(self.failed_sources), 'success_rate': self.success_rate, 'context_types_retrieved': list(set(ctx.context_type for ctx in self.context_data.values())), 'retrieval_time_ms': self.retrieval_time_ms }
[docs] class DataSourceManager: """ Unified data source management system. Replaces both DataSourceRegistry and DataSourceIntegrationService with a cleaner architecture that supports core and application-specific data sources. """
[docs] def __init__(self): self._providers: Dict[str, DataSourceProvider] = {} self._initialized = False
[docs] def register_provider(self, provider: DataSourceProvider) -> None: """ Register a data source provider. Providers are queried in registration order (framework providers first, then application providers). """ self._providers[provider.name] = provider logger.info(f"Registered data source: {provider.name}")
[docs] def get_responding_providers(self, request: DataSourceRequest) -> List[DataSourceProvider]: """ Get all providers that should respond to the current request in registration order. Args: request: Data source request with requester information Returns: List of providers that should respond in registration order (framework first, then applications) """ return [p for p in self._providers.values() if p.should_respond(request)]
[docs] async def retrieve_all_context(self, request: DataSourceRequest, timeout_seconds: float = 30.0) -> DataRetrievalResult: """ Retrieve context from all responding data sources. Args: request: Data source request with requester information timeout_seconds: Maximum time to wait for all data sources Returns: DataRetrievalResult containing all successfully retrieved data """ start_time = time.time() # Get responding providers in registration order providers = self.get_responding_providers(request) if not providers: logger.info("No data sources available for current context") return DataRetrievalResult(total_sources_attempted=0) logger.info(f"Retrieving context from {len(providers)} data sources") # Create retrieval tasks for all providers tasks = [] for provider in providers: task = asyncio.create_task( self._retrieve_from_provider(provider, request), name=f"retrieve_{provider.name}" ) tasks.append((provider.name, task)) # Wait for all tasks with timeout try: results = await asyncio.wait_for( asyncio.gather(*[task for _, task in tasks], return_exceptions=True), timeout=timeout_seconds ) except asyncio.TimeoutError: logger.warning(f"Data source retrieval timed out after {timeout_seconds}s") # Cancel remaining tasks for _, task in tasks: if not task.done(): task.cancel() results = [None] * len(tasks) # Treat all as failed # Process results context_data = {} successful_sources = [] failed_sources = [] for (provider_name, _), result in zip(tasks, results): if isinstance(result, Exception): logger.warning(f"Data retrieval failed for {provider_name}: {result}") failed_sources.append(provider_name) elif result is not None: context_data[provider_name] = result successful_sources.append(provider_name) logger.debug(f"Successfully retrieved data from {provider_name}") else: failed_sources.append(provider_name) retrieval_time_ms = (time.time() - start_time) * 1000 retrieval_result = DataRetrievalResult( context_data=context_data, successful_sources=successful_sources, failed_sources=failed_sources, total_sources_attempted=len(providers), retrieval_time_ms=retrieval_time_ms ) logger.info(f"Data retrieval complete: {retrieval_result.get_summary()}") return retrieval_result
[docs] def get_provider(self, provider_name: str) -> Optional[DataSourceProvider]: """ Get a specific data source provider by name. Args: provider_name: Name of the data source provider to retrieve Returns: DataSourceProvider if found, None otherwise """ return self._providers.get(provider_name)
[docs] async def retrieve_from_provider(self, provider_name: str, request: DataSourceRequest) -> Optional[DataSourceContext]: """ Retrieve data from a specific provider by name. Args: provider_name: Name of the data source provider request: Data source request Returns: DataSourceContext if successful, None if provider not found or retrieval failed """ provider = self.get_provider(provider_name) if not provider: logger.warning(f"Data source provider '{provider_name}' not found") return None if not provider.should_respond(request): logger.debug(f"Provider '{provider_name}' chose not to respond to request") return None return await self._retrieve_from_provider(provider, request)
async def _retrieve_from_provider(self, provider: DataSourceProvider, request: DataSourceRequest) -> Optional[DataSourceContext]: """ Retrieve data from a single provider with error handling. Args: provider: The data source provider to retrieve from request: Data source request Returns: DataSourceContext if successful, None if failed """ try: logger.debug(f"Retrieving data from {provider.name}") return await provider.retrieve_data(request) except Exception as e: logger.warning(f"Failed to retrieve data from {provider.name}: {e}") return None
# Global manager instance _data_source_manager: Optional[DataSourceManager] = None
[docs] def get_data_source_manager() -> DataSourceManager: """ Get the global data source manager instance. Loads all data sources from the registry system. Providers are queried in registration order (framework first, then applications). """ global _data_source_manager if _data_source_manager is None: _data_source_manager = DataSourceManager() # Load all data sources from registry try: from framework.registry import get_registry registry = get_registry() # Get all data sources from registry registry_data_sources = registry.get_all_data_sources() for provider in registry_data_sources: _data_source_manager.register_provider(provider) logger.info(f"Loaded {len(registry_data_sources)} data sources from registry") except Exception as e: logger.warning(f"Failed to load data sources from registry: {e}") return _data_source_manager