gitea/modules/operations/progress.go
logikonline 74c6389454
Some checks failed
Build and Release / Build Binaries (amd64, darwin) (push) Blocked by required conditions
Build and Release / Build Binaries (amd64, linux) (push) Blocked by required conditions
Build and Release / Build Binaries (amd64, windows) (push) Blocked by required conditions
Build and Release / Build Binaries (arm64, darwin) (push) Blocked by required conditions
Build and Release / Build Binaries (arm64, linux) (push) Blocked by required conditions
Build and Release / Build Docker Image (push) Blocked by required conditions
Build and Release / Create Release (push) Blocked by required conditions
Build and Release / Lint and Test (push) Has been cancelled
style: fix gofmt formatting across codebase
🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-09 15:51:36 -05:00

480 lines
12 KiB
Go

// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
// Package operations provides tracking for long-running operations
// with real-time progress updates via Server-Sent Events (SSE).
package operations
import (
"context"
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
"code.gitea.io/gitea/modules/log"
)
// OperationType represents the type of long-running operation
type OperationType string
const (
OpChunkedUpload OperationType = "chunked_upload"
OpRepoMirrorSync OperationType = "mirror_sync"
OpRepoPush OperationType = "repo_push"
OpRepoClone OperationType = "repo_clone"
OpWikiGenerate OperationType = "wiki_generate"
OpBatchOperation OperationType = "batch_operation"
)
// OperationStatus represents the current status of an operation
type OperationStatus string
const (
StatusPending OperationStatus = "pending"
StatusRunning OperationStatus = "running"
StatusComplete OperationStatus = "complete"
StatusFailed OperationStatus = "failed"
StatusCancelled OperationStatus = "cancelled"
)
// Phase represents a phase within an operation
type Phase struct {
Name string `json:"name"`
Status OperationStatus `json:"status"`
Progress int `json:"progress"` // 0-100
Message string `json:"message,omitempty"`
}
// ProgressUpdate represents a progress update event
type ProgressUpdate struct {
OperationID string `json:"operation_id"`
Type OperationType `json:"type"`
Status OperationStatus `json:"status"`
CurrentPhase string `json:"current_phase,omitempty"`
Phases []Phase `json:"phases,omitempty"`
Progress int `json:"progress"` // Overall progress 0-100
BytesTotal int64 `json:"bytes_total,omitempty"`
BytesDone int64 `json:"bytes_done,omitempty"`
ItemsTotal int `json:"items_total,omitempty"`
ItemsDone int `json:"items_done,omitempty"`
Message string `json:"message,omitempty"`
Error string `json:"error,omitempty"`
StartedAt time.Time `json:"started_at"`
UpdatedAt time.Time `json:"updated_at"`
EstimatedETA *time.Time `json:"estimated_eta,omitempty"`
SpeedBPS int64 `json:"speed_bps,omitempty"` // bytes per second
Metadata map[string]any `json:"metadata,omitempty"`
}
// Operation tracks a long-running operation
type Operation struct {
mu sync.RWMutex
id string
opType OperationType
userID int64
status OperationStatus
phases []Phase
currentPhase int
progress int
bytesTotal int64
bytesDone int64
itemsTotal int
itemsDone int
message string
errorMsg string
startedAt time.Time
updatedAt time.Time
metadata map[string]any
subscribers map[chan ProgressUpdate]struct{}
ctx context.Context
cancel context.CancelFunc
}
// Manager handles operation tracking
type Manager struct {
mu sync.RWMutex
operations map[string]*Operation
ttl time.Duration
}
var (
defaultManager *Manager
once sync.Once
)
// GetManager returns the global operation manager
func GetManager() *Manager {
once.Do(func() {
defaultManager = &Manager{
operations: make(map[string]*Operation),
ttl: 30 * time.Minute,
}
go defaultManager.cleanup()
})
return defaultManager
}
// cleanup periodically removes completed operations
func (m *Manager) cleanup() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for range ticker.C {
m.mu.Lock()
now := time.Now()
for id, op := range m.operations {
op.mu.RLock()
isComplete := op.status == StatusComplete || op.status == StatusFailed || op.status == StatusCancelled
age := now.Sub(op.updatedAt)
op.mu.RUnlock()
if isComplete && age > m.ttl {
delete(m.operations, id)
log.Debug("Cleaned up operation %s", id)
}
}
m.mu.Unlock()
}
}
// StartOperation creates and registers a new operation
func (m *Manager) StartOperation(ctx context.Context, id string, opType OperationType, userID int64, phases []string) *Operation {
opCtx, cancel := context.WithCancel(ctx)
phaseList := make([]Phase, len(phases))
for i, name := range phases {
status := StatusPending
if i == 0 {
status = StatusRunning
}
phaseList[i] = Phase{
Name: name,
Status: status,
}
}
op := &Operation{
id: id,
opType: opType,
userID: userID,
status: StatusRunning,
phases: phaseList,
currentPhase: 0,
startedAt: time.Now(),
updatedAt: time.Now(),
metadata: make(map[string]any),
subscribers: make(map[chan ProgressUpdate]struct{}),
ctx: opCtx,
cancel: cancel,
}
m.mu.Lock()
m.operations[id] = op
m.mu.Unlock()
op.broadcast()
return op
}
// GetOperation retrieves an operation by ID
func (m *Manager) GetOperation(id string) (*Operation, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
op, ok := m.operations[id]
return op, ok
}
// Subscribe to operation progress updates
func (op *Operation) Subscribe() <-chan ProgressUpdate {
ch := make(chan ProgressUpdate, 10)
op.mu.Lock()
op.subscribers[ch] = struct{}{}
op.mu.Unlock()
// Send current state immediately
go func() {
select {
case ch <- op.GetProgress():
case <-op.ctx.Done():
}
}()
return ch
}
// Unsubscribe from operation updates
func (op *Operation) Unsubscribe(ch <-chan ProgressUpdate) {
op.mu.Lock()
defer op.mu.Unlock()
for sub := range op.subscribers {
if sub == ch {
delete(op.subscribers, sub)
close(sub)
break
}
}
}
// broadcast sends update to all subscribers
func (op *Operation) broadcast() {
op.mu.RLock()
update := op.buildUpdate()
subscribers := make([]chan ProgressUpdate, 0, len(op.subscribers))
for ch := range op.subscribers {
subscribers = append(subscribers, ch)
}
op.mu.RUnlock()
for _, ch := range subscribers {
select {
case ch <- update:
default:
// Skip if channel is full
}
}
}
func (op *Operation) buildUpdate() ProgressUpdate {
update := ProgressUpdate{
OperationID: op.id,
Type: op.opType,
Status: op.status,
Phases: op.phases,
Progress: op.progress,
BytesTotal: op.bytesTotal,
BytesDone: op.bytesDone,
ItemsTotal: op.itemsTotal,
ItemsDone: op.itemsDone,
Message: op.message,
Error: op.errorMsg,
StartedAt: op.startedAt,
UpdatedAt: op.updatedAt,
Metadata: op.metadata,
}
if op.currentPhase < len(op.phases) {
update.CurrentPhase = op.phases[op.currentPhase].Name
}
// Calculate speed and ETA
elapsed := time.Since(op.startedAt).Seconds()
if elapsed > 0 && op.bytesDone > 0 {
update.SpeedBPS = int64(float64(op.bytesDone) / elapsed)
if op.bytesTotal > 0 && update.SpeedBPS > 0 {
remaining := op.bytesTotal - op.bytesDone
etaSeconds := float64(remaining) / float64(update.SpeedBPS)
eta := time.Now().Add(time.Duration(etaSeconds) * time.Second)
update.EstimatedETA = &eta
}
}
return update
}
// GetProgress returns current progress state
func (op *Operation) GetProgress() ProgressUpdate {
op.mu.RLock()
defer op.mu.RUnlock()
return op.buildUpdate()
}
// UpdateProgress updates the operation progress
func (op *Operation) UpdateProgress(progress int, message string) {
op.mu.Lock()
op.progress = progress
op.message = message
op.updatedAt = time.Now()
if op.currentPhase < len(op.phases) {
op.phases[op.currentPhase].Progress = progress
op.phases[op.currentPhase].Message = message
}
op.mu.Unlock()
op.broadcast()
}
// UpdateBytes updates byte progress
func (op *Operation) UpdateBytes(done, total int64) {
op.mu.Lock()
op.bytesDone = done
op.bytesTotal = total
if total > 0 {
op.progress = int(float64(done) / float64(total) * 100)
}
op.updatedAt = time.Now()
op.mu.Unlock()
op.broadcast()
}
// UpdateItems updates item progress
func (op *Operation) UpdateItems(done, total int) {
op.mu.Lock()
op.itemsDone = done
op.itemsTotal = total
if total > 0 {
op.progress = done * 100 / total
}
op.updatedAt = time.Now()
op.mu.Unlock()
op.broadcast()
}
// NextPhase moves to the next phase
func (op *Operation) NextPhase() {
op.mu.Lock()
if op.currentPhase < len(op.phases) {
op.phases[op.currentPhase].Status = StatusComplete
op.phases[op.currentPhase].Progress = 100
}
op.currentPhase++
if op.currentPhase < len(op.phases) {
op.phases[op.currentPhase].Status = StatusRunning
}
op.updatedAt = time.Now()
op.mu.Unlock()
op.broadcast()
}
// SetMetadata sets operation metadata
func (op *Operation) SetMetadata(key string, value any) {
op.mu.Lock()
op.metadata[key] = value
op.mu.Unlock()
}
// Complete marks the operation as complete
func (op *Operation) Complete(result map[string]any) {
op.mu.Lock()
op.status = StatusComplete
op.progress = 100
for i := range op.phases {
op.phases[i].Status = StatusComplete
op.phases[i].Progress = 100
}
if result != nil {
for k, v := range result {
op.metadata[k] = v
}
}
op.updatedAt = time.Now()
op.mu.Unlock()
op.broadcast()
op.closeSubscribers()
}
// Fail marks the operation as failed
func (op *Operation) Fail(err error) {
op.mu.Lock()
op.status = StatusFailed
op.errorMsg = err.Error()
if op.currentPhase < len(op.phases) {
op.phases[op.currentPhase].Status = StatusFailed
}
op.updatedAt = time.Now()
op.mu.Unlock()
op.broadcast()
op.closeSubscribers()
}
// Cancel cancels the operation
func (op *Operation) Cancel() {
op.mu.Lock()
op.status = StatusCancelled
op.updatedAt = time.Now()
op.mu.Unlock()
op.cancel()
op.broadcast()
op.closeSubscribers()
}
func (op *Operation) closeSubscribers() {
op.mu.Lock()
for ch := range op.subscribers {
close(ch)
}
op.subscribers = make(map[chan ProgressUpdate]struct{})
op.mu.Unlock()
}
// Context returns the operation's context
func (op *Operation) Context() context.Context {
return op.ctx
}
// ID returns the operation ID
func (op *Operation) ID() string {
return op.id
}
// ServeSSE serves Server-Sent Events for operation progress
func ServeSSE(w http.ResponseWriter, r *http.Request, opID string) {
manager := GetManager()
op, ok := manager.GetOperation(opID)
if !ok {
http.Error(w, "Operation not found", http.StatusNotFound)
return
}
// Set SSE headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no") // Disable nginx buffering
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming not supported", http.StatusInternalServerError)
return
}
// Subscribe to updates
updates := op.Subscribe()
defer op.Unsubscribe(updates)
// Send initial connection event
fmt.Fprintf(w, "event: connected\ndata: {\"operation_id\":\"%s\"}\n\n", opID)
flusher.Flush()
for {
select {
case update, ok := <-updates:
if !ok {
// Channel closed, operation complete
fmt.Fprintf(w, "event: close\ndata: {}\n\n")
flusher.Flush()
return
}
data, err := json.Marshal(update)
if err != nil {
log.Error("Failed to marshal progress update: %v", err)
continue
}
eventType := "progress"
switch update.Status {
case StatusComplete:
eventType = "complete"
case StatusFailed:
eventType = "error"
case StatusCancelled:
eventType = "cancelled"
}
fmt.Fprintf(w, "event: %s\ndata: %s\n\n", eventType, data)
flusher.Flush()
// Close connection after terminal events
if update.Status == StatusComplete || update.Status == StatusFailed || update.Status == StatusCancelled {
return
}
case <-r.Context().Done():
return
}
}
}