8.2 KiB
8.2 KiB
Distributed Shuffle Optimizer
Optimize shuffle operations in distributed computing frameworks (Spark, MapReduce, etc.) using Williams' √n memory bounds for network-efficient data exchange.
Features
- Buffer Sizing: Automatically calculates optimal buffer sizes per node using √n principle
- Spill Strategy: Determines when to spill to disk based on memory pressure
- Aggregation Trees: Builds √n-height trees for hierarchical aggregation
- Network Awareness: Considers rack topology and bandwidth in optimization
- Compression Selection: Chooses compression based on network/CPU tradeoffs
- Skew Handling: Special strategies for skewed key distributions
Installation
# From sqrtspace-tools root directory
pip install -r requirements-minimal.txt
Quick Start
from distsys.shuffle_optimizer import ShuffleOptimizer, ShuffleTask, NodeInfo
# Define cluster
nodes = [
NodeInfo("node1", "worker1.local", cpu_cores=16, memory_gb=64,
network_bandwidth_gbps=10.0, storage_type='ssd'),
NodeInfo("node2", "worker2.local", cpu_cores=16, memory_gb=64,
network_bandwidth_gbps=10.0, storage_type='ssd'),
# ... more nodes
]
# Create optimizer
optimizer = ShuffleOptimizer(nodes, memory_limit_fraction=0.5)
# Define shuffle task
task = ShuffleTask(
task_id="wordcount_shuffle",
input_partitions=1000,
output_partitions=100,
data_size_gb=50,
key_distribution='uniform',
value_size_avg=100,
combiner_function='sum'
)
# Optimize
plan = optimizer.optimize_shuffle(task)
print(plan.explanation)
# "Using combiner_based strategy because combiner function enables local aggregation.
# Allocated 316MB buffers per node using √n principle to balance memory and I/O.
# Applied snappy compression to reduce network traffic by ~50%.
# Estimated completion: 12.3s with 25.0GB network transfer."
Shuffle Strategies
1. All-to-All
- When: Small data (<1GB)
- How: Every node exchanges with every other node
- Pros: Simple, works well for small data
- Cons: O(n²) network connections
2. Hash Partition
- When: Uniform key distribution
- How: Hash keys to determine target partition
- Pros: Even data distribution
- Cons: No locality, can't handle skew
3. Range Partition
- When: Skewed data or ordered output needed
- How: Assign key ranges to partitions
- Pros: Handles skew, preserves order
- Cons: Requires sampling for ranges
4. Tree Aggregation
- When: Many nodes (>10) with aggregation
- How: √n-height tree reduces data at each level
- Pros: Log(n) network hops
- Cons: More complex coordination
5. Combiner-Based
- When: Associative aggregation functions
- How: Local combining before shuffle
- Pros: Reduces data volume significantly
- Cons: Only for specific operations
Memory Management
√n Buffer Sizing
# For 100GB shuffle on node with 64GB RAM:
data_per_node = 100GB / num_nodes
if data_per_node > available_memory:
buffer_size = √(data_per_node) # e.g., 316MB for 100GB
else:
buffer_size = data_per_node # Fit all in memory
Benefits:
- Memory: O(√n) instead of O(n)
- I/O: O(n/√n) = O(√n) passes
- Total: O(n√n) time with O(√n) memory
Spill Management
spill_threshold = buffer_size * 0.8 # Spill at 80% full
# Multi-pass algorithm:
while has_more_data:
fill_buffer_to_threshold()
sort_buffer() # or aggregate
spill_to_disk()
merge_spilled_runs()
Network Optimization
Rack Awareness
# Topology-aware data placement
if source.rack_id == destination.rack_id:
bandwidth = 10 Gbps # In-rack
else:
bandwidth = 5 Gbps # Cross-rack
# Prefer in-rack transfers when possible
Compression Selection
| Network Speed | Data Type | Recommended | Reasoning |
|---|---|---|---|
| >10 Gbps | Any | None | Network faster than compression |
| 1-10 Gbps | Small values | Snappy | Balanced CPU/network |
| 1-10 Gbps | Large values | Zlib | Worth CPU cost |
| <1 Gbps | Any | LZ4 | Fast compression critical |
Real-World Examples
1. Spark DataFrame Join
# 1TB join on 32-node cluster
task = ShuffleTask(
task_id="customer_orders_join",
input_partitions=10000,
output_partitions=10000,
data_size_gb=1000,
key_distribution='skewed', # Some customers have many orders
value_size_avg=200
)
plan = optimizer.optimize_shuffle(task)
# Result: Range partition with √n buffers
# Memory: 1.8GB per node (vs 31GB naive)
# Time: 4.2 minutes (vs 6.5 minutes)
2. MapReduce Word Count
# Classic word count with combining
task = ShuffleTask(
task_id="wordcount",
input_partitions=1000,
output_partitions=100,
data_size_gb=100,
key_distribution='skewed', # Common words
value_size_avg=8, # Count values
combiner_function='sum'
)
# Combiner reduces shuffle by 95%
# Network: 5GB instead of 100GB
3. Distributed Sort
# TeraSort benchmark
task = ShuffleTask(
task_id="terasort",
input_partitions=10000,
output_partitions=10000,
data_size_gb=1000,
key_distribution='uniform',
value_size_avg=100
)
# Uses range partitioning with sampling
# √n buffers enable sorting with limited memory
Performance Characteristics
Memory Savings
- Naive approach: O(n) memory per node
- √n optimization: O(√n) memory per node
- Typical savings: 90-98% for large shuffles
Time Impact
- Additional passes: √n instead of 1
- But: Each pass is faster (fits in cache)
- Network: Compression reduces transfer time
- Overall: Usually 20-50% faster
Scaling
| Cluster Size | Tree Height | Buffer Size (1TB) | Network Hops |
|---|---|---|---|
| 4 nodes | 2 | 15.8GB | 2 |
| 16 nodes | 4 | 7.9GB | 4 |
| 64 nodes | 8 | 3.95GB | 8 |
| 256 nodes | 16 | 1.98GB | 16 |
Integration Examples
Spark Integration
// Configure Spark with optimized settings
val conf = new SparkConf()
.set("spark.reducer.maxSizeInFlight", "48m") // √n buffer
.set("spark.shuffle.compress", "true")
.set("spark.shuffle.spill.compress", "true")
.set("spark.sql.adaptive.enabled", "true")
// Use optimizer recommendations
val plan = optimizer.optimizeShuffle(shuffleStats)
conf.set("spark.sql.shuffle.partitions", plan.outputPartitions.toString)
Custom Framework
# Use optimizer in custom distributed system
def execute_shuffle(data, optimizer):
# Get optimization plan
task = create_shuffle_task(data)
plan = optimizer.optimize_shuffle(task)
# Apply buffers
for node in nodes:
node.set_buffer_size(plan.buffer_sizes[node.id])
# Execute with strategy
if plan.strategy == ShuffleStrategy.TREE_AGGREGATE:
return tree_shuffle(data, plan.aggregation_tree)
else:
return hash_shuffle(data, plan.partition_assignment)
Advanced Features
Adaptive Optimization
# Monitor and adjust during execution
def adaptive_shuffle(task, optimizer):
plan = optimizer.optimize_shuffle(task)
# Start execution
metrics = start_shuffle(plan)
# Adjust if needed
if metrics.spill_rate > 0.5:
# Increase compression
plan.compression = CompressionType.ZLIB
if metrics.network_congestion > 0.8:
# Reduce parallelism
plan.parallelism *= 0.8
Multi-Stage Optimization
# Optimize entire job DAG
job_stages = [
ShuffleTask("map_output", 1000, 500, 100),
ShuffleTask("reduce_output", 500, 100, 50),
ShuffleTask("final_aggregate", 100, 1, 10)
]
plans = optimizer.optimize_pipeline(job_stages)
# Considers data flow between stages
Limitations
- Assumes homogeneous clusters (same node specs)
- Static optimization (no runtime adjustment yet)
- Simplified network model (no congestion)
- No GPU memory considerations
Future Enhancements
- Runtime plan adjustment
- Heterogeneous cluster support
- GPU memory hierarchy
- Learned cost models
- Integration with schedulers
See Also
- SpaceTimeCore: √n calculations
- Benchmark Suite: Performance comparisons