Source code for framework.services.memory_storage.storage_manager
"""
Memory Storage Manager
Simple file-based memory manager for user memory storage.
Extracted from services.ALS_expert.utils.memory_manager
"""
import os
import json
import logging
from typing import Optional, List
from pathlib import Path
from datetime import datetime
from configs.config import get_framework_service_config, get_session_info, get_agent_dir
from framework.state import UserMemories
try:
from langgraph.config import get_config
except ImportError:
get_config = None
from .models import MemoryContent
logger = logging.getLogger(__name__)
[docs]
class MemoryStorageManager:
"""Simple file-based memory manager for user memory storage.
Provides persistent storage of user memory entries in JSON format with
thread-safe file operations and proper error handling. Each user's memory
is stored in a separate JSON file identified by sanitized user ID.
"""
[docs]
def __init__(self, memory_directory: str):
"""Initialize memory manager with storage directory.
Creates the memory directory if it doesn't exist and sets up
logging for memory operations.
:param memory_directory: Directory path for storing memory files
:type memory_directory: str
:raises OSError: If directory cannot be created or accessed
"""
self.memory_dir = Path(memory_directory).resolve()
self.memory_dir.mkdir(exist_ok=True, parents=True)
logger.info(f"Memory manager initialized with directory: {self.memory_dir}")
def _get_memory_file_path(self, user_id: str) -> Path:
"""Get path to user's memory file.
:param user_id: User identifier for memory file
:type user_id: str
:return: Path to the user's memory JSON file
:rtype: Path
:raises ValueError: If user_id is empty or invalid
"""
if not user_id or not user_id.strip():
raise ValueError("User ID cannot be empty")
# Sanitize user_id for filename
safe_user_id = "".join(c for c in user_id if c.isalnum() or c in "-_")
return self.memory_dir / f"{safe_user_id}.json"
def _load_memory_data(self, user_id: str) -> List[dict]:
"""Load memory entries from user's JSON file.
:param user_id: User identifier
:type user_id: str
:return: List of memory entry dictionaries
:rtype: List[dict]
.. note::
Returns empty list if file doesn't exist or on read errors.
"""
try:
memory_file = self._get_memory_file_path(user_id)
if memory_file.exists():
with open(memory_file, 'r', encoding='utf-8') as f:
data = json.load(f)
return data.get("entries", [])
return []
except Exception as e:
logger.error(f"Error loading memory for user {user_id}: {e}")
return []
def _save_memory_data(self, user_id: str, entries: List[dict]) -> bool:
"""Save memory entries to user's JSON file.
:param user_id: User identifier
:type user_id: str
:param entries: List of memory entry dictionaries to save
:type entries: List[dict]
:return: True if save was successful, False otherwise
:rtype: bool
.. note::
File is written atomically with proper encoding and formatting.
"""
try:
memory_file = self._get_memory_file_path(user_id)
data = {
"user_id": user_id,
"last_updated": datetime.now().strftime("%Y-%m-%d %H:%M"),
"entries": entries
}
with open(memory_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
return True
except Exception as e:
logger.error(f"Error saving memory for user {user_id}: {e}")
return False
[docs]
def get_user_memory(self, user_id: str) -> str:
"""Get user's memory content as formatted string.
:param user_id: User identifier
:type user_id: str
:return: Formatted memory content with timestamps, empty string if no memory
:rtype: str
Examples:
Retrieving formatted memory::
>>> manager = MemoryManager("/path/to/memory")
>>> memory_text = manager.get_user_memory("user123")
>>> print(memory_text)
[2025-01-15 14:30] User prefers morning meetings
[2025-01-15 15:45] Working on project Alpha
"""
try:
entries = self._load_memory_data(user_id)
if not entries:
return ""
# Format as readable text
formatted_entries = []
for entry in entries:
timestamp = entry.get("timestamp", "")
content = entry.get("content", "")
if content:
formatted_entries.append(f"[{timestamp}] {content}")
return "\n".join(formatted_entries)
except Exception as e:
logger.error(f"Error retrieving memory for user {user_id}: {e}")
return ""
[docs]
def get_all_memory_entries(self, user_id: str) -> List[MemoryContent]:
"""Get all user memory entries as list of MemoryContent objects.
:param user_id: User identifier
:type user_id: str
:return: List of structured memory content objects
:rtype: List[MemoryContent]
.. note::
Handles timestamp parsing errors by using current time as fallback.
Only returns entries with non-empty content.
"""
try:
entries = self._load_memory_data(user_id)
memory_contents = []
for entry in entries:
timestamp_str = entry.get("timestamp", "")
content = entry.get("content", "")
if content.strip(): # Only include non-empty content
try:
# Parse timestamp string back to datetime
timestamp = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M")
memory_content = MemoryContent(
timestamp=timestamp,
content=content.strip()
)
memory_contents.append(memory_content)
except ValueError as e:
logger.warning(f"Failed to parse timestamp '{timestamp_str}' for user {user_id}: {e}")
# Use current time as fallback
memory_content = MemoryContent(
timestamp=datetime.now(),
content=content.strip()
)
memory_contents.append(memory_content)
logger.info(f"Retrieved {len(memory_contents)} memory entries for user {user_id}")
return memory_contents
except Exception as e:
logger.error(f"Error retrieving memory entries for user {user_id}: {e}")
return []
[docs]
def add_memory_entry(self, user_id: str, memory_content: MemoryContent) -> bool:
"""Add new memory entry for user.
:param user_id: User identifier
:type user_id: str
:param memory_content: Memory content to add
:type memory_content: MemoryContent
:return: True if entry was added successfully, False otherwise
:rtype: bool
Examples:
Adding a memory entry::
>>> from datetime import datetime
>>> manager = MemoryManager("/path/to/memory")
>>> entry = MemoryContent(
... timestamp=datetime.now(),
... content="User completed training module"
... )
>>> success = manager.add_memory_entry("user123", entry)
>>> print(f"Entry added: {success}")
"""
try:
entries = self._load_memory_data(user_id)
# Add new entry
new_entry = {
"timestamp": memory_content.timestamp.strftime("%Y-%m-%d %H:%M"),
"content": memory_content.content.strip()
}
entries.append(new_entry)
success = self._save_memory_data(user_id, entries)
if success:
logger.info(f"Added memory entry for user {user_id}: {memory_content.content[:50]}...")
return success
except Exception as e:
logger.error(f"Error adding memory entry for user {user_id}: {e}")
return False
[docs]
def get_memories_from_state(self, state):
"""Get user memory from agent state as UserMemories object.
:param state: Agent state containing user session context
:type state: AgentState
:return: UserMemories object with content list
:rtype: UserMemories
.. note::
Returns empty UserMemories if user_id is not available in config.
"""
# Extract user_id from LangGraph config (not from state)
try:
if get_config:
config = get_config()
configurable = config.get("configurable", {})
session_info = get_session_info()
user_id = session_info.get("user_id")
except Exception:
user_id = None
if not user_id:
logger.warning("No user_id found in LangGraph config, skipping memory retrieval")
return UserMemories(entries=[])
try:
entries = self._load_memory_data(user_id)
# Extract just the content for UserMemories
content_list = [entry.get("content", "") for entry in entries if entry.get("content")]
return UserMemories(entries=content_list)
except Exception as e:
logger.warning(f"Failed to retrieve user memory for {user_id}: {e}")
return UserMemories(entries=[])
# Global memory storage manager instance
_memory_storage_manager: Optional[MemoryStorageManager] = None
[docs]
def get_memory_storage_manager() -> MemoryStorageManager:
"""Get the global memory storage manager instance.
Creates and caches a global MemoryStorageManager instance using the
configured memory directory from global configuration.
:return: Global memory storage manager instance
:rtype: MemoryStorageManager
.. note::
Uses lazy initialization and global caching for efficiency.
"""
global _memory_storage_manager
if _memory_storage_manager is None:
# Memory storage config now accessed via config
# Use get_agent_dir to properly construct the path
memory_dir = get_agent_dir('user_memory_dir')
_memory_storage_manager = MemoryStorageManager(memory_dir)
return _memory_storage_manager