Data Management#

Data orchestration framework for integrating heterogeneous data sources into agent workflows with provider discovery, concurrent retrieval, and LLM-optimized formatting.

Note

For implementation guides and examples, see Data Integration.

Core Components#

DataSourceManager()

Unified data source management system.

DataRetrievalResult([context_data, ...])

Result of data retrieval from multiple sources.

DataSourceProvider()

Abstract base class for all data source providers.

DataSourceContext(source_name, context_type, ...)

Container for data source retrieval results.

DataSourceRequest(user_id, requester[, ...])

Generic data source request with query and metadata support.

DataSourceRequester(component_type, ...)

Information about the component requesting data from a data source.

get_data_source_manager()

Get the global data source manager instance.

create_data_source_request(state, requester)

Create a data source request from AgentState and requester information.

Management Classes#

class framework.data_management.DataSourceManager[source]#

Bases: object

Unified data source management system.

Replaces both DataSourceRegistry and DataSourceIntegrationService with a cleaner architecture that supports core and application-specific data sources.

__init__()[source]#
register_provider(provider)[source]#

Register a data source provider.

Providers are queried in registration order (framework providers first, then application providers).

get_responding_providers(request)[source]#

Get all providers that should respond to the current request in registration order.

Parameters:

request (DataSourceRequest) – Data source request with requester information

Returns:

List of providers that should respond in registration order (framework first, then applications)

Return type:

List[DataSourceProvider]

async retrieve_all_context(request, timeout_seconds=30.0)[source]#

Retrieve context from all responding data sources.

Parameters:
  • request (DataSourceRequest) – Data source request with requester information

  • timeout_seconds (float) – Maximum time to wait for all data sources

Returns:

DataRetrievalResult containing all successfully retrieved data

Return type:

DataRetrievalResult

get_provider(provider_name)[source]#

Get a specific data source provider by name.

Parameters:

provider_name (str) – Name of the data source provider to retrieve

Returns:

DataSourceProvider if found, None otherwise

Return type:

DataSourceProvider | None

async retrieve_from_provider(provider_name, request)[source]#

Retrieve data from a specific provider by name.

Parameters:
  • provider_name (str) – Name of the data source provider

  • request (DataSourceRequest) – Data source request

Returns:

DataSourceContext if successful, None if provider not found or retrieval failed

Return type:

DataSourceContext | None

class framework.data_management.DataRetrievalResult(context_data=<factory>, successful_sources=<factory>, failed_sources=<factory>, total_sources_attempted=0, retrieval_time_ms=None)[source]#

Bases: object

Result of data retrieval from multiple sources.

context_data: Dict[str, DataSourceContext]#
successful_sources: List[str]#
failed_sources: List[str]#
total_sources_attempted: int = 0#
retrieval_time_ms: float | None = None#
property has_data: bool#

Check if any data was successfully retrieved.

property success_rate: float#

Calculate the success rate of data retrieval.

get_summary()[source]#

Get a summary of the retrieval results.

Return type:

Dict[str, Any]

__init__(context_data=<factory>, successful_sources=<factory>, failed_sources=<factory>, total_sources_attempted=0, retrieval_time_ms=None)#

Provider Interfaces#

class framework.data_management.DataSourceProvider[source]#

Bases: ABC

Abstract base class for all data source providers.

Data source providers are responsible for: 1. Determining if they can provide data for the current context 2. Retrieving data from their specific source 3. Returning data in a standardized format

abstract property name: str#

Unique identifier for this data source provider.

abstract property context_type: str#

Context type this provider creates.

Should match a registered context type in the context registry for type validation and LLM prompt formatting.

abstractmethod async retrieve_data(request)[source]#

Retrieve data from this source given the current request.

Parameters:

request (DataSourceRequest) – Data source request containing user info, session context, and requester details

Returns:

DataSourceContext with retrieved data, or None if no data available

Raises:
  • Should handle all internal exceptions and return None rather than raising,

  • unless the exception represents a critical system failure.

Return type:

DataSourceContext | None

abstractmethod should_respond(request)[source]#

Determine if this data source should respond to the given request.

This should be a fast check (no I/O) that determines whether it makes sense to call retrieve_data() for the given request.

Parameters:

request (DataSourceRequest) – Data source request with requester information

Returns:

True if this data source should provide data for this request

Return type:

bool

property description: str#

Human-readable description of this data source.

get_config_requirements()[source]#

Get configuration requirements for this data source.

Returns a dictionary describing what configuration this data source needs. This can be used for validation and documentation.

Return type:

Dict[str, Any]

async health_check()[source]#

Perform a health check for this data source.

This is an optional method that can be implemented by data sources that need to verify connectivity or service availability.

Returns:

True if the data source is healthy and available

Return type:

bool

format_for_prompt(context)[source]#

Format this data source’s context for inclusion in LLM prompts.

Each data source provider can override this to control exactly how their data appears in LLM prompts, including section headers and formatting.

Parameters:

context (DataSourceContext) – The DataSourceContext returned by retrieve_data()

Returns:

Formatted string ready for inclusion in LLM prompts

Return type:

str

class framework.data_management.DataSourceContext(source_name, context_type, data, metadata=<factory>, provider=None)[source]#

Bases: object

Container for data source retrieval results.

This standardized format allows different data sources to return results in a consistent way while preserving source-specific metadata.

source_name: str#
context_type: str#
data: Any#
metadata: Dict[str, Any]#
provider: DataSourceProvider | None = None#
format_for_prompt()[source]#

Format this context for inclusion in LLM prompts.

Delegates to the provider’s format_for_prompt method if available, otherwise falls back to default formatting.

Return type:

str

get_summary()[source]#

Get a summary of this data source context for logging/debugging.

Return type:

Dict[str, Any]

__init__(source_name, context_type, data, metadata=<factory>, provider=None)#

Request Models#

class framework.data_management.DataSourceRequest(user_id, requester, query=None, metadata=<factory>)[source]#

Bases: object

Generic data source request with query and metadata support.

Provides flexible interface for data source providers to receive specific queries and contextual metadata for intelligent retrieval.

user_id: str | None#
requester: DataSourceRequester#
query: str | None = None#
metadata: Dict[str, Any]#
__init__(user_id, requester, query=None, metadata=<factory>)#
class framework.data_management.DataSourceRequester(component_type, component_name)[source]#

Bases: object

Information about the component requesting data from a data source.

Enables data sources to make decisions about whether to respond based on the requesting component and execution context.

component_type: str#
component_name: str#
__init__(component_type, component_name)#

Utility Functions#

framework.data_management.get_data_source_manager()[source]#

Get the global data source manager instance.

Loads all data sources from the registry system. Providers are queried in registration order (framework first, then applications).

Return type:

DataSourceManager

framework.data_management.create_data_source_request(state, requester, query=None, metadata=None)[source]#

Create a data source request from AgentState and requester information.

Parameters:
  • state (AgentState) – AgentState instance (TypedDict)

  • requester (DataSourceRequester) – Information about the requesting component

  • query (str | None) – Optional specific query for the data source

  • metadata (Dict[str, Any] | None) – Optional metadata for provider-specific context

Returns:

DataSourceRequest with user context and query information

Return type:

DataSourceRequest

See also

Data Integration

Complete implementation guide and examples

framework.services.memory_storage.UserMemoryProvider

Example core data source provider implementation