Omics-OS Docs

Data Manager

Modality storage and workspace management with DataManagerV2

Data Manager

The DataManagerV2 is the central data orchestration layer in Lobster AI. It manages modalities (datasets), storage backends, provenance tracking, and workspace organization for multi-omics analysis.

Overview

DataManagerV2 provides:

  • Modality management - Load, store, and track datasets as AnnData objects
  • Backend abstraction - Pluggable storage backends (H5AD, MuData, S3)
  • Adapter system - Format-specific data loading (CSV, 10X, MTX)
  • Provenance integration - Automatic tracking of all operations
  • Workspace organization - Structured directories for data, exports, and caches

Basic Usage

Initialization

from pathlib import Path
from rich.console import Console
from lobster.core.data_manager_v2 import DataManagerV2

# Create data manager with workspace
dm = DataManagerV2(
    default_backend="h5ad",                # Storage backend: "h5ad" (default) or "mudata"
    workspace_path=Path("./my_analysis"),  # Optional: workspace directory
    enable_provenance=True,                # Enable W3C-PROV tracking
    console=None,                          # Optional: Rich Console for progress
    auto_scan=True                         # Auto-load existing datasets
)

# With custom console
console = Console()
dm = DataManagerV2(
    default_backend="h5ad",
    workspace_path=Path("./my_analysis"),
    console=console
)

Resolution order for workspace path:

  1. Explicit workspace_path parameter
  2. LOBSTER_WORKSPACE environment variable
  3. Current directory .lobster_workspace

Loading Data

# Load from H5AD file
adata = dm.load_modality(
    name="my_dataset",
    source="/path/to/data.h5ad",
    adapter="transcriptomics_single_cell"
)

# Load from CSV
adata = dm.load_modality(
    name="counts_matrix",
    source="/path/to/counts.csv",
    adapter="transcriptomics_bulk"
)

# Load from GEO (via download queue)
# Typically handled by research_agent + data_expert workflow

Accessing Modalities

# Get a specific modality
adata = dm.get_modality("my_dataset")

# List all loaded modalities
modalities = dm.list_modalities()
print(f"Loaded: {modalities}")

# Check shape
print(f"Cells: {adata.n_obs}, Genes: {adata.n_vars}")

Storing Processed Data

# Store with lineage tracking (recommended)
dm.store_modality(
    name="my_dataset_filtered",
    adata=filtered_adata,
    parent_name="my_dataset",
    step_summary="Filtered 15% low-quality cells"
)

# Manual storage (less tracking)
dm.modalities["my_dataset_processed"] = processed_adata

Saving to Disk

# Save to H5AD (default backend)
path = dm.save_modality("my_dataset", "my_dataset.h5ad")

# Save with specific backend
path = dm.save_modality(
    "my_dataset",
    "my_dataset.h5mu",
    backend="mudata"
)

Modality Operations

Loading from Different Sources

# From file path
dm.load_modality("dataset", "/path/to/file.h5ad", adapter="transcriptomics_single_cell")

# From pandas DataFrame
import pandas as pd
df = pd.read_csv("counts.csv", index_col=0)
dm.load_modality("from_csv", df, adapter="transcriptomics_bulk")

# From existing AnnData
import anndata
adata = anndata.read_h5ad("existing.h5ad")
dm.load_modality("imported", adata, adapter="transcriptomics_single_cell")

Modality Naming Convention

Lobster uses a structured naming convention:

geo_gse12345                      # Raw loaded data
├─ geo_gse12345_quality_assessed  # After QC metrics
├─ geo_gse12345_filtered          # After cell/gene filtering
├─ geo_gse12345_normalized        # After normalization
├─ geo_gse12345_clustered         # After clustering
└─ geo_gse12345_annotated         # After cell type annotation

Listing with Lineage

# Get modalities with full lineage information
modalities = dm.list_modalities_with_lineage()

for mod in modalities:
    print(f"{mod['name']}: v{mod['version']}")
    print(f"  Parent: {mod.get('parent', 'None')}")
    print(f"  Step: {mod.get('processing_step', 'raw')}")

Backend System

DataManagerV2 uses pluggable backends for data storage.

Default Backends

BackendFormatUse Case
h5adH5ADSingle-modality datasets
mudataH5MUMulti-modal integration

Registering Custom Backends

from lobster.core.interfaces.backend import IDataBackend

class MyS3Backend(IDataBackend):
    def save(self, adata, path, **kwargs): ...
    def load(self, path, **kwargs): ...

dm.register_backend("s3", MyS3Backend(bucket="my-bucket"))

Using Specific Backend

# Save with MuData backend
dm.save_modality("integrated", "integrated.h5mu", backend="mudata")

Adapter System

Adapters handle format-specific data loading and validation.

Default Adapters

AdapterData Types
transcriptomics_single_cell10X, h5ad, mtx
transcriptomics_bulkCSV, TSV count matrices
proteomics_msMass spectrometry
proteomics_affinityOlink, SomaScan
genomics_wgsVCF files
genomics_snp_arrayPLINK format

Registering Custom Adapters

from lobster.core.interfaces.adapter import IModalityAdapter

class MyAdapter(IModalityAdapter):
    def from_source(self, source, **kwargs): ...
    def validate(self, adata, strict=False): ...
    def standardize(self, adata): ...

dm.register_adapter("my_format", MyAdapter())

Integration with Agents

Agents receive the data manager via factory injection:

def create_transcriptomics_expert(
    data_manager: DataManagerV2,
    callback_handler,
    agent_name: str = "transcriptomics_expert",
    delegation_tools: list = None,
    workspace_path = None
):
    """Factory creates agent with data manager access."""
    # Agent tools can access dm.modalities, dm.get_modality(), etc.

Tool Pattern

@tool
def filter_cells(modality_name: str, min_genes: int = 200) -> str:
    """Filter low-quality cells from dataset."""
    adata = data_manager.get_modality(modality_name)

    # Call service
    result, stats, ir = filtering_service.filter(adata, min_genes=min_genes)

    # Store with lineage
    output_name = f"{modality_name}_filtered"
    data_manager.store_modality(
        name=output_name,
        adata=result,
        parent_name=modality_name,
        step_summary=f"Filtered cells with <{min_genes} genes"
    )

    # Log to provenance
    data_manager.log_tool_usage(
        tool_name="filter_cells",
        parameters={"modality": modality_name, "min_genes": min_genes},
        ir=ir
    )

    return f"Filtered: {stats['cells_removed']} cells removed"

Workspace Structure

DataManagerV2 creates a structured workspace:

.lobster_workspace/
├── data/                    # Saved H5AD/H5MU files
│   ├── geo_gse12345.h5ad
│   └── geo_gse12345_filtered.h5ad
├── exports/                 # Exported results
│   └── markers.csv
├── cache/                   # Temporary cache
├── literature_cache/        # Publication data
│   ├── publications/
│   └── parsed_docs/
├── .lobster/
│   └── queues/             # Download/publication queues
│       ├── download_queue.jsonl
│       └── publication_queue.jsonl
├── plots/                   # Saved visualizations
├── notebooks/               # Exported pipelines
└── .session.json            # Session metadata

Session Persistence

DataManagerV2 supports session continuity:

# Session 1
dm = DataManagerV2(workspace_path="./my_analysis")
dm.load_modality("dataset", "data.h5ad", adapter="transcriptomics_single_cell")
dm.save_modality("dataset", "dataset.h5ad")
# Exit session

# Session 2 - dataset auto-loaded
dm = DataManagerV2(workspace_path="./my_analysis", auto_scan=True)
adata = dm.get_modality("dataset")  # Available immediately

Provenance Integration

All operations are tracked via ProvenanceTracker:

# Log tool usage with IR for reproducibility
dm.log_tool_usage(
    tool_name="normalize",
    parameters={"method": "log1p", "target_sum": 10000},
    description="Normalize counts per cell",
    ir=analysis_step  # AnalysisStep for notebook export
)

# Access provenance data
activities = dm.provenance.get_all_activities()

See Provenance Tracking for details.

Notebook Export

Export analysis as reproducible Jupyter notebook:

# Export current session
path = dm.export_notebook(
    name="qc_workflow",
    description="Quality control for 10X data",
    filter_strategy="successful"  # Only successful operations
)

# Run on new data
result = dm.run_notebook(
    notebook_path="qc_workflow.ipynb",
    input_modality="new_dataset",
    parameters={"min_genes": 300}
)

API Reference

Key Methods

MethodDescription
load_modality(name, source, adapter)Load data using adapter
get_modality(name)Get loaded modality
list_modalities()List all modality names
store_modality(name, adata, parent_name)Store with lineage
save_modality(name, path, backend)Persist to storage
log_tool_usage(tool, params, ir)Log to provenance
export_notebook(name, description)Export as notebook

Properties

PropertyDescription
modalitiesDict of loaded AnnData objects
provenanceProvenanceTracker instance
workspace_pathPath to workspace directory
download_queueDownloadQueue for dataset fetching
publication_queuePublicationQueue (premium)

On this page