486 lines
15 KiB
C#
486 lines
15 KiB
C#
using Microsoft.AspNetCore.Mvc;
|
|
using Microsoft.EntityFrameworkCore;
|
|
using SqrtSpace.SpaceTime.AspNetCore;
|
|
using SqrtSpace.SpaceTime.Caching;
|
|
using SqrtSpace.SpaceTime.Configuration;
|
|
using SqrtSpace.SpaceTime.Core;
|
|
using SqrtSpace.SpaceTime.Diagnostics;
|
|
using SqrtSpace.SpaceTime.Distributed;
|
|
using SqrtSpace.SpaceTime.EntityFramework;
|
|
using SqrtSpace.SpaceTime.Linq;
|
|
using SqrtSpace.SpaceTime.MemoryManagement;
|
|
using SqrtSpace.SpaceTime.Pipeline;
|
|
using SqrtSpace.SpaceTime.Scheduling;
|
|
using System.Reactive.Linq;
|
|
|
|
var builder = WebApplication.CreateBuilder(args);
|
|
|
|
// Configure all SpaceTime services with best practices
|
|
var spaceTimeConfig = new SpaceTimeConfiguration();
|
|
builder.Configuration.GetSection("SpaceTime").Bind(spaceTimeConfig);
|
|
builder.Services.AddSingleton(spaceTimeConfig);
|
|
|
|
// Configure memory limits based on environment
|
|
builder.Services.Configure<SpaceTimeConfiguration>(options =>
|
|
{
|
|
var environment = builder.Environment;
|
|
|
|
// Set memory limits based on deployment environment
|
|
options.Memory.MaxMemory = environment.IsDevelopment()
|
|
? 256 * 1024 * 1024 // 256MB for dev
|
|
: 1024 * 1024 * 1024; // 1GB for production
|
|
|
|
// Enable adaptive features
|
|
options.Algorithms.EnableAdaptiveSelection = true;
|
|
options.Features.EnableAdaptiveDataStructures = true;
|
|
|
|
// Configure based on container limits if available
|
|
var memoryLimit = Environment.GetEnvironmentVariable("MEMORY_LIMIT");
|
|
if (long.TryParse(memoryLimit, out var limit))
|
|
{
|
|
options.Memory.MaxMemory = (long)(limit * 0.8); // Use 80% of container limit
|
|
}
|
|
});
|
|
|
|
// Add all SpaceTime services
|
|
builder.Services.AddSpaceTime(options =>
|
|
{
|
|
options.EnableCheckpointing = true;
|
|
options.EnableStreaming = true;
|
|
});
|
|
|
|
// Add caching with proper configuration
|
|
builder.Services.AddSpaceTimeCaching();
|
|
builder.Services.AddSpaceTimeCache<string, object>("main", options =>
|
|
{
|
|
options.MaxHotCacheSize = 50 * 1024 * 1024; // 50MB hot cache
|
|
options.Strategy = MemoryStrategy.SqrtN;
|
|
});
|
|
|
|
// Add distributed processing if Redis is available
|
|
var redisConnection = builder.Configuration.GetConnectionString("Redis");
|
|
if (!string.IsNullOrEmpty(redisConnection))
|
|
{
|
|
// Add Redis services manually
|
|
builder.Services.AddSingleton<StackExchange.Redis.IConnectionMultiplexer>(sp =>
|
|
StackExchange.Redis.ConnectionMultiplexer.Connect(redisConnection));
|
|
builder.Services.AddSingleton<ISpaceTimeCoordinator, SpaceTimeCoordinator>();
|
|
}
|
|
|
|
// Add diagnostics
|
|
builder.Services.AddSingleton<ISpaceTimeDiagnostics, SpaceTimeDiagnostics>();
|
|
|
|
// Add memory management
|
|
builder.Services.AddSingleton<IMemoryPressureMonitor, MemoryPressureMonitor>();
|
|
|
|
// Add pipeline support
|
|
builder.Services.AddSingleton<IPipelineFactory, PipelineFactory>();
|
|
|
|
// Add Entity Framework with SpaceTime optimizations
|
|
builder.Services.AddDbContext<ApplicationDbContext>(options =>
|
|
{
|
|
options.UseSqlServer(builder.Configuration.GetConnectionString("DefaultConnection"))
|
|
.UseSpaceTimeOptimizer(opt =>
|
|
{
|
|
opt.EnableSqrtNChangeTracking = true;
|
|
opt.BufferPoolStrategy = BufferPoolStrategy.SqrtN;
|
|
});
|
|
});
|
|
|
|
// Add controllers and other services
|
|
builder.Services.AddControllers();
|
|
builder.Services.AddEndpointsApiExplorer();
|
|
builder.Services.AddSwaggerGen();
|
|
|
|
// Register application services
|
|
builder.Services.AddScoped<IOrderService, OrderService>();
|
|
builder.Services.AddHostedService<DataProcessingBackgroundService>();
|
|
|
|
var app = builder.Build();
|
|
|
|
// Configure the HTTP request pipeline
|
|
if (app.Environment.IsDevelopment())
|
|
{
|
|
app.UseSwagger();
|
|
app.UseSwaggerUI();
|
|
}
|
|
|
|
app.UseHttpsRedirection();
|
|
|
|
// Add SpaceTime middleware
|
|
app.UseSpaceTime();
|
|
|
|
app.UseAuthorization();
|
|
app.MapControllers();
|
|
|
|
// Map health check endpoint
|
|
app.MapGet("/health", async (IMemoryPressureMonitor monitor) =>
|
|
{
|
|
var stats = monitor.CurrentStatistics;
|
|
return Results.Ok(new
|
|
{
|
|
Status = "Healthy",
|
|
MemoryPressure = monitor.CurrentPressureLevel.ToString(),
|
|
MemoryUsage = new
|
|
{
|
|
ManagedMemoryMB = stats.ManagedMemory / (1024.0 * 1024.0),
|
|
WorkingSetMB = stats.WorkingSet / (1024.0 * 1024.0),
|
|
AvailablePhysicalMemoryMB = stats.AvailablePhysicalMemory / (1024.0 * 1024.0)
|
|
}
|
|
});
|
|
});
|
|
|
|
app.Run();
|
|
|
|
// Application services demonstrating best practices
|
|
|
|
public interface IOrderService
|
|
{
|
|
Task<IEnumerable<Order>> GetLargeOrderSetAsync(OrderFilter filter);
|
|
Task<OrderProcessingResult> ProcessOrderBatchAsync(IEnumerable<Order> orders);
|
|
}
|
|
|
|
public class OrderService : IOrderService
|
|
{
|
|
private readonly ApplicationDbContext _context;
|
|
private readonly ICacheManager _cacheManager;
|
|
private readonly ISpaceTimeDiagnostics _diagnostics;
|
|
private readonly IPipelineFactory _pipelineFactory;
|
|
private readonly ILogger<OrderService> _logger;
|
|
|
|
public OrderService(
|
|
ApplicationDbContext context,
|
|
ICacheManager cacheManager,
|
|
ISpaceTimeDiagnostics diagnostics,
|
|
IPipelineFactory pipelineFactory,
|
|
ILogger<OrderService> logger)
|
|
{
|
|
_context = context;
|
|
_cacheManager = cacheManager;
|
|
_diagnostics = diagnostics;
|
|
_pipelineFactory = pipelineFactory;
|
|
_logger = logger;
|
|
}
|
|
|
|
public async Task<IEnumerable<Order>> GetLargeOrderSetAsync(OrderFilter filter)
|
|
{
|
|
using var operation = _diagnostics.StartOperation("GetLargeOrderSet", OperationType.Custom);
|
|
|
|
try
|
|
{
|
|
// Use SpaceTime LINQ for memory-efficient query
|
|
var query = _context.Orders
|
|
.Where(o => o.CreatedDate >= filter.StartDate && o.CreatedDate <= filter.EndDate);
|
|
|
|
if (!string.IsNullOrEmpty(filter.Status))
|
|
query = query.Where(o => o.Status == filter.Status);
|
|
|
|
// Use standard LINQ for now
|
|
var orders = await query
|
|
.OrderBy(o => o.CreatedDate)
|
|
.ToListAsync();
|
|
|
|
operation.AddTag("order.count", orders.Count);
|
|
return orders;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
operation.AddTag("error", ex.Message);
|
|
throw;
|
|
}
|
|
}
|
|
|
|
public async Task<OrderProcessingResult> ProcessOrderBatchAsync(IEnumerable<Order> orders)
|
|
{
|
|
var processedCount = 0;
|
|
var startTime = DateTime.UtcNow;
|
|
var errors = new List<Exception>();
|
|
|
|
try
|
|
{
|
|
// Simple processing without complex pipeline for now
|
|
var orderList = orders.ToList();
|
|
|
|
// Validate orders
|
|
foreach (var order in orderList)
|
|
{
|
|
if (order.TotalAmount <= 0)
|
|
throw new ValidationException($"Invalid order amount: {order.Id}");
|
|
}
|
|
|
|
// Batch load customer data
|
|
var customerIds = orderList.Select(o => o.CustomerId).Distinct();
|
|
var customers = await _context.Customers
|
|
.Where(c => customerIds.Contains(c.Id))
|
|
.ToDictionaryAsync(c => c.Id);
|
|
|
|
// Process orders in parallel
|
|
var tasks = orderList.Select(async order =>
|
|
{
|
|
try
|
|
{
|
|
var customer = customers.GetValueOrDefault(order.CustomerId);
|
|
var enriched = new EnrichedOrder { Order = order, Customer = customer };
|
|
var tax = await CalculateTaxAsync(enriched);
|
|
|
|
var processed = new ProcessedOrder
|
|
{
|
|
Id = order.Id,
|
|
CustomerId = order.CustomerId,
|
|
TotalAmount = order.TotalAmount,
|
|
TotalWithTax = order.TotalAmount + tax,
|
|
ProcessedAt = DateTime.UtcNow
|
|
};
|
|
|
|
Interlocked.Increment(ref processedCount);
|
|
return processed;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
errors.Add(ex);
|
|
return null;
|
|
}
|
|
});
|
|
|
|
await Task.WhenAll(tasks);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error processing order batch");
|
|
errors.Add(ex);
|
|
}
|
|
|
|
return new OrderProcessingResult
|
|
{
|
|
ProcessedCount = processedCount,
|
|
Duration = DateTime.UtcNow - startTime,
|
|
Success = errors.Count == 0
|
|
};
|
|
}
|
|
|
|
private async Task<decimal> CalculateTaxAsync(EnrichedOrder order)
|
|
{
|
|
// Simulate tax calculation
|
|
await Task.Delay(10);
|
|
return order.Order.TotalAmount * 0.08m; // 8% tax
|
|
}
|
|
}
|
|
|
|
// Background service demonstrating memory-aware processing
|
|
public class DataProcessingBackgroundService : BackgroundService
|
|
{
|
|
private readonly IServiceProvider _serviceProvider;
|
|
private readonly IMemoryPressureMonitor _memoryMonitor;
|
|
private readonly TaskScheduler _scheduler;
|
|
private readonly ILogger<DataProcessingBackgroundService> _logger;
|
|
|
|
public DataProcessingBackgroundService(
|
|
IServiceProvider serviceProvider,
|
|
IMemoryPressureMonitor memoryMonitor,
|
|
TaskScheduler scheduler,
|
|
ILogger<DataProcessingBackgroundService> logger)
|
|
{
|
|
_serviceProvider = serviceProvider;
|
|
_memoryMonitor = memoryMonitor;
|
|
_scheduler = scheduler;
|
|
_logger = logger;
|
|
}
|
|
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
{
|
|
// Subscribe to memory pressure events
|
|
_memoryMonitor.PressureEvents
|
|
.Where(e => e.CurrentLevel >= SqrtSpace.SpaceTime.MemoryManagement.MemoryPressureLevel.High)
|
|
.Subscribe(e =>
|
|
{
|
|
_logger.LogWarning("High memory pressure detected, pausing processing");
|
|
// Implement backpressure
|
|
});
|
|
|
|
while (!stoppingToken.IsCancellationRequested)
|
|
{
|
|
try
|
|
{
|
|
// Schedule work based on memory availability
|
|
await Task.Factory.StartNew(
|
|
async () => await ProcessNextBatchAsync(stoppingToken),
|
|
stoppingToken,
|
|
TaskCreationOptions.None,
|
|
_scheduler).Unwrap();
|
|
|
|
// Wait before next iteration
|
|
await Task.Delay(TimeSpan.FromMinutes(1), stoppingToken);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error in background processing");
|
|
await Task.Delay(TimeSpan.FromMinutes(5), stoppingToken);
|
|
}
|
|
}
|
|
}
|
|
|
|
private async Task ProcessNextBatchAsync(CancellationToken cancellationToken)
|
|
{
|
|
using var scope = _serviceProvider.CreateScope();
|
|
var context = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>();
|
|
|
|
// Get unprocessed orders in memory-efficient batches
|
|
await foreach (var batch in context.Orders
|
|
.Where(o => o.Status == "Pending")
|
|
.BatchBySqrtNAsync())
|
|
{
|
|
if (cancellationToken.IsCancellationRequested)
|
|
break;
|
|
|
|
_logger.LogInformation("Processing batch of {Count} orders", batch.Count);
|
|
|
|
// Process batch
|
|
foreach (var order in batch)
|
|
{
|
|
order.Status = "Processed";
|
|
order.ProcessedDate = DateTime.UtcNow;
|
|
}
|
|
|
|
await context.SaveChangesAsync(cancellationToken);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Controller demonstrating SpaceTime features
|
|
[ApiController]
|
|
[Route("api/[controller]")]
|
|
public class OrdersController : ControllerBase
|
|
{
|
|
private readonly IOrderService _orderService;
|
|
private readonly ISpaceTimeCoordinator _coordinator;
|
|
private readonly ILogger<OrdersController> _logger;
|
|
|
|
public OrdersController(
|
|
IOrderService orderService,
|
|
ISpaceTimeCoordinator coordinator,
|
|
ILogger<OrdersController> logger)
|
|
{
|
|
_orderService = orderService;
|
|
_coordinator = coordinator;
|
|
_logger = logger;
|
|
}
|
|
|
|
[HttpGet("export")]
|
|
[SpaceTimeStreaming(ChunkStrategy = ChunkStrategy.SqrtN)]
|
|
public async IAsyncEnumerable<OrderExportDto> ExportOrders([FromQuery] OrderFilter filter)
|
|
{
|
|
var orders = await _orderService.GetLargeOrderSetAsync(filter);
|
|
|
|
await foreach (var batch in orders.BatchBySqrtNAsync())
|
|
{
|
|
foreach (var order in batch)
|
|
{
|
|
yield return new OrderExportDto
|
|
{
|
|
Id = order.Id,
|
|
CustomerName = order.CustomerName,
|
|
TotalAmount = order.TotalAmount,
|
|
Status = order.Status,
|
|
CreatedDate = order.CreatedDate
|
|
};
|
|
}
|
|
}
|
|
}
|
|
|
|
[HttpPost("process-distributed")]
|
|
[HttpPost("process-distributed")]
|
|
public async Task<IActionResult> ProcessDistributed([FromBody] ProcessRequest request)
|
|
{
|
|
// For now, process without distributed coordination
|
|
// TODO: Implement proper distributed processing when coordinator API is finalized
|
|
var filter = new OrderFilter
|
|
{
|
|
StartDate = DateTime.UtcNow.AddDays(-30),
|
|
EndDate = DateTime.UtcNow
|
|
};
|
|
|
|
var orders = await _orderService.GetLargeOrderSetAsync(filter);
|
|
var result = await _orderService.ProcessOrderBatchAsync(orders);
|
|
|
|
return Ok(result);
|
|
}
|
|
}
|
|
|
|
// Data models
|
|
public class ApplicationDbContext : DbContext
|
|
{
|
|
public DbSet<Order> Orders { get; set; }
|
|
public DbSet<Customer> Customers { get; set; }
|
|
|
|
public ApplicationDbContext(DbContextOptions<ApplicationDbContext> options)
|
|
: base(options)
|
|
{
|
|
}
|
|
}
|
|
|
|
public class Order
|
|
{
|
|
public string Id { get; set; } = "";
|
|
public string CustomerId { get; set; } = "";
|
|
public string CustomerName { get; set; } = "";
|
|
public decimal TotalAmount { get; set; }
|
|
public string Status { get; set; } = "";
|
|
public DateTime CreatedDate { get; set; }
|
|
public DateTime? ProcessedDate { get; set; }
|
|
}
|
|
|
|
public class Customer
|
|
{
|
|
public string Id { get; set; } = "";
|
|
public string Name { get; set; } = "";
|
|
public string Email { get; set; } = "";
|
|
}
|
|
|
|
public class OrderFilter
|
|
{
|
|
public DateTime StartDate { get; set; }
|
|
public DateTime EndDate { get; set; }
|
|
public string? Status { get; set; }
|
|
}
|
|
|
|
public class OrderExportDto
|
|
{
|
|
public string Id { get; set; } = "";
|
|
public string CustomerName { get; set; } = "";
|
|
public decimal TotalAmount { get; set; }
|
|
public string Status { get; set; } = "";
|
|
public DateTime CreatedDate { get; set; }
|
|
}
|
|
|
|
public class ProcessRequest
|
|
{
|
|
public string WorkloadId { get; set; } = "";
|
|
public long EstimatedSize { get; set; }
|
|
}
|
|
|
|
public class OrderProcessingResult
|
|
{
|
|
public int ProcessedCount { get; set; }
|
|
public TimeSpan Duration { get; set; }
|
|
public bool Success { get; set; }
|
|
}
|
|
|
|
public class EnrichedOrder
|
|
{
|
|
public Order Order { get; set; } = null!;
|
|
public Customer? Customer { get; set; }
|
|
}
|
|
|
|
public class ProcessedOrder
|
|
{
|
|
public string Id { get; set; } = "";
|
|
public string CustomerId { get; set; } = "";
|
|
public decimal TotalAmount { get; set; }
|
|
public decimal TotalWithTax { get; set; }
|
|
public DateTime ProcessedAt { get; set; }
|
|
}
|
|
|
|
public class ValidationException : Exception
|
|
{
|
|
public ValidationException(string message) : base(message) { }
|
|
} |