Creating Services - Lobster AI Service Development Guide
This guide covers how to create stateless analysis services in the Lobster AI system. Services handle the core computational work for bioinformatics analyses...
🎯 Overview
This guide covers how to create stateless analysis services in the Lobster AI system. Services handle the core computational work for bioinformatics analyses, while agents orchestrate their usage. Services follow a strict stateless design pattern that promotes reusability and testability.
🏗️ Service Architecture
Service Responsibilities
- Computational Logic: Implement bioinformatics algorithms and analyses
- Data Processing: Transform AnnData objects following scientific standards
- Statistical Analysis: Provide rigorous statistical methods and metrics
- Error Handling: Robust error handling with specific exceptions
- Progress Reporting: Optional progress callbacks for long-running operations
Service Design Principles
- Stateless: No instance state between method calls
- Pure Functions: Deterministic outputs for given inputs
- AnnData-Centric: Work with AnnData objects as primary data structure
- Return Tuples: Always return
(processed_adata, statistics_dict, ir)whereiris AnalysisStep - Scientific Rigor: Follow established bioinformatics best practices
📋 Service Pattern Template
Basic Service Structure
# lobster/tools/your_service.py
"""
Your analysis service for specific bioinformatics workflow.
This service provides stateless methods for [describe specific analysis type].
"""
from typing import Dict, List, Tuple, Any, Optional, Callable
import anndata
import numpy as np
import pandas as pd
from lobster.utils.logger import get_logger
from lobster.core.provenance import AnalysisStep
logger = get_logger(__name__)
class YourAnalysisError(Exception):
"""Base exception for your analysis operations."""
pass
class YourService:
"""
Stateless service for your specific analysis workflow.
This class provides methods to perform [specific analysis] on biological data
following established scientific protocols and best practices.
"""
def __init__(self):
"""
Initialize the service.
This service is stateless and doesn't require external dependencies.
"""
logger.debug("Initializing stateless YourService")
self.progress_callback = None
self.current_progress = 0
self.total_steps = 0
logger.debug("YourService initialized successfully")
def set_progress_callback(self, callback: Callable[[int, str], None]) -> None:
"""
Set a callback function to report progress.
Args:
callback: Function accepting (progress_percent: int, message: str)
"""
self.progress_callback = callback
logger.info("Progress callback set for YourService")
def _update_progress(self, step_name: str) -> None:
"""Update progress and call callback if set."""
self.current_progress += 1
if self.progress_callback is not None:
progress_percent = int((self.current_progress / self.total_steps) * 100)
self.progress_callback(progress_percent, step_name)
logger.debug(f"Progress: {progress_percent}% - {step_name}")
def main_analysis_method(
self,
adata: anndata.AnnData,
parameter1: float = 1.0,
parameter2: str = "default",
parameter3: Optional[List[str]] = None,
progress_callback: Optional[Callable] = None
) -> Tuple[anndata.AnnData, Dict[str, Any], AnalysisStep]:
"""
Main analysis method following the service pattern.
Args:
adata: Input AnnData object to analyze
parameter1: Numerical parameter with scientific meaning
parameter2: Categorical parameter
parameter3: Optional list parameter
progress_callback: Optional progress reporting function
Returns:
Tuple of (processed_adata, statistics_dict, ir) where ir is AnalysisStep for provenance
Raises:
YourAnalysisError: If analysis fails
ValueError: If parameters are invalid
"""
try:
# Set up progress tracking
if progress_callback:
self.set_progress_callback(progress_callback)
self.total_steps = 4 # Adjust based on actual steps
self.current_progress = 0
# Validate inputs
self._validate_inputs(adata, parameter1, parameter2, parameter3)
self._update_progress("Input validation")
# Create working copy
adata_result = adata.copy()
# Step 1: Preprocessing
adata_result = self._preprocess_data(adata_result, parameter1)
self._update_progress("Data preprocessing")
# Step 2: Core analysis
analysis_results = self._perform_core_analysis(
adata_result, parameter2, parameter3
)
self._update_progress("Core analysis")
# Step 3: Post-processing and statistics
statistics = self._calculate_statistics(adata_result, analysis_results)
self._update_progress("Statistical analysis")
# Step 4: Store results in AnnData
self._store_results(adata_result, analysis_results)
self._update_progress("Storing results")
# Create provenance IR
ir = AnalysisStep(
name="your_analysis",
description=f"Performed analysis with parameter1={parameter1}, parameter2={parameter2}",
inputs=[{"adata": "input_data"}],
outputs=[{"adata": "processed_data"}],
parameters={"parameter1": parameter1, "parameter2": parameter2, "parameter3": parameter3}
)
logger.info(f"Analysis completed successfully with {len(statistics)} metrics")
return adata_result, statistics, ir
except Exception as e:
logger.error(f"Analysis failed: {str(e)}")
if isinstance(e, (YourAnalysisError, ValueError)):
raise
else:
raise YourAnalysisError(f"Unexpected error during analysis: {str(e)}")
def _validate_inputs(
self,
adata: anndata.AnnData,
parameter1: float,
parameter2: str,
parameter3: Optional[List[str]]
) -> None:
"""Validate input parameters and data."""
if adata.n_obs == 0:
raise ValueError("Input data is empty (no observations)")
if adata.n_vars == 0:
raise ValueError("Input data has no features")
if parameter1 <= 0:
raise ValueError("Parameter1 must be positive")
if parameter2 not in ["option1", "option2", "default"]:
raise ValueError(f"Invalid parameter2: {parameter2}")
if parameter3 is not None and len(parameter3) == 0:
raise ValueError("Parameter3 cannot be empty list")
logger.debug("Input validation passed")
def _preprocess_data(
self,
adata: anndata.AnnData,
parameter1: float
) -> anndata.AnnData:
"""Perform data preprocessing steps."""
# Implement preprocessing logic
# Example: normalization, filtering, etc.
logger.debug(f"Preprocessing with parameter1={parameter1}")
# Store preprocessing parameters
adata.uns['preprocessing_params'] = {
'parameter1': parameter1,
'method': 'your_preprocessing_method'
}
return adata
def _perform_core_analysis(
self,
adata: anndata.AnnData,
parameter2: str,
parameter3: Optional[List[str]]
) -> Dict[str, Any]:
"""Perform the core analysis computation."""
results = {}
# Implement your core analysis logic here
logger.debug(f"Core analysis with parameter2={parameter2}")
# Example analysis results
results['analysis_method'] = parameter2
results['features_analyzed'] = parameter3 or []
results['n_observations'] = adata.n_obs
results['n_features'] = adata.n_vars
return results
def _calculate_statistics(
self,
adata: anndata.AnnData,
analysis_results: Dict[str, Any]
) -> Dict[str, Any]:
"""Calculate analysis statistics and metrics."""
statistics = {
'n_observations': adata.n_obs,
'n_features': adata.n_vars,
'analysis_timestamp': pd.Timestamp.now().isoformat(),
'analysis_method': analysis_results.get('analysis_method', 'unknown')
}
# Add scientific metrics
if 'X' in adata.layers:
statistics['mean_expression'] = np.mean(adata.layers['X'])
statistics['std_expression'] = np.std(adata.layers['X'])
# Add analysis-specific metrics
statistics.update(self._compute_domain_specific_metrics(adata, analysis_results))
logger.debug(f"Calculated {len(statistics)} statistical metrics")
return statistics
def _compute_domain_specific_metrics(
self,
adata: anndata.AnnData,
analysis_results: Dict[str, Any]
) -> Dict[str, Any]:
"""Compute metrics specific to your analysis domain."""
metrics = {}
# Implement domain-specific statistical calculations
# Examples: clustering metrics, differential expression stats, etc.
return metrics
def _store_results(
self,
adata: anndata.AnnData,
analysis_results: Dict[str, Any]
) -> None:
"""Store analysis results in appropriate AnnData slots."""
# Store in .obs (cell-level annotations)
# adata.obs['new_annotation'] = analysis_results['cell_annotations']
# Store in .var (feature-level annotations)
# adata.var['new_feature_info'] = analysis_results['feature_info']
# Store in .obsm (embeddings/matrices)
# adata.obsm['X_your_embedding'] = analysis_results['embedding']
# Store in .uns (unstructured metadata)
adata.uns['your_analysis'] = {
'method': analysis_results['analysis_method'],
'parameters': analysis_results.get('parameters', {}),
'timestamp': pd.Timestamp.now().isoformat()
}
logger.debug("Results stored in AnnData object")
def auxiliary_method(
self,
adata: anndata.AnnData,
specific_parameter: str
) -> Tuple[anndata.AnnData, Dict[str, Any], AnalysisStep]:
"""
Auxiliary analysis method for specific sub-tasks.
Following the same pattern as main analysis method.
"""
try:
# Validate inputs
if specific_parameter not in ["valid_option1", "valid_option2"]:
raise ValueError(f"Invalid specific_parameter: {specific_parameter}")
# Create working copy
adata_result = adata.copy()
# Perform specific analysis
# ... implementation ...
# Calculate statistics
statistics = {
'method': 'auxiliary_analysis',
'parameter': specific_parameter,
'timestamp': pd.Timestamp.now().isoformat()
}
# Create provenance IR
ir = AnalysisStep(
name="auxiliary_analysis",
description=f"Auxiliary analysis with {specific_parameter}",
inputs=[{"adata": "input_data"}],
outputs=[{"adata": "processed_data"}],
parameters={"specific_parameter": specific_parameter}
)
return adata_result, statistics, ir
except Exception as e:
logger.error(f"Auxiliary analysis failed: {str(e)}")
raise YourAnalysisError(f"Auxiliary analysis error: {str(e)}")🔬 Scientific Service Patterns
Single-Cell RNA-seq Service Example
class SingleCellQualityService:
"""Service for single-cell RNA-seq quality control."""
def assess_quality(
self,
adata: anndata.AnnData,
min_genes: int = 200,
max_mt_pct: float = 20.0,
max_ribo_pct: float = 50.0
) -> Tuple[anndata.AnnData, Dict[str, Any], AnalysisStep]:
"""Assess single-cell data quality with standard metrics."""
adata_result = adata.copy()
# Calculate QC metrics
adata_result.var['mt'] = adata_result.var_names.str.startswith('MT-')
adata_result.var['ribo'] = adata_result.var_names.str.startswith(('RPS', 'RPL'))
sc.pp.calculate_qc_metrics(
adata_result,
percent_top=None,
log1p=False,
inplace=True
)
# Add mitochondrial and ribosomal percentages
adata_result.obs['pct_counts_mt'] = (
adata_result.obs['total_counts_mt'] / adata_result.obs['total_counts'] * 100
)
adata_result.obs['pct_counts_ribo'] = (
adata_result.obs['total_counts_ribo'] / adata_result.obs['total_counts'] * 100
)
# Calculate statistics
statistics = {
'n_cells': adata_result.n_obs,
'n_genes': adata_result.n_vars,
'median_genes_per_cell': np.median(adata_result.obs['n_genes_by_counts']),
'median_counts_per_cell': np.median(adata_result.obs['total_counts']),
'mean_mt_pct': np.mean(adata_result.obs['pct_counts_mt']),
'mean_ribo_pct': np.mean(adata_result.obs['pct_counts_ribo']),
}
# Create provenance IR
ir = AnalysisStep(
name="quality_assessment",
description=f"QC with min_genes={min_genes}, max_mt_pct={max_mt_pct}%",
inputs=[{"adata": "input_data"}],
outputs=[{"adata": "qc_data"}],
parameters={"min_genes": min_genes, "max_mt_pct": max_mt_pct, "max_ribo_pct": max_ribo_pct}
)
return adata_result, statistics, irProteomics Service Example
class ProteomicsPreprocessingService:
"""Service for proteomics data preprocessing."""
def normalize_intensities(
self,
adata: anndata.AnnData,
method: str = "log2",
handle_missing: str = "remove"
) -> Tuple[anndata.AnnData, Dict[str, Any], AnalysisStep]:
"""Normalize protein intensity data."""
adata_result = adata.copy()
# Handle missing values
if handle_missing == "remove":
# Remove proteins with >50% missing values
missing_pct = (adata_result.X == 0).sum(axis=0) / adata_result.n_obs
keep_proteins = missing_pct < 0.5
adata_result = adata_result[:, keep_proteins]
# Apply normalization
if method == "log2":
adata_result.X = np.log2(adata_result.X + 1)
adata_result.layers['log2_normalized'] = adata_result.X
elif method == "quantile":
# Implement quantile normalization
pass
# Calculate statistics
statistics = {
'normalization_method': method,
'proteins_before': adata.n_vars,
'proteins_after': adata_result.n_vars,
'missing_value_handling': handle_missing,
'mean_intensity': np.mean(adata_result.X[adata_result.X > 0])
}
# Create provenance IR
ir = AnalysisStep(
name="normalize_intensities",
description=f"{method} normalization with {handle_missing} missing value handling",
inputs=[{"adata": "raw_proteomics"}],
outputs=[{"adata": "normalized_proteomics"}],
parameters={"method": method, "handle_missing": handle_missing}
)
return adata_result, statistics, ir🔧 Integration with DataManagerV2
Services are called by agent tools that handle DataManagerV2 integration:
# In agent tool
@tool
def perform_quality_assessment(modality_name: str, **params) -> str:
"""Agent tool that uses the service."""
# 1. Get data from DataManagerV2
adata = data_manager.get_modality(modality_name)
# 2. Call stateless service
service = QualityService()
result_adata, statistics = service.assess_quality(adata, **params)
# 3. Store result in DataManagerV2
result_modality = f"{modality_name}_quality_assessed"
data_manager.modalities[result_modality] = result_adata
# 4. Log for provenance
data_manager.log_tool_usage("quality_assessment", params, statistics)
# 5. Return formatted response
return format_quality_response(statistics, result_modality)🧪 Testing Services
Unit Test Template
# tests/unit/tools/test_your_service.py
import pytest
import numpy as np
import pandas as pd
from unittest.mock import Mock
from lobster.tools.your_service import YourService, YourAnalysisError
from tests.mock_data.generators import generate_adata
class TestYourService:
@pytest.fixture
def service(self):
return YourService()
@pytest.fixture
def sample_adata(self):
return generate_adata(n_obs=100, n_vars=50)
def test_service_initialization(self, service):
"""Test service initializes correctly."""
assert service is not None
assert service.progress_callback is None
assert service.current_progress == 0
def test_progress_callback(self, service):
"""Test progress callback functionality."""
callback_calls = []
def mock_callback(progress, message):
callback_calls.append((progress, message))
service.set_progress_callback(mock_callback)
assert service.progress_callback is not None
def test_main_analysis_success(self, service, sample_adata):
"""Test successful analysis execution."""
result_adata, statistics = service.main_analysis_method(
sample_adata,
parameter1=1.5,
parameter2="option1"
)
# Validate results
assert result_adata is not None
assert isinstance(statistics, dict)
assert 'n_observations' in statistics
assert statistics['n_observations'] == sample_adata.n_obs
def test_input_validation(self, service, sample_adata):
"""Test input validation."""
# Test invalid parameter1
with pytest.raises(ValueError, match="Parameter1 must be positive"):
service.main_analysis_method(sample_adata, parameter1=-1.0)
# Test invalid parameter2
with pytest.raises(ValueError, match="Invalid parameter2"):
service.main_analysis_method(sample_adata, parameter2="invalid")
def test_empty_data_handling(self, service):
"""Test handling of empty data."""
empty_adata = generate_adata(n_obs=0, n_vars=10)
with pytest.raises(ValueError, match="Input data is empty"):
service.main_analysis_method(empty_adata)
def test_analysis_error_handling(self, service, sample_adata, monkeypatch):
"""Test error handling during analysis."""
# Mock a method to raise an exception
def mock_preprocess(*args, **kwargs):
raise RuntimeError("Preprocessing failed")
monkeypatch.setattr(service, '_preprocess_data', mock_preprocess)
with pytest.raises(YourAnalysisError, match="Unexpected error"):
service.main_analysis_method(sample_adata)
def test_statistics_calculation(self, service, sample_adata):
"""Test statistical calculations are correct."""
result_adata, statistics = service.main_analysis_method(sample_adata)
# Verify statistical accuracy
assert statistics['n_observations'] == result_adata.n_obs
assert statistics['n_features'] == result_adata.n_vars
assert 'analysis_timestamp' in statisticsIntegration Test Template
# tests/integration/test_service_integration.py
def test_service_with_real_data(real_dataset_fixture):
"""Test service with realistic biological data."""
service = YourService()
adata = real_dataset_fixture # Use real data fixture
result_adata, statistics = service.main_analysis_method(adata)
# Validate biological relevance
assert result_adata.n_obs > 0
assert 'your_analysis' in result_adata.uns
assert statistics['mean_expression'] > 0
def test_service_performance(large_dataset_fixture):
"""Test service performance with large datasets."""
import time
service = YourService()
start_time = time.time()
result_adata, statistics = service.main_analysis_method(large_dataset_fixture)
duration = time.time() - start_time
assert duration < 60 # Should complete within 60 seconds
assert result_adata.n_obs == large_dataset_fixture.n_obs📊 Best Practices
1. Performance Optimization
# Use efficient NumPy operations
def efficient_computation(data):
# Good: vectorized operations
result = np.mean(data, axis=1)
# Avoid: loops when possible
# result = [np.mean(row) for row in data]
# Memory management for large datasets
def memory_efficient_analysis(adata):
# Process in chunks if memory-constrained
if adata.n_obs > 100000:
# Implement chunked processing
pass
else:
# Standard processing
pass2. Scientific Rigor
# Proper statistical methods
def calculate_pvalues(data, method='benjamini-hochberg'):
from scipy import stats
from statsmodels.stats.multitest import multipletests
# Calculate p-values
pvalues = stats.ttest_1samp(data, 0).pvalue
# Multiple testing correction
rejected, pvals_corrected, _, _ = multipletests(
pvalues, method=method
)
return pvals_corrected
# Quality control checks
def validate_scientific_parameters(min_cells=3, min_genes=200):
if min_cells < 1:
raise ValueError("Minimum cells must be at least 1")
if min_genes < 50:
logger.warning("Very low gene threshold may affect analysis quality")3. Error Handling
class ServiceError(Exception):
"""Base service exception."""
pass
class DataValidationError(ServiceError):
"""Data validation failed."""
pass
class ComputationError(ServiceError):
"""Computation failed."""
pass
def robust_computation(data):
try:
result = complex_algorithm(data)
except np.linalg.LinAlgError as e:
raise ComputationError(f"Linear algebra error: {e}")
except ValueError as e:
raise DataValidationError(f"Invalid data: {e}")
except Exception as e:
raise ServiceError(f"Unexpected error: {e}")
return result4. Documentation
def well_documented_method(
adata: anndata.AnnData,
threshold: float = 0.05,
method: str = "wilcoxon"
) -> Tuple[anndata.AnnData, Dict[str, Any], AnalysisStep]:
"""
Perform differential expression analysis.
This method implements [specific algorithm] following the methodology
described in [reference]. The analysis identifies differentially expressed
genes between conditions using [statistical test].
Args:
adata: AnnData object with expression data and group annotations
threshold: Significance threshold for adjusted p-values (default: 0.05)
method: Statistical test method ('wilcoxon', 'ttest', 'deseq2')
Returns:
Tuple containing:
- AnnData object with differential expression results in .var
- Dictionary with summary statistics:
* 'n_significant': Number of significant genes
* 'method_used': Statistical method applied
* 'threshold_used': Significance threshold
* 'total_genes_tested': Total number of genes tested
- AnalysisStep for provenance tracking and notebook export
Raises:
ValueError: If method is not supported or data lacks required annotations
ComputationError: If statistical computation fails
Example:
>>> service = DifferentialExpressionService()
>>> result_adata, stats, ir = service.find_markers(adata, threshold=0.01)
>>> print(f"Found {stats['n_significant']} significant genes")
Notes:
- Requires 'group' column in adata.obs for group comparisons
- Results stored in adata.var['pvals'] and adata.var['pvals_adj']
- For single-cell data, considers only genes expressed in >3 cells per group
"""🎯 Service Categories
1. Data Processing Services
- PreprocessingService: Filtering, normalization, batch correction
- QualityService: Quality control metrics and filtering recommendations
- ConcatenationService: Sample merging and batch handling
2. Analysis Services
- ClusteringService: Cell clustering and dimensionality reduction
- DifferentialExpressionService: Statistical comparison between groups
- EnrichmentService: Pathway and gene set enrichment analysis
3. Visualization Services
- VisualizationService: Interactive plotting with Plotly
- NetworkService: Network analysis and visualization
- ReportService: Automated report generation
4. Proteomics-Specific Services
- ProteomicsPreprocessingService: Intensity normalization and missing value handling
- ProteomicsQualityService: CV analysis and batch detection
- ProteomicsDifferentialService: Linear models with empirical Bayes
5. Data Access Services
Data access services follow a different pattern from analysis services. They wrap external REST APIs and return structured data rather than AnnData objects.
Key differences from analysis services:
- No AnnData: Input and output are dictionaries, strings, or Pydantic models — not AnnData objects
- No 3-tuple return: Return API response data directly (not
(adata, stats, ir)) - HTTP clients: Use
requestsorhttpxfor REST API communication - Rate limiting: Must respect external API rate limits
- Location:
lobster/services/data_access/(separate from domain-specific analysis services)
Examples:
- EnsemblService (
services/data_access/ensembl_service.py, 371 lines) — Gene lookup, VEP variant consequence prediction, sequence retrieval (genomic/cDNA/CDS/protein), cross-database references. Used bygenomics_expert. - UniProtService (
services/data_access/uniprot_service.py, 303 lines) — Protein information, search, and async ID mapping. Used bygenomics_expert.
Pattern:
class EnsemblService:
"""REST API client for Ensembl genome browser."""
BASE_URL = "https://rest.ensembl.org"
def get_gene_info(self, gene_symbol: str) -> dict:
"""Look up gene by symbol."""
response = requests.get(
f"{self.BASE_URL}/lookup/symbol/homo_sapiens/{gene_symbol}",
headers={"Content-Type": "application/json"},
)
response.raise_for_status()
return response.json()
def get_vep_consequences(self, variant_id: str) -> dict:
"""Predict variant consequences via Ensembl VEP."""
response = requests.get(
f"{self.BASE_URL}/vep/human/id/{variant_id}",
headers={"Content-Type": "application/json"},
)
response.raise_for_status()
return response.json()This comprehensive guide provides everything needed to create robust, scientifically rigorous services that integrate seamlessly with the Lobster AI platform while maintaining high performance and reliability standards.
Creating Agent Packages
Comprehensive guide to building Lobster AI agent packages with step-by-step instructions and real examples
44. Custom Provider Development Guide
Custom Providers enable Lobster AI to integrate with external data sources, APIs, and databases beyond the built-in PubMed, GEO, and PMC providers. This ...