Interfaces API Reference
The Interfaces API defines the abstract contracts and protocols that ensure consistent behavior across different implementations in the Lobster AI system. Th...
Overview
The Interfaces API defines the abstract contracts and protocols that ensure consistent behavior across different implementations in the Lobster AI system. These interfaces enable modularity, extensibility, and maintainability by providing clear contracts for data backends, modality adapters, validators, and client implementations.
Client Interface
BaseClient
Abstract base class defining the interface for all Lobster client implementations.
class BaseClient(ABC):
"""
Abstract base class defining the interface for all Lobster client implementations.
This ensures that both local (AgentClient) and cloud (CloudLobsterClient)
implementations provide the same interface to the CLI and other components.
"""Core Abstract Methods
@abstractmethod
def __init__(self, *args, **kwargs):
"""Initialize the client with necessary configuration."""
pass
@abstractmethod
def query(self, user_input: str, stream: bool = False) -> Dict[str, Any]:
"""
Process a user query through the system.
Args:
user_input: The user's question or request
stream: Whether to stream the response
Returns:
Dictionary containing:
- success: bool
- response: str
- error: Optional[str]
- session_id: str
- has_data: bool
- plots: List[Dict[str, Any]]
- duration: float (optional)
- last_agent: Optional[str] (optional)
"""
pass
@abstractmethod
def get_status(self) -> Dict[str, Any]:
"""
Get the current status of the client/system.
Returns:
Dictionary containing status information including:
- session_id: str
- message_count: int (for local) or status: str (for cloud)
- has_data: bool
- workspace: str
- data_summary: Optional[Dict] (if data is loaded)
"""
pass
@abstractmethod
def list_workspace_files(self, pattern: str = "*") -> List[Dict[str, Any]]:
"""
List files in the workspace.
Args:
pattern: Glob pattern for filtering files
Returns:
List of dictionaries containing file information:
- name: str
- path: str
- size: int
- modified: str (ISO format timestamp)
"""
pass
@abstractmethod
def reset(self) -> None:
"""Reset the conversation state."""
pass
@abstractmethod
def export_session(self, export_path: Optional[Path] = None) -> Path:
"""
Export the current session data.
Args:
export_path: Optional path for the export file
Returns:
Path to the exported file
"""
passOptional Methods
def get_usage(self) -> Dict[str, Any]:
"""
Get usage statistics (primarily for cloud clients).
Returns:
Dictionary with usage information or error
"""
return {"error": "Usage tracking not available for this client type", "success": False}
def list_models(self) -> Dict[str, Any]:
"""
List available models (primarily for cloud clients).
Returns:
Dictionary with model list or error
"""
return {"error": "Model listing not available for this client type", "success": False}Data Storage Interfaces
IDataBackend
Abstract interface for data storage backends enabling support for different storage systems.
class IDataBackend(ABC):
"""
Abstract interface for data storage backends.
This interface defines the contract for storing and retrieving
bioinformatics data in various formats and storage systems.
All backends must implement these core operations to ensure
consistent behavior across different storage solutions.
"""Core Abstract Methods
@abstractmethod
def load(self, path: Union[str, Path], **kwargs) -> anndata.AnnData:
"""
Load data from storage.
Args:
path: Path to the data file (local path or URI)
**kwargs: Backend-specific loading parameters
Returns:
anndata.AnnData: Loaded data object
Raises:
FileNotFoundError: If the file doesn't exist
ValueError: If the file format is unsupported or corrupted
PermissionError: If access is denied
"""
pass
@abstractmethod
def save(self, adata: anndata.AnnData, path: Union[str, Path], **kwargs) -> None:
"""
Save data to storage.
Args:
adata: AnnData object to save
path: Destination path (local path or URI)
**kwargs: Backend-specific saving parameters
Raises:
ValueError: If the data cannot be serialized
PermissionError: If write access is denied
OSError: If storage operation fails
"""
pass
@abstractmethod
def exists(self, path: Union[str, Path]) -> bool:
"""
Check if data exists at the specified path.
Args:
path: Path to check (local path or URI)
Returns:
bool: True if data exists, False otherwise
"""
pass
@abstractmethod
def delete(self, path: Union[str, Path]) -> None:
"""
Delete data at the specified path.
Args:
path: Path to delete (local path or URI)
Raises:
FileNotFoundError: If the file doesn't exist
PermissionError: If delete access is denied
"""
pass
@abstractmethod
def list_files(self, directory: Union[str, Path], pattern: str = "*") -> list[str]:
"""
List files in a directory matching the given pattern.
Args:
directory: Directory to search (local path or URI)
pattern: File pattern to match (glob-style)
Returns:
list[str]: List of file paths matching the pattern
Raises:
FileNotFoundError: If the directory doesn't exist
PermissionError: If read access is denied
"""
pass
@abstractmethod
def get_metadata(self, path: Union[str, Path]) -> Dict[str, Any]:
"""
Get metadata about a file.
Args:
path: Path to the file (local path or URI)
Returns:
Dict[str, Any]: Metadata dictionary containing:
- size: File size in bytes
- modified: Last modification timestamp
- checksum: File checksum (if available)
- format: Detected file format
Raises:
FileNotFoundError: If the file doesn't exist
"""
passDefault Implementation Methods
def get_storage_info(self) -> Dict[str, Any]:
"""
Get information about the storage backend.
Returns:
Dict[str, Any]: Storage backend information including:
- backend_type: Type of backend (e.g., 'local', 's3')
- capabilities: List of supported operations
- configuration: Backend configuration details
"""
return {
"backend_type": self.__class__.__name__,
"capabilities": ["load", "save", "exists", "delete", "list_files", "get_metadata"],
"configuration": {}
}
def validate_path(self, path: Union[str, Path]) -> Union[str, Path]:
"""
Validate and normalize a path for this backend.
Args:
path: Path to validate
Returns:
Union[str, Path]: Validated and normalized path
Raises:
ValueError: If the path is invalid for this backend
"""
return path
def supports_format(self, format_name: str) -> bool:
"""
Check if the backend supports a specific file format.
Args:
format_name: Format to check (e.g., 'h5ad', 'csv', 'h5mu')
Returns:
bool: True if format is supported, False otherwise
"""
return format_name.lower() in ['h5ad', 'csv']Data Adapter Interfaces
IModalityAdapter
Abstract interface for modality-specific data adapters enabling support for different biological data modalities.
class IModalityAdapter(ABC):
"""
Abstract interface for modality-specific data adapters.
This interface defines the contract for converting raw data from various
sources into standardized AnnData objects with modality-specific schemas.
Each adapter handles the specific requirements and conventions of its
biological data modality.
"""Core Abstract Methods
@abstractmethod
def from_source(
self,
source: Union[str, Path, pd.DataFrame],
**kwargs
) -> anndata.AnnData:
"""
Convert source data to AnnData with appropriate schema.
Args:
source: Data source (file path, DataFrame, or other format)
**kwargs: Modality-specific conversion parameters
Returns:
anndata.AnnData: Standardized data object with proper schema
Raises:
ValueError: If source data is invalid or cannot be converted
FileNotFoundError: If source file doesn't exist
TypeError: If source format is not supported
"""
pass
@abstractmethod
def validate(
self,
adata: anndata.AnnData,
strict: bool = False
) -> "ValidationResult":
"""
Validate AnnData against modality schema.
Args:
adata: AnnData object to validate
strict: If True, treat warnings as errors
Returns:
ValidationResult: Validation results with errors/warnings
Raises:
ValueError: If strict=True and validation fails
"""
pass
@abstractmethod
def get_schema(self) -> Dict[str, Any]:
"""
Return the expected schema for this modality.
Returns:
Dict[str, Any]: Schema definition containing:
- required_obs: Required observation (cell/sample) metadata
- required_var: Required variable (gene/protein) metadata
- optional_obs: Optional observation metadata
- optional_var: Optional variable metadata
- layers: Expected data layers
- obsm: Expected multi-dimensional observations
- uns: Expected unstructured metadata
"""
pass
@abstractmethod
def get_supported_formats(self) -> List[str]:
"""
Get list of supported input formats.
Returns:
List[str]: List of supported file extensions or format names
"""
passDefault Implementation Methods
def get_modality_name(self) -> str:
"""
Get the name of this modality.
Returns:
str: Modality name (e.g., 'transcriptomics', 'proteomics')
"""
return self.__class__.__name__.lower().replace('adapter', '')
def detect_format(self, source: Union[str, Path]) -> Optional[str]:
"""
Detect the format of a source file.
Args:
source: Path to the source file
Returns:
Optional[str]: Detected format name, None if unknown
"""
if isinstance(source, (str, Path)):
path = Path(source)
extension = path.suffix.lower()
format_mapping = {
'.csv': 'csv',
'.tsv': 'tsv',
'.txt': 'txt',
'.h5ad': 'h5ad',
'.h5': 'h5',
'.xlsx': 'excel',
'.xls': 'excel',
'.mtx': 'mtx',
'.h5mu': 'h5mu'
}
return format_mapping.get(extension)
return None
def preprocess_data(
self,
adata: anndata.AnnData,
**kwargs
) -> anndata.AnnData:
"""
Apply modality-specific preprocessing steps.
Args:
adata: Input AnnData object
**kwargs: Preprocessing parameters
Returns:
anndata.AnnData: Preprocessed data object
"""
return adata
def get_quality_metrics(self, adata: anndata.AnnData) -> Dict[str, Any]:
"""
Calculate modality-specific quality metrics.
Args:
adata: AnnData object to analyze
Returns:
Dict[str, Any]: Quality metrics dictionary
"""
return {
"n_obs": adata.n_obs,
"n_vars": adata.n_vars,
"sparsity": 1.0 - (adata.X != 0).sum() / adata.X.size if hasattr(adata.X, 'size') else 0.0,
"memory_usage": adata.X.nbytes if hasattr(adata.X, 'nbytes') else 0
}
def add_provenance(
self,
adata: anndata.AnnData,
source_info: Dict[str, Any],
processing_params: Optional[Dict[str, Any]] = None
) -> anndata.AnnData:
"""
Add provenance information to AnnData object.
Args:
adata: AnnData object to annotate
source_info: Information about data source
processing_params: Parameters used in processing
Returns:
anndata.AnnData: AnnData with provenance information
"""
import datetime
provenance = {
"adapter": self.__class__.__name__,
"modality": self.get_modality_name(),
"source": source_info,
"processing_params": processing_params or {},
"timestamp": datetime.datetime.now().isoformat(),
"version": "1.0.0"
}
if "provenance" not in adata.uns:
adata.uns["provenance"] = []
adata.uns["provenance"].append(provenance)
return adataValidation Interfaces
IValidator
Abstract interface for data validators with flexible error handling.
class IValidator(ABC):
"""
Abstract interface for data validators.
This interface defines the contract for validating biological data
against schemas with flexible error handling that supports both
strict validation (errors cause failures) and permissive validation
(warnings allow continued analysis).
"""Core Abstract Methods
@abstractmethod
def validate(
self,
adata: anndata.AnnData,
strict: bool = False,
check_types: bool = True,
check_ranges: bool = True,
check_completeness: bool = True
) -> ValidationResult:
"""
Validate AnnData object against schema.
Args:
adata: AnnData object to validate
strict: If True, treat warnings as errors
check_types: Whether to validate data types
check_ranges: Whether to validate value ranges
check_completeness: Whether to check for required fields
Returns:
ValidationResult: Validation results with errors/warnings
"""
pass
@abstractmethod
def validate_schema_compliance(
self,
adata: anndata.AnnData,
schema: Dict[str, Any]
) -> ValidationResult:
"""
Validate against a specific schema definition.
Args:
adata: AnnData object to validate
schema: Schema definition to validate against
Returns:
ValidationResult: Schema validation results
"""
passDefault Implementation Methods
def validate_obs_metadata(
self,
adata: anndata.AnnData,
required_columns: Optional[List[str]] = None,
optional_columns: Optional[List[str]] = None
) -> ValidationResult:
"""
Validate observation (cell/sample) metadata.
Args:
adata: AnnData object to validate
required_columns: List of required obs columns
optional_columns: List of optional obs columns
Returns:
ValidationResult: Obs metadata validation results
"""
result = ValidationResult()
if required_columns:
for col in required_columns:
if col not in adata.obs.columns:
result.add_error(f"Required obs column '{col}' is missing")
elif adata.obs[col].isna().all():
result.add_warning(f"Required obs column '{col}' contains only NaN values")
# Check for unexpected columns
expected_columns = set((required_columns or []) + (optional_columns or []))
actual_columns = set(adata.obs.columns)
unexpected = actual_columns - expected_columns
if unexpected:
result.add_info(f"Unexpected obs columns found: {list(unexpected)}")
return result
def validate_data_quality(self, adata: anndata.AnnData) -> ValidationResult:
"""
Perform basic data quality checks.
Args:
adata: AnnData object to validate
Returns:
ValidationResult: Data quality validation results
"""
result = ValidationResult()
# Check for empty data
if adata.n_obs == 0:
result.add_error("No observations (cells/samples) in dataset")
if adata.n_vars == 0:
result.add_error("No variables (genes/proteins) in dataset")
# Check for NaN values in X matrix
if hasattr(adata.X, 'isnan'):
nan_count = adata.X.isnan().sum()
if nan_count > 0:
nan_percentage = (nan_count / adata.X.size) * 100
if nan_percentage > 50:
result.add_warning(f"High proportion of NaN values: {nan_percentage:.1f}%")
else:
result.add_info(f"NaN values in X matrix: {nan_percentage:.1f}%")
return resultValidationResult
Data class for validation results with comprehensive error handling.
@dataclass
class ValidationResult:
"""
Result of a validation operation.
This class encapsulates the results of validating biological data,
supporting both errors (critical issues) and warnings (non-critical
issues that don't prevent analysis).
"""
errors: List[str] = field(default_factory=list)
warnings: List[str] = field(default_factory=list)
info: List[str] = field(default_factory=list)
metadata: Dict[str, Any] = field(default_factory=dict)Properties
@property
def has_errors(self) -> bool:
"""Check if validation found any errors."""
return len(self.errors) > 0
@property
def has_warnings(self) -> bool:
"""Check if validation found any warnings."""
return len(self.warnings) > 0
@property
def is_valid(self) -> bool:
"""Check if validation passed (no errors)."""
return not self.has_errorsMethods
def add_error(self, message: str) -> None:
"""Add an error message."""
self.errors.append(message)
def add_warning(self, message: str) -> None:
"""Add a warning message."""
self.warnings.append(message)
def add_info(self, message: str) -> None:
"""Add an informational message."""
self.info.append(message)
def merge(self, other: "ValidationResult") -> "ValidationResult":
"""
Merge another validation result into this one.
Args:
other: Another ValidationResult to merge
Returns:
ValidationResult: New merged result
"""
return ValidationResult(
errors=self.errors + other.errors,
warnings=self.warnings + other.warnings,
info=self.info + other.info,
metadata={**self.metadata, **other.metadata}
)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary representation."""
return {
"errors": self.errors,
"warnings": self.warnings,
"info": self.info,
"metadata": self.metadata,
"has_errors": self.has_errors,
"has_warnings": self.has_warnings,
"is_valid": self.is_valid
}
def summary(self) -> str:
"""Generate a human-readable summary."""
parts = []
if self.has_errors:
parts.append(f"{len(self.errors)} error(s)")
if self.has_warnings:
parts.append(f"{len(self.warnings)} warning(s)")
if self.info:
parts.append(f"{len(self.info)} info message(s)")
if not parts:
return "Validation passed with no issues"
return f"Validation completed with {', '.join(parts)}"
def format_messages(self, include_info: bool = True) -> str:
"""Format all messages for display."""
lines = []
if self.errors:
lines.append("ERRORS:")
for error in self.errors:
lines.append(f" ❌ {error}")
if self.warnings:
if lines:
lines.append("")
lines.append("WARNINGS:")
for warning in self.warnings:
lines.append(f" ⚠️ {warning}")
if self.info and include_info:
if lines:
lines.append("")
lines.append("INFO:")
for info_msg in self.info:
lines.append(f" ℹ️ {info_msg}")
return "\n".join(lines)Implementation Examples
Custom Backend Implementation
class S3Backend(IDataBackend):
"""Example S3 backend implementation."""
def __init__(self, bucket_name: str, aws_credentials: Dict[str, str]):
self.bucket_name = bucket_name
self.s3_client = boto3.client('s3', **aws_credentials)
def load(self, path: Union[str, Path], **kwargs) -> anndata.AnnData:
"""Load data from S3."""
# Download from S3 to temporary file
with tempfile.NamedTemporaryFile() as tmp:
self.s3_client.download_fileobj(
self.bucket_name, str(path), tmp
)
tmp.seek(0)
return anndata.read_h5ad(tmp.name)
def save(self, adata: anndata.AnnData, path: Union[str, Path], **kwargs) -> None:
"""Save data to S3."""
with tempfile.NamedTemporaryFile() as tmp:
adata.write_h5ad(tmp.name)
tmp.seek(0)
self.s3_client.upload_fileobj(
tmp, self.bucket_name, str(path)
)
# Implement other required methods...Custom Adapter Implementation
class CustomOmicsAdapter(IModalityAdapter):
"""Example custom omics adapter."""
def from_source(
self,
source: Union[str, Path, pd.DataFrame],
**kwargs
) -> anndata.AnnData:
"""Convert custom format to AnnData."""
if isinstance(source, pd.DataFrame):
# Convert DataFrame to AnnData
adata = anndata.AnnData(X=source.values)
adata.obs_names = source.index
adata.var_names = source.columns
else:
# Load from file
df = pd.read_csv(source)
adata = anndata.AnnData(X=df.iloc[:, 1:].values)
adata.obs_names = df.iloc[:, 0]
adata.var_names = df.columns[1:]
return self.add_provenance(adata, {"source": str(source)})
def validate(
self,
adata: anndata.AnnData,
strict: bool = False
) -> ValidationResult:
"""Validate custom omics data."""
result = ValidationResult()
# Custom validation logic
if adata.n_vars < 100:
result.add_warning("Low number of features detected")
return result
def get_schema(self) -> Dict[str, Any]:
"""Return expected schema."""
return {
"required_obs": ["sample_id"],
"optional_obs": ["batch", "condition"],
"required_var": [],
"optional_var": ["gene_biotype"],
"layers": ["raw"],
"obsm": [],
"uns": ["processing_info"]
}
def get_supported_formats(self) -> List[str]:
"""Return supported formats."""
return ["csv", "tsv", "xlsx"]Custom Validator Implementation
class BioinformaticsValidator(IValidator):
"""Example bioinformatics data validator."""
def validate(
self,
adata: anndata.AnnData,
strict: bool = False,
check_types: bool = True,
check_ranges: bool = True,
check_completeness: bool = True
) -> ValidationResult:
"""Validate bioinformatics data."""
result = ValidationResult()
# Basic structure validation
quality_result = self.validate_data_quality(adata)
result = result.merge(quality_result)
# Data type validation
if check_types:
if not np.issubdtype(adata.X.dtype, np.number):
result.add_error("Expression matrix must contain numeric data")
# Range validation
if check_ranges:
if hasattr(adata.X, 'min') and adata.X.min() < 0:
result.add_warning("Negative values detected in expression data")
# Convert warnings to errors if strict mode
if strict and result.has_warnings:
result.errors.extend(result.warnings)
result.warnings = []
return result
def validate_schema_compliance(
self,
adata: anndata.AnnData,
schema: Dict[str, Any]
) -> ValidationResult:
"""Validate against specific schema."""
result = ValidationResult()
# Validate required columns
obs_result = self.validate_obs_metadata(
adata,
schema.get('required_obs', []),
schema.get('optional_obs', [])
)
result = result.merge(obs_result)
# Validate expected layers
expected_layers = schema.get('layers', [])
for layer_name in expected_layers:
if layer_name not in adata.layers:
result.add_warning(f"Expected layer '{layer_name}' not found")
return resultInterface Integration
Registration with DataManagerV2
# Register custom implementations
data_manager = DataManagerV2()
# Register backend
s3_backend = S3Backend(bucket_name="my-bucket", aws_credentials=creds)
data_manager.register_backend("s3", s3_backend)
# Register adapter
custom_adapter = CustomOmicsAdapter()
data_manager.register_adapter("custom_omics", custom_adapter)
# Use registered implementations
adata = data_manager.load_modality(
name="my_data",
source="s3://my-bucket/data.csv",
adapter="custom_omics"
)Validation Pipeline
# Create validation pipeline
validator = BioinformaticsValidator()
# Validate with different strictness levels
result = validator.validate(adata, strict=False)
if result.has_errors:
print("Validation failed:", result.format_messages())
elif result.has_warnings:
print("Validation passed with warnings:", result.format_messages())
else:
print("Validation passed successfully")Interface Benefits
Modularity
- Pluggable Components: Easily swap implementations without changing core logic
- Separation of Concerns: Clear boundaries between different system layers
- Testability: Mock implementations for unit testing
Extensibility
- Custom Backends: Support new storage systems (S3, GCS, databases)
- New Modalities: Add support for emerging data types
- Flexible Validation: Implement domain-specific validation rules
Consistency
- Uniform APIs: Same interface regardless of underlying implementation
- Error Handling: Consistent exception hierarchy and error reporting
- Documentation: Self-documenting through interface contracts
The Interfaces API provides the foundation for Lobster AI's modular architecture, enabling seamless integration of new components while maintaining backward compatibility and system reliability.
Core API Reference
The Core API provides the foundational layer of the Lobster AI system, including data management, client interfaces, and system orchestration. This module ha...
API Reference Overview
The Lobster AI API provides a comprehensive set of interfaces for multi-omics bioinformatics analysis through a professional agent-based architecture. This r...