sqrtspace-tools/advisor/config_advisor.py
2025-07-20 04:04:41 -04:00

749 lines
26 KiB
Python

#!/usr/bin/env python3
"""
SpaceTime Configuration Advisor: Analyze systems and recommend optimal settings
Features:
- System Analysis: Profile hardware capabilities
- Workload Characterization: Understand access patterns
- Configuration Generation: Produce optimal settings
- A/B Testing: Compare configurations in production
- AI Explanations: Clear reasoning for recommendations
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import psutil
import platform
import subprocess
import json
import time
import numpy as np
from dataclasses import dataclass, asdict
from typing import Dict, List, Optional, Any, Tuple
from enum import Enum
import sqlite3
import re
# Import core components
from core.spacetime_core import (
MemoryHierarchy,
SqrtNCalculator,
OptimizationStrategy
)
class SystemType(Enum):
"""Types of systems to configure"""
DATABASE = "database"
JVM = "jvm"
KERNEL = "kernel"
CONTAINER = "container"
APPLICATION = "application"
class WorkloadType(Enum):
"""Common workload patterns"""
OLTP = "oltp" # Many small transactions
OLAP = "olap" # Large analytical queries
STREAMING = "streaming" # Continuous data flow
BATCH = "batch" # Periodic large jobs
MIXED = "mixed" # Combination
WEB = "web" # Web serving
ML_TRAINING = "ml_training" # Machine learning
ML_INFERENCE = "ml_inference" # Model serving
@dataclass
class SystemProfile:
"""Hardware and software profile"""
# Hardware
cpu_count: int
cpu_model: str
memory_gb: float
memory_speed_mhz: Optional[int]
storage_type: str # 'ssd', 'nvme', 'hdd'
storage_iops: Optional[int]
network_speed_gbps: float
# Software
os_type: str
os_version: str
kernel_version: Optional[str]
# Memory hierarchy
l1_cache_kb: int
l2_cache_kb: int
l3_cache_mb: float
numa_nodes: int
# Current usage
memory_used_percent: float
cpu_usage_percent: float
io_wait_percent: float
@dataclass
class WorkloadProfile:
"""Workload characteristics"""
type: WorkloadType
read_write_ratio: float # 0.0 = write-only, 1.0 = read-only
hot_data_size_gb: float # Working set size
total_data_size_gb: float # Total dataset
request_rate: float # Requests per second
avg_request_size_kb: float # Average request size
concurrency: int # Concurrent connections/threads
batch_size: Optional[int] # For batch workloads
latency_sla_ms: Optional[float] # Latency requirement
@dataclass
class Configuration:
"""System configuration recommendations"""
system_type: SystemType
settings: Dict[str, Any]
explanation: str
expected_improvement: Dict[str, float]
commands: List[str] # Commands to apply settings
validation_tests: List[str] # Tests to verify improvement
@dataclass
class TestResult:
"""A/B test results"""
config_name: str
metrics: Dict[str, float]
duration_seconds: float
samples: int
confidence: float
winner: bool
class SystemAnalyzer:
"""Analyze system hardware and software"""
def __init__(self):
self.hierarchy = MemoryHierarchy.detect_system()
def analyze_system(self) -> SystemProfile:
"""Comprehensive system analysis"""
# CPU information
cpu_count = psutil.cpu_count(logical=False)
cpu_model = self._get_cpu_model()
# Memory information
mem = psutil.virtual_memory()
memory_gb = mem.total / (1024**3)
memory_speed = self._get_memory_speed()
# Storage information
storage_type, storage_iops = self._analyze_storage()
# Network information
network_speed = self._estimate_network_speed()
# OS information
os_type = platform.system()
os_version = platform.version()
kernel_version = platform.release() if os_type == 'Linux' else None
# Cache sizes (from hierarchy)
l1_cache_kb = self.hierarchy.l1_size // 1024
l2_cache_kb = self.hierarchy.l2_size // 1024
l3_cache_mb = self.hierarchy.l3_size // (1024 * 1024)
# NUMA nodes
numa_nodes = self._get_numa_nodes()
# Current usage
memory_used_percent = mem.percent / 100
cpu_usage_percent = psutil.cpu_percent(interval=1) / 100
io_wait = self._get_io_wait()
return SystemProfile(
cpu_count=cpu_count,
cpu_model=cpu_model,
memory_gb=memory_gb,
memory_speed_mhz=memory_speed,
storage_type=storage_type,
storage_iops=storage_iops,
network_speed_gbps=network_speed,
os_type=os_type,
os_version=os_version,
kernel_version=kernel_version,
l1_cache_kb=l1_cache_kb,
l2_cache_kb=l2_cache_kb,
l3_cache_mb=l3_cache_mb,
numa_nodes=numa_nodes,
memory_used_percent=memory_used_percent,
cpu_usage_percent=cpu_usage_percent,
io_wait_percent=io_wait
)
def _get_cpu_model(self) -> str:
"""Get CPU model name"""
try:
if platform.system() == 'Linux':
with open('/proc/cpuinfo', 'r') as f:
for line in f:
if 'model name' in line:
return line.split(':')[1].strip()
elif platform.system() == 'Darwin':
result = subprocess.run(['sysctl', '-n', 'machdep.cpu.brand_string'],
capture_output=True, text=True)
return result.stdout.strip()
except:
pass
return "Unknown CPU"
def _get_memory_speed(self) -> Optional[int]:
"""Get memory speed in MHz"""
# This would need platform-specific implementation
# For now, return typical DDR4 speed
return 2666
def _analyze_storage(self) -> Tuple[str, Optional[int]]:
"""Analyze storage type and performance"""
# Simplified detection
partitions = psutil.disk_partitions()
if partitions:
# Check for NVMe
device = partitions[0].device
if 'nvme' in device:
return 'nvme', 100000 # 100K IOPS typical
elif any(x in device for x in ['ssd', 'solid']):
return 'ssd', 50000 # 50K IOPS typical
return 'hdd', 200 # 200 IOPS typical
def _estimate_network_speed(self) -> float:
"""Estimate network speed in Gbps"""
# Get network interface statistics
stats = psutil.net_if_stats()
speeds = []
for interface, stat in stats.items():
if stat.isup and stat.speed > 0:
speeds.append(stat.speed)
if speeds:
# Return max speed in Gbps
return max(speeds) / 1000
return 1.0 # Default 1 Gbps
def _get_numa_nodes(self) -> int:
"""Get number of NUMA nodes"""
try:
if platform.system() == 'Linux':
result = subprocess.run(['lscpu'], capture_output=True, text=True)
for line in result.stdout.split('\n'):
if 'NUMA node(s)' in line:
return int(line.split(':')[1].strip())
except:
pass
return 1
def _get_io_wait(self) -> float:
"""Get I/O wait percentage"""
# Simplified - would need proper implementation
return 0.05 # 5% typical
class WorkloadAnalyzer:
"""Analyze workload characteristics"""
def analyze_workload(self,
logs: Optional[List[str]] = None,
metrics: Optional[Dict[str, Any]] = None) -> WorkloadProfile:
"""Analyze workload from logs or metrics"""
# If no data provided, return default mixed workload
if not logs and not metrics:
return self._default_workload()
# Analyze from provided data
if metrics:
return self._analyze_from_metrics(metrics)
else:
return self._analyze_from_logs(logs)
def _default_workload(self) -> WorkloadProfile:
"""Default mixed workload profile"""
return WorkloadProfile(
type=WorkloadType.MIXED,
read_write_ratio=0.8,
hot_data_size_gb=10.0,
total_data_size_gb=100.0,
request_rate=1000.0,
avg_request_size_kb=10.0,
concurrency=100,
batch_size=None,
latency_sla_ms=100.0
)
def _analyze_from_metrics(self, metrics: Dict[str, Any]) -> WorkloadProfile:
"""Analyze from provided metrics"""
# Determine workload type
if metrics.get('batch_size'):
workload_type = WorkloadType.BATCH
elif metrics.get('streaming'):
workload_type = WorkloadType.STREAMING
elif metrics.get('analytics'):
workload_type = WorkloadType.OLAP
else:
workload_type = WorkloadType.OLTP
return WorkloadProfile(
type=workload_type,
read_write_ratio=metrics.get('read_ratio', 0.8),
hot_data_size_gb=metrics.get('working_set_gb', 10.0),
total_data_size_gb=metrics.get('total_data_gb', 100.0),
request_rate=metrics.get('qps', 1000.0),
avg_request_size_kb=metrics.get('avg_request_kb', 10.0),
concurrency=metrics.get('connections', 100),
batch_size=metrics.get('batch_size'),
latency_sla_ms=metrics.get('latency_sla_ms', 100.0)
)
def _analyze_from_logs(self, logs: List[str]) -> WorkloadProfile:
"""Analyze from log entries"""
# Simple pattern matching
reads = sum(1 for log in logs if 'SELECT' in log or 'GET' in log)
writes = sum(1 for log in logs if 'INSERT' in log or 'UPDATE' in log)
total = reads + writes
read_ratio = reads / total if total > 0 else 0.8
return WorkloadProfile(
type=WorkloadType.OLTP if read_ratio > 0.5 else WorkloadType.BATCH,
read_write_ratio=read_ratio,
hot_data_size_gb=10.0,
total_data_size_gb=100.0,
request_rate=len(logs),
avg_request_size_kb=10.0,
concurrency=100,
batch_size=None,
latency_sla_ms=100.0
)
class ConfigurationGenerator:
"""Generate optimal configurations"""
def __init__(self):
self.sqrt_calc = SqrtNCalculator()
def generate_config(self,
system: SystemProfile,
workload: WorkloadProfile,
target: SystemType) -> Configuration:
"""Generate configuration for target system"""
if target == SystemType.DATABASE:
return self._generate_database_config(system, workload)
elif target == SystemType.JVM:
return self._generate_jvm_config(system, workload)
elif target == SystemType.KERNEL:
return self._generate_kernel_config(system, workload)
elif target == SystemType.CONTAINER:
return self._generate_container_config(system, workload)
else:
return self._generate_application_config(system, workload)
def _generate_database_config(self, system: SystemProfile,
workload: WorkloadProfile) -> Configuration:
"""Generate database configuration"""
settings = {}
commands = []
# Shared buffers (PostgreSQL) or buffer pool (MySQL)
# Use 25% of RAM for database, but apply √n if data is large
available_memory = system.memory_gb * 0.25
if workload.total_data_size_gb > available_memory:
# Use √n sizing
sqrt_size_gb = np.sqrt(workload.total_data_size_gb)
buffer_size_gb = min(sqrt_size_gb, available_memory)
else:
buffer_size_gb = min(workload.hot_data_size_gb, available_memory)
settings['shared_buffers'] = f"{int(buffer_size_gb * 1024)}MB"
# Work memory per operation
work_mem_mb = int(available_memory * 1024 / workload.concurrency / 4)
settings['work_mem'] = f"{work_mem_mb}MB"
# WAL/Checkpoint settings
if workload.read_write_ratio < 0.5: # Write-heavy
settings['checkpoint_segments'] = 64
settings['checkpoint_completion_target'] = 0.9
else:
settings['checkpoint_segments'] = 16
settings['checkpoint_completion_target'] = 0.5
# Connection pool
settings['max_connections'] = workload.concurrency * 2
# Generate commands
commands = [
f"# PostgreSQL configuration",
f"shared_buffers = {settings['shared_buffers']}",
f"work_mem = {settings['work_mem']}",
f"checkpoint_segments = {settings['checkpoint_segments']}",
f"checkpoint_completion_target = {settings['checkpoint_completion_target']}",
f"max_connections = {settings['max_connections']}"
]
explanation = (
f"Database configured with {buffer_size_gb:.1f}GB buffer pool "
f"({'√n' if workload.total_data_size_gb > available_memory else 'full'} sizing), "
f"{work_mem_mb}MB work memory per operation, and "
f"{'aggressive' if workload.read_write_ratio < 0.5 else 'standard'} checkpointing."
)
expected_improvement = {
'throughput': 1.5 if buffer_size_gb >= workload.hot_data_size_gb else 1.2,
'latency': 0.7 if buffer_size_gb >= workload.hot_data_size_gb else 0.9,
'memory_efficiency': 1.0 - (buffer_size_gb / system.memory_gb)
}
validation_tests = [
"pgbench -c 10 -t 1000",
"SELECT pg_stat_database_conflicts FROM pg_stat_database",
"SELECT * FROM pg_stat_bgwriter"
]
return Configuration(
system_type=SystemType.DATABASE,
settings=settings,
explanation=explanation,
expected_improvement=expected_improvement,
commands=commands,
validation_tests=validation_tests
)
def _generate_jvm_config(self, system: SystemProfile,
workload: WorkloadProfile) -> Configuration:
"""Generate JVM configuration"""
settings = {}
# Heap size - use 50% of available memory
heap_size_gb = system.memory_gb * 0.5
settings['-Xmx'] = f"{int(heap_size_gb)}g"
settings['-Xms'] = f"{int(heap_size_gb)}g" # Same as max to avoid resizing
# Young generation - √n of heap for balanced GC
young_gen_size = int(np.sqrt(heap_size_gb * 1024))
settings['-Xmn'] = f"{young_gen_size}m"
# GC algorithm
if workload.latency_sla_ms and workload.latency_sla_ms < 100:
settings['-XX:+UseG1GC'] = ''
settings['-XX:MaxGCPauseMillis'] = int(workload.latency_sla_ms)
else:
settings['-XX:+UseParallelGC'] = ''
# Thread settings
settings['-XX:ParallelGCThreads'] = system.cpu_count
settings['-XX:ConcGCThreads'] = max(1, system.cpu_count // 4)
commands = ["java"] + [f"{k}{v}" if not k.startswith('-XX:+') else k
for k, v in settings.items()]
explanation = (
f"JVM configured with {heap_size_gb:.0f}GB heap, "
f"{young_gen_size}MB young generation (√n sizing), and "
f"{'G1GC for low latency' if '-XX:+UseG1GC' in settings else 'ParallelGC for throughput'}."
)
return Configuration(
system_type=SystemType.JVM,
settings=settings,
explanation=explanation,
expected_improvement={'gc_time': 0.5, 'throughput': 1.3},
commands=commands,
validation_tests=["jstat -gcutil <pid> 1000 10"]
)
def _generate_kernel_config(self, system: SystemProfile,
workload: WorkloadProfile) -> Configuration:
"""Generate kernel configuration"""
settings = {}
commands = []
# Page cache settings
if workload.hot_data_size_gb > system.memory_gb * 0.5:
# Aggressive page cache
settings['vm.dirty_ratio'] = 5
settings['vm.dirty_background_ratio'] = 2
else:
settings['vm.dirty_ratio'] = 20
settings['vm.dirty_background_ratio'] = 10
# Swappiness
settings['vm.swappiness'] = 10 if workload.type in [WorkloadType.OLTP, WorkloadType.OLAP] else 60
# Network settings for high throughput
if workload.request_rate > 10000:
settings['net.core.somaxconn'] = 65535
settings['net.ipv4.tcp_max_syn_backlog'] = 65535
# Generate sysctl commands
commands = [f"sysctl -w {k}={v}" for k, v in settings.items()]
explanation = (
f"Kernel tuned for {'low' if settings['vm.swappiness'] == 10 else 'normal'} swappiness, "
f"{'aggressive' if settings['vm.dirty_ratio'] == 5 else 'standard'} page cache, "
f"and {'high' if 'net.core.somaxconn' in settings else 'normal'} network throughput."
)
return Configuration(
system_type=SystemType.KERNEL,
settings=settings,
explanation=explanation,
expected_improvement={'io_throughput': 1.2, 'latency': 0.9},
commands=commands,
validation_tests=["sysctl -a | grep vm.dirty"]
)
def _generate_container_config(self, system: SystemProfile,
workload: WorkloadProfile) -> Configuration:
"""Generate container configuration"""
settings = {}
# Memory limits
container_memory_gb = min(workload.hot_data_size_gb * 1.5, system.memory_gb * 0.8)
settings['memory'] = f"{container_memory_gb:.1f}g"
# CPU limits
settings['cpus'] = min(workload.concurrency, system.cpu_count)
# Shared memory for databases
if workload.type in [WorkloadType.OLTP, WorkloadType.OLAP]:
settings['shm_size'] = f"{int(container_memory_gb * 0.25)}g"
commands = [
f"docker run --memory={settings['memory']} --cpus={settings['cpus']}"
]
explanation = (
f"Container limited to {container_memory_gb:.1f}GB memory and "
f"{settings['cpus']} CPUs based on workload requirements."
)
return Configuration(
system_type=SystemType.CONTAINER,
settings=settings,
explanation=explanation,
expected_improvement={'resource_efficiency': 1.5},
commands=commands,
validation_tests=["docker stats"]
)
def _generate_application_config(self, system: SystemProfile,
workload: WorkloadProfile) -> Configuration:
"""Generate application-level configuration"""
settings = {}
# Thread pool sizing
settings['thread_pool_size'] = min(workload.concurrency, system.cpu_count * 2)
# Connection pool
settings['connection_pool_size'] = workload.concurrency
# Cache sizing using √n principle
cache_entries = int(np.sqrt(workload.hot_data_size_gb * 1024 * 1024))
settings['cache_size'] = cache_entries
# Batch size for processing
if workload.batch_size:
settings['batch_size'] = workload.batch_size
else:
# Calculate optimal batch size
memory_per_item = workload.avg_request_size_kb
available_memory_mb = system.memory_gb * 1024 * 0.1 # 10% for batching
settings['batch_size'] = int(available_memory_mb / memory_per_item)
explanation = (
f"Application configured with {settings['thread_pool_size']} threads, "
f"{cache_entries:,} cache entries (√n sizing), and "
f"batch size of {settings.get('batch_size', 'N/A')}."
)
return Configuration(
system_type=SystemType.APPLICATION,
settings=settings,
explanation=explanation,
expected_improvement={'throughput': 1.4, 'memory_usage': 0.7},
commands=[],
validation_tests=[]
)
class ConfigurationAdvisor:
"""Main configuration advisor"""
def __init__(self):
self.system_analyzer = SystemAnalyzer()
self.workload_analyzer = WorkloadAnalyzer()
self.config_generator = ConfigurationGenerator()
def analyze(self,
workload_data: Optional[Dict[str, Any]] = None,
target: SystemType = SystemType.DATABASE) -> Configuration:
"""Analyze system and generate configuration"""
# Analyze system
print("Analyzing system hardware...")
system_profile = self.system_analyzer.analyze_system()
# Analyze workload
print("Analyzing workload characteristics...")
workload_profile = self.workload_analyzer.analyze_workload(
metrics=workload_data
)
# Generate configuration
print(f"Generating {target.value} configuration...")
config = self.config_generator.generate_config(
system_profile, workload_profile, target
)
return config
def compare_configs(self,
configs: List[Configuration],
test_duration: int = 300) -> List[TestResult]:
"""A/B test multiple configurations"""
results = []
for config in configs:
print(f"\nTesting configuration: {config.system_type.value}")
# Simulate test (in practice would apply config and measure)
metrics = self._run_test(config, test_duration)
result = TestResult(
config_name=config.system_type.value,
metrics=metrics,
duration_seconds=test_duration,
samples=test_duration * 10,
confidence=0.95,
winner=False
)
results.append(result)
# Determine winner
best_throughput = max(r.metrics.get('throughput', 0) for r in results)
for result in results:
if result.metrics.get('throughput', 0) == best_throughput:
result.winner = True
break
return results
def _run_test(self, config: Configuration, duration: int) -> Dict[str, float]:
"""Simulate running a test (would be real measurement in practice)"""
# Simulate metrics based on expected improvement
base_throughput = 1000.0
base_latency = 50.0
improvement = config.expected_improvement
return {
'throughput': base_throughput * improvement.get('throughput', 1.0),
'latency': base_latency * improvement.get('latency', 1.0),
'cpu_usage': 0.5 / improvement.get('throughput', 1.0),
'memory_usage': improvement.get('memory_efficiency', 0.8)
}
def export_config(self, config: Configuration, filename: str):
"""Export configuration to file"""
with open(filename, 'w') as f:
if config.system_type == SystemType.DATABASE:
f.write("# PostgreSQL Configuration\n")
f.write("# Generated by SpaceTime Configuration Advisor\n\n")
for cmd in config.commands:
f.write(cmd + "\n")
elif config.system_type == SystemType.JVM:
f.write("#!/bin/bash\n")
f.write("# JVM Configuration\n")
f.write("# Generated by SpaceTime Configuration Advisor\n\n")
f.write(" ".join(config.commands) + " $@\n")
else:
json.dump(asdict(config), f, indent=2)
print(f"Configuration exported to {filename}")
# Example usage
if __name__ == "__main__":
print("SpaceTime Configuration Advisor")
print("="*60)
advisor = ConfigurationAdvisor()
# Example 1: Database configuration
print("\nExample 1: Database Configuration")
print("-"*40)
db_workload = {
'read_ratio': 0.8,
'working_set_gb': 50,
'total_data_gb': 500,
'qps': 10000,
'connections': 200
}
db_config = advisor.analyze(
workload_data=db_workload,
target=SystemType.DATABASE
)
print(f"\nRecommendation: {db_config.explanation}")
print("\nSettings:")
for k, v in db_config.settings.items():
print(f" {k}: {v}")
# Example 2: JVM configuration
print("\n\nExample 2: JVM Configuration")
print("-"*40)
jvm_workload = {
'latency_sla_ms': 50,
'working_set_gb': 20,
'connections': 1000
}
jvm_config = advisor.analyze(
workload_data=jvm_workload,
target=SystemType.JVM
)
print(f"\nRecommendation: {jvm_config.explanation}")
print("\nJVM flags:")
for cmd in jvm_config.commands[1:]: # Skip 'java'
print(f" {cmd}")
# Example 3: A/B testing
print("\n\nExample 3: A/B Testing Configurations")
print("-"*40)
configs = [
advisor.analyze(workload_data=db_workload, target=SystemType.DATABASE),
advisor.analyze(workload_data={'read_ratio': 0.5}, target=SystemType.DATABASE)
]
results = advisor.compare_configs(configs, test_duration=60)
print("\nTest Results:")
for result in results:
print(f"\n{result.config_name}:")
print(f" Throughput: {result.metrics['throughput']:.0f} QPS")
print(f" Latency: {result.metrics['latency']:.1f} ms")
print(f" Winner: {'' if result.winner else ''}")
# Export configuration
advisor.export_config(db_config, "postgresql.conf")
advisor.export_config(jvm_config, "jvm_startup.sh")
print("\n" + "="*60)
print("Configuration advisor complete!")