14 KiB
14 KiB
SqrtSpace SpaceTime FastAPI Sample Application
This sample demonstrates how to build memory-efficient, high-performance APIs using FastAPI and SqrtSpace SpaceTime.
Features Demonstrated
1. Streaming Endpoints
- Server-Sent Events (SSE) for real-time data
- Streaming file downloads without memory bloat
- Chunked JSON responses for large datasets
2. Background Tasks
- Memory-aware task processing
- Checkpointed long-running operations
- Progress tracking with resumable state
3. Data Processing
- External sorting for large datasets
- Memory-efficient aggregations
- Streaming ETL pipelines
4. Machine Learning Integration
- Batch prediction with memory limits
- Model training with checkpoints
- Feature extraction pipelines
Installation
- Create virtual environment:
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
- Install dependencies:
pip install -r requirements.txt
- Configure environment:
cp .env.example .env
Edit .env:
SPACETIME_MEMORY_LIMIT=512MB
SPACETIME_EXTERNAL_STORAGE=/tmp/spacetime
SPACETIME_CHUNK_STRATEGY=sqrt_n
SPACETIME_COMPRESSION=gzip
DATABASE_URL=sqlite:///./app.db
- Initialize database:
python init_db.py
Project Structure
fastapi-app/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI app
│ ├── config.py # Configuration
│ ├── models.py # Pydantic models
│ ├── database.py # Database setup
│ ├── routers/
│ │ ├── products.py # Product endpoints
│ │ ├── analytics.py # Analytics endpoints
│ │ ├── ml.py # ML endpoints
│ │ └── reports.py # Report generation
│ ├── services/
│ │ ├── product_service.py # Business logic
│ │ ├── analytics_service.py # Analytics processing
│ │ ├── ml_service.py # ML operations
│ │ └── cache_service.py # SpaceTime caching
│ ├── workers/
│ │ ├── background_tasks.py # Task workers
│ │ └── checkpointed_jobs.py # Resumable jobs
│ └── utils/
│ ├── streaming.py # Streaming helpers
│ └── memory.py # Memory monitoring
├── requirements.txt
├── Dockerfile
└── docker-compose.yml
Usage Examples
1. Streaming Large Datasets
# app/routers/products.py
from fastapi import APIRouter, Response
from fastapi.responses import StreamingResponse
from sqrtspace_spacetime import Stream
import json
router = APIRouter()
@router.get("/products/stream")
async def stream_products(category: str = None):
"""Stream products as newline-delimited JSON"""
async def generate():
query = db.query(Product)
if category:
query = query.filter(Product.category == category)
# Use SpaceTime stream for memory efficiency
stream = Stream.from_query(query, chunk_size=100)
for product in stream:
yield json.dumps(product.dict()) + "\n"
return StreamingResponse(
generate(),
media_type="application/x-ndjson",
headers={"X-Accel-Buffering": "no"}
)
2. Server-Sent Events for Real-Time Data
# app/routers/analytics.py
from fastapi import APIRouter
from sse_starlette.sse import EventSourceResponse
from sqrtspace_spacetime.memory import MemoryPressureMonitor
import asyncio
router = APIRouter()
@router.get("/analytics/realtime")
async def realtime_analytics():
"""Stream real-time analytics using SSE"""
monitor = MemoryPressureMonitor("100MB")
async def event_generator():
while True:
# Get current stats
stats = await analytics_service.get_current_stats()
# Check memory pressure
if monitor.check() != MemoryPressureLevel.NONE:
await analytics_service.compact_cache()
yield {
"event": "update",
"data": json.dumps(stats)
}
await asyncio.sleep(1)
return EventSourceResponse(event_generator())
3. Memory-Efficient CSV Export
# app/routers/reports.py
from fastapi import APIRouter
from fastapi.responses import StreamingResponse
from sqrtspace_spacetime.file import CsvWriter
import io
router = APIRouter()
@router.get("/reports/export/csv")
async def export_csv(start_date: date, end_date: date):
"""Export large dataset as CSV with streaming"""
async def generate():
# Create in-memory buffer
output = io.StringIO()
writer = CsvWriter(output)
# Write headers
writer.writerow(["Date", "Orders", "Revenue", "Customers"])
# Stream data in chunks
async for batch in analytics_service.get_daily_stats_batched(
start_date, end_date, batch_size=100
):
for row in batch:
writer.writerow([
row.date,
row.order_count,
row.total_revenue,
row.unique_customers
])
# Yield buffer content
output.seek(0)
data = output.read()
output.seek(0)
output.truncate()
yield data
return StreamingResponse(
generate(),
media_type="text/csv",
headers={
"Content-Disposition": f"attachment; filename=report_{start_date}_{end_date}.csv"
}
)
4. Checkpointed Background Tasks
# app/workers/checkpointed_jobs.py
from sqrtspace_spacetime.checkpoint import CheckpointManager, auto_checkpoint
from sqrtspace_spacetime.collections import SpaceTimeArray
class DataProcessor:
def __init__(self):
self.checkpoint_manager = CheckpointManager()
@auto_checkpoint(total_iterations=10000)
async def process_large_dataset(self, dataset_id: str):
"""Process dataset with automatic checkpointing"""
# Initialize or restore state
results = SpaceTimeArray(threshold=1000)
processed_count = 0
# Get data in batches
async for batch in self.get_data_batches(dataset_id):
for item in batch:
# Process item
result = await self.process_item(item)
results.append(result)
processed_count += 1
# Yield state for checkpointing
if processed_count % 100 == 0:
yield {
'processed': processed_count,
'results': results,
'last_item_id': item.id
}
return results
5. Machine Learning with Memory Constraints
# app/services/ml_service.py
from sqrtspace_spacetime.ml import SpaceTimeOptimizer
from sqrtspace_spacetime.streams import Stream
import numpy as np
class MLService:
def __init__(self):
self.optimizer = SpaceTimeOptimizer(
memory_limit="256MB",
checkpoint_frequency=100
)
async def train_model(self, training_data_path: str):
"""Train model with memory-efficient data loading"""
# Stream training data
data_stream = Stream.from_csv(
training_data_path,
chunk_size=1000
)
# Process in mini-batches
for epoch in range(10):
for batch in data_stream.batch(32):
X = np.array([item.features for item in batch])
y = np.array([item.label for item in batch])
# Train step with automatic checkpointing
loss = self.optimizer.step(
self.model,
X, y,
epoch=epoch
)
if self.optimizer.should_checkpoint():
await self.save_checkpoint(epoch)
async def batch_predict(self, input_data):
"""Memory-efficient batch prediction"""
results = SpaceTimeArray(threshold=1000)
# Process in chunks to avoid memory issues
for chunk in Stream.from_iterable(input_data).chunk(100):
predictions = self.model.predict(chunk)
results.extend(predictions)
return results
6. Advanced Caching with SpaceTime
# app/services/cache_service.py
from sqrtspace_spacetime.collections import SpaceTimeDict
from sqrtspace_spacetime.memory import MemoryPressureMonitor
import asyncio
class SpaceTimeCache:
def __init__(self):
self.hot_cache = SpaceTimeDict(threshold=1000)
self.monitor = MemoryPressureMonitor("128MB")
self.stats = {
'hits': 0,
'misses': 0,
'evictions': 0
}
async def get(self, key: str):
"""Get with automatic tier management"""
if key in self.hot_cache:
self.stats['hits'] += 1
return self.hot_cache[key]
self.stats['misses'] += 1
# Load from database
value = await self.load_from_db(key)
# Add to cache if memory allows
if self.monitor.can_allocate(len(str(value))):
self.hot_cache[key] = value
else:
# Trigger cleanup
self.cleanup()
self.stats['evictions'] += len(self.hot_cache) // 2
return value
def cleanup(self):
"""Remove least recently used items"""
# SpaceTimeDict handles LRU automatically
self.hot_cache.evict_cold_items(0.5)
API Endpoints
Products API
GET /products- Paginated listGET /products/stream- Stream all products (NDJSON)GET /products/search- Memory-efficient searchPOST /products/bulk-update- Checkpointed bulk updatesGET /products/export/csv- Streaming CSV export
Analytics API
GET /analytics/summary- Current statisticsGET /analytics/realtime- SSE stream of live dataGET /analytics/trends- Historical trendsPOST /analytics/aggregate- Custom aggregations
ML API
POST /ml/train- Train model (async with progress)POST /ml/predict/batch- Batch predictionsGET /ml/models/{id}/status- Training statusPOST /ml/features/extract- Feature extraction pipeline
Reports API
POST /reports/generate- Generate large reportGET /reports/{id}/progress- Check progressGET /reports/{id}/download- Download completed report
Running the Application
Development
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
Production
gunicorn app.main:app -w 4 -k uvicorn.workers.UvicornWorker \
--bind 0.0.0.0:8000 \
--timeout 300 \
--max-requests 1000 \
--max-requests-jitter 50
With Docker
docker-compose up
Performance Configuration
1. Nginx Configuration
location /products/stream {
proxy_pass http://backend;
proxy_buffering off;
proxy_read_timeout 3600;
proxy_http_version 1.1;
proxy_set_header Connection "";
}
location /analytics/realtime {
proxy_pass http://backend;
proxy_buffering off;
proxy_cache off;
proxy_read_timeout 86400;
proxy_http_version 1.1;
proxy_set_header Connection "";
}
2. Worker Configuration
# app/config.py
WORKER_CONFIG = {
'memory_limit': os.getenv('WORKER_MEMORY_LIMIT', '512MB'),
'checkpoint_interval': 100,
'batch_size': 1000,
'external_storage': '/tmp/spacetime-workers'
}
Monitoring
Memory Usage Endpoint
@router.get("/system/memory")
async def memory_stats():
"""Get current memory statistics"""
return {
"current_usage_mb": memory_monitor.current_usage_mb,
"peak_usage_mb": memory_monitor.peak_usage_mb,
"available_mb": memory_monitor.available_mb,
"pressure_level": memory_monitor.pressure_level,
"cache_stats": cache_service.get_stats(),
"external_files": len(os.listdir(EXTERNAL_STORAGE))
}
Prometheus Metrics
from prometheus_client import Counter, Histogram, Gauge
stream_requests = Counter('spacetime_stream_requests_total', 'Total streaming requests')
memory_usage = Gauge('spacetime_memory_usage_bytes', 'Current memory usage')
processing_time = Histogram('spacetime_processing_seconds', 'Processing time')
Testing
Unit Tests
pytest tests/unit -v
Integration Tests
pytest tests/integration -v
Load Testing
locust -f tests/load/locustfile.py --host http://localhost:8000
Best Practices
- Always use streaming for large responses
- Configure memory limits based on container size
- Enable checkpointing for long-running tasks
- Monitor memory pressure in production
- Use external storage on fast SSDs
- Set appropriate timeouts for streaming endpoints
- Implement circuit breakers for memory protection
Troubleshooting
High Memory Usage
- Reduce chunk sizes
- Enable more aggressive spillover
- Check for memory leaks in custom code
Slow Streaming
- Ensure proxy buffering is disabled
- Check network latency
- Optimize chunk sizes
Failed Checkpoints
- Verify storage permissions
- Check disk space
- Monitor checkpoint frequency