Files
2025-07-20 03:41:39 -04:00

486 lines
15 KiB
C#

using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using SqrtSpace.SpaceTime.AspNetCore;
using SqrtSpace.SpaceTime.Caching;
using SqrtSpace.SpaceTime.Configuration;
using SqrtSpace.SpaceTime.Core;
using SqrtSpace.SpaceTime.Diagnostics;
using SqrtSpace.SpaceTime.Distributed;
using SqrtSpace.SpaceTime.EntityFramework;
using SqrtSpace.SpaceTime.Linq;
using SqrtSpace.SpaceTime.MemoryManagement;
using SqrtSpace.SpaceTime.Pipeline;
using SqrtSpace.SpaceTime.Scheduling;
using System.Reactive.Linq;
var builder = WebApplication.CreateBuilder(args);
// Configure all SpaceTime services with best practices
var spaceTimeConfig = new SpaceTimeConfiguration();
builder.Configuration.GetSection("SpaceTime").Bind(spaceTimeConfig);
builder.Services.AddSingleton(spaceTimeConfig);
// Configure memory limits based on environment
builder.Services.Configure<SpaceTimeConfiguration>(options =>
{
var environment = builder.Environment;
// Set memory limits based on deployment environment
options.Memory.MaxMemory = environment.IsDevelopment()
? 256 * 1024 * 1024 // 256MB for dev
: 1024 * 1024 * 1024; // 1GB for production
// Enable adaptive features
options.Algorithms.EnableAdaptiveSelection = true;
options.Features.EnableAdaptiveDataStructures = true;
// Configure based on container limits if available
var memoryLimit = Environment.GetEnvironmentVariable("MEMORY_LIMIT");
if (long.TryParse(memoryLimit, out var limit))
{
options.Memory.MaxMemory = (long)(limit * 0.8); // Use 80% of container limit
}
});
// Add all SpaceTime services
builder.Services.AddSpaceTime(options =>
{
options.EnableCheckpointing = true;
options.EnableStreaming = true;
});
// Add caching with proper configuration
builder.Services.AddSpaceTimeCaching();
builder.Services.AddSpaceTimeCache<string, object>("main", options =>
{
options.MaxHotCacheSize = 50 * 1024 * 1024; // 50MB hot cache
options.Strategy = MemoryStrategy.SqrtN;
});
// Add distributed processing if Redis is available
var redisConnection = builder.Configuration.GetConnectionString("Redis");
if (!string.IsNullOrEmpty(redisConnection))
{
// Add Redis services manually
builder.Services.AddSingleton<StackExchange.Redis.IConnectionMultiplexer>(sp =>
StackExchange.Redis.ConnectionMultiplexer.Connect(redisConnection));
builder.Services.AddSingleton<ISpaceTimeCoordinator, SpaceTimeCoordinator>();
}
// Add diagnostics
builder.Services.AddSingleton<ISpaceTimeDiagnostics, SpaceTimeDiagnostics>();
// Add memory management
builder.Services.AddSingleton<IMemoryPressureMonitor, MemoryPressureMonitor>();
// Add pipeline support
builder.Services.AddSingleton<IPipelineFactory, PipelineFactory>();
// Add Entity Framework with SpaceTime optimizations
builder.Services.AddDbContext<ApplicationDbContext>(options =>
{
options.UseSqlServer(builder.Configuration.GetConnectionString("DefaultConnection"))
.UseSpaceTimeOptimizer(opt =>
{
opt.EnableSqrtNChangeTracking = true;
opt.BufferPoolStrategy = BufferPoolStrategy.SqrtN;
});
});
// Add controllers and other services
builder.Services.AddControllers();
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
// Register application services
builder.Services.AddScoped<IOrderService, OrderService>();
builder.Services.AddHostedService<DataProcessingBackgroundService>();
var app = builder.Build();
// Configure the HTTP request pipeline
if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}
app.UseHttpsRedirection();
// Add SpaceTime middleware
app.UseSpaceTime();
app.UseAuthorization();
app.MapControllers();
// Map health check endpoint
app.MapGet("/health", async (IMemoryPressureMonitor monitor) =>
{
var stats = monitor.CurrentStatistics;
return Results.Ok(new
{
Status = "Healthy",
MemoryPressure = monitor.CurrentPressureLevel.ToString(),
MemoryUsage = new
{
ManagedMemoryMB = stats.ManagedMemory / (1024.0 * 1024.0),
WorkingSetMB = stats.WorkingSet / (1024.0 * 1024.0),
AvailablePhysicalMemoryMB = stats.AvailablePhysicalMemory / (1024.0 * 1024.0)
}
});
});
app.Run();
// Application services demonstrating best practices
public interface IOrderService
{
Task<IEnumerable<Order>> GetLargeOrderSetAsync(OrderFilter filter);
Task<OrderProcessingResult> ProcessOrderBatchAsync(IEnumerable<Order> orders);
}
public class OrderService : IOrderService
{
private readonly ApplicationDbContext _context;
private readonly ICacheManager _cacheManager;
private readonly ISpaceTimeDiagnostics _diagnostics;
private readonly IPipelineFactory _pipelineFactory;
private readonly ILogger<OrderService> _logger;
public OrderService(
ApplicationDbContext context,
ICacheManager cacheManager,
ISpaceTimeDiagnostics diagnostics,
IPipelineFactory pipelineFactory,
ILogger<OrderService> logger)
{
_context = context;
_cacheManager = cacheManager;
_diagnostics = diagnostics;
_pipelineFactory = pipelineFactory;
_logger = logger;
}
public async Task<IEnumerable<Order>> GetLargeOrderSetAsync(OrderFilter filter)
{
using var operation = _diagnostics.StartOperation("GetLargeOrderSet", OperationType.Custom);
try
{
// Use SpaceTime LINQ for memory-efficient query
var query = _context.Orders
.Where(o => o.CreatedDate >= filter.StartDate && o.CreatedDate <= filter.EndDate);
if (!string.IsNullOrEmpty(filter.Status))
query = query.Where(o => o.Status == filter.Status);
// Use standard LINQ for now
var orders = await query
.OrderBy(o => o.CreatedDate)
.ToListAsync();
operation.AddTag("order.count", orders.Count);
return orders;
}
catch (Exception ex)
{
operation.AddTag("error", ex.Message);
throw;
}
}
public async Task<OrderProcessingResult> ProcessOrderBatchAsync(IEnumerable<Order> orders)
{
var processedCount = 0;
var startTime = DateTime.UtcNow;
var errors = new List<Exception>();
try
{
// Simple processing without complex pipeline for now
var orderList = orders.ToList();
// Validate orders
foreach (var order in orderList)
{
if (order.TotalAmount <= 0)
throw new ValidationException($"Invalid order amount: {order.Id}");
}
// Batch load customer data
var customerIds = orderList.Select(o => o.CustomerId).Distinct();
var customers = await _context.Customers
.Where(c => customerIds.Contains(c.Id))
.ToDictionaryAsync(c => c.Id);
// Process orders in parallel
var tasks = orderList.Select(async order =>
{
try
{
var customer = customers.GetValueOrDefault(order.CustomerId);
var enriched = new EnrichedOrder { Order = order, Customer = customer };
var tax = await CalculateTaxAsync(enriched);
var processed = new ProcessedOrder
{
Id = order.Id,
CustomerId = order.CustomerId,
TotalAmount = order.TotalAmount,
TotalWithTax = order.TotalAmount + tax,
ProcessedAt = DateTime.UtcNow
};
Interlocked.Increment(ref processedCount);
return processed;
}
catch (Exception ex)
{
errors.Add(ex);
return null;
}
});
await Task.WhenAll(tasks);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing order batch");
errors.Add(ex);
}
return new OrderProcessingResult
{
ProcessedCount = processedCount,
Duration = DateTime.UtcNow - startTime,
Success = errors.Count == 0
};
}
private async Task<decimal> CalculateTaxAsync(EnrichedOrder order)
{
// Simulate tax calculation
await Task.Delay(10);
return order.Order.TotalAmount * 0.08m; // 8% tax
}
}
// Background service demonstrating memory-aware processing
public class DataProcessingBackgroundService : BackgroundService
{
private readonly IServiceProvider _serviceProvider;
private readonly IMemoryPressureMonitor _memoryMonitor;
private readonly TaskScheduler _scheduler;
private readonly ILogger<DataProcessingBackgroundService> _logger;
public DataProcessingBackgroundService(
IServiceProvider serviceProvider,
IMemoryPressureMonitor memoryMonitor,
TaskScheduler scheduler,
ILogger<DataProcessingBackgroundService> logger)
{
_serviceProvider = serviceProvider;
_memoryMonitor = memoryMonitor;
_scheduler = scheduler;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// Subscribe to memory pressure events
_memoryMonitor.PressureEvents
.Where(e => e.CurrentLevel >= SqrtSpace.SpaceTime.MemoryManagement.MemoryPressureLevel.High)
.Subscribe(e =>
{
_logger.LogWarning("High memory pressure detected, pausing processing");
// Implement backpressure
});
while (!stoppingToken.IsCancellationRequested)
{
try
{
// Schedule work based on memory availability
await Task.Factory.StartNew(
async () => await ProcessNextBatchAsync(stoppingToken),
stoppingToken,
TaskCreationOptions.None,
_scheduler).Unwrap();
// Wait before next iteration
await Task.Delay(TimeSpan.FromMinutes(1), stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in background processing");
await Task.Delay(TimeSpan.FromMinutes(5), stoppingToken);
}
}
}
private async Task ProcessNextBatchAsync(CancellationToken cancellationToken)
{
using var scope = _serviceProvider.CreateScope();
var context = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>();
// Get unprocessed orders in memory-efficient batches
await foreach (var batch in context.Orders
.Where(o => o.Status == "Pending")
.BatchBySqrtNAsync())
{
if (cancellationToken.IsCancellationRequested)
break;
_logger.LogInformation("Processing batch of {Count} orders", batch.Count);
// Process batch
foreach (var order in batch)
{
order.Status = "Processed";
order.ProcessedDate = DateTime.UtcNow;
}
await context.SaveChangesAsync(cancellationToken);
}
}
}
// Controller demonstrating SpaceTime features
[ApiController]
[Route("api/[controller]")]
public class OrdersController : ControllerBase
{
private readonly IOrderService _orderService;
private readonly ISpaceTimeCoordinator _coordinator;
private readonly ILogger<OrdersController> _logger;
public OrdersController(
IOrderService orderService,
ISpaceTimeCoordinator coordinator,
ILogger<OrdersController> logger)
{
_orderService = orderService;
_coordinator = coordinator;
_logger = logger;
}
[HttpGet("export")]
[SpaceTimeStreaming(ChunkStrategy = ChunkStrategy.SqrtN)]
public async IAsyncEnumerable<OrderExportDto> ExportOrders([FromQuery] OrderFilter filter)
{
var orders = await _orderService.GetLargeOrderSetAsync(filter);
await foreach (var batch in orders.BatchBySqrtNAsync())
{
foreach (var order in batch)
{
yield return new OrderExportDto
{
Id = order.Id,
CustomerName = order.CustomerName,
TotalAmount = order.TotalAmount,
Status = order.Status,
CreatedDate = order.CreatedDate
};
}
}
}
[HttpPost("process-distributed")]
[HttpPost("process-distributed")]
public async Task<IActionResult> ProcessDistributed([FromBody] ProcessRequest request)
{
// For now, process without distributed coordination
// TODO: Implement proper distributed processing when coordinator API is finalized
var filter = new OrderFilter
{
StartDate = DateTime.UtcNow.AddDays(-30),
EndDate = DateTime.UtcNow
};
var orders = await _orderService.GetLargeOrderSetAsync(filter);
var result = await _orderService.ProcessOrderBatchAsync(orders);
return Ok(result);
}
}
// Data models
public class ApplicationDbContext : DbContext
{
public DbSet<Order> Orders { get; set; }
public DbSet<Customer> Customers { get; set; }
public ApplicationDbContext(DbContextOptions<ApplicationDbContext> options)
: base(options)
{
}
}
public class Order
{
public string Id { get; set; } = "";
public string CustomerId { get; set; } = "";
public string CustomerName { get; set; } = "";
public decimal TotalAmount { get; set; }
public string Status { get; set; } = "";
public DateTime CreatedDate { get; set; }
public DateTime? ProcessedDate { get; set; }
}
public class Customer
{
public string Id { get; set; } = "";
public string Name { get; set; } = "";
public string Email { get; set; } = "";
}
public class OrderFilter
{
public DateTime StartDate { get; set; }
public DateTime EndDate { get; set; }
public string? Status { get; set; }
}
public class OrderExportDto
{
public string Id { get; set; } = "";
public string CustomerName { get; set; } = "";
public decimal TotalAmount { get; set; }
public string Status { get; set; } = "";
public DateTime CreatedDate { get; set; }
}
public class ProcessRequest
{
public string WorkloadId { get; set; } = "";
public long EstimatedSize { get; set; }
}
public class OrderProcessingResult
{
public int ProcessedCount { get; set; }
public TimeSpan Duration { get; set; }
public bool Success { get; set; }
}
public class EnrichedOrder
{
public Order Order { get; set; } = null!;
public Customer? Customer { get; set; }
}
public class ProcessedOrder
{
public string Id { get; set; } = "";
public string CustomerId { get; set; } = "";
public decimal TotalAmount { get; set; }
public decimal TotalWithTax { get; set; }
public DateTime ProcessedAt { get; set; }
}
public class ValidationException : Exception
{
public ValidationException(string message) : base(message) { }
}