978 lines
30 KiB
C#
978 lines
30 KiB
C#
using System;
|
|
using System.Collections.Generic;
|
|
using System.IO;
|
|
using System.Linq;
|
|
using System.Net.Http;
|
|
using System.Text;
|
|
using System.Text.Json;
|
|
using System.Threading.Tasks;
|
|
using FluentAssertions;
|
|
using Microsoft.AspNetCore.Builder;
|
|
using Microsoft.AspNetCore.Hosting;
|
|
using Microsoft.AspNetCore.Mvc;
|
|
using Microsoft.AspNetCore.TestHost;
|
|
using Microsoft.EntityFrameworkCore;
|
|
using Microsoft.Extensions.DependencyInjection;
|
|
using SqrtSpace.SpaceTime.AspNetCore;
|
|
using SqrtSpace.SpaceTime.Collections;
|
|
using SqrtSpace.SpaceTime.Core;
|
|
using SqrtSpace.SpaceTime.EntityFramework;
|
|
using SqrtSpace.SpaceTime.Linq;
|
|
using Xunit;
|
|
|
|
namespace SqrtSpace.SpaceTime.Tests.Integration;
|
|
|
|
public class EndToEndScenarioTests : IDisposable
|
|
{
|
|
private readonly TestServer _server;
|
|
private readonly HttpClient _client;
|
|
private readonly string _dataDirectory;
|
|
|
|
public EndToEndScenarioTests()
|
|
{
|
|
_dataDirectory = Path.Combine(Path.GetTempPath(), "spacetime_e2e_tests", Guid.NewGuid().ToString());
|
|
Directory.CreateDirectory(_dataDirectory);
|
|
|
|
var builder = new WebHostBuilder()
|
|
.ConfigureServices(services =>
|
|
{
|
|
// Configure SpaceTime
|
|
services.AddSpaceTime(options =>
|
|
{
|
|
options.EnableCheckpointing = true;
|
|
options.EnableStreaming = true;
|
|
options.CheckpointDirectory = Path.Combine(_dataDirectory, "checkpoints");
|
|
options.ExternalStorageDirectory = Path.Combine(_dataDirectory, "external");
|
|
options.DefaultStrategy = SpaceTimeStrategy.SqrtN;
|
|
});
|
|
|
|
// Configure Entity Framework with SpaceTime
|
|
services.AddDbContext<TestDbContext>(options =>
|
|
{
|
|
options.UseInMemoryDatabase($"TestDb_{Guid.NewGuid()}");
|
|
options.UseSpaceTimeOptimizer(opt =>
|
|
{
|
|
opt.EnableSqrtNChangeTracking = true;
|
|
opt.EnableQueryCheckpointing = true;
|
|
opt.BufferPoolStrategy = BufferPoolStrategy.SqrtN;
|
|
});
|
|
});
|
|
|
|
services.AddControllers();
|
|
services.AddScoped<IDataProcessingService, DataProcessingService>();
|
|
})
|
|
.Configure(app =>
|
|
{
|
|
app.UseSpaceTime();
|
|
app.UseRouting();
|
|
app.UseEndpoints(endpoints =>
|
|
{
|
|
endpoints.MapControllers();
|
|
});
|
|
});
|
|
|
|
_server = new TestServer(builder);
|
|
_client = _server.CreateClient();
|
|
|
|
// Seed initial data
|
|
SeedDatabase().Wait();
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
_client?.Dispose();
|
|
_server?.Dispose();
|
|
if (Directory.Exists(_dataDirectory))
|
|
{
|
|
Directory.Delete(_dataDirectory, true);
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task CompleteDataImportWorkflow_WithCheckpointing()
|
|
{
|
|
// Arrange
|
|
var importData = new ImportRequest
|
|
{
|
|
FileName = "large_dataset.csv",
|
|
TotalRecords = 10000,
|
|
SimulateFailureAt = 5000
|
|
};
|
|
|
|
// Act - First attempt (will fail)
|
|
var response1 = await _client.PostAsync("/api/import/csv",
|
|
new StringContent(JsonSerializer.Serialize(importData), Encoding.UTF8, "application/json"));
|
|
|
|
response1.StatusCode.Should().Be(System.Net.HttpStatusCode.InternalServerError);
|
|
|
|
// Act - Retry (should resume from checkpoint)
|
|
importData.SimulateFailureAt = null;
|
|
var response2 = await _client.PostAsync("/api/import/csv",
|
|
new StringContent(JsonSerializer.Serialize(importData), Encoding.UTF8, "application/json"));
|
|
|
|
// Assert
|
|
response2.StatusCode.Should().Be(System.Net.HttpStatusCode.OK);
|
|
var result = JsonSerializer.Deserialize<ImportResult>(
|
|
await response2.Content.ReadAsStringAsync());
|
|
|
|
result!.TotalProcessed.Should().Be(10000);
|
|
result.ResumedFromCheckpoint.Should().BeTrue();
|
|
result.ProcessingTime.Should().BeGreaterThan(TimeSpan.Zero);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task LargeDataExport_WithStreaming()
|
|
{
|
|
// Act
|
|
var response = await _client.GetStreamAsync("/api/export/orders?count=5000");
|
|
|
|
// Read streamed data
|
|
var orders = new List<OrderExport>();
|
|
using var reader = new StreamReader(response);
|
|
string? line;
|
|
while ((line = await reader.ReadLineAsync()) != null)
|
|
{
|
|
if (!string.IsNullOrWhiteSpace(line) && line.StartsWith("{"))
|
|
{
|
|
var order = JsonSerializer.Deserialize<OrderExport>(line);
|
|
if (order != null)
|
|
orders.Add(order);
|
|
}
|
|
}
|
|
|
|
// Assert
|
|
orders.Should().HaveCount(5000);
|
|
orders.Should().BeInAscendingOrder(o => o.OrderDate);
|
|
orders.All(o => o.TotalAmount > 0).Should().BeTrue();
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ComplexAnalytics_WithMemoryOptimization()
|
|
{
|
|
// Act
|
|
var response = await _client.GetAsync("/api/analytics/sales-summary?startDate=2023-01-01&endDate=2023-12-31");
|
|
|
|
// Assert
|
|
response.StatusCode.Should().Be(System.Net.HttpStatusCode.OK);
|
|
|
|
var result = JsonSerializer.Deserialize<SalesSummary>(
|
|
await response.Content.ReadAsStringAsync());
|
|
|
|
result!.TotalRevenue.Should().BeGreaterThan(0);
|
|
result.OrderCount.Should().BeGreaterThan(0);
|
|
result.TopProducts.Should().HaveCount(10);
|
|
result.MonthlySales.Should().HaveCount(12);
|
|
result.ProcessingStats.MemoryUsedMB.Should().BeLessThan(50); // Should use < 50MB even for large dataset
|
|
}
|
|
|
|
[Fact]
|
|
public async Task BatchProcessing_WithAdaptiveCollections()
|
|
{
|
|
// Arrange
|
|
var batchRequest = new BatchProcessRequest
|
|
{
|
|
Operations = Enumerable.Range(1, 1000).Select(i => new Operation
|
|
{
|
|
Type = i % 3 == 0 ? "Update" : "Create",
|
|
Data = new { Id = i, Value = $"Item{i}" }
|
|
}).ToList()
|
|
};
|
|
|
|
// Act
|
|
var response = await _client.PostAsync("/api/batch/process",
|
|
new StringContent(JsonSerializer.Serialize(batchRequest), Encoding.UTF8, "application/json"));
|
|
|
|
// Assert
|
|
response.StatusCode.Should().Be(System.Net.HttpStatusCode.OK);
|
|
|
|
var result = JsonSerializer.Deserialize<BatchProcessResult>(
|
|
await response.Content.ReadAsStringAsync());
|
|
|
|
result!.ProcessedCount.Should().Be(1000);
|
|
result.Errors.Should().BeEmpty();
|
|
result.MemoryStats.PeakUsageMB.Should().BeLessThan(20); // Adaptive collections should minimize memory
|
|
}
|
|
|
|
[Fact]
|
|
public async Task RealtimeDataProcessing_WithBackpressure()
|
|
{
|
|
// Arrange
|
|
var processingTasks = new List<Task<HttpResponseMessage>>();
|
|
|
|
// Act - Send multiple concurrent requests
|
|
for (int i = 0; i < 5; i++)
|
|
{
|
|
var task = _client.PostAsync($"/api/realtime/process?streamId={i}",
|
|
new StringContent(JsonSerializer.Serialize(new { DataPoints = 1000 })));
|
|
processingTasks.Add(task);
|
|
}
|
|
|
|
var responses = await Task.WhenAll(processingTasks);
|
|
|
|
// Assert
|
|
responses.Should().AllSatisfy(r => r.StatusCode.Should().Be(System.Net.HttpStatusCode.OK));
|
|
|
|
// Verify backpressure worked
|
|
var processingTimes = new List<TimeSpan>();
|
|
foreach (var response in responses)
|
|
{
|
|
var result = JsonSerializer.Deserialize<ProcessingResult>(
|
|
await response.Content.ReadAsStringAsync());
|
|
processingTimes.Add(result!.Duration);
|
|
}
|
|
|
|
// Processing times should vary due to backpressure
|
|
processingTimes.Max().Subtract(processingTimes.Min()).TotalMilliseconds.Should().BeGreaterThan(100);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task DataMigration_WithCheckpointRecovery()
|
|
{
|
|
// Arrange
|
|
var migrationId = Guid.NewGuid().ToString();
|
|
|
|
// Act - Start migration
|
|
var startResponse = await _client.PostAsync($"/api/migration/start?id={migrationId}",
|
|
new StringContent(JsonSerializer.Serialize(new { SourceTable = "LegacyData", RecordCount = 50000 })));
|
|
|
|
startResponse.StatusCode.Should().Be(System.Net.HttpStatusCode.Accepted);
|
|
|
|
// Check status periodically
|
|
MigrationStatus? status = null;
|
|
for (int i = 0; i < 10; i++)
|
|
{
|
|
await Task.Delay(500);
|
|
var statusResponse = await _client.GetAsync($"/api/migration/status?id={migrationId}");
|
|
status = JsonSerializer.Deserialize<MigrationStatus>(
|
|
await statusResponse.Content.ReadAsStringAsync());
|
|
|
|
if (status!.IsComplete)
|
|
break;
|
|
}
|
|
|
|
// Assert
|
|
status.Should().NotBeNull();
|
|
status!.IsComplete.Should().BeTrue();
|
|
status.RecordsProcessed.Should().Be(50000);
|
|
status.CheckpointsSaved.Should().BeGreaterThan(0);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task FullTextSearch_WithExternalIndex()
|
|
{
|
|
// Arrange - Index documents
|
|
var documents = Enumerable.Range(1, 1000).Select(i => new Document
|
|
{
|
|
Id = i,
|
|
Title = $"Document {i}",
|
|
Content = GenerateLargeText(i),
|
|
Tags = GenerateTags(i)
|
|
}).ToList();
|
|
|
|
var indexResponse = await _client.PostAsync("/api/search/index",
|
|
new StringContent(JsonSerializer.Serialize(documents), Encoding.UTF8, "application/json"));
|
|
|
|
indexResponse.StatusCode.Should().Be(System.Net.HttpStatusCode.OK);
|
|
|
|
// Act - Search
|
|
var searchResponse = await _client.GetAsync("/api/search?query=important&limit=10");
|
|
|
|
// Assert
|
|
searchResponse.StatusCode.Should().Be(System.Net.HttpStatusCode.OK);
|
|
|
|
var results = JsonSerializer.Deserialize<SearchResults>(
|
|
await searchResponse.Content.ReadAsStringAsync());
|
|
|
|
results!.Items.Should().HaveCount(10);
|
|
results.TotalMatches.Should().BeGreaterThan(10);
|
|
results.SearchTime.Should().BeLessThan(TimeSpan.FromSeconds(1));
|
|
results.MemoryUsedMB.Should().BeLessThan(10); // External index should use minimal memory
|
|
}
|
|
|
|
private async Task SeedDatabase()
|
|
{
|
|
using var scope = _server.Services.CreateScope();
|
|
var context = scope.ServiceProvider.GetRequiredService<TestDbContext>();
|
|
|
|
// Create test data
|
|
var customers = Enumerable.Range(1, 100).Select(i => new Customer
|
|
{
|
|
Name = $"Customer {i}",
|
|
Email = $"customer{i}@example.com",
|
|
CreatedDate = DateTime.Today.AddDays(-Random.Shared.Next(365))
|
|
}).ToList();
|
|
|
|
var products = Enumerable.Range(1, 50).Select(i => new Product
|
|
{
|
|
Name = $"Product {i}",
|
|
Price = Random.Shared.Next(10, 1000),
|
|
Category = $"Category{i % 5}"
|
|
}).ToList();
|
|
|
|
context.Customers.AddRange(customers);
|
|
context.Products.AddRange(products);
|
|
await context.SaveChangesAsync();
|
|
|
|
// Generate orders
|
|
var orders = new List<Order>();
|
|
foreach (var customer in customers.Take(50))
|
|
{
|
|
for (int i = 0; i < Random.Shared.Next(5, 20); i++)
|
|
{
|
|
var order = new Order
|
|
{
|
|
CustomerId = customer.Id,
|
|
OrderDate = DateTime.Today.AddDays(-Random.Shared.Next(365)),
|
|
Status = "Completed",
|
|
Items = new List<OrderItem>()
|
|
};
|
|
|
|
var itemCount = Random.Shared.Next(1, 10);
|
|
for (int j = 0; j < itemCount; j++)
|
|
{
|
|
var product = products[Random.Shared.Next(products.Count)];
|
|
order.Items.Add(new OrderItem
|
|
{
|
|
ProductId = product.Id,
|
|
Quantity = Random.Shared.Next(1, 5),
|
|
UnitPrice = product.Price
|
|
});
|
|
}
|
|
|
|
order.TotalAmount = order.Items.Sum(i => i.Quantity * i.UnitPrice);
|
|
orders.Add(order);
|
|
}
|
|
}
|
|
|
|
context.Orders.AddRange(orders);
|
|
await context.SaveChangesAsync();
|
|
}
|
|
|
|
private static string GenerateLargeText(int seed)
|
|
{
|
|
var words = new[] { "important", "critical", "data", "analysis", "report", "summary", "detail", "information" };
|
|
var sb = new StringBuilder();
|
|
var random = new Random(seed);
|
|
|
|
for (int i = 0; i < 100; i++)
|
|
{
|
|
sb.Append(words[random.Next(words.Length)]);
|
|
sb.Append(' ');
|
|
}
|
|
|
|
return sb.ToString();
|
|
}
|
|
|
|
private static List<string> GenerateTags(int seed)
|
|
{
|
|
var allTags = new[] { "urgent", "review", "approved", "pending", "archived" };
|
|
var random = new Random(seed);
|
|
var tagCount = random.Next(1, 4);
|
|
|
|
return allTags.OrderBy(_ => random.Next()).Take(tagCount).ToList();
|
|
}
|
|
}
|
|
|
|
// Controllers for integration tests
|
|
[ApiController]
|
|
[Route("api/import")]
|
|
public class ImportController : ControllerBase
|
|
{
|
|
private readonly IDataProcessingService _processingService;
|
|
|
|
public ImportController(IDataProcessingService processingService)
|
|
{
|
|
_processingService = processingService;
|
|
}
|
|
|
|
[HttpPost("csv")]
|
|
[EnableCheckpoint(Strategy = CheckpointStrategy.SqrtN)]
|
|
public async Task<IActionResult> ImportCsv([FromBody] ImportRequest request)
|
|
{
|
|
var checkpoint = HttpContext.Features.Get<ICheckpointFeature>()!;
|
|
|
|
var state = await checkpoint.LoadStateAsync<ImportState>("import-state");
|
|
var startFrom = state?.ProcessedCount ?? 0;
|
|
var processedCount = startFrom;
|
|
var resumed = startFrom > 0;
|
|
|
|
var sw = System.Diagnostics.Stopwatch.StartNew();
|
|
|
|
try
|
|
{
|
|
for (int i = startFrom; i < request.TotalRecords; i++)
|
|
{
|
|
if (request.SimulateFailureAt.HasValue && i == request.SimulateFailureAt.Value)
|
|
{
|
|
throw new Exception("Simulated import failure");
|
|
}
|
|
|
|
// Simulate processing
|
|
await _processingService.ProcessRecord(i);
|
|
processedCount++;
|
|
|
|
if (checkpoint.ShouldCheckpoint(processedCount))
|
|
{
|
|
await checkpoint.SaveStateAsync("import-state", new ImportState { ProcessedCount = processedCount });
|
|
}
|
|
}
|
|
}
|
|
catch
|
|
{
|
|
throw;
|
|
}
|
|
|
|
return Ok(new ImportResult
|
|
{
|
|
TotalProcessed = processedCount,
|
|
ResumedFromCheckpoint = resumed,
|
|
ProcessingTime = sw.Elapsed
|
|
});
|
|
}
|
|
}
|
|
|
|
[ApiController]
|
|
[Route("api/export")]
|
|
public class ExportController : ControllerBase
|
|
{
|
|
private readonly TestDbContext _context;
|
|
|
|
public ExportController(TestDbContext context)
|
|
{
|
|
_context = context;
|
|
}
|
|
|
|
[HttpGet("orders")]
|
|
[SpaceTimeStreaming(ChunkStrategy = ChunkStrategy.SqrtN)]
|
|
public async IAsyncEnumerable<OrderExport> ExportOrders([FromQuery] int count = 1000)
|
|
{
|
|
await foreach (var batch in _context.Orders
|
|
.OrderBy(o => o.OrderDate)
|
|
.Take(count)
|
|
.BatchBySqrtNAsync())
|
|
{
|
|
foreach (var order in batch)
|
|
{
|
|
yield return new OrderExport
|
|
{
|
|
OrderId = order.Id,
|
|
CustomerName = order.Customer.Name,
|
|
OrderDate = order.OrderDate,
|
|
TotalAmount = order.TotalAmount,
|
|
ItemCount = order.Items.Count
|
|
};
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
[ApiController]
|
|
[Route("api/analytics")]
|
|
public class AnalyticsController : ControllerBase
|
|
{
|
|
private readonly TestDbContext _context;
|
|
|
|
public AnalyticsController(TestDbContext context)
|
|
{
|
|
_context = context;
|
|
}
|
|
|
|
[HttpGet("sales-summary")]
|
|
public async Task<IActionResult> GetSalesSummary([FromQuery] DateTime startDate, [FromQuery] DateTime endDate)
|
|
{
|
|
var memoryBefore = GC.GetTotalMemory(false);
|
|
|
|
// Use SpaceTime optimizations for large aggregations
|
|
var orders = await _context.Orders
|
|
.Where(o => o.OrderDate >= startDate && o.OrderDate <= endDate)
|
|
.ToListWithSqrtNMemoryAsync();
|
|
|
|
var summary = new SalesSummary
|
|
{
|
|
TotalRevenue = orders.Sum(o => o.TotalAmount),
|
|
OrderCount = orders.Count,
|
|
AverageOrderValue = orders.Average(o => o.TotalAmount),
|
|
TopProducts = await GetTopProducts(orders),
|
|
MonthlySales = GetMonthlySales(orders),
|
|
ProcessingStats = new ProcessingStats
|
|
{
|
|
MemoryUsedMB = (GC.GetTotalMemory(false) - memoryBefore) / (1024.0 * 1024.0),
|
|
RecordsProcessed = orders.Count
|
|
}
|
|
};
|
|
|
|
return Ok(summary);
|
|
}
|
|
|
|
private async Task<List<ProductSummary>> GetTopProducts(List<Order> orders)
|
|
{
|
|
var productSales = new AdaptiveDictionary<int, decimal>();
|
|
|
|
foreach (var order in orders)
|
|
{
|
|
foreach (var item in order.Items)
|
|
{
|
|
if (productSales.ContainsKey(item.ProductId))
|
|
productSales[item.ProductId] += item.Quantity * item.UnitPrice;
|
|
else
|
|
productSales[item.ProductId] = item.Quantity * item.UnitPrice;
|
|
}
|
|
}
|
|
|
|
return productSales
|
|
.OrderByDescending(kvp => kvp.Value)
|
|
.Take(10)
|
|
.Select(kvp => new ProductSummary
|
|
{
|
|
ProductId = kvp.Key,
|
|
TotalSales = kvp.Value
|
|
})
|
|
.ToList();
|
|
}
|
|
|
|
private List<MonthlySale> GetMonthlySales(List<Order> orders)
|
|
{
|
|
return orders
|
|
.GroupByExternal(o => new { o.OrderDate.Year, o.OrderDate.Month })
|
|
.Select(g => new MonthlySale
|
|
{
|
|
Year = g.Key.Year,
|
|
Month = g.Key.Month,
|
|
Total = g.Sum(o => o.TotalAmount)
|
|
})
|
|
.OrderBy(m => m.Year)
|
|
.ThenBy(m => m.Month)
|
|
.ToList();
|
|
}
|
|
}
|
|
|
|
[ApiController]
|
|
[Route("api/batch")]
|
|
public class BatchController : ControllerBase
|
|
{
|
|
[HttpPost("process")]
|
|
public async Task<IActionResult> ProcessBatch([FromBody] BatchProcessRequest request)
|
|
{
|
|
var results = new AdaptiveList<object>();
|
|
var errors = new List<string>();
|
|
var memoryStart = GC.GetTotalMemory(false);
|
|
var peakMemory = memoryStart;
|
|
|
|
foreach (var batch in request.Operations.BatchBySqrtN())
|
|
{
|
|
foreach (var operation in batch)
|
|
{
|
|
try
|
|
{
|
|
var result = await ProcessOperation(operation);
|
|
results.Add(result);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
errors.Add($"Operation failed: {ex.Message}");
|
|
}
|
|
}
|
|
|
|
var currentMemory = GC.GetTotalMemory(false);
|
|
if (currentMemory > peakMemory)
|
|
peakMemory = currentMemory;
|
|
}
|
|
|
|
return Ok(new BatchProcessResult
|
|
{
|
|
ProcessedCount = results.Count,
|
|
Errors = errors,
|
|
MemoryStats = new MemoryStats
|
|
{
|
|
StartUsageMB = memoryStart / (1024.0 * 1024.0),
|
|
PeakUsageMB = peakMemory / (1024.0 * 1024.0)
|
|
}
|
|
});
|
|
}
|
|
|
|
private async Task<object> ProcessOperation(Operation operation)
|
|
{
|
|
await Task.Delay(1); // Simulate processing
|
|
return new { Success = true, Id = Guid.NewGuid() };
|
|
}
|
|
}
|
|
|
|
[ApiController]
|
|
[Route("api/realtime")]
|
|
public class RealtimeController : ControllerBase
|
|
{
|
|
private static readonly AdaptiveDictionary<string, DateTime> _streamTimestamps = new();
|
|
|
|
[HttpPost("process")]
|
|
public async Task<IActionResult> ProcessStream([FromQuery] string streamId, [FromBody] StreamData data)
|
|
{
|
|
var start = DateTime.UtcNow;
|
|
|
|
// Apply backpressure based on stream rate
|
|
if (_streamTimestamps.TryGetValue(streamId, out var lastTime))
|
|
{
|
|
var elapsed = (DateTime.UtcNow - lastTime).TotalMilliseconds;
|
|
if (elapsed < 100) // Less than 100ms since last request
|
|
{
|
|
await Task.Delay(TimeSpan.FromMilliseconds(100 - elapsed));
|
|
}
|
|
}
|
|
|
|
_streamTimestamps[streamId] = DateTime.UtcNow;
|
|
|
|
// Process data points
|
|
var processed = 0;
|
|
foreach (var point in Enumerable.Range(1, data.DataPoints))
|
|
{
|
|
await ProcessDataPoint(point);
|
|
processed++;
|
|
}
|
|
|
|
return Ok(new ProcessingResult
|
|
{
|
|
ProcessedCount = processed,
|
|
Duration = DateTime.UtcNow - start
|
|
});
|
|
}
|
|
|
|
private async Task ProcessDataPoint(int point)
|
|
{
|
|
await Task.Delay(1);
|
|
}
|
|
}
|
|
|
|
[ApiController]
|
|
[Route("api/migration")]
|
|
public class MigrationController : ControllerBase
|
|
{
|
|
private static readonly AdaptiveDictionary<string, MigrationStatus> _migrations = new();
|
|
|
|
[HttpPost("start")]
|
|
public async Task<IActionResult> StartMigration([FromQuery] string id, [FromBody] MigrationRequest request)
|
|
{
|
|
var status = new MigrationStatus
|
|
{
|
|
Id = id,
|
|
TotalRecords = request.RecordCount,
|
|
StartTime = DateTime.UtcNow
|
|
};
|
|
|
|
_migrations[id] = status;
|
|
|
|
// Start migration in background
|
|
_ = Task.Run(async () => await RunMigration(id, request));
|
|
|
|
return Accepted(new { MigrationId = id });
|
|
}
|
|
|
|
[HttpGet("status")]
|
|
public IActionResult GetStatus([FromQuery] string id)
|
|
{
|
|
if (_migrations.TryGetValue(id, out var status))
|
|
{
|
|
return Ok(status);
|
|
}
|
|
|
|
return NotFound();
|
|
}
|
|
|
|
private async Task RunMigration(string id, MigrationRequest request)
|
|
{
|
|
var checkpointManager = new CheckpointManager(
|
|
Path.Combine(Path.GetTempPath(), "migrations", id),
|
|
strategy: CheckpointStrategy.SqrtN);
|
|
|
|
var status = _migrations[id];
|
|
|
|
for (int i = 0; i < request.RecordCount; i++)
|
|
{
|
|
// Simulate migration work
|
|
await Task.Delay(1);
|
|
|
|
status.RecordsProcessed++;
|
|
|
|
if (checkpointManager.ShouldCheckpoint())
|
|
{
|
|
await checkpointManager.CreateCheckpointAsync(new { Processed = i });
|
|
status.CheckpointsSaved++;
|
|
}
|
|
}
|
|
|
|
status.IsComplete = true;
|
|
status.EndTime = DateTime.UtcNow;
|
|
}
|
|
}
|
|
|
|
[ApiController]
|
|
[Route("api/search")]
|
|
public class SearchController : ControllerBase
|
|
{
|
|
private static readonly ExternalStorage<Document> _documentStorage =
|
|
new(Path.Combine(Path.GetTempPath(), "search_index"));
|
|
private static readonly AdaptiveDictionary<string, List<int>> _invertedIndex = new();
|
|
|
|
[HttpPost("index")]
|
|
public async Task<IActionResult> IndexDocuments([FromBody] List<Document> documents)
|
|
{
|
|
foreach (var doc in documents)
|
|
{
|
|
// Store document
|
|
await _documentStorage.WriteAsync($"doc_{doc.Id}", doc);
|
|
|
|
// Build inverted index
|
|
var words = doc.Content.Split(' ', StringSplitOptions.RemoveEmptyEntries)
|
|
.Concat(doc.Title.Split(' '))
|
|
.Concat(doc.Tags)
|
|
.Distinct();
|
|
|
|
foreach (var word in words)
|
|
{
|
|
var key = word.ToLowerInvariant();
|
|
if (!_invertedIndex.ContainsKey(key))
|
|
_invertedIndex[key] = new List<int>();
|
|
|
|
_invertedIndex[key].Add(doc.Id);
|
|
}
|
|
}
|
|
|
|
return Ok(new { IndexedCount = documents.Count });
|
|
}
|
|
|
|
[HttpGet]
|
|
public async Task<IActionResult> Search([FromQuery] string query, [FromQuery] int limit = 10)
|
|
{
|
|
var start = DateTime.UtcNow;
|
|
var memoryBefore = GC.GetTotalMemory(false);
|
|
|
|
var searchTerms = query.ToLowerInvariant().Split(' ');
|
|
var matchingIds = new AdaptiveList<int>();
|
|
|
|
foreach (var term in searchTerms)
|
|
{
|
|
if (_invertedIndex.TryGetValue(term, out var ids))
|
|
{
|
|
matchingIds.AddRange(ids);
|
|
}
|
|
}
|
|
|
|
var uniqueIds = matchingIds.DistinctExternal().Take(limit).ToList();
|
|
var results = new List<Document>();
|
|
|
|
foreach (var id in uniqueIds)
|
|
{
|
|
var doc = await _documentStorage.ReadAsync($"doc_{id}");
|
|
if (doc != null)
|
|
results.Add(doc);
|
|
}
|
|
|
|
return Ok(new SearchResults
|
|
{
|
|
Items = results,
|
|
TotalMatches = matchingIds.Count,
|
|
SearchTime = DateTime.UtcNow - start,
|
|
MemoryUsedMB = (GC.GetTotalMemory(false) - memoryBefore) / (1024.0 * 1024.0)
|
|
});
|
|
}
|
|
}
|
|
|
|
// Service interfaces and implementations
|
|
public interface IDataProcessingService
|
|
{
|
|
Task ProcessRecord(int recordId);
|
|
}
|
|
|
|
public class DataProcessingService : IDataProcessingService
|
|
{
|
|
public async Task ProcessRecord(int recordId)
|
|
{
|
|
// Simulate processing
|
|
await Task.Delay(1);
|
|
}
|
|
}
|
|
|
|
// DTOs
|
|
public class ImportRequest
|
|
{
|
|
public string FileName { get; set; } = "";
|
|
public int TotalRecords { get; set; }
|
|
public int? SimulateFailureAt { get; set; }
|
|
}
|
|
|
|
public class ImportResult
|
|
{
|
|
public int TotalProcessed { get; set; }
|
|
public bool ResumedFromCheckpoint { get; set; }
|
|
public TimeSpan ProcessingTime { get; set; }
|
|
}
|
|
|
|
public class ImportState
|
|
{
|
|
public int ProcessedCount { get; set; }
|
|
}
|
|
|
|
public class OrderExport
|
|
{
|
|
public int OrderId { get; set; }
|
|
public string CustomerName { get; set; } = "";
|
|
public DateTime OrderDate { get; set; }
|
|
public decimal TotalAmount { get; set; }
|
|
public int ItemCount { get; set; }
|
|
}
|
|
|
|
public class SalesSummary
|
|
{
|
|
public decimal TotalRevenue { get; set; }
|
|
public int OrderCount { get; set; }
|
|
public decimal AverageOrderValue { get; set; }
|
|
public List<ProductSummary> TopProducts { get; set; } = new();
|
|
public List<MonthlySale> MonthlySales { get; set; } = new();
|
|
public ProcessingStats ProcessingStats { get; set; } = new();
|
|
}
|
|
|
|
public class ProductSummary
|
|
{
|
|
public int ProductId { get; set; }
|
|
public decimal TotalSales { get; set; }
|
|
}
|
|
|
|
public class MonthlySale
|
|
{
|
|
public int Year { get; set; }
|
|
public int Month { get; set; }
|
|
public decimal Total { get; set; }
|
|
}
|
|
|
|
public class ProcessingStats
|
|
{
|
|
public double MemoryUsedMB { get; set; }
|
|
public int RecordsProcessed { get; set; }
|
|
}
|
|
|
|
public class BatchProcessRequest
|
|
{
|
|
public List<Operation> Operations { get; set; } = new();
|
|
}
|
|
|
|
public class Operation
|
|
{
|
|
public string Type { get; set; } = "";
|
|
public object Data { get; set; } = new();
|
|
}
|
|
|
|
public class BatchProcessResult
|
|
{
|
|
public int ProcessedCount { get; set; }
|
|
public List<string> Errors { get; set; } = new();
|
|
public MemoryStats MemoryStats { get; set; } = new();
|
|
}
|
|
|
|
public class MemoryStats
|
|
{
|
|
public double StartUsageMB { get; set; }
|
|
public double PeakUsageMB { get; set; }
|
|
}
|
|
|
|
public class StreamData
|
|
{
|
|
public int DataPoints { get; set; }
|
|
}
|
|
|
|
public class ProcessingResult
|
|
{
|
|
public int ProcessedCount { get; set; }
|
|
public TimeSpan Duration { get; set; }
|
|
}
|
|
|
|
public class MigrationRequest
|
|
{
|
|
public string SourceTable { get; set; } = "";
|
|
public int RecordCount { get; set; }
|
|
}
|
|
|
|
public class MigrationStatus
|
|
{
|
|
public string Id { get; set; } = "";
|
|
public int TotalRecords { get; set; }
|
|
public int RecordsProcessed { get; set; }
|
|
public int CheckpointsSaved { get; set; }
|
|
public bool IsComplete { get; set; }
|
|
public DateTime StartTime { get; set; }
|
|
public DateTime? EndTime { get; set; }
|
|
}
|
|
|
|
public class Document
|
|
{
|
|
public int Id { get; set; }
|
|
public string Title { get; set; } = "";
|
|
public string Content { get; set; } = "";
|
|
public List<string> Tags { get; set; } = new();
|
|
}
|
|
|
|
public class SearchResults
|
|
{
|
|
public List<Document> Items { get; set; } = new();
|
|
public int TotalMatches { get; set; }
|
|
public TimeSpan SearchTime { get; set; }
|
|
public double MemoryUsedMB { get; set; }
|
|
}
|
|
|
|
// Test DB Context
|
|
public class TestDbContext : DbContext
|
|
{
|
|
public TestDbContext(DbContextOptions<TestDbContext> options) : base(options) { }
|
|
|
|
public DbSet<Customer> Customers { get; set; }
|
|
public DbSet<Product> Products { get; set; }
|
|
public DbSet<Order> Orders { get; set; }
|
|
public DbSet<OrderItem> OrderItems { get; set; }
|
|
|
|
protected override void OnModelCreating(ModelBuilder modelBuilder)
|
|
{
|
|
modelBuilder.Entity<Order>()
|
|
.HasOne(o => o.Customer)
|
|
.WithMany()
|
|
.HasForeignKey(o => o.CustomerId);
|
|
|
|
modelBuilder.Entity<OrderItem>()
|
|
.HasOne(oi => oi.Product)
|
|
.WithMany()
|
|
.HasForeignKey(oi => oi.ProductId);
|
|
}
|
|
}
|
|
|
|
public class Customer
|
|
{
|
|
public int Id { get; set; }
|
|
public string Name { get; set; } = "";
|
|
public string Email { get; set; } = "";
|
|
public DateTime CreatedDate { get; set; }
|
|
}
|
|
|
|
public class Product
|
|
{
|
|
public int Id { get; set; }
|
|
public string Name { get; set; } = "";
|
|
public decimal Price { get; set; }
|
|
public string Category { get; set; } = "";
|
|
}
|
|
|
|
public class Order
|
|
{
|
|
public int Id { get; set; }
|
|
public int CustomerId { get; set; }
|
|
public Customer Customer { get; set; } = null!;
|
|
public DateTime OrderDate { get; set; }
|
|
public decimal TotalAmount { get; set; }
|
|
public string Status { get; set; } = "";
|
|
public List<OrderItem> Items { get; set; } = new();
|
|
}
|
|
|
|
public class OrderItem
|
|
{
|
|
public int Id { get; set; }
|
|
public int OrderId { get; set; }
|
|
public int ProductId { get; set; }
|
|
public Product Product { get; set; } = null!;
|
|
public int Quantity { get; set; }
|
|
public decimal UnitPrice { get; set; }
|
|
} |