Data Manager
Modality storage and workspace management with DataManagerV2
Data Manager
The DataManagerV2 is the central data orchestration layer in Lobster AI. It manages modalities (datasets), storage backends, provenance tracking, and workspace organization for multi-omics analysis.
Overview
DataManagerV2 provides:
- Modality management - Load, store, and track datasets as AnnData objects
- Backend abstraction - Pluggable storage backends (H5AD, MuData, S3)
- Adapter system - Format-specific data loading (CSV, 10X, MTX)
- Provenance integration - Automatic tracking of all operations
- Workspace organization - Structured directories for data, exports, and caches
Basic Usage
Initialization
from pathlib import Path
from rich.console import Console
from lobster.core.data_manager_v2 import DataManagerV2
# Create data manager with workspace
dm = DataManagerV2(
default_backend="h5ad", # Storage backend: "h5ad" (default) or "mudata"
workspace_path=Path("./my_analysis"), # Optional: workspace directory
enable_provenance=True, # Enable W3C-PROV tracking
console=None, # Optional: Rich Console for progress
auto_scan=True # Auto-load existing datasets
)
# With custom console
console = Console()
dm = DataManagerV2(
default_backend="h5ad",
workspace_path=Path("./my_analysis"),
console=console
)Resolution order for workspace path:
- Explicit
workspace_pathparameter LOBSTER_WORKSPACEenvironment variable- Current directory
.lobster_workspace
Loading Data
# Load from H5AD file
adata = dm.load_modality(
name="my_dataset",
source="/path/to/data.h5ad",
adapter="transcriptomics_single_cell"
)
# Load from CSV
adata = dm.load_modality(
name="counts_matrix",
source="/path/to/counts.csv",
adapter="transcriptomics_bulk"
)
# Load from GEO (via download queue)
# Typically handled by research_agent + data_expert workflowAccessing Modalities
# Get a specific modality
adata = dm.get_modality("my_dataset")
# List all loaded modalities
modalities = dm.list_modalities()
print(f"Loaded: {modalities}")
# Check shape
print(f"Cells: {adata.n_obs}, Genes: {adata.n_vars}")Storing Processed Data
# Store with lineage tracking (recommended)
dm.store_modality(
name="my_dataset_filtered",
adata=filtered_adata,
parent_name="my_dataset",
step_summary="Filtered 15% low-quality cells"
)
# Manual storage (less tracking)
dm.modalities["my_dataset_processed"] = processed_adataSaving to Disk
# Save to H5AD (default backend)
path = dm.save_modality("my_dataset", "my_dataset.h5ad")
# Save with specific backend
path = dm.save_modality(
"my_dataset",
"my_dataset.h5mu",
backend="mudata"
)Modality Operations
Loading from Different Sources
# From file path
dm.load_modality("dataset", "/path/to/file.h5ad", adapter="transcriptomics_single_cell")
# From pandas DataFrame
import pandas as pd
df = pd.read_csv("counts.csv", index_col=0)
dm.load_modality("from_csv", df, adapter="transcriptomics_bulk")
# From existing AnnData
import anndata
adata = anndata.read_h5ad("existing.h5ad")
dm.load_modality("imported", adata, adapter="transcriptomics_single_cell")Modality Naming Convention
Lobster uses a structured naming convention:
geo_gse12345 # Raw loaded data
├─ geo_gse12345_quality_assessed # After QC metrics
├─ geo_gse12345_filtered # After cell/gene filtering
├─ geo_gse12345_normalized # After normalization
├─ geo_gse12345_clustered # After clustering
└─ geo_gse12345_annotated # After cell type annotationListing with Lineage
# Get modalities with full lineage information
modalities = dm.list_modalities_with_lineage()
for mod in modalities:
print(f"{mod['name']}: v{mod['version']}")
print(f" Parent: {mod.get('parent', 'None')}")
print(f" Step: {mod.get('processing_step', 'raw')}")Backend System
DataManagerV2 uses pluggable backends for data storage.
Default Backends
| Backend | Format | Use Case |
|---|---|---|
h5ad | H5AD | Single-modality datasets |
mudata | H5MU | Multi-modal integration |
Registering Custom Backends
from lobster.core.interfaces.backend import IDataBackend
class MyS3Backend(IDataBackend):
def save(self, adata, path, **kwargs): ...
def load(self, path, **kwargs): ...
dm.register_backend("s3", MyS3Backend(bucket="my-bucket"))Using Specific Backend
# Save with MuData backend
dm.save_modality("integrated", "integrated.h5mu", backend="mudata")Adapter System
Adapters handle format-specific data loading and validation.
Default Adapters
| Adapter | Data Types |
|---|---|
transcriptomics_single_cell | 10X, h5ad, mtx |
transcriptomics_bulk | CSV, TSV count matrices |
proteomics_ms | Mass spectrometry |
proteomics_affinity | Olink, SomaScan |
genomics_wgs | VCF files |
genomics_snp_array | PLINK format |
Registering Custom Adapters
from lobster.core.interfaces.adapter import IModalityAdapter
class MyAdapter(IModalityAdapter):
def from_source(self, source, **kwargs): ...
def validate(self, adata, strict=False): ...
def standardize(self, adata): ...
dm.register_adapter("my_format", MyAdapter())Integration with Agents
Agents receive the data manager via factory injection:
def create_transcriptomics_expert(
data_manager: DataManagerV2,
callback_handler,
agent_name: str = "transcriptomics_expert",
delegation_tools: list = None,
workspace_path = None
):
"""Factory creates agent with data manager access."""
# Agent tools can access dm.modalities, dm.get_modality(), etc.Tool Pattern
@tool
def filter_cells(modality_name: str, min_genes: int = 200) -> str:
"""Filter low-quality cells from dataset."""
adata = data_manager.get_modality(modality_name)
# Call service
result, stats, ir = filtering_service.filter(adata, min_genes=min_genes)
# Store with lineage
output_name = f"{modality_name}_filtered"
data_manager.store_modality(
name=output_name,
adata=result,
parent_name=modality_name,
step_summary=f"Filtered cells with <{min_genes} genes"
)
# Log to provenance
data_manager.log_tool_usage(
tool_name="filter_cells",
parameters={"modality": modality_name, "min_genes": min_genes},
ir=ir
)
return f"Filtered: {stats['cells_removed']} cells removed"Workspace Structure
DataManagerV2 creates a structured workspace:
.lobster_workspace/
├── data/ # Saved H5AD/H5MU files
│ ├── geo_gse12345.h5ad
│ └── geo_gse12345_filtered.h5ad
├── exports/ # Exported results
│ └── markers.csv
├── cache/ # Temporary cache
├── literature_cache/ # Publication data
│ ├── publications/
│ └── parsed_docs/
├── .lobster/
│ └── queues/ # Download/publication queues
│ ├── download_queue.jsonl
│ └── publication_queue.jsonl
├── plots/ # Saved visualizations
├── notebooks/ # Exported pipelines
└── .session.json # Session metadataSession Persistence
DataManagerV2 supports session continuity:
# Session 1
dm = DataManagerV2(workspace_path="./my_analysis")
dm.load_modality("dataset", "data.h5ad", adapter="transcriptomics_single_cell")
dm.save_modality("dataset", "dataset.h5ad")
# Exit session
# Session 2 - dataset auto-loaded
dm = DataManagerV2(workspace_path="./my_analysis", auto_scan=True)
adata = dm.get_modality("dataset") # Available immediatelyProvenance Integration
All operations are tracked via ProvenanceTracker:
# Log tool usage with IR for reproducibility
dm.log_tool_usage(
tool_name="normalize",
parameters={"method": "log1p", "target_sum": 10000},
description="Normalize counts per cell",
ir=analysis_step # AnalysisStep for notebook export
)
# Access provenance data
activities = dm.provenance.get_all_activities()See Provenance Tracking for details.
Notebook Export
Export analysis as reproducible Jupyter notebook:
# Export current session
path = dm.export_notebook(
name="qc_workflow",
description="Quality control for 10X data",
filter_strategy="successful" # Only successful operations
)
# Run on new data
result = dm.run_notebook(
notebook_path="qc_workflow.ipynb",
input_modality="new_dataset",
parameters={"min_genes": 300}
)API Reference
Key Methods
| Method | Description |
|---|---|
load_modality(name, source, adapter) | Load data using adapter |
get_modality(name) | Get loaded modality |
list_modalities() | List all modality names |
store_modality(name, adata, parent_name) | Store with lineage |
save_modality(name, path, backend) | Persist to storage |
log_tool_usage(tool, params, ir) | Log to provenance |
export_notebook(name, description) | Export as notebook |
Properties
| Property | Description |
|---|---|
modalities | Dict of loaded AnnData objects |
provenance | ProvenanceTracker instance |
workspace_path | Path to workspace directory |
download_queue | DownloadQueue for dataset fetching |
publication_queue | PublicationQueue (premium) |