Creating Adapters - Lobster AI Adapter Development Guide
This guide covers how to create adapters in the Lobster AI system. Adapters serve two primary purposes: Modality Adapters convert raw data from various s...
🎯 Overview
This guide covers how to create adapters in the Lobster AI system. Adapters serve two primary purposes: Modality Adapters convert raw data from various sources into standardized AnnData objects with proper schemas, and Backend Adapters handle data storage and retrieval across different storage systems (local files, S3, etc.).
🏗️ Adapter Architecture
Adapter Types
1. Modality Adapters
- Convert raw biological data to standardized AnnData format
- Enforce modality-specific schemas (transcriptomics, proteomics, etc.)
- Handle data validation and quality control
- Located in:
lobster/core/adapters/
2. Backend Adapters
- Handle data storage and retrieval operations
- Support multiple storage systems (H5AD, MuData, S3-ready)
- Provide consistent API across storage backends
- Located in:
lobster/core/backends/
Design Principles
- Interface Compliance: All adapters implement abstract base interfaces
- Schema Validation: Enforce data quality and structure standards
- Format Flexibility: Support multiple input/output formats
- Future-Proof: S3-ready architecture for cloud scaling
- Error Handling: Comprehensive validation and error reporting
📋 Interface Definitions
Modality Adapter Interface
# lobster/core/interfaces/adapter.py
from abc import ABC, abstractmethod
from typing import Any, Dict, Union
from pathlib import Path
import anndata
import pandas as pd
class IModalityAdapter(ABC):
"""Abstract interface for modality-specific data adapters."""
@abstractmethod
def from_source(
self,
source: Union[str, Path, pd.DataFrame],
**kwargs
) -> anndata.AnnData:
"""Convert source data to AnnData with appropriate schema."""
pass
@abstractmethod
def validate(
self,
adata: anndata.AnnData,
strict: bool = False
) -> "ValidationResult":
"""Validate AnnData against modality schema."""
pass
@abstractmethod
def get_schema_info(self) -> Dict[str, Any]:
"""Get information about the expected schema."""
pass
@abstractmethod
def supported_formats(self) -> List[str]:
"""Return list of supported input formats."""
passBackend Interface
# lobster/core/interfaces/backend.py
from abc import ABC, abstractmethod
from typing import Union, Dict, Any
from pathlib import Path
import anndata
class IDataBackend(ABC):
"""Abstract interface for data storage backends."""
@abstractmethod
def load(self, path: Union[str, Path], **kwargs) -> anndata.AnnData:
"""Load data from storage."""
pass
@abstractmethod
def save(self, adata: anndata.AnnData, path: Union[str, Path], **kwargs) -> None:
"""Save data to storage."""
pass
@abstractmethod
def exists(self, path: Union[str, Path]) -> bool:
"""Check if data exists at path."""
pass
@abstractmethod
def list_objects(self, prefix: str = "") -> List[str]:
"""List available data objects."""
pass
@abstractmethod
def delete(self, path: Union[str, Path]) -> None:
"""Delete data at path."""
pass🔬 Creating Modality Adapters
Step 1: Create Base Adapter Class
# lobster/core/adapters/your_modality_adapter.py
"""
Your Modality Adapter for handling specific biological data type.
This adapter converts raw [modality type] data into standardized AnnData
format with appropriate schema validation and quality control.
"""
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
import anndata
import numpy as np
import pandas as pd
from lobster.core.adapters.base import BaseAdapter
from lobster.core.interfaces.validator import ValidationResult
from lobster.core.schemas.your_schema import YourModalitySchema
from lobster.utils.logger import get_logger
logger = get_logger(__name__)
class YourModalityAdapter(BaseAdapter):
"""
Adapter for your specific modality data type.
This adapter handles loading, validation, and preprocessing of
[specific modality] data with appropriate schema enforcement.
"""
def __init__(
self,
data_subtype: str = "default",
strict_validation: bool = False,
**kwargs
):
"""
Initialize the modality adapter.
Args:
data_subtype: Subtype of the modality (e.g., 'single_cell', 'bulk')
strict_validation: Whether to use strict validation
**kwargs: Additional configuration parameters
"""
super().__init__(name="YourModalityAdapter")
self.data_subtype = data_subtype
self.strict_validation = strict_validation
# Initialize validator
self.validator = YourModalitySchema.create_validator(
schema_type=data_subtype,
strict=strict_validation
)
# Get recommended parameters
self.recommended_params = YourModalitySchema.get_recommended_params(data_subtype)
logger.info(f"Initialized {self.name} for {data_subtype} data")
def from_source(
self,
source: Union[str, Path, pd.DataFrame],
**kwargs
) -> anndata.AnnData:
"""
Convert source data to AnnData with modality schema.
Args:
source: Data source (file path, DataFrame, or existing AnnData)
**kwargs: Conversion parameters:
- transpose: Whether to transpose the data matrix
- obs_metadata: Sample/observation metadata
- var_metadata: Feature/variable metadata
- delimiter: File delimiter for text files
- sheet_name: Excel sheet name
- [modality-specific parameters]
Returns:
anndata.AnnData: Standardized data object with proper schema
Raises:
ValueError: If source data is invalid
FileNotFoundError: If source file doesn't exist
TypeError: If source format is not supported
"""
try:
logger.info(f"Converting source data: {type(source).__name__}")
# Handle different source types
if isinstance(source, (str, Path)):
adata = self._load_from_file(Path(source), **kwargs)
elif isinstance(source, pd.DataFrame):
adata = self._load_from_dataframe(source, **kwargs)
elif isinstance(source, anndata.AnnData):
adata = self._load_from_anndata(source, **kwargs)
else:
raise TypeError(f"Unsupported source type: {type(source)}")
# Apply modality-specific preprocessing
adata = self._apply_modality_preprocessing(adata, **kwargs)
# Validate the result
validation_result = self.validate(adata, strict=self.strict_validation)
if not validation_result.is_valid and self.strict_validation:
raise ValueError(f"Validation failed: {validation_result.error_summary}")
if validation_result.warnings:
for warning in validation_result.warnings:
logger.warning(f"Schema validation warning: {warning}")
logger.info(f"Successfully converted to AnnData: {adata.n_obs} obs, {adata.n_vars} vars")
return adata
except Exception as e:
logger.error(f"Failed to convert source data: {str(e)}")
raise
def _load_from_file(self, file_path: Path, **kwargs) -> anndata.AnnData:
"""Load data from file based on extension."""
if not file_path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
suffix = file_path.suffix.lower()
if suffix == '.h5ad':
return anndata.read_h5ad(file_path)
elif suffix in ['.csv', '.tsv', '.txt']:
delimiter = kwargs.get('delimiter', '\t' if suffix == '.tsv' else ',')
df = pd.read_csv(file_path, delimiter=delimiter, index_col=0)
return self._dataframe_to_anndata(df, **kwargs)
elif suffix in ['.xlsx', '.xls']:
sheet_name = kwargs.get('sheet_name', 0)
df = pd.read_excel(file_path, sheet_name=sheet_name, index_col=0)
return self._dataframe_to_anndata(df, **kwargs)
elif suffix in ['.h5', '.hdf5']:
# Handle HDF5 files (implement based on your format)
return self._load_hdf5(file_path, **kwargs)
else:
raise ValueError(f"Unsupported file format: {suffix}")
def _load_from_dataframe(self, df: pd.DataFrame, **kwargs) -> anndata.AnnData:
"""Convert DataFrame to AnnData."""
return self._dataframe_to_anndata(df, **kwargs)
def _load_from_anndata(self, adata: anndata.AnnData, **kwargs) -> anndata.AnnData:
"""Process existing AnnData to ensure schema compliance."""
# Create a copy to avoid modifying the original
result_adata = adata.copy()
# Apply any necessary transformations
return self._apply_modality_preprocessing(result_adata, **kwargs)
def _dataframe_to_anndata(self, df: pd.DataFrame, **kwargs) -> anndata.AnnData:
"""Convert DataFrame to AnnData with proper structure."""
# Handle transposition if needed
transpose = kwargs.get('transpose', True)
if transpose:
df = df.T
# Create AnnData object
adata = anndata.AnnData(X=df.values)
adata.obs.index = df.index
adata.var.index = df.columns
# Add observation metadata if provided
obs_metadata = kwargs.get('obs_metadata')
if obs_metadata is not None:
if isinstance(obs_metadata, pd.DataFrame):
# Align metadata with observations
common_indices = adata.obs.index.intersection(obs_metadata.index)
adata.obs = adata.obs.join(obs_metadata.loc[common_indices])
else:
logger.warning("obs_metadata should be a DataFrame")
# Add variable metadata if provided
var_metadata = kwargs.get('var_metadata')
if var_metadata is not None:
if isinstance(var_metadata, pd.DataFrame):
common_indices = adata.var.index.intersection(var_metadata.index)
adata.var = adata.var.join(var_metadata.loc[common_indices])
return adata
def _apply_modality_preprocessing(self, adata: anndata.AnnData, **kwargs) -> anndata.AnnData:
"""Apply modality-specific preprocessing and annotations."""
# Add modality type
adata.uns['modality'] = self.data_subtype
adata.uns['adapter'] = self.name
adata.uns['processing_timestamp'] = pd.Timestamp.now().isoformat()
# Apply modality-specific processing
if self.data_subtype == 'mass_spectrometry':
adata = self._apply_ms_preprocessing(adata, **kwargs)
elif self.data_subtype == 'affinity_proteomics':
adata = self._apply_affinity_preprocessing(adata, **kwargs)
# Add more subtypes as needed
return adata
def _apply_ms_preprocessing(self, adata: anndata.AnnData, **kwargs) -> anndata.AnnData:
"""Apply mass spectrometry specific preprocessing."""
# Handle missing values typical in MS data
missing_threshold = kwargs.get('missing_threshold', 0.5)
# Calculate missing value percentages
missing_per_protein = (adata.X == 0).sum(axis=0) / adata.n_obs
missing_per_sample = (adata.X == 0).sum(axis=1) / adata.n_vars
# Store in AnnData
adata.var['missing_pct'] = missing_per_protein
adata.obs['missing_pct'] = missing_per_sample
# Log transform if requested
if kwargs.get('log_transform', True):
adata.layers['log2_intensities'] = np.log2(adata.X + 1)
adata.X = adata.layers['log2_intensities']
return adata
def validate(
self,
adata: anndata.AnnData,
strict: bool = False
) -> ValidationResult:
"""
Validate AnnData against modality schema.
Args:
adata: AnnData object to validate
strict: Whether to use strict validation
Returns:
ValidationResult: Validation results with errors/warnings
"""
return self.validator.validate(adata, strict=strict)
def get_schema_info(self) -> Dict[str, Any]:
"""Get information about the expected schema."""
return {
'name': self.name,
'modality': self.data_subtype,
'required_obs': self.validator.required_obs_columns,
'required_var': self.validator.required_var_columns,
'recommended_params': self.recommended_params,
'supported_formats': self.supported_formats()
}
def supported_formats(self) -> List[str]:
"""Return list of supported input formats."""
return ['.csv', '.tsv', '.txt', '.xlsx', '.xls', '.h5ad', '.h5', '.hdf5']
def get_conversion_examples(self) -> Dict[str, str]:
"""Provide usage examples for common scenarios."""
return {
'csv_file': "adapter.from_source('data.csv', transpose=True)",
'excel_file': "adapter.from_source('data.xlsx', sheet_name='Sheet1')",
'with_metadata': "adapter.from_source(df, obs_metadata=sample_info)",
'dataframe': "adapter.from_source(expression_df, transpose=False)"
}Step 2: Create Schema Validation
# lobster/core/schemas/your_schema.py
"""Schema definitions for your modality."""
from typing import Dict, List, Any, Optional
from lobster.core.schemas.base import BaseSchema
from lobster.core.interfaces.validator import ValidationResult
class YourModalitySchema(BaseSchema):
"""Schema for your specific modality data."""
@classmethod
def get_required_obs_columns(cls, schema_type: str = "default") -> List[str]:
"""Required observation (sample) columns."""
base_columns = ['sample_id']
if schema_type == 'mass_spectrometry':
return base_columns + ['sample_type', 'batch']
elif schema_type == 'affinity_proteomics':
return base_columns + ['panel_type', 'dilution_factor']
return base_columns
@classmethod
def get_required_var_columns(cls, schema_type: str = "default") -> List[str]:
"""Required variable (feature) columns."""
base_columns = ['feature_id']
if schema_type == 'mass_spectrometry':
return base_columns + ['protein_group', 'gene_name']
elif schema_type == 'affinity_proteomics':
return base_columns + ['antibody_id', 'target_protein']
return base_columns
@classmethod
def get_recommended_qc_thresholds(cls, schema_type: str) -> Dict[str, float]:
"""Get QC thresholds for the modality."""
if schema_type == 'mass_spectrometry':
return {
'min_peptides_per_protein': 2,
'max_missing_per_protein': 0.5,
'min_intensity': 1.0
}
elif schema_type == 'affinity_proteomics':
return {
'max_cv_within_group': 0.2,
'min_signal_to_noise': 3.0,
'max_missing_per_sample': 0.3
}
return {}🗄️ Creating Backend Adapters
Backend Template
# lobster/core/backends/your_backend.py
"""
Your Backend for handling data storage and retrieval.
This backend provides data persistence for [specific storage system]
with S3-ready architecture for future cloud scaling.
"""
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from urllib.parse import urlparse
import anndata
from lobster.core.backends.base import BaseBackend
from lobster.utils.logger import get_logger
logger = get_logger(__name__)
class YourBackend(BaseBackend):
"""
Backend for [storage system] with S3-ready path handling.
This backend handles data storage and retrieval for [specific format]
with path parsing ready for future S3 integration.
"""
def __init__(
self,
base_path: Optional[Union[str, Path]] = None,
**storage_config
):
"""
Initialize the backend.
Args:
base_path: Optional base path for all operations
**storage_config: Storage-specific configuration
"""
super().__init__(base_path=base_path)
# Storage-specific configuration
self.config = storage_config
# S3-ready configuration (for future use)
self.s3_config = {
"bucket": storage_config.get("s3_bucket"),
"region": storage_config.get("s3_region"),
"prefix": storage_config.get("s3_prefix", "")
}
logger.info(f"Initialized {self.__class__.__name__}")
def load(self, path: Union[str, Path], **kwargs) -> anndata.AnnData:
"""
Load data from storage.
Args:
path: Path to the data (local path or future S3 URI)
**kwargs: Loading parameters
Returns:
anndata.AnnData: Loaded data object
Raises:
FileNotFoundError: If file doesn't exist
ValueError: If file format is unsupported
"""
try:
resolved_path = self._resolve_path(path)
if self._is_s3_path(path):
return self._load_from_s3(resolved_path, **kwargs)
else:
return self._load_from_local(resolved_path, **kwargs)
except Exception as e:
logger.error(f"Failed to load data from {path}: {str(e)}")
raise
def save(
self,
adata: anndata.AnnData,
path: Union[str, Path],
**kwargs
) -> None:
"""
Save data to storage.
Args:
adata: AnnData object to save
path: Destination path
**kwargs: Saving parameters
Raises:
ValueError: If data cannot be serialized
PermissionError: If write access is denied
"""
try:
resolved_path = self._resolve_path(path)
if self._is_s3_path(path):
self._save_to_s3(adata, resolved_path, **kwargs)
else:
self._save_to_local(adata, resolved_path, **kwargs)
logger.info(f"Successfully saved data to {path}")
except Exception as e:
logger.error(f"Failed to save data to {path}: {str(e)}")
raise
def exists(self, path: Union[str, Path]) -> bool:
"""Check if data exists at path."""
try:
resolved_path = self._resolve_path(path)
if self._is_s3_path(path):
return self._exists_in_s3(resolved_path)
else:
return Path(resolved_path).exists()
except Exception as e:
logger.warning(f"Error checking existence of {path}: {str(e)}")
return False
def list_objects(self, prefix: str = "") -> List[str]:
"""List available data objects."""
try:
if self.s3_config.get("bucket"):
return self._list_s3_objects(prefix)
else:
return self._list_local_objects(prefix)
except Exception as e:
logger.error(f"Failed to list objects with prefix {prefix}: {str(e)}")
return []
def delete(self, path: Union[str, Path]) -> None:
"""Delete data at path."""
try:
resolved_path = self._resolve_path(path)
if self._is_s3_path(path):
self._delete_from_s3(resolved_path)
else:
Path(resolved_path).unlink()
logger.info(f"Successfully deleted {path}")
except Exception as e:
logger.error(f"Failed to delete {path}: {str(e)}")
raise
def _load_from_local(self, path: Path, **kwargs) -> anndata.AnnData:
"""Load data from local file system."""
if not path.exists():
raise FileNotFoundError(f"File not found: {path}")
# Implement format-specific loading
suffix = path.suffix.lower()
if suffix == '.h5ad':
return anndata.read_h5ad(path)
elif suffix in ['.h5', '.hdf5']:
return self._load_hdf5(path, **kwargs)
else:
raise ValueError(f"Unsupported format: {suffix}")
def _save_to_local(self, adata: anndata.AnnData, path: Path, **kwargs) -> None:
"""Save data to local file system."""
# Create parent directories if needed
path.parent.mkdir(parents=True, exist_ok=True)
# Implement format-specific saving
suffix = path.suffix.lower()
if suffix == '.h5ad':
compression = kwargs.get('compression', 'gzip')
adata.write_h5ad(path, compression=compression)
elif suffix in ['.h5', '.hdf5']:
self._save_hdf5(adata, path, **kwargs)
else:
raise ValueError(f"Unsupported format: {suffix}")
def _is_s3_path(self, path: Union[str, Path]) -> bool:
"""Check if path is an S3 URI."""
return str(path).startswith(('s3://', 's3a://', 's3n://'))
def _resolve_path(self, path: Union[str, Path]) -> Path:
"""Resolve path with base_path if needed."""
path_obj = Path(path)
if not path_obj.is_absolute() and self.base_path:
return self.base_path / path_obj
return path_obj
# S3 methods (implement when S3 support is added)
def _load_from_s3(self, path: str, **kwargs) -> anndata.AnnData:
"""Load data from S3 (future implementation)."""
raise NotImplementedError("S3 support not yet implemented")
def _save_to_s3(self, adata: anndata.AnnData, path: str, **kwargs) -> None:
"""Save data to S3 (future implementation)."""
raise NotImplementedError("S3 support not yet implemented")
def _exists_in_s3(self, path: str) -> bool:
"""Check if object exists in S3 (future implementation)."""
raise NotImplementedError("S3 support not yet implemented")
def _list_s3_objects(self, prefix: str) -> List[str]:
"""List S3 objects (future implementation)."""
raise NotImplementedError("S3 support not yet implemented")
def _delete_from_s3(self, path: str) -> None:
"""Delete from S3 (future implementation)."""
raise NotImplementedError("S3 support not yet implemented")📊 Testing Adapters
Unit Test Template
# tests/unit/adapters/test_your_adapter.py
import pytest
import pandas as pd
import numpy as np
from pathlib import Path
from unittest.mock import Mock, patch
from lobster.core.adapters.your_adapter import YourModalityAdapter
from tests.mock_data.generators import generate_mock_data
class TestYourModalityAdapter:
@pytest.fixture
def adapter(self):
return YourModalityAdapter(data_subtype="default")
@pytest.fixture
def sample_dataframe(self):
return generate_mock_data(n_samples=10, n_features=100)
def test_adapter_initialization(self, adapter):
"""Test adapter initializes correctly."""
assert adapter.name == "YourModalityAdapter"
assert adapter.data_subtype == "default"
assert adapter.validator is not None
def test_from_dataframe(self, adapter, sample_dataframe):
"""Test conversion from DataFrame."""
adata = adapter.from_source(sample_dataframe)
assert adata.n_obs == sample_dataframe.shape[1] # transposed
assert adata.n_vars == sample_dataframe.shape[0]
assert 'modality' in adata.uns
def test_validation(self, adapter, sample_dataframe):
"""Test schema validation."""
adata = adapter.from_source(sample_dataframe)
validation_result = adapter.validate(adata)
assert validation_result is not None
# Add specific validation checks
def test_file_loading(self, adapter, tmp_path):
"""Test loading from various file formats."""
# Create test CSV file
test_data = pd.DataFrame(np.random.randn(10, 20))
csv_path = tmp_path / "test.csv"
test_data.to_csv(csv_path)
adata = adapter.from_source(csv_path)
assert adata.n_obs > 0
assert adata.n_vars > 0
def test_error_handling(self, adapter):
"""Test error handling for invalid inputs."""
with pytest.raises(FileNotFoundError):
adapter.from_source("nonexistent_file.csv")
with pytest.raises(TypeError):
adapter.from_source(12345) # Invalid type🎯 Best Practices
1. Schema Design
- Flexible Validation: Support both strict and permissive validation modes
- Biological Relevance: Include biologically meaningful QC thresholds
- Extensible: Design schemas to accommodate future data types
- Documentation: Provide clear schema documentation and examples
2. Error Handling
def robust_conversion(self, source, **kwargs):
"""Example of robust error handling."""
try:
# Attempt conversion
result = self._convert_data(source, **kwargs)
# Validate result
validation = self.validate(result)
if not validation.is_valid:
if self.strict_validation:
raise ValueError(f"Validation failed: {validation.errors}")
else:
logger.warning(f"Validation warnings: {validation.warnings}")
return result
except FileNotFoundError as e:
raise FileNotFoundError(f"Input file not found: {e}")
except pd.errors.EmptyDataError:
raise ValueError("Input file is empty or contains no valid data")
except Exception as e:
logger.error(f"Unexpected error during conversion: {e}")
raise3. Performance Optimization
def memory_efficient_loading(self, path, **kwargs):
"""Handle large files efficiently."""
# Check file size first
file_size = Path(path).stat().st_size
if file_size > 1e9: # 1GB threshold
logger.info("Large file detected, using chunked reading")
return self._load_in_chunks(path, **kwargs)
else:
return self._load_standard(path, **kwargs)
def _load_in_chunks(self, path, chunk_size=10000):
"""Load large files in chunks."""
chunks = []
for chunk in pd.read_csv(path, chunksize=chunk_size):
chunks.append(chunk)
return pd.concat(chunks, ignore_index=True)4. S3-Ready Design
def s3_ready_path_handling(self, path):
"""Parse paths for current and future S3 support."""
if isinstance(path, str) and path.startswith('s3://'):
# Parse S3 URI
parsed = urlparse(path)
return {
'type': 's3',
'bucket': parsed.netloc,
'key': parsed.path.lstrip('/'),
'region': self.s3_config.get('region')
}
else:
# Local path
return {
'type': 'local',
'path': Path(path),
'absolute': Path(path).resolve()
}🔍 Integration with DataManagerV2
Adapters integrate with the DataManagerV2 through the adapter registry:
# Register your adapter
from lobster.core.data_manager_v2 import DataManagerV2
data_manager = DataManagerV2()
# Register modality adapter
data_manager.register_modality_adapter(
'your_modality',
YourModalityAdapter
)
# Register backend adapter
data_manager.register_backend_adapter(
'your_format',
YourBackend
)
# Use in loading
adata = data_manager.load_data(
'data.csv',
modality_type='your_modality',
adapter_params={'data_subtype': 'specific_type'}
)📚 Complete Example Usage
# Example: Creating a metabolomics adapter
from lobster.core.adapters.metabolomics_adapter import MetabolomicsAdapter
# Initialize adapter
adapter = MetabolomicsAdapter(
data_subtype='targeted_metabolomics',
strict_validation=False
)
# Convert from various sources
adata_from_csv = adapter.from_source(
'metabolite_data.csv',
transpose=True,
obs_metadata=sample_metadata
)
adata_from_excel = adapter.from_source(
'data.xlsx',
sheet_name='Concentrations',
log_transform=True
)
# Validate data
validation_result = adapter.validate(adata_from_csv)
if validation_result.warnings:
print("Schema warnings:", validation_result.warnings)
# Get schema information
schema_info = adapter.get_schema_info()
print("Required columns:", schema_info['required_obs'])This comprehensive guide provides everything needed to create robust, extensible adapters that handle data conversion, validation, and storage while maintaining compatibility with the Lobster AI platform's architecture and future cloud scaling requirements.
45. Advanced Agent Customization
This guide covers advanced agent customization in Lobster AI, enabling you to build production-ready specialized agents that seamlessly integrate with th...
Creating Agent Packages
Comprehensive guide to building Lobster AI agent packages with step-by-step instructions and real examples