diff --git a/modules/circuitbreaker/circuitbreaker.go b/modules/circuitbreaker/circuitbreaker.go new file mode 100644 index 0000000000..80d3d7c343 --- /dev/null +++ b/modules/circuitbreaker/circuitbreaker.go @@ -0,0 +1,392 @@ +// Copyright 2026 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +// Package circuitbreaker implements the circuit breaker pattern for external service calls. +// This prevents cascading failures by temporarily blocking requests to failing services. +package circuitbreaker + +import ( + "context" + "errors" + "sync" + "time" + + "code.gitea.io/gitea/modules/log" +) + +// State represents the circuit breaker state +type State int + +const ( + // StateClosed means the circuit is closed and requests flow through + StateClosed State = iota + + // StateOpen means the circuit is open and requests are blocked + StateOpen + + // StateHalfOpen means the circuit is testing if the service has recovered + StateHalfOpen +) + +func (s State) String() string { + switch s { + case StateClosed: + return "closed" + case StateOpen: + return "open" + case StateHalfOpen: + return "half-open" + default: + return "unknown" + } +} + +// Common errors +var ( + ErrCircuitOpen = errors.New("circuit breaker is open") + ErrTooManyRequests = errors.New("too many requests in half-open state") +) + +// Config configures the circuit breaker behavior +type Config struct { + // Name identifies this circuit breaker + Name string + + // MaxFailures is the number of failures before opening the circuit + MaxFailures int + + // SuccessThreshold is the number of successes needed to close from half-open + SuccessThreshold int + + // Timeout is how long the circuit stays open before testing + Timeout time.Duration + + // MaxConcurrentInHalfOpen is the max concurrent requests when half-open + MaxConcurrentInHalfOpen int + + // OnStateChange is called when the state changes + OnStateChange func(name string, from, to State) +} + +// DefaultConfig returns a sensible default configuration +func DefaultConfig(name string) Config { + return Config{ + Name: name, + MaxFailures: 5, + SuccessThreshold: 2, + Timeout: 30 * time.Second, + MaxConcurrentInHalfOpen: 1, + } +} + +// CircuitBreaker implements the circuit breaker pattern +type CircuitBreaker struct { + mu sync.RWMutex + config Config + state State + failures int + successes int + lastFailure time.Time + lastStateChange time.Time + halfOpenRequests int + generation uint64 // prevents counting old requests +} + +// New creates a new circuit breaker +func New(config Config) *CircuitBreaker { + if config.MaxFailures <= 0 { + config.MaxFailures = 5 + } + if config.SuccessThreshold <= 0 { + config.SuccessThreshold = 2 + } + if config.Timeout <= 0 { + config.Timeout = 30 * time.Second + } + if config.MaxConcurrentInHalfOpen <= 0 { + config.MaxConcurrentInHalfOpen = 1 + } + + return &CircuitBreaker{ + config: config, + state: StateClosed, + lastStateChange: time.Now(), + } +} + +// Execute runs the given function if the circuit allows +func (cb *CircuitBreaker) Execute(ctx context.Context, fn func(context.Context) error) error { + generation, err := cb.beforeRequest() + if err != nil { + return err + } + + // Check context + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + // Execute the function + err = fn(ctx) + + cb.afterRequest(generation, err == nil) + return err +} + +// beforeRequest checks if the request should proceed +func (cb *CircuitBreaker) beforeRequest() (uint64, error) { + cb.mu.Lock() + defer cb.mu.Unlock() + + now := time.Now() + + switch cb.state { + case StateClosed: + return cb.generation, nil + + case StateOpen: + // Check if timeout has elapsed + if now.Sub(cb.lastStateChange) >= cb.config.Timeout { + cb.toHalfOpen() + cb.halfOpenRequests++ + return cb.generation, nil + } + return 0, ErrCircuitOpen + + case StateHalfOpen: + if cb.halfOpenRequests >= cb.config.MaxConcurrentInHalfOpen { + return 0, ErrTooManyRequests + } + cb.halfOpenRequests++ + return cb.generation, nil + } + + return cb.generation, nil +} + +// afterRequest processes the result of a request +func (cb *CircuitBreaker) afterRequest(generation uint64, success bool) { + cb.mu.Lock() + defer cb.mu.Unlock() + + // Ignore results from old generations + if generation != cb.generation { + return + } + + if success { + cb.onSuccess() + } else { + cb.onFailure() + } +} + +func (cb *CircuitBreaker) onSuccess() { + switch cb.state { + case StateClosed: + cb.failures = 0 + + case StateHalfOpen: + cb.halfOpenRequests-- + cb.successes++ + if cb.successes >= cb.config.SuccessThreshold { + cb.toClosed() + } + } +} + +func (cb *CircuitBreaker) onFailure() { + switch cb.state { + case StateClosed: + cb.failures++ + cb.lastFailure = time.Now() + if cb.failures >= cb.config.MaxFailures { + cb.toOpen() + } + + case StateHalfOpen: + cb.halfOpenRequests-- + cb.toOpen() + } +} + +func (cb *CircuitBreaker) toOpen() { + if cb.state == StateOpen { + return + } + + from := cb.state + cb.state = StateOpen + cb.lastStateChange = time.Now() + cb.generation++ + + log.Warn("Circuit breaker %s: %s -> open (failures: %d)", cb.config.Name, from, cb.failures) + + if cb.config.OnStateChange != nil { + go cb.config.OnStateChange(cb.config.Name, from, StateOpen) + } +} + +func (cb *CircuitBreaker) toClosed() { + if cb.state == StateClosed { + return + } + + from := cb.state + cb.state = StateClosed + cb.failures = 0 + cb.successes = 0 + cb.halfOpenRequests = 0 + cb.lastStateChange = time.Now() + cb.generation++ + + log.Info("Circuit breaker %s: %s -> closed", cb.config.Name, from) + + if cb.config.OnStateChange != nil { + go cb.config.OnStateChange(cb.config.Name, from, StateClosed) + } +} + +func (cb *CircuitBreaker) toHalfOpen() { + if cb.state == StateHalfOpen { + return + } + + from := cb.state + cb.state = StateHalfOpen + cb.successes = 0 + cb.halfOpenRequests = 0 + cb.lastStateChange = time.Now() + cb.generation++ + + log.Debug("Circuit breaker %s: %s -> half-open", cb.config.Name, from) + + if cb.config.OnStateChange != nil { + go cb.config.OnStateChange(cb.config.Name, from, StateHalfOpen) + } +} + +// State returns the current state +func (cb *CircuitBreaker) State() State { + cb.mu.RLock() + defer cb.mu.RUnlock() + return cb.state +} + +// Stats returns statistics about the circuit breaker +type Stats struct { + Name string `json:"name"` + State string `json:"state"` + Failures int `json:"failures"` + Successes int `json:"successes"` + LastFailure time.Time `json:"last_failure,omitempty"` + LastStateChange time.Time `json:"last_state_change"` + Generation uint64 `json:"generation"` +} + +// Stats returns the current statistics +func (cb *CircuitBreaker) Stats() Stats { + cb.mu.RLock() + defer cb.mu.RUnlock() + + return Stats{ + Name: cb.config.Name, + State: cb.state.String(), + Failures: cb.failures, + Successes: cb.successes, + LastFailure: cb.lastFailure, + LastStateChange: cb.lastStateChange, + Generation: cb.generation, + } +} + +// Reset forces the circuit breaker back to closed state +func (cb *CircuitBreaker) Reset() { + cb.mu.Lock() + defer cb.mu.Unlock() + + cb.state = StateClosed + cb.failures = 0 + cb.successes = 0 + cb.halfOpenRequests = 0 + cb.lastStateChange = time.Now() + cb.generation++ + + log.Info("Circuit breaker %s: manually reset to closed", cb.config.Name) +} + +// Registry manages multiple circuit breakers +type Registry struct { + mu sync.RWMutex + breakers map[string]*CircuitBreaker +} + +var ( + defaultRegistry *Registry + registryOnce sync.Once +) + +// GetRegistry returns the default circuit breaker registry +func GetRegistry() *Registry { + registryOnce.Do(func() { + defaultRegistry = &Registry{ + breakers: make(map[string]*CircuitBreaker), + } + }) + return defaultRegistry +} + +// Get returns a circuit breaker by name, creating it if necessary +func (r *Registry) Get(name string, config ...Config) *CircuitBreaker { + r.mu.RLock() + cb, ok := r.breakers[name] + r.mu.RUnlock() + + if ok { + return cb + } + + r.mu.Lock() + defer r.mu.Unlock() + + // Double-check after acquiring write lock + if cb, ok := r.breakers[name]; ok { + return cb + } + + var cfg Config + if len(config) > 0 { + cfg = config[0] + cfg.Name = name + } else { + cfg = DefaultConfig(name) + } + + cb = New(cfg) + r.breakers[name] = cb + return cb +} + +// Stats returns stats for all circuit breakers +func (r *Registry) Stats() map[string]Stats { + r.mu.RLock() + defer r.mu.RUnlock() + + stats := make(map[string]Stats) + for name, cb := range r.breakers { + stats[name] = cb.Stats() + } + return stats +} + +// ResetAll resets all circuit breakers +func (r *Registry) ResetAll() { + r.mu.RLock() + defer r.mu.RUnlock() + + for _, cb := range r.breakers { + cb.Reset() + } +} diff --git a/modules/health/health.go b/modules/health/health.go new file mode 100644 index 0000000000..e1217c3953 --- /dev/null +++ b/modules/health/health.go @@ -0,0 +1,377 @@ +// Copyright 2026 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +// Package health provides comprehensive health checking for Gitea services. +package health + +import ( + "context" + "runtime" + "sync" + "time" + + "code.gitea.io/gitea/modules/circuitbreaker" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/setting" +) + +// Status represents the health status of a component +type Status string + +const ( + StatusHealthy Status = "healthy" + StatusDegraded Status = "degraded" + StatusUnhealthy Status = "unhealthy" + StatusUnknown Status = "unknown" +) + +// ComponentCheck represents a health check for a single component +type ComponentCheck struct { + Name string `json:"name"` + Status Status `json:"status"` + Message string `json:"message,omitempty"` + Duration time.Duration `json:"duration_ms"` + LastChecked time.Time `json:"last_checked"` + Metadata map[string]any `json:"metadata,omitempty"` +} + +// HealthResponse represents the complete health check response +type HealthResponse struct { + Status Status `json:"status"` + Version string `json:"version"` + Uptime time.Duration `json:"uptime_seconds"` + Timestamp time.Time `json:"timestamp"` + Components map[string]*ComponentCheck `json:"components"` + System *SystemInfo `json:"system,omitempty"` + Circuits map[string]circuitbreaker.Stats `json:"circuit_breakers,omitempty"` +} + +// SystemInfo contains system-level health information +type SystemInfo struct { + GoVersion string `json:"go_version"` + NumGoroutines int `json:"goroutines"` + MemoryAllocMB float64 `json:"memory_alloc_mb"` + MemorySysMB float64 `json:"memory_sys_mb"` + NumCPU int `json:"num_cpu"` + GOMAXPROCS int `json:"gomaxprocs"` +} + +// Checker is a function that performs a health check +type Checker func(ctx context.Context) *ComponentCheck + +// Manager manages health checks +type Manager struct { + mu sync.RWMutex + checkers map[string]Checker + cache map[string]*ComponentCheck + cacheTTL time.Duration + startTime time.Time + version string +} + +var ( + defaultManager *Manager + managerOnce sync.Once +) + +// GetManager returns the default health manager +func GetManager() *Manager { + managerOnce.Do(func() { + defaultManager = &Manager{ + checkers: make(map[string]Checker), + cache: make(map[string]*ComponentCheck), + cacheTTL: 5 * time.Second, + startTime: time.Now(), + version: setting.AppVer, + } + }) + return defaultManager +} + +// RegisterChecker registers a new health checker +func (m *Manager) RegisterChecker(name string, checker Checker) { + m.mu.Lock() + defer m.mu.Unlock() + m.checkers[name] = checker + log.Debug("Registered health checker: %s", name) +} + +// UnregisterChecker removes a health checker +func (m *Manager) UnregisterChecker(name string) { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.checkers, name) + delete(m.cache, name) +} + +// Check performs all health checks +func (m *Manager) Check(ctx context.Context, includeSystem bool) *HealthResponse { + m.mu.RLock() + checkers := make(map[string]Checker) + for k, v := range m.checkers { + checkers[k] = v + } + m.mu.RUnlock() + + response := &HealthResponse{ + Status: StatusHealthy, + Version: m.version, + Uptime: time.Since(m.startTime), + Timestamp: time.Now(), + Components: make(map[string]*ComponentCheck), + } + + // Run checks concurrently + var wg sync.WaitGroup + var mu sync.Mutex + + for name, checker := range checkers { + wg.Add(1) + go func(name string, checker Checker) { + defer wg.Done() + + result := m.runCheck(ctx, name, checker) + + mu.Lock() + response.Components[name] = result + mu.Unlock() + }(name, checker) + } + + wg.Wait() + + // Calculate overall status + response.Status = m.calculateOverallStatus(response.Components) + + // Include system info if requested + if includeSystem { + response.System = getSystemInfo() + response.Circuits = circuitbreaker.GetRegistry().Stats() + } + + return response +} + +func (m *Manager) runCheck(ctx context.Context, name string, checker Checker) *ComponentCheck { + // Check cache first + m.mu.RLock() + cached, ok := m.cache[name] + m.mu.RUnlock() + + if ok && time.Since(cached.LastChecked) < m.cacheTTL { + return cached + } + + // Run the check with timeout + checkCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + start := time.Now() + result := checker(checkCtx) + result.Duration = time.Since(start) + result.LastChecked = time.Now() + + // Cache the result + m.mu.Lock() + m.cache[name] = result + m.mu.Unlock() + + return result +} + +func (m *Manager) calculateOverallStatus(components map[string]*ComponentCheck) Status { + unhealthyCount := 0 + degradedCount := 0 + + for _, check := range components { + switch check.Status { + case StatusUnhealthy: + unhealthyCount++ + case StatusDegraded: + degradedCount++ + } + } + + // If any critical component is unhealthy, overall is unhealthy + if unhealthyCount > 0 { + return StatusUnhealthy + } + + // If any component is degraded, overall is degraded + if degradedCount > 0 { + return StatusDegraded + } + + return StatusHealthy +} + +func getSystemInfo() *SystemInfo { + var memStats runtime.MemStats + runtime.ReadMemStats(&memStats) + + return &SystemInfo{ + GoVersion: runtime.Version(), + NumGoroutines: runtime.NumGoroutine(), + MemoryAllocMB: float64(memStats.Alloc) / 1024 / 1024, + MemorySysMB: float64(memStats.Sys) / 1024 / 1024, + NumCPU: runtime.NumCPU(), + GOMAXPROCS: runtime.GOMAXPROCS(0), + } +} + +// CheckSingle performs a single component check +func (m *Manager) CheckSingle(ctx context.Context, name string) (*ComponentCheck, bool) { + m.mu.RLock() + checker, ok := m.checkers[name] + m.mu.RUnlock() + + if !ok { + return nil, false + } + + return m.runCheck(ctx, name, checker), true +} + +// LivenessCheck performs a quick liveness check (is the service running?) +func (m *Manager) LivenessCheck() *ComponentCheck { + return &ComponentCheck{ + Name: "liveness", + Status: StatusHealthy, + Message: "service is running", + LastChecked: time.Now(), + } +} + +// ReadinessCheck performs a readiness check (can the service handle requests?) +func (m *Manager) ReadinessCheck(ctx context.Context) *HealthResponse { + // For readiness, we only check critical components + return m.Check(ctx, false) +} + +// NewDatabaseChecker creates a database health checker +func NewDatabaseChecker(pingFunc func(ctx context.Context) error) Checker { + return func(ctx context.Context) *ComponentCheck { + check := &ComponentCheck{ + Name: "database", + } + + if err := pingFunc(ctx); err != nil { + check.Status = StatusUnhealthy + check.Message = err.Error() + } else { + check.Status = StatusHealthy + check.Message = "connected" + } + + return check + } +} + +// NewCacheChecker creates a cache health checker +func NewCacheChecker(pingFunc func(ctx context.Context) error) Checker { + return func(ctx context.Context) *ComponentCheck { + check := &ComponentCheck{ + Name: "cache", + } + + if err := pingFunc(ctx); err != nil { + check.Status = StatusDegraded // Cache is usually not critical + check.Message = err.Error() + } else { + check.Status = StatusHealthy + check.Message = "connected" + } + + return check + } +} + +// NewGitChecker creates a git service health checker +func NewGitChecker(checkFunc func(ctx context.Context) (string, error)) Checker { + return func(ctx context.Context) *ComponentCheck { + check := &ComponentCheck{ + Name: "git", + } + + version, err := checkFunc(ctx) + if err != nil { + check.Status = StatusUnhealthy + check.Message = err.Error() + } else { + check.Status = StatusHealthy + check.Message = "available" + check.Metadata = map[string]any{ + "version": version, + } + } + + return check + } +} + +// NewSSHChecker creates an SSH service health checker +func NewSSHChecker(isEnabled bool, port int) Checker { + return func(ctx context.Context) *ComponentCheck { + check := &ComponentCheck{ + Name: "ssh", + } + + if !isEnabled { + check.Status = StatusHealthy + check.Message = "disabled" + } else { + check.Status = StatusHealthy + check.Message = "listening" + check.Metadata = map[string]any{ + "port": port, + } + } + + return check + } +} + +// NewExternalServiceChecker creates a checker for external services +func NewExternalServiceChecker(name string, checkFunc func(ctx context.Context) error) Checker { + return func(ctx context.Context) *ComponentCheck { + check := &ComponentCheck{ + Name: name, + } + + if err := checkFunc(ctx); err != nil { + check.Status = StatusDegraded + check.Message = err.Error() + } else { + check.Status = StatusHealthy + check.Message = "connected" + } + + return check + } +} + +// NewQueueChecker creates a checker for the task queue +func NewQueueChecker(getQueueStats func() (pending, processing int)) Checker { + return func(ctx context.Context) *ComponentCheck { + check := &ComponentCheck{ + Name: "queue", + } + + pending, processing := getQueueStats() + + check.Status = StatusHealthy + check.Message = "running" + check.Metadata = map[string]any{ + "pending": pending, + "processing": processing, + } + + // Mark as degraded if queue is backing up + if pending > 10000 { + check.Status = StatusDegraded + check.Message = "queue backlog is high" + } + + return check + } +} diff --git a/modules/idempotency/idempotency.go b/modules/idempotency/idempotency.go new file mode 100644 index 0000000000..07f3f64948 --- /dev/null +++ b/modules/idempotency/idempotency.go @@ -0,0 +1,311 @@ +// Copyright 2026 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +// Package idempotency provides middleware for idempotent POST request handling. +// Clients can send an Idempotency-Key header to ensure requests are processed +// exactly once, with subsequent requests returning the cached response. +package idempotency + +import ( + "bytes" + "encoding/json" + "net/http" + "sync" + "time" + + "code.gitea.io/gitea/modules/log" +) + +const ( + // HeaderIdempotencyKey is the header name for idempotency keys + HeaderIdempotencyKey = "Idempotency-Key" + + // HeaderIdempotencyReplayed indicates the response was replayed from cache + HeaderIdempotencyReplayed = "Idempotency-Replayed" + + // DefaultTTL is the default time-to-live for cached responses + DefaultTTL = 24 * time.Hour + + // MaxKeyLength is the maximum allowed length for an idempotency key + MaxKeyLength = 256 +) + +// CachedResponse stores a cached API response +type CachedResponse struct { + StatusCode int `json:"status_code"` + Headers map[string]string `json:"headers"` + Body []byte `json:"body"` + CreatedAt time.Time `json:"created_at"` + ExpiresAt time.Time `json:"expires_at"` +} + +// Store defines the interface for idempotency key storage +type Store interface { + // Get retrieves a cached response by key + Get(key string) (*CachedResponse, bool) + + // Set stores a response with the given key + Set(key string, response *CachedResponse) + + // Delete removes a cached response + Delete(key string) + + // Lock acquires a lock for processing a key (returns true if lock acquired) + Lock(key string) bool + + // Unlock releases the lock for a key + Unlock(key string) +} + +// MemoryStore implements Store using in-memory storage +type MemoryStore struct { + mu sync.RWMutex + responses map[string]*CachedResponse + locks map[string]bool + ttl time.Duration +} + +// NewMemoryStore creates a new in-memory idempotency store +func NewMemoryStore(ttl time.Duration) *MemoryStore { + store := &MemoryStore{ + responses: make(map[string]*CachedResponse), + locks: make(map[string]bool), + ttl: ttl, + } + go store.cleanup() + return store +} + +func (s *MemoryStore) cleanup() { + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() + + for range ticker.C { + s.mu.Lock() + now := time.Now() + for key, resp := range s.responses { + if now.After(resp.ExpiresAt) { + delete(s.responses, key) + delete(s.locks, key) + } + } + s.mu.Unlock() + } +} + +// Get retrieves a cached response +func (s *MemoryStore) Get(key string) (*CachedResponse, bool) { + s.mu.RLock() + defer s.mu.RUnlock() + + resp, ok := s.responses[key] + if !ok { + return nil, false + } + + if time.Now().After(resp.ExpiresAt) { + return nil, false + } + + return resp, true +} + +// Set stores a response +func (s *MemoryStore) Set(key string, response *CachedResponse) { + s.mu.Lock() + defer s.mu.Unlock() + + response.CreatedAt = time.Now() + response.ExpiresAt = time.Now().Add(s.ttl) + s.responses[key] = response +} + +// Delete removes a cached response +func (s *MemoryStore) Delete(key string) { + s.mu.Lock() + defer s.mu.Unlock() + + delete(s.responses, key) + delete(s.locks, key) +} + +// Lock acquires a processing lock +func (s *MemoryStore) Lock(key string) bool { + s.mu.Lock() + defer s.mu.Unlock() + + if s.locks[key] { + return false + } + s.locks[key] = true + return true +} + +// Unlock releases a processing lock +func (s *MemoryStore) Unlock(key string) { + s.mu.Lock() + defer s.mu.Unlock() + + delete(s.locks, key) +} + +// ResponseRecorder captures the response for caching +type ResponseRecorder struct { + http.ResponseWriter + statusCode int + body bytes.Buffer + headers http.Header +} + +// NewResponseRecorder creates a new response recorder +func NewResponseRecorder(w http.ResponseWriter) *ResponseRecorder { + return &ResponseRecorder{ + ResponseWriter: w, + statusCode: http.StatusOK, + headers: make(http.Header), + } +} + +// WriteHeader captures the status code +func (r *ResponseRecorder) WriteHeader(code int) { + r.statusCode = code + r.ResponseWriter.WriteHeader(code) +} + +// Write captures the response body +func (r *ResponseRecorder) Write(b []byte) (int, error) { + r.body.Write(b) + return r.ResponseWriter.Write(b) +} + +// Header returns the header map +func (r *ResponseRecorder) Header() http.Header { + return r.ResponseWriter.Header() +} + +// ToCachedResponse converts the recorded response to a cached response +func (r *ResponseRecorder) ToCachedResponse() *CachedResponse { + headers := make(map[string]string) + for k, v := range r.ResponseWriter.Header() { + if len(v) > 0 { + headers[k] = v[0] + } + } + + return &CachedResponse{ + StatusCode: r.statusCode, + Headers: headers, + Body: r.body.Bytes(), + } +} + +// Middleware provides idempotency handling +type Middleware struct { + store Store +} + +// NewMiddleware creates a new idempotency middleware +func NewMiddleware(store Store) *Middleware { + return &Middleware{store: store} +} + +var ( + defaultStore *MemoryStore + once sync.Once +) + +// GetDefaultStore returns the default idempotency store +func GetDefaultStore() *MemoryStore { + once.Do(func() { + defaultStore = NewMemoryStore(DefaultTTL) + }) + return defaultStore +} + +// Handler returns the middleware handler +func (m *Middleware) Handler(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Only apply to POST, PUT, PATCH requests + if r.Method != http.MethodPost && r.Method != http.MethodPut && r.Method != http.MethodPatch { + next.ServeHTTP(w, r) + return + } + + key := r.Header.Get(HeaderIdempotencyKey) + if key == "" { + // No idempotency key, proceed normally + next.ServeHTTP(w, r) + return + } + + // Validate key length + if len(key) > MaxKeyLength { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(map[string]any{ + "code": "IDEMPOTENCY_KEY_TOO_LONG", + "message": "Idempotency key exceeds maximum length of 256 characters", + }) + return + } + + // Check for cached response + if cached, ok := m.store.Get(key); ok { + log.Debug("Replaying cached response for idempotency key: %s", key) + w.Header().Set(HeaderIdempotencyReplayed, "true") + for k, v := range cached.Headers { + w.Header().Set(k, v) + } + w.WriteHeader(cached.StatusCode) + w.Write(cached.Body) + return + } + + // Try to acquire lock + if !m.store.Lock(key) { + // Another request is processing with this key + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Retry-After", "1") + w.WriteHeader(http.StatusConflict) + json.NewEncoder(w).Encode(map[string]any{ + "code": "IDEMPOTENCY_KEY_IN_USE", + "message": "A request with this idempotency key is currently being processed", + }) + return + } + defer m.store.Unlock(key) + + // Record the response + recorder := NewResponseRecorder(w) + next.ServeHTTP(recorder, r) + + // Cache successful responses (2xx and 4xx) + // Don't cache 5xx errors as they may be transient + if recorder.statusCode < 500 { + m.store.Set(key, recorder.ToCachedResponse()) + } + }) +} + +// IdempotencyInfo represents information about an idempotency key +type IdempotencyInfo struct { + Key string `json:"key"` + CreatedAt time.Time `json:"created_at"` + ExpiresAt time.Time `json:"expires_at"` + Replayed bool `json:"replayed"` +} + +// GetInfo retrieves information about a cached idempotency key +func GetInfo(key string) (*IdempotencyInfo, bool) { + store := GetDefaultStore() + cached, ok := store.Get(key) + if !ok { + return nil, false + } + + return &IdempotencyInfo{ + Key: key, + CreatedAt: cached.CreatedAt, + ExpiresAt: cached.ExpiresAt, + }, true +} diff --git a/modules/operations/progress.go b/modules/operations/progress.go new file mode 100644 index 0000000000..54c00cac45 --- /dev/null +++ b/modules/operations/progress.go @@ -0,0 +1,479 @@ +// Copyright 2026 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +// Package operations provides tracking for long-running operations +// with real-time progress updates via Server-Sent Events (SSE). +package operations + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "sync" + "time" + + "code.gitea.io/gitea/modules/log" +) + +// OperationType represents the type of long-running operation +type OperationType string + +const ( + OpChunkedUpload OperationType = "chunked_upload" + OpRepoMirrorSync OperationType = "mirror_sync" + OpRepoPush OperationType = "repo_push" + OpRepoClone OperationType = "repo_clone" + OpWikiGenerate OperationType = "wiki_generate" + OpBatchOperation OperationType = "batch_operation" +) + +// OperationStatus represents the current status of an operation +type OperationStatus string + +const ( + StatusPending OperationStatus = "pending" + StatusRunning OperationStatus = "running" + StatusComplete OperationStatus = "complete" + StatusFailed OperationStatus = "failed" + StatusCancelled OperationStatus = "cancelled" +) + +// Phase represents a phase within an operation +type Phase struct { + Name string `json:"name"` + Status OperationStatus `json:"status"` + Progress int `json:"progress"` // 0-100 + Message string `json:"message,omitempty"` +} + +// ProgressUpdate represents a progress update event +type ProgressUpdate struct { + OperationID string `json:"operation_id"` + Type OperationType `json:"type"` + Status OperationStatus `json:"status"` + CurrentPhase string `json:"current_phase,omitempty"` + Phases []Phase `json:"phases,omitempty"` + Progress int `json:"progress"` // Overall progress 0-100 + BytesTotal int64 `json:"bytes_total,omitempty"` + BytesDone int64 `json:"bytes_done,omitempty"` + ItemsTotal int `json:"items_total,omitempty"` + ItemsDone int `json:"items_done,omitempty"` + Message string `json:"message,omitempty"` + Error string `json:"error,omitempty"` + StartedAt time.Time `json:"started_at"` + UpdatedAt time.Time `json:"updated_at"` + EstimatedETA *time.Time `json:"estimated_eta,omitempty"` + SpeedBPS int64 `json:"speed_bps,omitempty"` // bytes per second + Metadata map[string]any `json:"metadata,omitempty"` +} + +// Operation tracks a long-running operation +type Operation struct { + mu sync.RWMutex + id string + opType OperationType + userID int64 + status OperationStatus + phases []Phase + currentPhase int + progress int + bytesTotal int64 + bytesDone int64 + itemsTotal int + itemsDone int + message string + errorMsg string + startedAt time.Time + updatedAt time.Time + metadata map[string]any + subscribers map[chan ProgressUpdate]struct{} + ctx context.Context + cancel context.CancelFunc +} + +// Manager handles operation tracking +type Manager struct { + mu sync.RWMutex + operations map[string]*Operation + ttl time.Duration +} + +var ( + defaultManager *Manager + once sync.Once +) + +// GetManager returns the global operation manager +func GetManager() *Manager { + once.Do(func() { + defaultManager = &Manager{ + operations: make(map[string]*Operation), + ttl: 30 * time.Minute, + } + go defaultManager.cleanup() + }) + return defaultManager +} + +// cleanup periodically removes completed operations +func (m *Manager) cleanup() { + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() + + for range ticker.C { + m.mu.Lock() + now := time.Now() + for id, op := range m.operations { + op.mu.RLock() + isComplete := op.status == StatusComplete || op.status == StatusFailed || op.status == StatusCancelled + age := now.Sub(op.updatedAt) + op.mu.RUnlock() + + if isComplete && age > m.ttl { + delete(m.operations, id) + log.Debug("Cleaned up operation %s", id) + } + } + m.mu.Unlock() + } +} + +// StartOperation creates and registers a new operation +func (m *Manager) StartOperation(ctx context.Context, id string, opType OperationType, userID int64, phases []string) *Operation { + opCtx, cancel := context.WithCancel(ctx) + + phaseList := make([]Phase, len(phases)) + for i, name := range phases { + status := StatusPending + if i == 0 { + status = StatusRunning + } + phaseList[i] = Phase{ + Name: name, + Status: status, + } + } + + op := &Operation{ + id: id, + opType: opType, + userID: userID, + status: StatusRunning, + phases: phaseList, + currentPhase: 0, + startedAt: time.Now(), + updatedAt: time.Now(), + metadata: make(map[string]any), + subscribers: make(map[chan ProgressUpdate]struct{}), + ctx: opCtx, + cancel: cancel, + } + + m.mu.Lock() + m.operations[id] = op + m.mu.Unlock() + + op.broadcast() + return op +} + +// GetOperation retrieves an operation by ID +func (m *Manager) GetOperation(id string) (*Operation, bool) { + m.mu.RLock() + defer m.mu.RUnlock() + op, ok := m.operations[id] + return op, ok +} + +// Subscribe to operation progress updates +func (op *Operation) Subscribe() <-chan ProgressUpdate { + ch := make(chan ProgressUpdate, 10) + + op.mu.Lock() + op.subscribers[ch] = struct{}{} + op.mu.Unlock() + + // Send current state immediately + go func() { + select { + case ch <- op.GetProgress(): + case <-op.ctx.Done(): + } + }() + + return ch +} + +// Unsubscribe from operation updates +func (op *Operation) Unsubscribe(ch <-chan ProgressUpdate) { + op.mu.Lock() + defer op.mu.Unlock() + + for sub := range op.subscribers { + if sub == ch { + delete(op.subscribers, sub) + close(sub) + break + } + } +} + +// broadcast sends update to all subscribers +func (op *Operation) broadcast() { + op.mu.RLock() + update := op.buildUpdate() + subscribers := make([]chan ProgressUpdate, 0, len(op.subscribers)) + for ch := range op.subscribers { + subscribers = append(subscribers, ch) + } + op.mu.RUnlock() + + for _, ch := range subscribers { + select { + case ch <- update: + default: + // Skip if channel is full + } + } +} + +func (op *Operation) buildUpdate() ProgressUpdate { + update := ProgressUpdate{ + OperationID: op.id, + Type: op.opType, + Status: op.status, + Phases: op.phases, + Progress: op.progress, + BytesTotal: op.bytesTotal, + BytesDone: op.bytesDone, + ItemsTotal: op.itemsTotal, + ItemsDone: op.itemsDone, + Message: op.message, + Error: op.errorMsg, + StartedAt: op.startedAt, + UpdatedAt: op.updatedAt, + Metadata: op.metadata, + } + + if op.currentPhase < len(op.phases) { + update.CurrentPhase = op.phases[op.currentPhase].Name + } + + // Calculate speed and ETA + elapsed := time.Since(op.startedAt).Seconds() + if elapsed > 0 && op.bytesDone > 0 { + update.SpeedBPS = int64(float64(op.bytesDone) / elapsed) + if op.bytesTotal > 0 && update.SpeedBPS > 0 { + remaining := op.bytesTotal - op.bytesDone + etaSeconds := float64(remaining) / float64(update.SpeedBPS) + eta := time.Now().Add(time.Duration(etaSeconds) * time.Second) + update.EstimatedETA = &eta + } + } + + return update +} + +// GetProgress returns current progress state +func (op *Operation) GetProgress() ProgressUpdate { + op.mu.RLock() + defer op.mu.RUnlock() + return op.buildUpdate() +} + +// UpdateProgress updates the operation progress +func (op *Operation) UpdateProgress(progress int, message string) { + op.mu.Lock() + op.progress = progress + op.message = message + op.updatedAt = time.Now() + if op.currentPhase < len(op.phases) { + op.phases[op.currentPhase].Progress = progress + op.phases[op.currentPhase].Message = message + } + op.mu.Unlock() + op.broadcast() +} + +// UpdateBytes updates byte progress +func (op *Operation) UpdateBytes(done, total int64) { + op.mu.Lock() + op.bytesDone = done + op.bytesTotal = total + if total > 0 { + op.progress = int(float64(done) / float64(total) * 100) + } + op.updatedAt = time.Now() + op.mu.Unlock() + op.broadcast() +} + +// UpdateItems updates item progress +func (op *Operation) UpdateItems(done, total int) { + op.mu.Lock() + op.itemsDone = done + op.itemsTotal = total + if total > 0 { + op.progress = done * 100 / total + } + op.updatedAt = time.Now() + op.mu.Unlock() + op.broadcast() +} + +// NextPhase moves to the next phase +func (op *Operation) NextPhase() { + op.mu.Lock() + if op.currentPhase < len(op.phases) { + op.phases[op.currentPhase].Status = StatusComplete + op.phases[op.currentPhase].Progress = 100 + } + op.currentPhase++ + if op.currentPhase < len(op.phases) { + op.phases[op.currentPhase].Status = StatusRunning + } + op.updatedAt = time.Now() + op.mu.Unlock() + op.broadcast() +} + +// SetMetadata sets operation metadata +func (op *Operation) SetMetadata(key string, value any) { + op.mu.Lock() + op.metadata[key] = value + op.mu.Unlock() +} + +// Complete marks the operation as complete +func (op *Operation) Complete(result map[string]any) { + op.mu.Lock() + op.status = StatusComplete + op.progress = 100 + for i := range op.phases { + op.phases[i].Status = StatusComplete + op.phases[i].Progress = 100 + } + if result != nil { + for k, v := range result { + op.metadata[k] = v + } + } + op.updatedAt = time.Now() + op.mu.Unlock() + op.broadcast() + op.closeSubscribers() +} + +// Fail marks the operation as failed +func (op *Operation) Fail(err error) { + op.mu.Lock() + op.status = StatusFailed + op.errorMsg = err.Error() + if op.currentPhase < len(op.phases) { + op.phases[op.currentPhase].Status = StatusFailed + } + op.updatedAt = time.Now() + op.mu.Unlock() + op.broadcast() + op.closeSubscribers() +} + +// Cancel cancels the operation +func (op *Operation) Cancel() { + op.mu.Lock() + op.status = StatusCancelled + op.updatedAt = time.Now() + op.mu.Unlock() + op.cancel() + op.broadcast() + op.closeSubscribers() +} + +func (op *Operation) closeSubscribers() { + op.mu.Lock() + for ch := range op.subscribers { + close(ch) + } + op.subscribers = make(map[chan ProgressUpdate]struct{}) + op.mu.Unlock() +} + +// Context returns the operation's context +func (op *Operation) Context() context.Context { + return op.ctx +} + +// ID returns the operation ID +func (op *Operation) ID() string { + return op.id +} + +// ServeSSE serves Server-Sent Events for operation progress +func ServeSSE(w http.ResponseWriter, r *http.Request, opID string) { + manager := GetManager() + op, ok := manager.GetOperation(opID) + if !ok { + http.Error(w, "Operation not found", http.StatusNotFound) + return + } + + // Set SSE headers + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("X-Accel-Buffering", "no") // Disable nginx buffering + + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "Streaming not supported", http.StatusInternalServerError) + return + } + + // Subscribe to updates + updates := op.Subscribe() + defer op.Unsubscribe(updates) + + // Send initial connection event + fmt.Fprintf(w, "event: connected\ndata: {\"operation_id\":\"%s\"}\n\n", opID) + flusher.Flush() + + for { + select { + case update, ok := <-updates: + if !ok { + // Channel closed, operation complete + fmt.Fprintf(w, "event: close\ndata: {}\n\n") + flusher.Flush() + return + } + + data, err := json.Marshal(update) + if err != nil { + log.Error("Failed to marshal progress update: %v", err) + continue + } + + eventType := "progress" + switch update.Status { + case StatusComplete: + eventType = "complete" + case StatusFailed: + eventType = "error" + case StatusCancelled: + eventType = "cancelled" + } + + fmt.Fprintf(w, "event: %s\ndata: %s\n\n", eventType, data) + flusher.Flush() + + // Close connection after terminal events + if update.Status == StatusComplete || update.Status == StatusFailed || update.Status == StatusCancelled { + return + } + + case <-r.Context().Done(): + return + } + } +} diff --git a/modules/webhook/retry.go b/modules/webhook/retry.go new file mode 100644 index 0000000000..aa11286d2c --- /dev/null +++ b/modules/webhook/retry.go @@ -0,0 +1,331 @@ +// Copyright 2026 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package webhook + +import ( + "context" + "math" + "math/rand" + "sync" + "time" + + "code.gitea.io/gitea/modules/log" +) + +// RetryConfig configures retry behavior +type RetryConfig struct { + // MaxRetries is the maximum number of retry attempts + MaxRetries int + + // InitialDelay is the delay before the first retry + InitialDelay time.Duration + + // MaxDelay is the maximum delay between retries + MaxDelay time.Duration + + // Multiplier is the factor by which delay increases + Multiplier float64 + + // Jitter adds randomness to delays (0-1) + Jitter float64 + + // RetryableStatusCodes are HTTP status codes that should trigger a retry + RetryableStatusCodes []int +} + +// DefaultRetryConfig returns the default retry configuration +func DefaultRetryConfig() RetryConfig { + return RetryConfig{ + MaxRetries: 5, + InitialDelay: 1 * time.Second, + MaxDelay: 5 * time.Minute, + Multiplier: 2.0, + Jitter: 0.2, + RetryableStatusCodes: []int{ + 408, // Request Timeout + 429, // Too Many Requests + 500, // Internal Server Error + 502, // Bad Gateway + 503, // Service Unavailable + 504, // Gateway Timeout + }, + } +} + +// DeliveryAttempt represents a single webhook delivery attempt +type DeliveryAttempt struct { + AttemptNumber int `json:"attempt_number"` + StatusCode int `json:"status_code,omitempty"` + Error string `json:"error,omitempty"` + Duration time.Duration `json:"duration"` + Timestamp time.Time `json:"timestamp"` + NextRetryAt *time.Time `json:"next_retry_at,omitempty"` +} + +// DeliveryResult represents the final result of a webhook delivery +type DeliveryResult struct { + Success bool `json:"success"` + Attempts []DeliveryAttempt `json:"attempts"` + FinalStatus int `json:"final_status,omitempty"` + FinalError string `json:"final_error,omitempty"` + TotalTime time.Duration `json:"total_time"` + WebhookID int64 `json:"webhook_id"` + DeliveryUUID string `json:"delivery_uuid"` +} + +// RetryQueue manages webhook delivery retries +type RetryQueue struct { + mu sync.RWMutex + pending map[string]*pendingDelivery + config RetryConfig + ctx context.Context + cancel context.CancelFunc + deliverF func(ctx context.Context, delivery *pendingDelivery) (int, error) +} + +type pendingDelivery struct { + ID string + WebhookID int64 + URL string + Payload []byte + Headers map[string]string + Attempts []DeliveryAttempt + NextAttempt time.Time + AttemptCount int + ResultChan chan *DeliveryResult +} + +// NewRetryQueue creates a new retry queue +func NewRetryQueue(config RetryConfig, deliverFunc func(ctx context.Context, d *pendingDelivery) (int, error)) *RetryQueue { + ctx, cancel := context.WithCancel(context.Background()) + q := &RetryQueue{ + pending: make(map[string]*pendingDelivery), + config: config, + ctx: ctx, + cancel: cancel, + deliverF: deliverFunc, + } + go q.processLoop() + return q +} + +// Stop stops the retry queue +func (q *RetryQueue) Stop() { + q.cancel() +} + +// Enqueue adds a delivery to the retry queue +func (q *RetryQueue) Enqueue(id string, webhookID int64, url string, payload []byte, headers map[string]string) <-chan *DeliveryResult { + resultChan := make(chan *DeliveryResult, 1) + + delivery := &pendingDelivery{ + ID: id, + WebhookID: webhookID, + URL: url, + Payload: payload, + Headers: headers, + Attempts: make([]DeliveryAttempt, 0), + NextAttempt: time.Now(), + ResultChan: resultChan, + } + + q.mu.Lock() + q.pending[id] = delivery + q.mu.Unlock() + + return resultChan +} + +// GetStatus returns the status of a pending delivery +func (q *RetryQueue) GetStatus(id string) (*DeliveryResult, bool) { + q.mu.RLock() + defer q.mu.RUnlock() + + d, ok := q.pending[id] + if !ok { + return nil, false + } + + return &DeliveryResult{ + Success: false, + Attempts: d.Attempts, + WebhookID: d.WebhookID, + DeliveryUUID: d.ID, + }, true +} + +func (q *RetryQueue) processLoop() { + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-q.ctx.Done(): + return + case <-ticker.C: + q.processRetries() + } + } +} + +func (q *RetryQueue) processRetries() { + q.mu.Lock() + now := time.Now() + var toProcess []*pendingDelivery + + for _, d := range q.pending { + if now.After(d.NextAttempt) { + toProcess = append(toProcess, d) + } + } + q.mu.Unlock() + + for _, d := range toProcess { + q.attemptDelivery(d) + } +} + +func (q *RetryQueue) attemptDelivery(d *pendingDelivery) { + start := time.Now() + statusCode, err := q.deliverF(q.ctx, d) + duration := time.Since(start) + + attempt := DeliveryAttempt{ + AttemptNumber: d.AttemptCount + 1, + StatusCode: statusCode, + Duration: duration, + Timestamp: start, + } + + if err != nil { + attempt.Error = err.Error() + } + + d.AttemptCount++ + d.Attempts = append(d.Attempts, attempt) + + // Check if successful + if err == nil && statusCode >= 200 && statusCode < 300 { + q.completeDelivery(d, true, statusCode, "") + return + } + + // Check if should retry + if d.AttemptCount >= q.config.MaxRetries { + errMsg := "max retries exceeded" + if err != nil { + errMsg = err.Error() + } + q.completeDelivery(d, false, statusCode, errMsg) + return + } + + // Check if status code is retryable + if !q.isRetryable(statusCode, err) { + errMsg := "non-retryable error" + if err != nil { + errMsg = err.Error() + } + q.completeDelivery(d, false, statusCode, errMsg) + return + } + + // Schedule next retry + delay := q.calculateDelay(d.AttemptCount) + nextRetry := time.Now().Add(delay) + d.NextAttempt = nextRetry + + // Update the last attempt with next retry time + if len(d.Attempts) > 0 { + d.Attempts[len(d.Attempts)-1].NextRetryAt = &nextRetry + } + + log.Debug("Scheduling webhook retry %d/%d for %s in %v", + d.AttemptCount, q.config.MaxRetries, d.ID, delay) +} + +func (q *RetryQueue) completeDelivery(d *pendingDelivery, success bool, statusCode int, errMsg string) { + q.mu.Lock() + delete(q.pending, d.ID) + q.mu.Unlock() + + var totalTime time.Duration + for _, a := range d.Attempts { + totalTime += a.Duration + } + + result := &DeliveryResult{ + Success: success, + Attempts: d.Attempts, + FinalStatus: statusCode, + FinalError: errMsg, + TotalTime: totalTime, + WebhookID: d.WebhookID, + DeliveryUUID: d.ID, + } + + select { + case d.ResultChan <- result: + default: + } + close(d.ResultChan) +} + +func (q *RetryQueue) isRetryable(statusCode int, err error) bool { + // Network errors are always retryable + if err != nil && statusCode == 0 { + return true + } + + for _, code := range q.config.RetryableStatusCodes { + if statusCode == code { + return true + } + } + + return false +} + +func (q *RetryQueue) calculateDelay(attemptNumber int) time.Duration { + // Exponential backoff: delay = initialDelay * multiplier^attempt + delay := float64(q.config.InitialDelay) * math.Pow(q.config.Multiplier, float64(attemptNumber-1)) + + // Apply max delay cap + if delay > float64(q.config.MaxDelay) { + delay = float64(q.config.MaxDelay) + } + + // Add jitter + if q.config.Jitter > 0 { + jitterAmount := delay * q.config.Jitter + delay = delay - jitterAmount + (rand.Float64() * 2 * jitterAmount) + } + + return time.Duration(delay) +} + +// Stats represents retry queue statistics +type Stats struct { + PendingDeliveries int `json:"pending_deliveries"` + TotalRetries int `json:"total_retries"` + SuccessRate float64 `json:"success_rate"` + LastUpdated time.Time `json:"last_updated"` +} + +// GetStats returns statistics about the retry queue +func (q *RetryQueue) GetStats() Stats { + q.mu.RLock() + defer q.mu.RUnlock() + + totalRetries := 0 + for _, d := range q.pending { + totalRetries += d.AttemptCount + } + + return Stats{ + PendingDeliveries: len(q.pending), + TotalRetries: totalRetries, + LastUpdated: time.Now(), + } +} diff --git a/routers/api/v2/api.go b/routers/api/v2/api.go index 4ca7462f38..b960dba584 100644 --- a/routers/api/v2/api.go +++ b/routers/api/v2/api.go @@ -26,6 +26,7 @@ import ( auth_model "code.gitea.io/gitea/models/auth" apierrors "code.gitea.io/gitea/modules/errors" "code.gitea.io/gitea/modules/graceful" + "code.gitea.io/gitea/modules/idempotency" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/util" "code.gitea.io/gitea/modules/web" @@ -45,6 +46,10 @@ func Routes() *web.Router { m.Use(middleware.RateLimitInfo()) m.Use(securityHeaders()) + // Idempotency middleware for POST/PUT/PATCH requests + idempotencyMiddleware := idempotency.NewMiddleware(idempotency.GetDefaultStore()) + m.Use(idempotencyMiddleware.Handler) + if setting.CORSConfig.Enabled { m.Use(cors.Handler(cors.Options{ AllowedOrigins: setting.CORSConfig.AllowDomain, @@ -68,6 +73,21 @@ func Routes() *web.Router { m.Get("/docs", DocsScalar) m.Get("/swagger.json", SwaggerJSON) + // Health check endpoints + m.Group("/health", func() { + m.Get("", HealthCheck) + m.Get("/live", LivenessCheck) + m.Get("/ready", ReadinessCheck) + m.Get("/component/{component}", ComponentHealthCheck) + }) + + // Operation progress endpoints (SSE) + m.Group("/operations", func() { + m.Get("/{id}/progress", OperationProgress) + m.Get("/{id}", GetOperation) + m.Delete("/{id}", CancelOperation) + }) + // Authenticated endpoints m.Group("", func() { // User info diff --git a/routers/api/v2/health.go b/routers/api/v2/health.go new file mode 100644 index 0000000000..7f80c42d2a --- /dev/null +++ b/routers/api/v2/health.go @@ -0,0 +1,161 @@ +// Copyright 2026 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package v2 + +import ( + "net/http" + + "code.gitea.io/gitea/modules/health" + "code.gitea.io/gitea/services/context" +) + +// HealthCheckResponse represents the health check response +type HealthCheckResponse struct { + Status string `json:"status"` + Version string `json:"version"` + Uptime float64 `json:"uptime_seconds"` + Timestamp string `json:"timestamp"` + Components map[string]*ComponentStatus `json:"components"` + System *health.SystemInfo `json:"system,omitempty"` +} + +// ComponentStatus represents a component's health status +type ComponentStatus struct { + Name string `json:"name"` + Status string `json:"status"` + Message string `json:"message,omitempty"` + DurationMs float64 `json:"duration_ms"` + LastChecked string `json:"last_checked"` + Metadata map[string]any `json:"metadata,omitempty"` +} + +// HealthCheck performs a comprehensive health check +// @Summary Health check +// @Description Returns the health status of all components +// @Tags health +// @Produce json +// @Param detailed query bool false "Include system information" +// @Success 200 {object} HealthCheckResponse +// @Router /health [get] +func HealthCheck(ctx *context.APIContext) { + includeSystem := ctx.FormBool("detailed") + manager := health.GetManager() + + result := manager.Check(ctx.Req.Context(), includeSystem) + + // Convert to response format + components := make(map[string]*ComponentStatus) + for name, check := range result.Components { + components[name] = &ComponentStatus{ + Name: check.Name, + Status: string(check.Status), + Message: check.Message, + DurationMs: float64(check.Duration.Milliseconds()), + LastChecked: check.LastChecked.Format("2006-01-02T15:04:05Z07:00"), + Metadata: check.Metadata, + } + } + + response := HealthCheckResponse{ + Status: string(result.Status), + Version: result.Version, + Uptime: result.Uptime.Seconds(), + Timestamp: result.Timestamp.Format("2006-01-02T15:04:05Z07:00"), + Components: components, + System: result.System, + } + + // Set status code based on health + statusCode := http.StatusOK + if result.Status == health.StatusUnhealthy { + statusCode = http.StatusServiceUnavailable + } else if result.Status == health.StatusDegraded { + statusCode = http.StatusOK // Still OK but degraded + } + + ctx.JSON(statusCode, response) +} + +// LivenessCheck performs a simple liveness check +// @Summary Liveness probe +// @Description Returns OK if the service is running +// @Tags health +// @Produce json +// @Success 200 {object} map[string]any +// @Router /health/live [get] +func LivenessCheck(ctx *context.APIContext) { + manager := health.GetManager() + check := manager.LivenessCheck() + + ctx.JSON(http.StatusOK, map[string]any{ + "status": string(check.Status), + "message": check.Message, + }) +} + +// ReadinessCheck performs a readiness check +// @Summary Readiness probe +// @Description Returns OK if the service is ready to handle requests +// @Tags health +// @Produce json +// @Success 200 {object} map[string]any +// @Success 503 {object} map[string]any +// @Router /health/ready [get] +func ReadinessCheck(ctx *context.APIContext) { + manager := health.GetManager() + result := manager.ReadinessCheck(ctx.Req.Context()) + + response := map[string]any{ + "status": string(result.Status), + "timestamp": result.Timestamp.Format("2006-01-02T15:04:05Z07:00"), + } + + if result.Status == health.StatusUnhealthy { + ctx.JSON(http.StatusServiceUnavailable, response) + return + } + + ctx.JSON(http.StatusOK, response) +} + +// ComponentHealthCheck checks a specific component +// @Summary Component health check +// @Description Returns the health status of a specific component +// @Tags health +// @Produce json +// @Param component path string true "Component name" +// @Success 200 {object} ComponentStatus +// @Failure 404 {object} map[string]any +// @Router /health/component/{component} [get] +func ComponentHealthCheck(ctx *context.APIContext) { + component := ctx.PathParam("component") + + manager := health.GetManager() + check, found := manager.CheckSingle(ctx.Req.Context(), component) + + if !found { + ctx.JSON(http.StatusNotFound, map[string]any{ + "code": "COMPONENT_NOT_FOUND", + "message": "Health check component not found", + "component": component, + }) + return + } + + response := ComponentStatus{ + Name: check.Name, + Status: string(check.Status), + Message: check.Message, + DurationMs: float64(check.Duration.Milliseconds()), + LastChecked: check.LastChecked.Format("2006-01-02T15:04:05Z07:00"), + Metadata: check.Metadata, + } + + statusCode := http.StatusOK + if check.Status == health.StatusUnhealthy { + statusCode = http.StatusServiceUnavailable + } + + ctx.JSON(statusCode, response) +} diff --git a/routers/api/v2/operations.go b/routers/api/v2/operations.go new file mode 100644 index 0000000000..c00cb65de0 --- /dev/null +++ b/routers/api/v2/operations.go @@ -0,0 +1,147 @@ +// Copyright 2026 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package v2 + +import ( + "net/http" + + apierrors "code.gitea.io/gitea/modules/errors" + "code.gitea.io/gitea/modules/operations" + "code.gitea.io/gitea/services/context" +) + +// OperationProgressResponse represents an operation progress response +type OperationProgressResponse struct { + OperationID string `json:"operation_id"` + Type string `json:"type"` + Status string `json:"status"` + CurrentPhase string `json:"current_phase,omitempty"` + Phases []Phase `json:"phases,omitempty"` + Progress int `json:"progress"` + BytesTotal int64 `json:"bytes_total,omitempty"` + BytesDone int64 `json:"bytes_done,omitempty"` + ItemsTotal int `json:"items_total,omitempty"` + ItemsDone int `json:"items_done,omitempty"` + Message string `json:"message,omitempty"` + Error string `json:"error,omitempty"` + StartedAt string `json:"started_at"` + UpdatedAt string `json:"updated_at"` + EstimatedETA string `json:"estimated_eta,omitempty"` + SpeedBPS int64 `json:"speed_bps,omitempty"` +} + +// Phase represents a phase in an operation +type Phase struct { + Name string `json:"name"` + Status string `json:"status"` + Progress int `json:"progress"` + Message string `json:"message,omitempty"` +} + +// OperationProgress streams operation progress via SSE +// @Summary Stream operation progress +// @Description Streams real-time progress updates for a long-running operation via Server-Sent Events +// @Tags operations +// @Produce text/event-stream +// @Param id path string true "Operation ID" +// @Success 200 {object} OperationProgressResponse "SSE stream of progress updates" +// @Failure 404 {object} map[string]any +// @Router /operations/{id}/progress [get] +func OperationProgress(ctx *context.APIContext) { + opID := ctx.PathParam("id") + + // ServeSSE handles all the response logic + operations.ServeSSE(ctx.Resp, ctx.Req, opID) +} + +// GetOperation retrieves the current state of an operation +// @Summary Get operation status +// @Description Returns the current status of a long-running operation +// @Tags operations +// @Produce json +// @Param id path string true "Operation ID" +// @Success 200 {object} OperationProgressResponse +// @Failure 404 {object} map[string]any +// @Router /operations/{id} [get] +func GetOperation(ctx *context.APIContext) { + opID := ctx.PathParam("id") + + manager := operations.GetManager() + op, ok := manager.GetOperation(opID) + if !ok { + ctx.APIErrorWithCode(apierrors.ResourceNotFound, map[string]any{ + "resource": "operation", + "id": opID, + }) + return + } + + progress := op.GetProgress() + + // Convert phases + phases := make([]Phase, len(progress.Phases)) + for i, p := range progress.Phases { + phases[i] = Phase{ + Name: p.Name, + Status: string(p.Status), + Progress: p.Progress, + Message: p.Message, + } + } + + response := OperationProgressResponse{ + OperationID: progress.OperationID, + Type: string(progress.Type), + Status: string(progress.Status), + CurrentPhase: progress.CurrentPhase, + Phases: phases, + Progress: progress.Progress, + BytesTotal: progress.BytesTotal, + BytesDone: progress.BytesDone, + ItemsTotal: progress.ItemsTotal, + ItemsDone: progress.ItemsDone, + Message: progress.Message, + Error: progress.Error, + StartedAt: progress.StartedAt.Format("2006-01-02T15:04:05Z07:00"), + UpdatedAt: progress.UpdatedAt.Format("2006-01-02T15:04:05Z07:00"), + SpeedBPS: progress.SpeedBPS, + } + + if progress.EstimatedETA != nil { + response.EstimatedETA = progress.EstimatedETA.Format("2006-01-02T15:04:05Z07:00") + } + + ctx.JSON(http.StatusOK, response) +} + +// CancelOperation cancels a running operation +// @Summary Cancel operation +// @Description Cancels a long-running operation if possible +// @Tags operations +// @Produce json +// @Param id path string true "Operation ID" +// @Success 200 {object} map[string]any +// @Failure 404 {object} map[string]any +// @Router /operations/{id} [delete] +func CancelOperation(ctx *context.APIContext) { + opID := ctx.PathParam("id") + + manager := operations.GetManager() + op, ok := manager.GetOperation(opID) + if !ok { + ctx.APIErrorWithCode(apierrors.ResourceNotFound, map[string]any{ + "resource": "operation", + "id": opID, + }) + return + } + + op.Cancel() + + ctx.JSON(http.StatusOK, map[string]any{ + "operation_id": opID, + "status": "cancelled", + "message": "Operation cancellation requested", + }) +}