sqrtspace-experiments/experiments/measurement_framework.py
2025-07-20 03:56:21 -04:00

233 lines
7.9 KiB
Python

"""
Standardized measurement framework for space-time tradeoff experiments.
Provides consistent metrics and visualization tools.
"""
import time
import psutil
import os
import json
import numpy as np
import matplotlib.pyplot as plt
from dataclasses import dataclass, asdict
from typing import Callable, Any, List, Dict
from datetime import datetime
@dataclass
class Measurement:
"""Single measurement point"""
timestamp: float
memory_bytes: int
cpu_percent: float
@dataclass
class ExperimentResult:
"""Results from a single experiment run"""
algorithm: str
input_size: int
elapsed_time: float
peak_memory: int
average_memory: int
measurements: List[Measurement]
output: Any
metadata: Dict[str, Any]
class SpaceTimeProfiler:
"""Profile space and time usage of algorithms"""
def __init__(self, sample_interval: float = 0.01):
self.sample_interval = sample_interval
self.process = psutil.Process(os.getpid())
def profile(self, func: Callable, *args, **kwargs) -> ExperimentResult:
"""Profile a function's execution"""
measurements = []
# Start monitoring in background
import threading
stop_monitoring = threading.Event()
def monitor():
while not stop_monitoring.is_set():
measurements.append(Measurement(
timestamp=time.time(),
memory_bytes=self.process.memory_info().rss,
cpu_percent=self.process.cpu_percent(interval=0.01)
))
time.sleep(self.sample_interval)
monitor_thread = threading.Thread(target=monitor)
monitor_thread.start()
# Run the function
start_time = time.time()
try:
output = func(*args, **kwargs)
finally:
stop_monitoring.set()
monitor_thread.join()
elapsed_time = time.time() - start_time
# Calculate statistics
memory_values = [m.memory_bytes for m in measurements]
peak_memory = max(memory_values) if memory_values else 0
average_memory = sum(memory_values) / len(memory_values) if memory_values else 0
return ExperimentResult(
algorithm=func.__name__,
input_size=kwargs.get('input_size', 0),
elapsed_time=elapsed_time,
peak_memory=peak_memory,
average_memory=int(average_memory),
measurements=measurements,
output=output,
metadata=kwargs.get('metadata', {})
)
class ExperimentRunner:
"""Run and compare multiple algorithms"""
def __init__(self, experiment_name: str):
self.experiment_name = experiment_name
self.results: List[ExperimentResult] = []
self.profiler = SpaceTimeProfiler()
def add_algorithm(self, func: Callable, input_sizes: List[int],
name: str = None, **kwargs):
"""Run algorithm on multiple input sizes"""
name = name or func.__name__
for size in input_sizes:
print(f"Running {name} with input size {size}...")
result = self.profiler.profile(func, input_size=size, **kwargs)
result.algorithm = name
result.input_size = size
self.results.append(result)
def save_results(self, filename: str = None):
"""Save results to JSON file"""
filename = filename or f"{self.experiment_name}_results.json"
# Convert results to serializable format
data = {
'experiment': self.experiment_name,
'timestamp': datetime.now().isoformat(),
'results': [
{
**asdict(r),
'measurements': [asdict(m) for m in r.measurements[:100]] # Limit measurements
}
for r in self.results
]
}
with open(filename, 'w') as f:
json.dump(data, f, indent=2)
def plot_space_time_curves(self, save_path: str = None):
"""Generate space-time tradeoff visualization"""
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))
# Group by algorithm
algorithms = {}
for r in self.results:
if r.algorithm not in algorithms:
algorithms[r.algorithm] = {'sizes': [], 'times': [], 'memory': []}
algorithms[r.algorithm]['sizes'].append(r.input_size)
algorithms[r.algorithm]['times'].append(r.elapsed_time)
algorithms[r.algorithm]['memory'].append(r.peak_memory / 1024 / 1024) # MB
# Plot time complexity
for alg, data in algorithms.items():
ax1.plot(data['sizes'], data['times'], 'o-', label=alg, markersize=8)
ax1.set_xlabel('Input Size (n)')
ax1.set_ylabel('Time (seconds)')
ax1.set_title('Time Complexity')
ax1.legend()
ax1.grid(True, alpha=0.3)
ax1.set_xscale('log')
ax1.set_yscale('log')
# Plot space complexity
for alg, data in algorithms.items():
ax2.plot(data['sizes'], data['memory'], 's-', label=alg, markersize=8)
ax2.set_xlabel('Input Size (n)')
ax2.set_ylabel('Peak Memory (MB)')
ax2.set_title('Space Complexity')
ax2.legend()
ax2.grid(True, alpha=0.3)
ax2.set_xscale('log')
ax2.set_yscale('log')
# Only add theoretical bounds if they make sense for the experiment
# (removed inappropriate √n bound for sorting algorithms that use O(1) space)
plt.suptitle(f'{self.experiment_name}: Space-Time Tradeoff Analysis')
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=150)
else:
plt.savefig(f"{self.experiment_name}_analysis.png", dpi=150)
plt.close()
def print_summary(self):
"""Print summary statistics"""
print(f"\n=== {self.experiment_name} Results Summary ===\n")
# Group by algorithm and size
summary = {}
for r in self.results:
key = (r.algorithm, r.input_size)
if key not in summary:
summary[key] = []
summary[key].append(r)
# Print table
print(f"{'Algorithm':<20} {'Size':<10} {'Time (s)':<12} {'Memory (MB)':<12} {'Time Ratio':<12}")
print("-" * 70)
baseline_times = {}
for (alg, size), results in sorted(summary.items()):
avg_time = sum(r.elapsed_time for r in results) / len(results)
avg_memory = sum(r.peak_memory for r in results) / len(results) / 1024 / 1024
# Store baseline (first algorithm) times
if size not in baseline_times:
baseline_times[size] = avg_time
time_ratio = avg_time / baseline_times[size]
print(f"{alg:<20} {size:<10} {avg_time:<12.4f} {avg_memory:<12.2f} {time_ratio:<12.2f}x")
# Example usage for testing
if __name__ == "__main__":
# Test with simple sorting algorithms
import random
def bubble_sort(input_size: int, **kwargs):
arr = [random.random() for _ in range(input_size)]
n = len(arr)
for i in range(n):
for j in range(0, n-i-1):
if arr[j] > arr[j+1]:
arr[j], arr[j+1] = arr[j+1], arr[j]
return arr
def python_sort(input_size: int, **kwargs):
arr = [random.random() for _ in range(input_size)]
return sorted(arr)
runner = ExperimentRunner("Sorting Comparison")
runner.add_algorithm(python_sort, [100, 500, 1000], name="Built-in Sort")
runner.add_algorithm(bubble_sort, [100, 500, 1000], name="Bubble Sort")
runner.print_summary()
runner.plot_space_time_curves()
runner.save_results()