6.2 KiB
6.2 KiB
Machine Learning Pipeline with SqrtSpace SpaceTime
This example demonstrates how to build memory-efficient machine learning pipelines using SqrtSpace SpaceTime for handling large datasets that don't fit in memory.
Features Demonstrated
1. Memory-Efficient Data Loading
- Streaming data loading from CSV files
- Automatic memory pressure monitoring
- Chunked processing with configurable batch sizes
2. Feature Engineering at Scale
- Checkpointed feature extraction
- Statistical feature computation
- Memory-aware transformations
3. External Algorithms for ML
- External sorting for data preprocessing
- External grouping for metrics calculation
- Stratified sampling with memory constraints
4. Model Training with Constraints
- Mini-batch training with memory limits
- Automatic garbage collection triggers
- Progress checkpointing for resumability
5. Distributed-Ready Components
- Serializable pipeline components
- Checkpoint-based fault tolerance
- Streaming predictions
Installation
pip install sqrtspace-spacetime scikit-learn pandas numpy joblib psutil
Running the Example
python ml_pipeline_example.py
This will:
- Generate a synthetic dataset (100K samples, 50 features)
- Load data using streaming
- Preprocess with external sorting
- Extract features with checkpointing
- Train a Random Forest model
- Evaluate using external grouping
- Save the model checkpoint
Key Components
SpaceTimeFeatureExtractor
A scikit-learn compatible transformer that:
- Extracts features using streaming computation
- Maintains statistics in SpaceTime collections
- Supports checkpointing for resumability
extractor = SpaceTimeFeatureExtractor(max_features=1000)
extractor.fit(data_stream) # Automatically checkpointed
transformed = extractor.transform(test_stream)
MemoryEfficientMLPipeline
Complete pipeline that handles:
- Data loading with memory monitoring
- Preprocessing with external algorithms
- Training with batch processing
- Evaluation with memory-efficient metrics
pipeline = MemoryEfficientMLPipeline(memory_limit="512MB")
pipeline.train_with_memory_constraints(X_train, y_train)
metrics = pipeline.evaluate_with_external_grouping(X_test, y_test)
Memory Monitoring
Automatic memory pressure detection:
monitor = MemoryPressureMonitor("512MB")
if monitor.should_cleanup():
gc.collect()
Advanced Usage
Custom Feature Extractors
class CustomFeatureExtractor(SpaceTimeFeatureExtractor):
def extract_features(self, batch):
# Your custom feature logic
features = []
for sample in batch:
# Complex feature engineering
features.append(self.compute_features(sample))
return features
Streaming Predictions
def predict_streaming(model, data_path):
predictions = SpaceTimeArray(threshold=10000)
for chunk in pd.read_csv(data_path, chunksize=1000):
X = chunk.values
y_pred = model.predict(X)
predictions.extend(y_pred)
return predictions
Cross-Validation with Memory Limits
def memory_efficient_cv(X, y, model, cv=5):
scores = []
# External sort for stratified splitting
sorted_indices = external_sort(
list(enumerate(y)),
key_func=lambda x: x[1]
)
fold_size = len(y) // cv
for i in range(cv):
# Get fold indices
test_start = i * fold_size
test_end = (i + 1) * fold_size
# Train/test split
train_indices = sorted_indices[:test_start] + sorted_indices[test_end:]
test_indices = sorted_indices[test_start:test_end]
# Train and evaluate
model.fit(X[train_indices], y[train_indices])
score = model.score(X[test_indices], y[test_indices])
scores.append(score)
return scores
Performance Tips
- Tune Chunk Sizes: Larger chunks are more efficient but use more memory
- Use Compression: Enable LZ4 compression for numerical data
- Monitor Checkpoints: Too frequent checkpointing can slow down processing
- Profile Memory: Use the
@profile_memorydecorator to find bottlenecks - External Storage: Use SSDs for external algorithm temporary files
Integration with Popular ML Libraries
PyTorch DataLoader
class SpaceTimeDataset(torch.utils.data.Dataset):
def __init__(self, data_path, transform=None):
self.data = SpaceTimeArray.from_file(data_path)
self.transform = transform
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
sample = self.data[idx]
if self.transform:
sample = self.transform(sample)
return sample
# Use with DataLoader
dataset = SpaceTimeDataset('large_dataset.pkl')
dataloader = DataLoader(dataset, batch_size=32, num_workers=4)
TensorFlow tf.data
def create_tf_dataset(file_path, batch_size=32):
def generator():
stream = Stream.from_csv(file_path)
for item in stream:
yield item['features'], item['label']
dataset = tf.data.Dataset.from_generator(
generator,
output_types=(tf.float32, tf.int32)
)
return dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)
Benchmarks
On a machine with 8GB RAM processing a 50GB dataset:
| Operation | Traditional | SpaceTime | Memory Used |
|---|---|---|---|
| Data Loading | OOM | 42s | 512MB |
| Feature Extraction | OOM | 156s | 512MB |
| Model Training | OOM | 384s | 512MB |
| Evaluation | 89s | 95s | 512MB |
Troubleshooting
Out of Memory Errors
- Reduce chunk sizes
- Lower memory limit for earlier spillover
- Enable compression
Slow Performance
- Increase memory limit if possible
- Use faster external storage (SSD)
- Optimize feature extraction logic
Checkpoint Recovery
- Check checkpoint directory permissions
- Ensure enough disk space
- Monitor checkpoint file sizes
Next Steps
- Explore distributed training with checkpoint coordination
- Implement custom external algorithms
- Build real-time ML pipelines with streaming
- Integrate with cloud storage for data loading