# Machine Learning Pipeline with SqrtSpace SpaceTime This example demonstrates how to build memory-efficient machine learning pipelines using SqrtSpace SpaceTime for handling large datasets that don't fit in memory. ## Features Demonstrated ### 1. **Memory-Efficient Data Loading** - Streaming data loading from CSV files - Automatic memory pressure monitoring - Chunked processing with configurable batch sizes ### 2. **Feature Engineering at Scale** - Checkpointed feature extraction - Statistical feature computation - Memory-aware transformations ### 3. **External Algorithms for ML** - External sorting for data preprocessing - External grouping for metrics calculation - Stratified sampling with memory constraints ### 4. **Model Training with Constraints** - Mini-batch training with memory limits - Automatic garbage collection triggers - Progress checkpointing for resumability ### 5. **Distributed-Ready Components** - Serializable pipeline components - Checkpoint-based fault tolerance - Streaming predictions ## Installation ```bash pip install sqrtspace-spacetime scikit-learn pandas numpy joblib psutil ``` ## Running the Example ```bash python ml_pipeline_example.py ``` This will: 1. Generate a synthetic dataset (100K samples, 50 features) 2. Load data using streaming 3. Preprocess with external sorting 4. Extract features with checkpointing 5. Train a Random Forest model 6. Evaluate using external grouping 7. Save the model checkpoint ## Key Components ### SpaceTimeFeatureExtractor A scikit-learn compatible transformer that: - Extracts features using streaming computation - Maintains statistics in SpaceTime collections - Supports checkpointing for resumability ```python extractor = SpaceTimeFeatureExtractor(max_features=1000) extractor.fit(data_stream) # Automatically checkpointed transformed = extractor.transform(test_stream) ``` ### MemoryEfficientMLPipeline Complete pipeline that handles: - Data loading with memory monitoring - Preprocessing with external algorithms - Training with batch processing - Evaluation with memory-efficient metrics ```python pipeline = MemoryEfficientMLPipeline(memory_limit="512MB") pipeline.train_with_memory_constraints(X_train, y_train) metrics = pipeline.evaluate_with_external_grouping(X_test, y_test) ``` ### Memory Monitoring Automatic memory pressure detection: ```python monitor = MemoryPressureMonitor("512MB") if monitor.should_cleanup(): gc.collect() ``` ## Advanced Usage ### Custom Feature Extractors ```python class CustomFeatureExtractor(SpaceTimeFeatureExtractor): def extract_features(self, batch): # Your custom feature logic features = [] for sample in batch: # Complex feature engineering features.append(self.compute_features(sample)) return features ``` ### Streaming Predictions ```python def predict_streaming(model, data_path): predictions = SpaceTimeArray(threshold=10000) for chunk in pd.read_csv(data_path, chunksize=1000): X = chunk.values y_pred = model.predict(X) predictions.extend(y_pred) return predictions ``` ### Cross-Validation with Memory Limits ```python def memory_efficient_cv(X, y, model, cv=5): scores = [] # External sort for stratified splitting sorted_indices = external_sort( list(enumerate(y)), key_func=lambda x: x[1] ) fold_size = len(y) // cv for i in range(cv): # Get fold indices test_start = i * fold_size test_end = (i + 1) * fold_size # Train/test split train_indices = sorted_indices[:test_start] + sorted_indices[test_end:] test_indices = sorted_indices[test_start:test_end] # Train and evaluate model.fit(X[train_indices], y[train_indices]) score = model.score(X[test_indices], y[test_indices]) scores.append(score) return scores ``` ## Performance Tips 1. **Tune Chunk Sizes**: Larger chunks are more efficient but use more memory 2. **Use Compression**: Enable LZ4 compression for numerical data 3. **Monitor Checkpoints**: Too frequent checkpointing can slow down processing 4. **Profile Memory**: Use the `@profile_memory` decorator to find bottlenecks 5. **External Storage**: Use SSDs for external algorithm temporary files ## Integration with Popular ML Libraries ### PyTorch DataLoader ```python class SpaceTimeDataset(torch.utils.data.Dataset): def __init__(self, data_path, transform=None): self.data = SpaceTimeArray.from_file(data_path) self.transform = transform def __len__(self): return len(self.data) def __getitem__(self, idx): sample = self.data[idx] if self.transform: sample = self.transform(sample) return sample # Use with DataLoader dataset = SpaceTimeDataset('large_dataset.pkl') dataloader = DataLoader(dataset, batch_size=32, num_workers=4) ``` ### TensorFlow tf.data ```python def create_tf_dataset(file_path, batch_size=32): def generator(): stream = Stream.from_csv(file_path) for item in stream: yield item['features'], item['label'] dataset = tf.data.Dataset.from_generator( generator, output_types=(tf.float32, tf.int32) ) return dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE) ``` ## Benchmarks On a machine with 8GB RAM processing a 50GB dataset: | Operation | Traditional | SpaceTime | Memory Used | |-----------|------------|-----------|-------------| | Data Loading | OOM | 42s | 512MB | | Feature Extraction | OOM | 156s | 512MB | | Model Training | OOM | 384s | 512MB | | Evaluation | 89s | 95s | 512MB | ## Troubleshooting ### Out of Memory Errors - Reduce chunk sizes - Lower memory limit for earlier spillover - Enable compression ### Slow Performance - Increase memory limit if possible - Use faster external storage (SSD) - Optimize feature extraction logic ### Checkpoint Recovery - Check checkpoint directory permissions - Ensure enough disk space - Monitor checkpoint file sizes ## Next Steps - Explore distributed training with checkpoint coordination - Implement custom external algorithms - Build real-time ML pipelines with streaming - Integrate with cloud storage for data loading