sqrtspace-tools/distsys/README.md
2025-07-20 04:04:41 -04:00

8.2 KiB

Distributed Shuffle Optimizer

Optimize shuffle operations in distributed computing frameworks (Spark, MapReduce, etc.) using Williams' √n memory bounds for network-efficient data exchange.

Features

  • Buffer Sizing: Automatically calculates optimal buffer sizes per node using √n principle
  • Spill Strategy: Determines when to spill to disk based on memory pressure
  • Aggregation Trees: Builds √n-height trees for hierarchical aggregation
  • Network Awareness: Considers rack topology and bandwidth in optimization
  • Compression Selection: Chooses compression based on network/CPU tradeoffs
  • Skew Handling: Special strategies for skewed key distributions

Installation

# From sqrtspace-tools root directory
pip install -r requirements-minimal.txt

Quick Start

from distsys.shuffle_optimizer import ShuffleOptimizer, ShuffleTask, NodeInfo

# Define cluster
nodes = [
    NodeInfo("node1", "worker1.local", cpu_cores=16, memory_gb=64, 
             network_bandwidth_gbps=10.0, storage_type='ssd'),
    NodeInfo("node2", "worker2.local", cpu_cores=16, memory_gb=64,
             network_bandwidth_gbps=10.0, storage_type='ssd'),
    # ... more nodes
]

# Create optimizer
optimizer = ShuffleOptimizer(nodes, memory_limit_fraction=0.5)

# Define shuffle task
task = ShuffleTask(
    task_id="wordcount_shuffle",
    input_partitions=1000,
    output_partitions=100,
    data_size_gb=50,
    key_distribution='uniform',
    value_size_avg=100,
    combiner_function='sum'
)

# Optimize
plan = optimizer.optimize_shuffle(task)
print(plan.explanation)
# "Using combiner_based strategy because combiner function enables local aggregation.
#  Allocated 316MB buffers per node using √n principle to balance memory and I/O.
#  Applied snappy compression to reduce network traffic by ~50%.
#  Estimated completion: 12.3s with 25.0GB network transfer."

Shuffle Strategies

1. All-to-All

  • When: Small data (<1GB)
  • How: Every node exchanges with every other node
  • Pros: Simple, works well for small data
  • Cons: O(n²) network connections

2. Hash Partition

  • When: Uniform key distribution
  • How: Hash keys to determine target partition
  • Pros: Even data distribution
  • Cons: No locality, can't handle skew

3. Range Partition

  • When: Skewed data or ordered output needed
  • How: Assign key ranges to partitions
  • Pros: Handles skew, preserves order
  • Cons: Requires sampling for ranges

4. Tree Aggregation

  • When: Many nodes (>10) with aggregation
  • How: √n-height tree reduces data at each level
  • Pros: Log(n) network hops
  • Cons: More complex coordination

5. Combiner-Based

  • When: Associative aggregation functions
  • How: Local combining before shuffle
  • Pros: Reduces data volume significantly
  • Cons: Only for specific operations

Memory Management

√n Buffer Sizing

# For 100GB shuffle on node with 64GB RAM:
data_per_node = 100GB / num_nodes
if data_per_node > available_memory:
    buffer_size = (data_per_node)  # e.g., 316MB for 100GB
else:
    buffer_size = data_per_node      # Fit all in memory

Benefits:

  • Memory: O(√n) instead of O(n)
  • I/O: O(n/√n) = O(√n) passes
  • Total: O(n√n) time with O(√n) memory

Spill Management

spill_threshold = buffer_size * 0.8  # Spill at 80% full

# Multi-pass algorithm:
while has_more_data:
    fill_buffer_to_threshold()
    sort_buffer()  # or aggregate
    spill_to_disk()
merge_spilled_runs()

Network Optimization

Rack Awareness

# Topology-aware data placement
if source.rack_id == destination.rack_id:
    bandwidth = 10 Gbps  # In-rack
else:
    bandwidth = 5 Gbps   # Cross-rack

# Prefer in-rack transfers when possible

Compression Selection

Network Speed Data Type Recommended Reasoning
>10 Gbps Any None Network faster than compression
1-10 Gbps Small values Snappy Balanced CPU/network
1-10 Gbps Large values Zlib Worth CPU cost
<1 Gbps Any LZ4 Fast compression critical

Real-World Examples

1. Spark DataFrame Join

# 1TB join on 32-node cluster
task = ShuffleTask(
    task_id="customer_orders_join",
    input_partitions=10000,
    output_partitions=10000,
    data_size_gb=1000,
    key_distribution='skewed',  # Some customers have many orders
    value_size_avg=200
)

plan = optimizer.optimize_shuffle(task)
# Result: Range partition with √n buffers
# Memory: 1.8GB per node (vs 31GB naive)
# Time: 4.2 minutes (vs 6.5 minutes)

2. MapReduce Word Count

# Classic word count with combining
task = ShuffleTask(
    task_id="wordcount",
    input_partitions=1000,
    output_partitions=100,
    data_size_gb=100,
    key_distribution='skewed',  # Common words
    value_size_avg=8,  # Count values
    combiner_function='sum'
)

# Combiner reduces shuffle by 95%
# Network: 5GB instead of 100GB

3. Distributed Sort

# TeraSort benchmark
task = ShuffleTask(
    task_id="terasort",
    input_partitions=10000,
    output_partitions=10000,
    data_size_gb=1000,
    key_distribution='uniform',
    value_size_avg=100
)

# Uses range partitioning with sampling
# √n buffers enable sorting with limited memory

Performance Characteristics

Memory Savings

  • Naive approach: O(n) memory per node
  • √n optimization: O(√n) memory per node
  • Typical savings: 90-98% for large shuffles

Time Impact

  • Additional passes: √n instead of 1
  • But: Each pass is faster (fits in cache)
  • Network: Compression reduces transfer time
  • Overall: Usually 20-50% faster

Scaling

Cluster Size Tree Height Buffer Size (1TB) Network Hops
4 nodes 2 15.8GB 2
16 nodes 4 7.9GB 4
64 nodes 8 3.95GB 8
256 nodes 16 1.98GB 16

Integration Examples

Spark Integration

// Configure Spark with optimized settings
val conf = new SparkConf()
  .set("spark.reducer.maxSizeInFlight", "48m")  // √n buffer
  .set("spark.shuffle.compress", "true")
  .set("spark.shuffle.spill.compress", "true")
  .set("spark.sql.adaptive.enabled", "true")

// Use optimizer recommendations
val plan = optimizer.optimizeShuffle(shuffleStats)
conf.set("spark.sql.shuffle.partitions", plan.outputPartitions.toString)

Custom Framework

# Use optimizer in custom distributed system
def execute_shuffle(data, optimizer):
    # Get optimization plan
    task = create_shuffle_task(data)
    plan = optimizer.optimize_shuffle(task)
    
    # Apply buffers
    for node in nodes:
        node.set_buffer_size(plan.buffer_sizes[node.id])
    
    # Execute with strategy
    if plan.strategy == ShuffleStrategy.TREE_AGGREGATE:
        return tree_shuffle(data, plan.aggregation_tree)
    else:
        return hash_shuffle(data, plan.partition_assignment)

Advanced Features

Adaptive Optimization

# Monitor and adjust during execution
def adaptive_shuffle(task, optimizer):
    plan = optimizer.optimize_shuffle(task)
    
    # Start execution
    metrics = start_shuffle(plan)
    
    # Adjust if needed
    if metrics.spill_rate > 0.5:
        # Increase compression
        plan.compression = CompressionType.ZLIB
    
    if metrics.network_congestion > 0.8:
        # Reduce parallelism
        plan.parallelism *= 0.8

Multi-Stage Optimization

# Optimize entire job DAG
job_stages = [
    ShuffleTask("map_output", 1000, 500, 100),
    ShuffleTask("reduce_output", 500, 100, 50),
    ShuffleTask("final_aggregate", 100, 1, 10)
]

plans = optimizer.optimize_pipeline(job_stages)
# Considers data flow between stages

Limitations

  • Assumes homogeneous clusters (same node specs)
  • Static optimization (no runtime adjustment yet)
  • Simplified network model (no congestion)
  • No GPU memory considerations

Future Enhancements

  • Runtime plan adjustment
  • Heterogeneous cluster support
  • GPU memory hierarchy
  • Learned cost models
  • Integration with schedulers

See Also