Files
replicate.maui/Replicate.Maui/PredictionTracker.cs
2025-12-10 21:59:12 -05:00

441 lines
16 KiB
C#

using System.Collections.Concurrent;
namespace MarketAlly.Replicate.Maui
{
/// <summary>
/// Interface for tracking prediction history and monitoring pending predictions.
/// </summary>
public interface IPredictionTracker : IDisposable
{
/// <summary>
/// Interval between status checks for pending predictions in milliseconds.
/// </summary>
int PollingIntervalMs { get; set; }
/// <summary>
/// Maximum number of predictions to keep in history.
/// </summary>
int MaxHistorySize { get; set; }
/// <summary>
/// Raised when a prediction's status changes.
/// </summary>
event EventHandler<PredictionStatusChangedEventArgs>? PredictionStatusChanged;
/// <summary>
/// Raised when a prediction completes (succeeded, failed, or canceled).
/// </summary>
event EventHandler<PredictionCompletedEventArgs>? PredictionCompleted;
/// <summary>
/// Gets all tracked predictions ordered by creation time (newest first).
/// </summary>
IReadOnlyList<TrackedPrediction> History { get; }
/// <summary>
/// Gets predictions that are still pending (starting or processing).
/// </summary>
IReadOnlyList<TrackedPrediction> PendingPredictions { get; }
/// <summary>
/// Start tracking a prediction. Automatically begins polling if not already running.
/// </summary>
TrackedPrediction Track(PredictionResult result, TransformationType type, byte[]? sourceImageBytes = null);
/// <summary>
/// Get a tracked prediction by ID.
/// </summary>
TrackedPrediction? Get(string predictionId);
/// <summary>
/// Manually refresh a prediction's status.
/// </summary>
Task<TrackedPrediction?> RefreshAsync(string predictionId, CancellationToken cancellationToken = default);
/// <summary>
/// Clear all completed predictions from history.
/// </summary>
void ClearCompleted();
/// <summary>
/// Clear all predictions from history.
/// </summary>
void ClearAll();
}
/// <summary>
/// Tracks prediction history and monitors pending predictions for completion.
/// </summary>
public class PredictionTracker : IPredictionTracker
{
private readonly ConcurrentDictionary<string, TrackedPrediction> _predictions = new();
private readonly IReplicateTransformer _transformer;
private readonly CancellationTokenSource _pollingCts = new();
private Task? _pollingTask;
private bool _disposed;
/// <summary>
/// Interval between status checks for pending predictions (default 3 seconds).
/// </summary>
public int PollingIntervalMs { get; set; } = 3000;
/// <summary>
/// Maximum number of predictions to keep in history (default 50).
/// </summary>
public int MaxHistorySize { get; set; } = 50;
/// <summary>
/// Raised when a prediction's status changes.
/// </summary>
public event EventHandler<PredictionStatusChangedEventArgs>? PredictionStatusChanged;
/// <summary>
/// Raised when a prediction completes (succeeded, failed, or canceled).
/// </summary>
public event EventHandler<PredictionCompletedEventArgs>? PredictionCompleted;
/// <summary>
/// Gets all tracked predictions ordered by creation time (newest first).
/// </summary>
public IReadOnlyList<TrackedPrediction> History =>
_predictions.Values.OrderByDescending(p => p.CreatedAt).ToList();
/// <summary>
/// Gets predictions that are still pending (starting or processing).
/// </summary>
public IReadOnlyList<TrackedPrediction> PendingPredictions =>
_predictions.Values.Where(p => !p.IsCompleted).OrderByDescending(p => p.CreatedAt).ToList();
public PredictionTracker(IReplicateTransformer transformer)
{
_transformer = transformer;
// Subscribe to prediction creation to track immediately
_transformer.PredictionCreated += OnPredictionCreated;
}
private void OnPredictionCreated(object? sender, PredictionCreatedEventArgs e)
{
// Auto-track predictions as soon as they're created
// This ensures they appear in history immediately with "starting" status
var prediction = e.Prediction;
// Only track if not already tracked (avoid duplicates)
if (!_predictions.ContainsKey(prediction.Id))
{
var tracked = new TrackedPrediction
{
Id = prediction.Id,
Type = TransformationType.Image, // Will be updated by explicit Track() call
Status = prediction.Status,
Output = prediction.Output,
Outputs = prediction.Outputs,
Error = prediction.Error,
Metrics = prediction.Metrics,
CreatedAt = prediction.CreatedAt ?? DateTimeOffset.Now,
StartedAt = prediction.StartedAt,
CompletedAt = prediction.CompletedAt,
SourceImageBytes = null // Will be updated by explicit Track() call
};
_predictions[prediction.Id] = tracked;
TrimHistory();
// Fire status event immediately
PredictionStatusChanged?.Invoke(this, new PredictionStatusChangedEventArgs(
tracked, string.Empty, prediction.Status));
// Start polling for this prediction
if (!tracked.IsCompleted)
{
StartPolling();
}
}
}
/// <summary>
/// Start tracking a prediction. Automatically begins polling if not already running.
/// If the prediction was already auto-tracked via PredictionCreated event, updates it with type and source image.
/// </summary>
public TrackedPrediction Track(PredictionResult result, TransformationType type, byte[]? sourceImageBytes = null)
{
// Check if already tracked (auto-tracked via PredictionCreated event)
if (_predictions.TryGetValue(result.Id, out var existing))
{
// Update with additional info that wasn't available at creation time
existing.Type = type;
existing.SourceImageBytes = sourceImageBytes;
// Update with latest result data
var oldStatus = existing.Status;
existing.Status = result.Status;
existing.Output = result.Output;
existing.Outputs = result.Outputs;
existing.Error = result.Error;
existing.Metrics = result.Metrics;
existing.StartedAt = result.StartedAt;
existing.CompletedAt = result.CompletedAt;
// Fire status changed if status actually changed
if (oldStatus != result.Status)
{
PredictionStatusChanged?.Invoke(this, new PredictionStatusChangedEventArgs(
existing, oldStatus, result.Status));
}
// Fire completed event if now done
if (existing.IsCompleted)
{
PredictionCompleted?.Invoke(this, new PredictionCompletedEventArgs(existing));
}
return existing;
}
// Not yet tracked - create new entry
var tracked = new TrackedPrediction
{
Id = result.Id,
Type = type,
Status = result.Status,
Output = result.Output,
Outputs = result.Outputs,
Error = result.Error,
Metrics = result.Metrics,
CreatedAt = result.CreatedAt ?? DateTimeOffset.Now,
StartedAt = result.StartedAt,
CompletedAt = result.CompletedAt,
SourceImageBytes = sourceImageBytes
};
_predictions[result.Id] = tracked;
TrimHistory();
// Fire initial status event so listeners can track from the start
PredictionStatusChanged?.Invoke(this, new PredictionStatusChangedEventArgs(
tracked, string.Empty, result.Status));
// Fire completed event if already done
if (tracked.IsCompleted)
{
PredictionCompleted?.Invoke(this, new PredictionCompletedEventArgs(tracked));
}
else
{
// Start polling if we have pending predictions
StartPolling();
}
return tracked;
}
/// <summary>
/// Get a tracked prediction by ID.
/// </summary>
public TrackedPrediction? Get(string predictionId)
{
return _predictions.TryGetValue(predictionId, out var prediction) ? prediction : null;
}
/// <summary>
/// Manually refresh a prediction's status.
/// </summary>
public async Task<TrackedPrediction?> RefreshAsync(string predictionId, CancellationToken cancellationToken = default)
{
if (!_predictions.TryGetValue(predictionId, out var tracked))
return null;
try
{
var result = await _transformer.GetPredictionAsync(predictionId, cancellationToken);
UpdateTrackedPrediction(tracked, result);
return tracked;
}
catch
{
return tracked;
}
}
/// <summary>
/// Clear all completed predictions from history.
/// </summary>
public void ClearCompleted()
{
var completedIds = _predictions.Values
.Where(p => p.IsCompleted)
.Select(p => p.Id)
.ToList();
foreach (var id in completedIds)
{
_predictions.TryRemove(id, out _);
}
}
/// <summary>
/// Clear all predictions from history.
/// </summary>
public void ClearAll()
{
_predictions.Clear();
}
private void StartPolling()
{
if (_pollingTask != null && !_pollingTask.IsCompleted)
return;
_pollingTask = Task.Run(async () =>
{
while (!_pollingCts.Token.IsCancellationRequested)
{
await Task.Delay(PollingIntervalMs, _pollingCts.Token);
var pending = PendingPredictions;
if (pending.Count == 0)
break;
foreach (var prediction in pending)
{
if (_pollingCts.Token.IsCancellationRequested)
break;
try
{
var result = await _transformer.GetPredictionAsync(prediction.Id, _pollingCts.Token);
UpdateTrackedPrediction(prediction, result);
}
catch (OperationCanceledException)
{
break;
}
catch
{
// Continue polling other predictions
}
}
}
}, _pollingCts.Token);
}
private void UpdateTrackedPrediction(TrackedPrediction tracked, PredictionResult result)
{
var previousStatus = tracked.Status;
var wasCompleted = tracked.IsCompleted;
tracked.Status = result.Status;
tracked.Output = result.Output;
tracked.Outputs = result.Outputs;
tracked.Error = result.Error;
tracked.Metrics = result.Metrics;
tracked.StartedAt = result.StartedAt;
tracked.CompletedAt = result.CompletedAt;
tracked.LastCheckedAt = DateTimeOffset.Now;
// Raise status changed event
if (previousStatus != result.Status)
{
PredictionStatusChanged?.Invoke(this, new PredictionStatusChangedEventArgs(
tracked, previousStatus, result.Status));
}
// Raise completed event
if (!wasCompleted && tracked.IsCompleted)
{
PredictionCompleted?.Invoke(this, new PredictionCompletedEventArgs(tracked));
}
}
private void TrimHistory()
{
if (_predictions.Count <= MaxHistorySize)
return;
var toRemove = _predictions.Values
.Where(p => p.IsCompleted)
.OrderBy(p => p.CreatedAt)
.Take(_predictions.Count - MaxHistorySize)
.Select(p => p.Id)
.ToList();
foreach (var id in toRemove)
{
_predictions.TryRemove(id, out _);
}
}
public void Dispose()
{
if (_disposed) return;
_disposed = true;
// Unsubscribe from transformer events
_transformer.PredictionCreated -= OnPredictionCreated;
_pollingCts.Cancel();
_pollingCts.Dispose();
}
}
/// <summary>
/// A tracked prediction with history information.
/// </summary>
public class TrackedPrediction
{
public string Id { get; set; } = string.Empty;
public TransformationType Type { get; set; }
public string Status { get; set; } = string.Empty;
public PredictionStatus StatusEnum => Status.ToPredictionStatus();
public string? Output { get; set; }
public string[]? Outputs { get; set; }
public string? Error { get; set; }
public PredictionMetrics? Metrics { get; set; }
public DateTimeOffset CreatedAt { get; set; }
public DateTimeOffset? StartedAt { get; set; }
public DateTimeOffset? CompletedAt { get; set; }
public DateTimeOffset? LastCheckedAt { get; set; }
public byte[]? SourceImageBytes { get; set; }
public bool IsCompleted => StatusEnum.IsCompleted();
public bool IsSucceeded => StatusEnum == PredictionStatus.Succeeded;
public bool IsFailed => StatusEnum == PredictionStatus.Failed;
public bool IsCanceled => StatusEnum == PredictionStatus.Canceled;
public bool IsPending => StatusEnum.IsPending();
/// <summary>
/// Time elapsed since creation.
/// </summary>
public TimeSpan Elapsed => (CompletedAt ?? DateTimeOffset.Now) - CreatedAt;
}
public class PredictionStatusChangedEventArgs : EventArgs
{
public TrackedPrediction Prediction { get; }
public string PreviousStatus { get; }
public string NewStatus { get; }
public PredictionStatus PreviousStatusEnum => PreviousStatus.ToPredictionStatus();
public PredictionStatus NewStatusEnum => NewStatus.ToPredictionStatus();
public PredictionStatusChangedEventArgs(TrackedPrediction prediction, string previousStatus, string newStatus)
{
Prediction = prediction;
PreviousStatus = previousStatus;
NewStatus = newStatus;
}
}
public class PredictionCompletedEventArgs : EventArgs
{
public TrackedPrediction Prediction { get; }
public bool Succeeded => Prediction.IsSucceeded;
public bool Failed => Prediction.IsFailed;
public bool Canceled => Prediction.IsCanceled;
public PredictionCompletedEventArgs(TrackedPrediction prediction)
{
Prediction = prediction;
}
}
}