Core API Reference
The Core API provides the foundational layer of the Lobster AI system, including data management, client interfaces, and system orchestration. This module ha...
Overview
The Core API provides the foundational layer of the Lobster AI system, including data management, client interfaces, and system orchestration. This module handles multi-modal data storage, provenance tracking, and provides unified interfaces for both local and cloud execution.
DataManagerV2
The DataManagerV2 class is the central orchestrator for multi-omics data management with complete provenance tracking and modular backend support.
Class Definition
class DataManagerV2:
"""
Modular data manager for multi-omics analysis.
This class orchestrates modality adapters, storage backends,
and validation to provide a unified interface for managing
multi-modal biological data with complete provenance tracking.
"""Constructor
def __init__(
self,
default_backend: str = "h5ad",
workspace_path: Optional[Union[str, Path]] = None,
enable_provenance: bool = True,
console: Optional[Console] = None,
auto_scan: bool = True
) -> NoneParameters:
default_backend(str): Default storage backend to use. Options:"h5ad"(default),"mudata". Determines how AnnData objects are persisted to disk.workspace_path(Optional[Union[str, Path]]): Optional workspace directory for data storage. Resolution order: explicit path →LOBSTER_WORKSPACEenv var →cwd/.lobster_workspaceenable_provenance(bool): Whether to enable W3C-PROV compliant provenance tracking. Default:Trueconsole(Optional[Console]): Optional Rich Console instance for progress bars and formatted output. Default:Noneauto_scan(bool): Whether to automatically scan workspace directory for existing datasets on initialization. Default:True
Core Data Management Methods
Loading and Saving Data
def load_modality(
self,
name: str,
source: Union[str, Path, pd.DataFrame, anndata.AnnData],
adapter: str,
validate: bool = True,
**kwargs
) -> anndata.AnnDataLoad data for a specific modality using the specified adapter.
Parameters:
name(str): Name for the modalitysource(Union[str, Path, pd.DataFrame, anndata.AnnData]): Data sourceadapter(str): Name of adapter to usevalidate(bool): Whether to validate the loaded data**kwargs: Additional parameters passed to adapter
Returns:
anndata.AnnData: Loaded and validated data
Raises:
ValueError: If adapter is not registered or validation fails
def save_modality(
self,
name: str,
path: Union[str, Path],
backend: Optional[str] = None,
**kwargs
) -> strSave a modality using specified backend.
Parameters:
name(str): Name of modality to savepath(Union[str, Path]): Destination pathbackend(Optional[str]): Backend to use (default: default_backend)**kwargs: Additional parameters passed to backend
Returns:
str: Path where data was saved
Modality Management
def get_modality(self, name: str) -> anndata.AnnDataGet a specific modality.
Returns:
anndata.AnnData: The requested modality
Raises:
ValueError: If modality not found
def list_modalities(self) -> List[str]List all loaded modalities.
Returns:
List[str]: List of modality names
def remove_modality(self, name: str) -> NoneRemove a modality from memory.
Raises:
ValueError: If modality not found
Multi-Modal Data Integration
def to_mudata(self, modalities: Optional[List[str]] = None) -> AnyConvert modalities to MuData object.
Parameters:
modalities(Optional[List[str]]): List of modality names to include (default: all)
Returns:
mudata.MuData: MuData object containing specified modalities
Raises:
ImportError: If MuData is not availableValueError: If no modalities are loaded
Backend and Adapter Registration
def register_backend(self, name: str, backend: IDataBackend) -> NoneRegister a storage backend.
Parameters:
name(str): Name for the backendbackend(IDataBackend): Backend implementation
Raises:
ValueError: If backend name already exists
def register_adapter(self, name: str, adapter: IModalityAdapter) -> NoneRegister a modality adapter.
Parameters:
name(str): Name for the adapteradapter(IModalityAdapter): Adapter implementation
Raises:
ValueError: If adapter name already exists
Quality and Validation Methods
def get_quality_metrics(self, modality: Optional[str] = None) -> Dict[str, Any]Get quality metrics for modalities.
Parameters:
modality(Optional[str]): Specific modality name (default: all modalities)
Returns:
Dict[str, Any]: Quality metrics
def validate_modalities(self, strict: bool = False) -> Dict[str, ValidationResult]Validate all loaded modalities.
Parameters:
strict(bool): Whether to use strict validation
Returns:
Dict[str, ValidationResult]: Validation results for each modality
Workspace Management
def get_workspace_status(self) -> Dict[str, Any]Get comprehensive workspace status.
Returns:
Dict[str, Any]: Workspace status information including:workspace_path: Path to workspace directorymodalities_loaded: Number of loaded modalitiesmodality_names: List of modality namesregistered_backends: List of available backendsregistered_adapters: List of available adaptersmodality_details: Detailed information about each modality
def list_workspace_files(self) -> Dict[str, List[Dict[str, Any]]]List all files in the workspace organized by category.
Returns:
Dict[str, List[Dict[str, Any]]]: Files organized by category ("data", "exports", "cache")
Machine Learning Integration
def check_ml_readiness(self, modality: str = None) -> Dict[str, Any]Check if modalities are ready for machine learning workflows.
Parameters:
modality(str): Specific modality to check (default: all modalities)
Returns:
Dict[str, Any]: ML readiness assessment with scores and recommendations
def prepare_ml_features(
self,
modality: str,
feature_selection: str = "variance",
n_features: int = 2000,
normalization: str = "log1p",
scaling: str = "standard"
) -> Dict[str, Any]Prepare ML-ready feature matrices from biological data.
Parameters:
modality(str): Name of modality to processfeature_selection(str): Method for feature selection ('variance', 'correlation', 'chi2', 'mutual_info')n_features(int): Number of features to selectnormalization(str): Normalization method ('log1p', 'cpm', 'none')scaling(str): Scaling method ('standard', 'minmax', 'robust', 'none')
Returns:
Dict[str, Any]: Processed feature information and metadata
Plot Management
def add_plot(
self,
plot: go.Figure,
title: str = None,
source: str = None,
dataset_info: Dict[str, Any] = None,
analysis_params: Dict[str, Any] = None,
) -> strAdd a plot to the collection with comprehensive metadata.
Parameters:
plot(go.Figure): Plotly Figure objecttitle(str): Optional title for the plotsource(str): Optional source identifier (e.g., service name)dataset_info(Dict[str, Any]): Optional information about the dataset usedanalysis_params(Dict[str, Any]): Optional parameters used for the analysis
Returns:
str: The unique ID assigned to the plot
def get_latest_plots(self, n: int = None) -> List[Dict[str, Any]]Get the n most recent plots with their metadata.
Parameters:
n(int): Number of plots to return (None for all)
Returns:
List[Dict[str, Any]]: List of plot entries with metadata
Legacy Compatibility Methods
def log_tool_usage(
self,
tool_name: str,
parameters: Dict[str, Any],
description: str = None
) -> NoneLog tool usage for reproducibility tracking.
def has_data(self) -> boolCheck if any modalities are loaded.
Client Interfaces
BaseClient
Abstract base class defining the interface for all Lobster client implementations.
class BaseClient(ABC):
"""
Abstract base class defining the interface for all Lobster client implementations.
This ensures that both local (AgentClient) and cloud (CloudLobsterClient)
implementations provide the same interface to the CLI and other components.
"""Core Methods
@abstractmethod
def query(self, user_input: str, stream: bool = False) -> Dict[str, Any]Process a user query through the system.
Parameters:
user_input(str): The user's question or requeststream(bool): Whether to stream the response
Returns:
Dict[str, Any]: Dictionary containing:success: boolresponse: strerror: Optional[str]session_id: strhas_data: boolplots: List[Dict[str, Any]]duration: float (optional)last_agent: Optional[str] (optional)
@abstractmethod
def get_status(self) -> Dict[str, Any]Get the current status of the client/system.
Returns:
Dict[str, Any]: Status information
@abstractmethod
def list_workspace_files(self, pattern: str = "*") -> List[Dict[str, Any]]List files in the workspace.
Parameters:
pattern(str): Glob pattern for filtering files
Returns:
List[Dict[str, Any]]: List of file information dictionaries
AgentClient
Local client implementation using LangGraph multi-agent system.
class AgentClient(BaseClient):
def __init__(
self,
data_manager: Optional[DataManagerV2] = None,
session_id: str = None,
enable_reasoning: bool = True,
enable_langfuse: bool = False,
workspace_path: Optional[Path] = None,
custom_callbacks: Optional[List] = None,
manual_model_params: Optional[Dict[str, Any]] = None
)Parameters:
data_manager(Optional[DataManagerV2]): DataManagerV2 instance (creates new if None)session_id(str): Unique session identifierenable_reasoning(bool): Show agent reasoning/thinking processenable_langfuse(bool): Enable Langfuse debugging callbackworkspace_path(Optional[Path]): Path to workspace for file operationscustom_callbacks(Optional[List]): Additional callback handlersmanual_model_params(Optional[Dict[str, Any]]): Manual model parameter overrides
Implementation-Specific Methods
def reset(self) -> NoneReset the conversation state.
def export_session(self, export_path: Optional[Path] = None) -> PathExport the current session data.
Parameters:
export_path(Optional[Path]): Optional path for the export file
Returns:
Path: Path to the exported file
APIAgentClient
WebSocket streaming client for web services.
class APIAgentClient(BaseClient):
def __init__(
self,
websocket_url: str,
data_manager: Optional[DataManagerV2] = None,
session_id: str = None
)Parameters:
websocket_url(str): WebSocket server URLdata_manager(Optional[DataManagerV2]): Optional local data managersession_id(str): Session identifier
Provenance Tracking
ProvenanceTracker
W3C-PROV-like provenance tracking system.
class ProvenanceTracker:
"""
W3C-PROV-like provenance tracking system.
This class tracks data processing activities, entities, and agents
to provide a complete audit trail and enable reproducibility.
"""Constructor
def __init__(self, namespace: str = "lobster") -> NoneParameters:
namespace(str): Namespace for provenance identifiers
Core Methods
def create_activity(
self,
activity_type: str,
agent: str,
inputs: Optional[List[Dict[str, Any]]] = None,
outputs: Optional[List[Dict[str, Any]]] = None,
parameters: Optional[Dict[str, Any]] = None,
description: Optional[str] = None
) -> strCreate a provenance activity record.
Parameters:
activity_type(str): Type of activity performedagent(str): Agent responsible for the activityinputs(Optional[List[Dict[str, Any]]]): Input entitiesoutputs(Optional[List[Dict[str, Any]]]): Output entitiesparameters(Optional[Dict[str, Any]]): Activity parametersdescription(Optional[str]): Human-readable description
Returns:
str: Unique activity identifier
def create_entity(
self,
entity_type: str,
uri: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None
) -> strCreate a provenance entity record.
Parameters:
entity_type(str): Type of entityuri(Optional[str]): Entity URI or pathmetadata(Optional[Dict[str, Any]]): Entity metadata
Returns:
str: Unique entity identifier
def to_dict(self) -> Dict[str, Any]Export provenance information as dictionary.
Returns:
Dict[str, Any]: Complete provenance record
def add_to_anndata(self, adata: anndata.AnnData) -> anndata.AnnDataAdd provenance information to AnnData object.
Parameters:
adata(anndata.AnnData): AnnData object to annotate
Returns:
anndata.AnnData: Annotated AnnData object
Schema Validation
ValidationResult
Data class for validation results with flexible error handling.
@dataclass
class ValidationResult:
"""
Result of a validation operation.
This class encapsulates the results of validating biological data,
supporting both errors (critical issues) and warnings (non-critical
issues that don't prevent analysis).
"""
errors: List[str] = field(default_factory=list)
warnings: List[str] = field(default_factory=list)
info: List[str] = field(default_factory=list)
metadata: Dict[str, Any] = field(default_factory=dict)Properties
@property
def has_errors(self) -> boolCheck if validation found any errors.
@property
def has_warnings(self) -> boolCheck if validation found any warnings.
@property
def is_valid(self) -> boolCheck if validation passed (no errors).
Methods
def add_error(self, message: str) -> NoneAdd an error message.
def add_warning(self, message: str) -> NoneAdd a warning message.
def merge(self, other: "ValidationResult") -> "ValidationResult"Merge another validation result into this one.
def summary(self) -> strGenerate a human-readable summary.
def format_messages(self, include_info: bool = True) -> strFormat all messages for display.
Usage Examples
Basic DataManagerV2 Usage
from lobster.core.data_manager_v2 import DataManagerV2
import pandas as pd
# Initialize data manager
data_manager = DataManagerV2(
workspace_path="./my_workspace",
enable_provenance=True
)
# Load data using adapter
adata = data_manager.load_modality(
name="my_dataset",
source="/path/to/data.csv",
adapter="transcriptomics_single_cell"
)
# Get quality metrics
metrics = data_manager.get_quality_metrics("my_dataset")
# Save processed data
data_manager.save_modality("my_dataset", "processed_data.h5ad")Client Usage
from lobster.core.client import AgentClient
# Create local client
client = AgentClient()
# Query the system
result = client.query("Load GSE194247 and perform quality assessment")
print(result['response'])
if result['plots']:
print(f"Generated {len(result['plots'])} plots")Provenance Tracking
from lobster.core.provenance import ProvenanceTracker
# Initialize tracker
provenance = ProvenanceTracker()
# Record an activity
activity_id = provenance.create_activity(
activity_type="data_preprocessing",
agent="preprocessing_service",
parameters={"method": "log1p_normalization"},
description="Applied log1p normalization to expression data"
)
# Export provenance
prov_data = provenance.to_dict()Error Handling
The Core API uses a hierarchical exception structure:
ValueError: For invalid parameters or missing dataFileNotFoundError: For missing files or pathsImportError: For missing optional dependenciesPermissionError: For access-denied scenarios
All methods provide detailed error messages and maintain system state consistency even when operations fail.
Thread Safety
The DataManagerV2 class is not thread-safe. For multi-threaded applications, use appropriate locking mechanisms or create separate instances per thread.
Memory Management
DataManagerV2 holds all modalities in memory. For large datasets:
- Use the
remove_modality()method to free memory - Monitor memory usage with
get_workspace_status() - Consider using the
auto_save_state()method for persistence
Lobster AI - API Documentation for Frontend Integration
The Lobster AI API provides a comprehensive bioinformatics analysis system with multi-agent capabilities, real-time streaming, and session-based isolation. T...
Interfaces API Reference
The Interfaces API defines the abstract contracts and protocols that ensure consistent behavior across different implementations in the Lobster AI system. Th...