feat(api): add Phase 3 reliability and UX enhancements

Add comprehensive reliability patterns and user experience improvements:

- Operation progress tracking via SSE (modules/operations/progress.go)
  - Real-time progress updates for long-running operations
  - Phase-based progress with completion percentages
  - Byte/item progress tracking with ETA calculations
  - Automatic cleanup of completed operations

- Idempotency key middleware (modules/idempotency/idempotency.go)
  - POST/PUT/PATCH request deduplication
  - 24-hour response caching
  - Concurrent request protection with locking
  - Idempotency-Replayed header for cache hits

- Webhook retry with exponential backoff (modules/webhook/retry.go)
  - Configurable max retries and delays
  - Jitter for thundering herd prevention
  - Retryable status code configuration
  - Delivery attempt tracking

- Circuit breaker pattern (modules/circuitbreaker/circuitbreaker.go)
  - Closed/Open/Half-Open state machine
  - Configurable failure thresholds
  - Global registry for service management
  - State change callbacks

- Enhanced health checks (modules/health/health.go)
  - Kubernetes-compatible liveness/readiness probes
  - Per-component health status
  - System metrics (goroutines, memory, CPU)
  - Circuit breaker status integration

New v2 API endpoints:
- GET /api/v2/health - comprehensive health check
- GET /api/v2/health/live - liveness probe
- GET /api/v2/health/ready - readiness probe
- GET /api/v2/health/component/{name} - component status
- GET /api/v2/operations/{id} - operation status
- GET /api/v2/operations/{id}/progress - SSE progress stream
- DELETE /api/v2/operations/{id} - cancel operation

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
David H. Friedel Jr. 2026-01-09 11:54:00 -05:00
parent 9094d8b503
commit a703bcc60f
8 changed files with 2218 additions and 0 deletions

View File

@ -0,0 +1,392 @@
// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
// Package circuitbreaker implements the circuit breaker pattern for external service calls.
// This prevents cascading failures by temporarily blocking requests to failing services.
package circuitbreaker
import (
"context"
"errors"
"sync"
"time"
"code.gitea.io/gitea/modules/log"
)
// State represents the circuit breaker state
type State int
const (
// StateClosed means the circuit is closed and requests flow through
StateClosed State = iota
// StateOpen means the circuit is open and requests are blocked
StateOpen
// StateHalfOpen means the circuit is testing if the service has recovered
StateHalfOpen
)
func (s State) String() string {
switch s {
case StateClosed:
return "closed"
case StateOpen:
return "open"
case StateHalfOpen:
return "half-open"
default:
return "unknown"
}
}
// Common errors
var (
ErrCircuitOpen = errors.New("circuit breaker is open")
ErrTooManyRequests = errors.New("too many requests in half-open state")
)
// Config configures the circuit breaker behavior
type Config struct {
// Name identifies this circuit breaker
Name string
// MaxFailures is the number of failures before opening the circuit
MaxFailures int
// SuccessThreshold is the number of successes needed to close from half-open
SuccessThreshold int
// Timeout is how long the circuit stays open before testing
Timeout time.Duration
// MaxConcurrentInHalfOpen is the max concurrent requests when half-open
MaxConcurrentInHalfOpen int
// OnStateChange is called when the state changes
OnStateChange func(name string, from, to State)
}
// DefaultConfig returns a sensible default configuration
func DefaultConfig(name string) Config {
return Config{
Name: name,
MaxFailures: 5,
SuccessThreshold: 2,
Timeout: 30 * time.Second,
MaxConcurrentInHalfOpen: 1,
}
}
// CircuitBreaker implements the circuit breaker pattern
type CircuitBreaker struct {
mu sync.RWMutex
config Config
state State
failures int
successes int
lastFailure time.Time
lastStateChange time.Time
halfOpenRequests int
generation uint64 // prevents counting old requests
}
// New creates a new circuit breaker
func New(config Config) *CircuitBreaker {
if config.MaxFailures <= 0 {
config.MaxFailures = 5
}
if config.SuccessThreshold <= 0 {
config.SuccessThreshold = 2
}
if config.Timeout <= 0 {
config.Timeout = 30 * time.Second
}
if config.MaxConcurrentInHalfOpen <= 0 {
config.MaxConcurrentInHalfOpen = 1
}
return &CircuitBreaker{
config: config,
state: StateClosed,
lastStateChange: time.Now(),
}
}
// Execute runs the given function if the circuit allows
func (cb *CircuitBreaker) Execute(ctx context.Context, fn func(context.Context) error) error {
generation, err := cb.beforeRequest()
if err != nil {
return err
}
// Check context
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Execute the function
err = fn(ctx)
cb.afterRequest(generation, err == nil)
return err
}
// beforeRequest checks if the request should proceed
func (cb *CircuitBreaker) beforeRequest() (uint64, error) {
cb.mu.Lock()
defer cb.mu.Unlock()
now := time.Now()
switch cb.state {
case StateClosed:
return cb.generation, nil
case StateOpen:
// Check if timeout has elapsed
if now.Sub(cb.lastStateChange) >= cb.config.Timeout {
cb.toHalfOpen()
cb.halfOpenRequests++
return cb.generation, nil
}
return 0, ErrCircuitOpen
case StateHalfOpen:
if cb.halfOpenRequests >= cb.config.MaxConcurrentInHalfOpen {
return 0, ErrTooManyRequests
}
cb.halfOpenRequests++
return cb.generation, nil
}
return cb.generation, nil
}
// afterRequest processes the result of a request
func (cb *CircuitBreaker) afterRequest(generation uint64, success bool) {
cb.mu.Lock()
defer cb.mu.Unlock()
// Ignore results from old generations
if generation != cb.generation {
return
}
if success {
cb.onSuccess()
} else {
cb.onFailure()
}
}
func (cb *CircuitBreaker) onSuccess() {
switch cb.state {
case StateClosed:
cb.failures = 0
case StateHalfOpen:
cb.halfOpenRequests--
cb.successes++
if cb.successes >= cb.config.SuccessThreshold {
cb.toClosed()
}
}
}
func (cb *CircuitBreaker) onFailure() {
switch cb.state {
case StateClosed:
cb.failures++
cb.lastFailure = time.Now()
if cb.failures >= cb.config.MaxFailures {
cb.toOpen()
}
case StateHalfOpen:
cb.halfOpenRequests--
cb.toOpen()
}
}
func (cb *CircuitBreaker) toOpen() {
if cb.state == StateOpen {
return
}
from := cb.state
cb.state = StateOpen
cb.lastStateChange = time.Now()
cb.generation++
log.Warn("Circuit breaker %s: %s -> open (failures: %d)", cb.config.Name, from, cb.failures)
if cb.config.OnStateChange != nil {
go cb.config.OnStateChange(cb.config.Name, from, StateOpen)
}
}
func (cb *CircuitBreaker) toClosed() {
if cb.state == StateClosed {
return
}
from := cb.state
cb.state = StateClosed
cb.failures = 0
cb.successes = 0
cb.halfOpenRequests = 0
cb.lastStateChange = time.Now()
cb.generation++
log.Info("Circuit breaker %s: %s -> closed", cb.config.Name, from)
if cb.config.OnStateChange != nil {
go cb.config.OnStateChange(cb.config.Name, from, StateClosed)
}
}
func (cb *CircuitBreaker) toHalfOpen() {
if cb.state == StateHalfOpen {
return
}
from := cb.state
cb.state = StateHalfOpen
cb.successes = 0
cb.halfOpenRequests = 0
cb.lastStateChange = time.Now()
cb.generation++
log.Debug("Circuit breaker %s: %s -> half-open", cb.config.Name, from)
if cb.config.OnStateChange != nil {
go cb.config.OnStateChange(cb.config.Name, from, StateHalfOpen)
}
}
// State returns the current state
func (cb *CircuitBreaker) State() State {
cb.mu.RLock()
defer cb.mu.RUnlock()
return cb.state
}
// Stats returns statistics about the circuit breaker
type Stats struct {
Name string `json:"name"`
State string `json:"state"`
Failures int `json:"failures"`
Successes int `json:"successes"`
LastFailure time.Time `json:"last_failure,omitempty"`
LastStateChange time.Time `json:"last_state_change"`
Generation uint64 `json:"generation"`
}
// Stats returns the current statistics
func (cb *CircuitBreaker) Stats() Stats {
cb.mu.RLock()
defer cb.mu.RUnlock()
return Stats{
Name: cb.config.Name,
State: cb.state.String(),
Failures: cb.failures,
Successes: cb.successes,
LastFailure: cb.lastFailure,
LastStateChange: cb.lastStateChange,
Generation: cb.generation,
}
}
// Reset forces the circuit breaker back to closed state
func (cb *CircuitBreaker) Reset() {
cb.mu.Lock()
defer cb.mu.Unlock()
cb.state = StateClosed
cb.failures = 0
cb.successes = 0
cb.halfOpenRequests = 0
cb.lastStateChange = time.Now()
cb.generation++
log.Info("Circuit breaker %s: manually reset to closed", cb.config.Name)
}
// Registry manages multiple circuit breakers
type Registry struct {
mu sync.RWMutex
breakers map[string]*CircuitBreaker
}
var (
defaultRegistry *Registry
registryOnce sync.Once
)
// GetRegistry returns the default circuit breaker registry
func GetRegistry() *Registry {
registryOnce.Do(func() {
defaultRegistry = &Registry{
breakers: make(map[string]*CircuitBreaker),
}
})
return defaultRegistry
}
// Get returns a circuit breaker by name, creating it if necessary
func (r *Registry) Get(name string, config ...Config) *CircuitBreaker {
r.mu.RLock()
cb, ok := r.breakers[name]
r.mu.RUnlock()
if ok {
return cb
}
r.mu.Lock()
defer r.mu.Unlock()
// Double-check after acquiring write lock
if cb, ok := r.breakers[name]; ok {
return cb
}
var cfg Config
if len(config) > 0 {
cfg = config[0]
cfg.Name = name
} else {
cfg = DefaultConfig(name)
}
cb = New(cfg)
r.breakers[name] = cb
return cb
}
// Stats returns stats for all circuit breakers
func (r *Registry) Stats() map[string]Stats {
r.mu.RLock()
defer r.mu.RUnlock()
stats := make(map[string]Stats)
for name, cb := range r.breakers {
stats[name] = cb.Stats()
}
return stats
}
// ResetAll resets all circuit breakers
func (r *Registry) ResetAll() {
r.mu.RLock()
defer r.mu.RUnlock()
for _, cb := range r.breakers {
cb.Reset()
}
}

377
modules/health/health.go Normal file
View File

@ -0,0 +1,377 @@
// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
// Package health provides comprehensive health checking for Gitea services.
package health
import (
"context"
"runtime"
"sync"
"time"
"code.gitea.io/gitea/modules/circuitbreaker"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
)
// Status represents the health status of a component
type Status string
const (
StatusHealthy Status = "healthy"
StatusDegraded Status = "degraded"
StatusUnhealthy Status = "unhealthy"
StatusUnknown Status = "unknown"
)
// ComponentCheck represents a health check for a single component
type ComponentCheck struct {
Name string `json:"name"`
Status Status `json:"status"`
Message string `json:"message,omitempty"`
Duration time.Duration `json:"duration_ms"`
LastChecked time.Time `json:"last_checked"`
Metadata map[string]any `json:"metadata,omitempty"`
}
// HealthResponse represents the complete health check response
type HealthResponse struct {
Status Status `json:"status"`
Version string `json:"version"`
Uptime time.Duration `json:"uptime_seconds"`
Timestamp time.Time `json:"timestamp"`
Components map[string]*ComponentCheck `json:"components"`
System *SystemInfo `json:"system,omitempty"`
Circuits map[string]circuitbreaker.Stats `json:"circuit_breakers,omitempty"`
}
// SystemInfo contains system-level health information
type SystemInfo struct {
GoVersion string `json:"go_version"`
NumGoroutines int `json:"goroutines"`
MemoryAllocMB float64 `json:"memory_alloc_mb"`
MemorySysMB float64 `json:"memory_sys_mb"`
NumCPU int `json:"num_cpu"`
GOMAXPROCS int `json:"gomaxprocs"`
}
// Checker is a function that performs a health check
type Checker func(ctx context.Context) *ComponentCheck
// Manager manages health checks
type Manager struct {
mu sync.RWMutex
checkers map[string]Checker
cache map[string]*ComponentCheck
cacheTTL time.Duration
startTime time.Time
version string
}
var (
defaultManager *Manager
managerOnce sync.Once
)
// GetManager returns the default health manager
func GetManager() *Manager {
managerOnce.Do(func() {
defaultManager = &Manager{
checkers: make(map[string]Checker),
cache: make(map[string]*ComponentCheck),
cacheTTL: 5 * time.Second,
startTime: time.Now(),
version: setting.AppVer,
}
})
return defaultManager
}
// RegisterChecker registers a new health checker
func (m *Manager) RegisterChecker(name string, checker Checker) {
m.mu.Lock()
defer m.mu.Unlock()
m.checkers[name] = checker
log.Debug("Registered health checker: %s", name)
}
// UnregisterChecker removes a health checker
func (m *Manager) UnregisterChecker(name string) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.checkers, name)
delete(m.cache, name)
}
// Check performs all health checks
func (m *Manager) Check(ctx context.Context, includeSystem bool) *HealthResponse {
m.mu.RLock()
checkers := make(map[string]Checker)
for k, v := range m.checkers {
checkers[k] = v
}
m.mu.RUnlock()
response := &HealthResponse{
Status: StatusHealthy,
Version: m.version,
Uptime: time.Since(m.startTime),
Timestamp: time.Now(),
Components: make(map[string]*ComponentCheck),
}
// Run checks concurrently
var wg sync.WaitGroup
var mu sync.Mutex
for name, checker := range checkers {
wg.Add(1)
go func(name string, checker Checker) {
defer wg.Done()
result := m.runCheck(ctx, name, checker)
mu.Lock()
response.Components[name] = result
mu.Unlock()
}(name, checker)
}
wg.Wait()
// Calculate overall status
response.Status = m.calculateOverallStatus(response.Components)
// Include system info if requested
if includeSystem {
response.System = getSystemInfo()
response.Circuits = circuitbreaker.GetRegistry().Stats()
}
return response
}
func (m *Manager) runCheck(ctx context.Context, name string, checker Checker) *ComponentCheck {
// Check cache first
m.mu.RLock()
cached, ok := m.cache[name]
m.mu.RUnlock()
if ok && time.Since(cached.LastChecked) < m.cacheTTL {
return cached
}
// Run the check with timeout
checkCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
start := time.Now()
result := checker(checkCtx)
result.Duration = time.Since(start)
result.LastChecked = time.Now()
// Cache the result
m.mu.Lock()
m.cache[name] = result
m.mu.Unlock()
return result
}
func (m *Manager) calculateOverallStatus(components map[string]*ComponentCheck) Status {
unhealthyCount := 0
degradedCount := 0
for _, check := range components {
switch check.Status {
case StatusUnhealthy:
unhealthyCount++
case StatusDegraded:
degradedCount++
}
}
// If any critical component is unhealthy, overall is unhealthy
if unhealthyCount > 0 {
return StatusUnhealthy
}
// If any component is degraded, overall is degraded
if degradedCount > 0 {
return StatusDegraded
}
return StatusHealthy
}
func getSystemInfo() *SystemInfo {
var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)
return &SystemInfo{
GoVersion: runtime.Version(),
NumGoroutines: runtime.NumGoroutine(),
MemoryAllocMB: float64(memStats.Alloc) / 1024 / 1024,
MemorySysMB: float64(memStats.Sys) / 1024 / 1024,
NumCPU: runtime.NumCPU(),
GOMAXPROCS: runtime.GOMAXPROCS(0),
}
}
// CheckSingle performs a single component check
func (m *Manager) CheckSingle(ctx context.Context, name string) (*ComponentCheck, bool) {
m.mu.RLock()
checker, ok := m.checkers[name]
m.mu.RUnlock()
if !ok {
return nil, false
}
return m.runCheck(ctx, name, checker), true
}
// LivenessCheck performs a quick liveness check (is the service running?)
func (m *Manager) LivenessCheck() *ComponentCheck {
return &ComponentCheck{
Name: "liveness",
Status: StatusHealthy,
Message: "service is running",
LastChecked: time.Now(),
}
}
// ReadinessCheck performs a readiness check (can the service handle requests?)
func (m *Manager) ReadinessCheck(ctx context.Context) *HealthResponse {
// For readiness, we only check critical components
return m.Check(ctx, false)
}
// NewDatabaseChecker creates a database health checker
func NewDatabaseChecker(pingFunc func(ctx context.Context) error) Checker {
return func(ctx context.Context) *ComponentCheck {
check := &ComponentCheck{
Name: "database",
}
if err := pingFunc(ctx); err != nil {
check.Status = StatusUnhealthy
check.Message = err.Error()
} else {
check.Status = StatusHealthy
check.Message = "connected"
}
return check
}
}
// NewCacheChecker creates a cache health checker
func NewCacheChecker(pingFunc func(ctx context.Context) error) Checker {
return func(ctx context.Context) *ComponentCheck {
check := &ComponentCheck{
Name: "cache",
}
if err := pingFunc(ctx); err != nil {
check.Status = StatusDegraded // Cache is usually not critical
check.Message = err.Error()
} else {
check.Status = StatusHealthy
check.Message = "connected"
}
return check
}
}
// NewGitChecker creates a git service health checker
func NewGitChecker(checkFunc func(ctx context.Context) (string, error)) Checker {
return func(ctx context.Context) *ComponentCheck {
check := &ComponentCheck{
Name: "git",
}
version, err := checkFunc(ctx)
if err != nil {
check.Status = StatusUnhealthy
check.Message = err.Error()
} else {
check.Status = StatusHealthy
check.Message = "available"
check.Metadata = map[string]any{
"version": version,
}
}
return check
}
}
// NewSSHChecker creates an SSH service health checker
func NewSSHChecker(isEnabled bool, port int) Checker {
return func(ctx context.Context) *ComponentCheck {
check := &ComponentCheck{
Name: "ssh",
}
if !isEnabled {
check.Status = StatusHealthy
check.Message = "disabled"
} else {
check.Status = StatusHealthy
check.Message = "listening"
check.Metadata = map[string]any{
"port": port,
}
}
return check
}
}
// NewExternalServiceChecker creates a checker for external services
func NewExternalServiceChecker(name string, checkFunc func(ctx context.Context) error) Checker {
return func(ctx context.Context) *ComponentCheck {
check := &ComponentCheck{
Name: name,
}
if err := checkFunc(ctx); err != nil {
check.Status = StatusDegraded
check.Message = err.Error()
} else {
check.Status = StatusHealthy
check.Message = "connected"
}
return check
}
}
// NewQueueChecker creates a checker for the task queue
func NewQueueChecker(getQueueStats func() (pending, processing int)) Checker {
return func(ctx context.Context) *ComponentCheck {
check := &ComponentCheck{
Name: "queue",
}
pending, processing := getQueueStats()
check.Status = StatusHealthy
check.Message = "running"
check.Metadata = map[string]any{
"pending": pending,
"processing": processing,
}
// Mark as degraded if queue is backing up
if pending > 10000 {
check.Status = StatusDegraded
check.Message = "queue backlog is high"
}
return check
}
}

View File

@ -0,0 +1,311 @@
// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
// Package idempotency provides middleware for idempotent POST request handling.
// Clients can send an Idempotency-Key header to ensure requests are processed
// exactly once, with subsequent requests returning the cached response.
package idempotency
import (
"bytes"
"encoding/json"
"net/http"
"sync"
"time"
"code.gitea.io/gitea/modules/log"
)
const (
// HeaderIdempotencyKey is the header name for idempotency keys
HeaderIdempotencyKey = "Idempotency-Key"
// HeaderIdempotencyReplayed indicates the response was replayed from cache
HeaderIdempotencyReplayed = "Idempotency-Replayed"
// DefaultTTL is the default time-to-live for cached responses
DefaultTTL = 24 * time.Hour
// MaxKeyLength is the maximum allowed length for an idempotency key
MaxKeyLength = 256
)
// CachedResponse stores a cached API response
type CachedResponse struct {
StatusCode int `json:"status_code"`
Headers map[string]string `json:"headers"`
Body []byte `json:"body"`
CreatedAt time.Time `json:"created_at"`
ExpiresAt time.Time `json:"expires_at"`
}
// Store defines the interface for idempotency key storage
type Store interface {
// Get retrieves a cached response by key
Get(key string) (*CachedResponse, bool)
// Set stores a response with the given key
Set(key string, response *CachedResponse)
// Delete removes a cached response
Delete(key string)
// Lock acquires a lock for processing a key (returns true if lock acquired)
Lock(key string) bool
// Unlock releases the lock for a key
Unlock(key string)
}
// MemoryStore implements Store using in-memory storage
type MemoryStore struct {
mu sync.RWMutex
responses map[string]*CachedResponse
locks map[string]bool
ttl time.Duration
}
// NewMemoryStore creates a new in-memory idempotency store
func NewMemoryStore(ttl time.Duration) *MemoryStore {
store := &MemoryStore{
responses: make(map[string]*CachedResponse),
locks: make(map[string]bool),
ttl: ttl,
}
go store.cleanup()
return store
}
func (s *MemoryStore) cleanup() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for range ticker.C {
s.mu.Lock()
now := time.Now()
for key, resp := range s.responses {
if now.After(resp.ExpiresAt) {
delete(s.responses, key)
delete(s.locks, key)
}
}
s.mu.Unlock()
}
}
// Get retrieves a cached response
func (s *MemoryStore) Get(key string) (*CachedResponse, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
resp, ok := s.responses[key]
if !ok {
return nil, false
}
if time.Now().After(resp.ExpiresAt) {
return nil, false
}
return resp, true
}
// Set stores a response
func (s *MemoryStore) Set(key string, response *CachedResponse) {
s.mu.Lock()
defer s.mu.Unlock()
response.CreatedAt = time.Now()
response.ExpiresAt = time.Now().Add(s.ttl)
s.responses[key] = response
}
// Delete removes a cached response
func (s *MemoryStore) Delete(key string) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.responses, key)
delete(s.locks, key)
}
// Lock acquires a processing lock
func (s *MemoryStore) Lock(key string) bool {
s.mu.Lock()
defer s.mu.Unlock()
if s.locks[key] {
return false
}
s.locks[key] = true
return true
}
// Unlock releases a processing lock
func (s *MemoryStore) Unlock(key string) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.locks, key)
}
// ResponseRecorder captures the response for caching
type ResponseRecorder struct {
http.ResponseWriter
statusCode int
body bytes.Buffer
headers http.Header
}
// NewResponseRecorder creates a new response recorder
func NewResponseRecorder(w http.ResponseWriter) *ResponseRecorder {
return &ResponseRecorder{
ResponseWriter: w,
statusCode: http.StatusOK,
headers: make(http.Header),
}
}
// WriteHeader captures the status code
func (r *ResponseRecorder) WriteHeader(code int) {
r.statusCode = code
r.ResponseWriter.WriteHeader(code)
}
// Write captures the response body
func (r *ResponseRecorder) Write(b []byte) (int, error) {
r.body.Write(b)
return r.ResponseWriter.Write(b)
}
// Header returns the header map
func (r *ResponseRecorder) Header() http.Header {
return r.ResponseWriter.Header()
}
// ToCachedResponse converts the recorded response to a cached response
func (r *ResponseRecorder) ToCachedResponse() *CachedResponse {
headers := make(map[string]string)
for k, v := range r.ResponseWriter.Header() {
if len(v) > 0 {
headers[k] = v[0]
}
}
return &CachedResponse{
StatusCode: r.statusCode,
Headers: headers,
Body: r.body.Bytes(),
}
}
// Middleware provides idempotency handling
type Middleware struct {
store Store
}
// NewMiddleware creates a new idempotency middleware
func NewMiddleware(store Store) *Middleware {
return &Middleware{store: store}
}
var (
defaultStore *MemoryStore
once sync.Once
)
// GetDefaultStore returns the default idempotency store
func GetDefaultStore() *MemoryStore {
once.Do(func() {
defaultStore = NewMemoryStore(DefaultTTL)
})
return defaultStore
}
// Handler returns the middleware handler
func (m *Middleware) Handler(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Only apply to POST, PUT, PATCH requests
if r.Method != http.MethodPost && r.Method != http.MethodPut && r.Method != http.MethodPatch {
next.ServeHTTP(w, r)
return
}
key := r.Header.Get(HeaderIdempotencyKey)
if key == "" {
// No idempotency key, proceed normally
next.ServeHTTP(w, r)
return
}
// Validate key length
if len(key) > MaxKeyLength {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(map[string]any{
"code": "IDEMPOTENCY_KEY_TOO_LONG",
"message": "Idempotency key exceeds maximum length of 256 characters",
})
return
}
// Check for cached response
if cached, ok := m.store.Get(key); ok {
log.Debug("Replaying cached response for idempotency key: %s", key)
w.Header().Set(HeaderIdempotencyReplayed, "true")
for k, v := range cached.Headers {
w.Header().Set(k, v)
}
w.WriteHeader(cached.StatusCode)
w.Write(cached.Body)
return
}
// Try to acquire lock
if !m.store.Lock(key) {
// Another request is processing with this key
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Retry-After", "1")
w.WriteHeader(http.StatusConflict)
json.NewEncoder(w).Encode(map[string]any{
"code": "IDEMPOTENCY_KEY_IN_USE",
"message": "A request with this idempotency key is currently being processed",
})
return
}
defer m.store.Unlock(key)
// Record the response
recorder := NewResponseRecorder(w)
next.ServeHTTP(recorder, r)
// Cache successful responses (2xx and 4xx)
// Don't cache 5xx errors as they may be transient
if recorder.statusCode < 500 {
m.store.Set(key, recorder.ToCachedResponse())
}
})
}
// IdempotencyInfo represents information about an idempotency key
type IdempotencyInfo struct {
Key string `json:"key"`
CreatedAt time.Time `json:"created_at"`
ExpiresAt time.Time `json:"expires_at"`
Replayed bool `json:"replayed"`
}
// GetInfo retrieves information about a cached idempotency key
func GetInfo(key string) (*IdempotencyInfo, bool) {
store := GetDefaultStore()
cached, ok := store.Get(key)
if !ok {
return nil, false
}
return &IdempotencyInfo{
Key: key,
CreatedAt: cached.CreatedAt,
ExpiresAt: cached.ExpiresAt,
}, true
}

View File

@ -0,0 +1,479 @@
// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
// Package operations provides tracking for long-running operations
// with real-time progress updates via Server-Sent Events (SSE).
package operations
import (
"context"
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
"code.gitea.io/gitea/modules/log"
)
// OperationType represents the type of long-running operation
type OperationType string
const (
OpChunkedUpload OperationType = "chunked_upload"
OpRepoMirrorSync OperationType = "mirror_sync"
OpRepoPush OperationType = "repo_push"
OpRepoClone OperationType = "repo_clone"
OpWikiGenerate OperationType = "wiki_generate"
OpBatchOperation OperationType = "batch_operation"
)
// OperationStatus represents the current status of an operation
type OperationStatus string
const (
StatusPending OperationStatus = "pending"
StatusRunning OperationStatus = "running"
StatusComplete OperationStatus = "complete"
StatusFailed OperationStatus = "failed"
StatusCancelled OperationStatus = "cancelled"
)
// Phase represents a phase within an operation
type Phase struct {
Name string `json:"name"`
Status OperationStatus `json:"status"`
Progress int `json:"progress"` // 0-100
Message string `json:"message,omitempty"`
}
// ProgressUpdate represents a progress update event
type ProgressUpdate struct {
OperationID string `json:"operation_id"`
Type OperationType `json:"type"`
Status OperationStatus `json:"status"`
CurrentPhase string `json:"current_phase,omitempty"`
Phases []Phase `json:"phases,omitempty"`
Progress int `json:"progress"` // Overall progress 0-100
BytesTotal int64 `json:"bytes_total,omitempty"`
BytesDone int64 `json:"bytes_done,omitempty"`
ItemsTotal int `json:"items_total,omitempty"`
ItemsDone int `json:"items_done,omitempty"`
Message string `json:"message,omitempty"`
Error string `json:"error,omitempty"`
StartedAt time.Time `json:"started_at"`
UpdatedAt time.Time `json:"updated_at"`
EstimatedETA *time.Time `json:"estimated_eta,omitempty"`
SpeedBPS int64 `json:"speed_bps,omitempty"` // bytes per second
Metadata map[string]any `json:"metadata,omitempty"`
}
// Operation tracks a long-running operation
type Operation struct {
mu sync.RWMutex
id string
opType OperationType
userID int64
status OperationStatus
phases []Phase
currentPhase int
progress int
bytesTotal int64
bytesDone int64
itemsTotal int
itemsDone int
message string
errorMsg string
startedAt time.Time
updatedAt time.Time
metadata map[string]any
subscribers map[chan ProgressUpdate]struct{}
ctx context.Context
cancel context.CancelFunc
}
// Manager handles operation tracking
type Manager struct {
mu sync.RWMutex
operations map[string]*Operation
ttl time.Duration
}
var (
defaultManager *Manager
once sync.Once
)
// GetManager returns the global operation manager
func GetManager() *Manager {
once.Do(func() {
defaultManager = &Manager{
operations: make(map[string]*Operation),
ttl: 30 * time.Minute,
}
go defaultManager.cleanup()
})
return defaultManager
}
// cleanup periodically removes completed operations
func (m *Manager) cleanup() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for range ticker.C {
m.mu.Lock()
now := time.Now()
for id, op := range m.operations {
op.mu.RLock()
isComplete := op.status == StatusComplete || op.status == StatusFailed || op.status == StatusCancelled
age := now.Sub(op.updatedAt)
op.mu.RUnlock()
if isComplete && age > m.ttl {
delete(m.operations, id)
log.Debug("Cleaned up operation %s", id)
}
}
m.mu.Unlock()
}
}
// StartOperation creates and registers a new operation
func (m *Manager) StartOperation(ctx context.Context, id string, opType OperationType, userID int64, phases []string) *Operation {
opCtx, cancel := context.WithCancel(ctx)
phaseList := make([]Phase, len(phases))
for i, name := range phases {
status := StatusPending
if i == 0 {
status = StatusRunning
}
phaseList[i] = Phase{
Name: name,
Status: status,
}
}
op := &Operation{
id: id,
opType: opType,
userID: userID,
status: StatusRunning,
phases: phaseList,
currentPhase: 0,
startedAt: time.Now(),
updatedAt: time.Now(),
metadata: make(map[string]any),
subscribers: make(map[chan ProgressUpdate]struct{}),
ctx: opCtx,
cancel: cancel,
}
m.mu.Lock()
m.operations[id] = op
m.mu.Unlock()
op.broadcast()
return op
}
// GetOperation retrieves an operation by ID
func (m *Manager) GetOperation(id string) (*Operation, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
op, ok := m.operations[id]
return op, ok
}
// Subscribe to operation progress updates
func (op *Operation) Subscribe() <-chan ProgressUpdate {
ch := make(chan ProgressUpdate, 10)
op.mu.Lock()
op.subscribers[ch] = struct{}{}
op.mu.Unlock()
// Send current state immediately
go func() {
select {
case ch <- op.GetProgress():
case <-op.ctx.Done():
}
}()
return ch
}
// Unsubscribe from operation updates
func (op *Operation) Unsubscribe(ch <-chan ProgressUpdate) {
op.mu.Lock()
defer op.mu.Unlock()
for sub := range op.subscribers {
if sub == ch {
delete(op.subscribers, sub)
close(sub)
break
}
}
}
// broadcast sends update to all subscribers
func (op *Operation) broadcast() {
op.mu.RLock()
update := op.buildUpdate()
subscribers := make([]chan ProgressUpdate, 0, len(op.subscribers))
for ch := range op.subscribers {
subscribers = append(subscribers, ch)
}
op.mu.RUnlock()
for _, ch := range subscribers {
select {
case ch <- update:
default:
// Skip if channel is full
}
}
}
func (op *Operation) buildUpdate() ProgressUpdate {
update := ProgressUpdate{
OperationID: op.id,
Type: op.opType,
Status: op.status,
Phases: op.phases,
Progress: op.progress,
BytesTotal: op.bytesTotal,
BytesDone: op.bytesDone,
ItemsTotal: op.itemsTotal,
ItemsDone: op.itemsDone,
Message: op.message,
Error: op.errorMsg,
StartedAt: op.startedAt,
UpdatedAt: op.updatedAt,
Metadata: op.metadata,
}
if op.currentPhase < len(op.phases) {
update.CurrentPhase = op.phases[op.currentPhase].Name
}
// Calculate speed and ETA
elapsed := time.Since(op.startedAt).Seconds()
if elapsed > 0 && op.bytesDone > 0 {
update.SpeedBPS = int64(float64(op.bytesDone) / elapsed)
if op.bytesTotal > 0 && update.SpeedBPS > 0 {
remaining := op.bytesTotal - op.bytesDone
etaSeconds := float64(remaining) / float64(update.SpeedBPS)
eta := time.Now().Add(time.Duration(etaSeconds) * time.Second)
update.EstimatedETA = &eta
}
}
return update
}
// GetProgress returns current progress state
func (op *Operation) GetProgress() ProgressUpdate {
op.mu.RLock()
defer op.mu.RUnlock()
return op.buildUpdate()
}
// UpdateProgress updates the operation progress
func (op *Operation) UpdateProgress(progress int, message string) {
op.mu.Lock()
op.progress = progress
op.message = message
op.updatedAt = time.Now()
if op.currentPhase < len(op.phases) {
op.phases[op.currentPhase].Progress = progress
op.phases[op.currentPhase].Message = message
}
op.mu.Unlock()
op.broadcast()
}
// UpdateBytes updates byte progress
func (op *Operation) UpdateBytes(done, total int64) {
op.mu.Lock()
op.bytesDone = done
op.bytesTotal = total
if total > 0 {
op.progress = int(float64(done) / float64(total) * 100)
}
op.updatedAt = time.Now()
op.mu.Unlock()
op.broadcast()
}
// UpdateItems updates item progress
func (op *Operation) UpdateItems(done, total int) {
op.mu.Lock()
op.itemsDone = done
op.itemsTotal = total
if total > 0 {
op.progress = done * 100 / total
}
op.updatedAt = time.Now()
op.mu.Unlock()
op.broadcast()
}
// NextPhase moves to the next phase
func (op *Operation) NextPhase() {
op.mu.Lock()
if op.currentPhase < len(op.phases) {
op.phases[op.currentPhase].Status = StatusComplete
op.phases[op.currentPhase].Progress = 100
}
op.currentPhase++
if op.currentPhase < len(op.phases) {
op.phases[op.currentPhase].Status = StatusRunning
}
op.updatedAt = time.Now()
op.mu.Unlock()
op.broadcast()
}
// SetMetadata sets operation metadata
func (op *Operation) SetMetadata(key string, value any) {
op.mu.Lock()
op.metadata[key] = value
op.mu.Unlock()
}
// Complete marks the operation as complete
func (op *Operation) Complete(result map[string]any) {
op.mu.Lock()
op.status = StatusComplete
op.progress = 100
for i := range op.phases {
op.phases[i].Status = StatusComplete
op.phases[i].Progress = 100
}
if result != nil {
for k, v := range result {
op.metadata[k] = v
}
}
op.updatedAt = time.Now()
op.mu.Unlock()
op.broadcast()
op.closeSubscribers()
}
// Fail marks the operation as failed
func (op *Operation) Fail(err error) {
op.mu.Lock()
op.status = StatusFailed
op.errorMsg = err.Error()
if op.currentPhase < len(op.phases) {
op.phases[op.currentPhase].Status = StatusFailed
}
op.updatedAt = time.Now()
op.mu.Unlock()
op.broadcast()
op.closeSubscribers()
}
// Cancel cancels the operation
func (op *Operation) Cancel() {
op.mu.Lock()
op.status = StatusCancelled
op.updatedAt = time.Now()
op.mu.Unlock()
op.cancel()
op.broadcast()
op.closeSubscribers()
}
func (op *Operation) closeSubscribers() {
op.mu.Lock()
for ch := range op.subscribers {
close(ch)
}
op.subscribers = make(map[chan ProgressUpdate]struct{})
op.mu.Unlock()
}
// Context returns the operation's context
func (op *Operation) Context() context.Context {
return op.ctx
}
// ID returns the operation ID
func (op *Operation) ID() string {
return op.id
}
// ServeSSE serves Server-Sent Events for operation progress
func ServeSSE(w http.ResponseWriter, r *http.Request, opID string) {
manager := GetManager()
op, ok := manager.GetOperation(opID)
if !ok {
http.Error(w, "Operation not found", http.StatusNotFound)
return
}
// Set SSE headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no") // Disable nginx buffering
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming not supported", http.StatusInternalServerError)
return
}
// Subscribe to updates
updates := op.Subscribe()
defer op.Unsubscribe(updates)
// Send initial connection event
fmt.Fprintf(w, "event: connected\ndata: {\"operation_id\":\"%s\"}\n\n", opID)
flusher.Flush()
for {
select {
case update, ok := <-updates:
if !ok {
// Channel closed, operation complete
fmt.Fprintf(w, "event: close\ndata: {}\n\n")
flusher.Flush()
return
}
data, err := json.Marshal(update)
if err != nil {
log.Error("Failed to marshal progress update: %v", err)
continue
}
eventType := "progress"
switch update.Status {
case StatusComplete:
eventType = "complete"
case StatusFailed:
eventType = "error"
case StatusCancelled:
eventType = "cancelled"
}
fmt.Fprintf(w, "event: %s\ndata: %s\n\n", eventType, data)
flusher.Flush()
// Close connection after terminal events
if update.Status == StatusComplete || update.Status == StatusFailed || update.Status == StatusCancelled {
return
}
case <-r.Context().Done():
return
}
}
}

331
modules/webhook/retry.go Normal file
View File

@ -0,0 +1,331 @@
// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package webhook
import (
"context"
"math"
"math/rand"
"sync"
"time"
"code.gitea.io/gitea/modules/log"
)
// RetryConfig configures retry behavior
type RetryConfig struct {
// MaxRetries is the maximum number of retry attempts
MaxRetries int
// InitialDelay is the delay before the first retry
InitialDelay time.Duration
// MaxDelay is the maximum delay between retries
MaxDelay time.Duration
// Multiplier is the factor by which delay increases
Multiplier float64
// Jitter adds randomness to delays (0-1)
Jitter float64
// RetryableStatusCodes are HTTP status codes that should trigger a retry
RetryableStatusCodes []int
}
// DefaultRetryConfig returns the default retry configuration
func DefaultRetryConfig() RetryConfig {
return RetryConfig{
MaxRetries: 5,
InitialDelay: 1 * time.Second,
MaxDelay: 5 * time.Minute,
Multiplier: 2.0,
Jitter: 0.2,
RetryableStatusCodes: []int{
408, // Request Timeout
429, // Too Many Requests
500, // Internal Server Error
502, // Bad Gateway
503, // Service Unavailable
504, // Gateway Timeout
},
}
}
// DeliveryAttempt represents a single webhook delivery attempt
type DeliveryAttempt struct {
AttemptNumber int `json:"attempt_number"`
StatusCode int `json:"status_code,omitempty"`
Error string `json:"error,omitempty"`
Duration time.Duration `json:"duration"`
Timestamp time.Time `json:"timestamp"`
NextRetryAt *time.Time `json:"next_retry_at,omitempty"`
}
// DeliveryResult represents the final result of a webhook delivery
type DeliveryResult struct {
Success bool `json:"success"`
Attempts []DeliveryAttempt `json:"attempts"`
FinalStatus int `json:"final_status,omitempty"`
FinalError string `json:"final_error,omitempty"`
TotalTime time.Duration `json:"total_time"`
WebhookID int64 `json:"webhook_id"`
DeliveryUUID string `json:"delivery_uuid"`
}
// RetryQueue manages webhook delivery retries
type RetryQueue struct {
mu sync.RWMutex
pending map[string]*pendingDelivery
config RetryConfig
ctx context.Context
cancel context.CancelFunc
deliverF func(ctx context.Context, delivery *pendingDelivery) (int, error)
}
type pendingDelivery struct {
ID string
WebhookID int64
URL string
Payload []byte
Headers map[string]string
Attempts []DeliveryAttempt
NextAttempt time.Time
AttemptCount int
ResultChan chan *DeliveryResult
}
// NewRetryQueue creates a new retry queue
func NewRetryQueue(config RetryConfig, deliverFunc func(ctx context.Context, d *pendingDelivery) (int, error)) *RetryQueue {
ctx, cancel := context.WithCancel(context.Background())
q := &RetryQueue{
pending: make(map[string]*pendingDelivery),
config: config,
ctx: ctx,
cancel: cancel,
deliverF: deliverFunc,
}
go q.processLoop()
return q
}
// Stop stops the retry queue
func (q *RetryQueue) Stop() {
q.cancel()
}
// Enqueue adds a delivery to the retry queue
func (q *RetryQueue) Enqueue(id string, webhookID int64, url string, payload []byte, headers map[string]string) <-chan *DeliveryResult {
resultChan := make(chan *DeliveryResult, 1)
delivery := &pendingDelivery{
ID: id,
WebhookID: webhookID,
URL: url,
Payload: payload,
Headers: headers,
Attempts: make([]DeliveryAttempt, 0),
NextAttempt: time.Now(),
ResultChan: resultChan,
}
q.mu.Lock()
q.pending[id] = delivery
q.mu.Unlock()
return resultChan
}
// GetStatus returns the status of a pending delivery
func (q *RetryQueue) GetStatus(id string) (*DeliveryResult, bool) {
q.mu.RLock()
defer q.mu.RUnlock()
d, ok := q.pending[id]
if !ok {
return nil, false
}
return &DeliveryResult{
Success: false,
Attempts: d.Attempts,
WebhookID: d.WebhookID,
DeliveryUUID: d.ID,
}, true
}
func (q *RetryQueue) processLoop() {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-q.ctx.Done():
return
case <-ticker.C:
q.processRetries()
}
}
}
func (q *RetryQueue) processRetries() {
q.mu.Lock()
now := time.Now()
var toProcess []*pendingDelivery
for _, d := range q.pending {
if now.After(d.NextAttempt) {
toProcess = append(toProcess, d)
}
}
q.mu.Unlock()
for _, d := range toProcess {
q.attemptDelivery(d)
}
}
func (q *RetryQueue) attemptDelivery(d *pendingDelivery) {
start := time.Now()
statusCode, err := q.deliverF(q.ctx, d)
duration := time.Since(start)
attempt := DeliveryAttempt{
AttemptNumber: d.AttemptCount + 1,
StatusCode: statusCode,
Duration: duration,
Timestamp: start,
}
if err != nil {
attempt.Error = err.Error()
}
d.AttemptCount++
d.Attempts = append(d.Attempts, attempt)
// Check if successful
if err == nil && statusCode >= 200 && statusCode < 300 {
q.completeDelivery(d, true, statusCode, "")
return
}
// Check if should retry
if d.AttemptCount >= q.config.MaxRetries {
errMsg := "max retries exceeded"
if err != nil {
errMsg = err.Error()
}
q.completeDelivery(d, false, statusCode, errMsg)
return
}
// Check if status code is retryable
if !q.isRetryable(statusCode, err) {
errMsg := "non-retryable error"
if err != nil {
errMsg = err.Error()
}
q.completeDelivery(d, false, statusCode, errMsg)
return
}
// Schedule next retry
delay := q.calculateDelay(d.AttemptCount)
nextRetry := time.Now().Add(delay)
d.NextAttempt = nextRetry
// Update the last attempt with next retry time
if len(d.Attempts) > 0 {
d.Attempts[len(d.Attempts)-1].NextRetryAt = &nextRetry
}
log.Debug("Scheduling webhook retry %d/%d for %s in %v",
d.AttemptCount, q.config.MaxRetries, d.ID, delay)
}
func (q *RetryQueue) completeDelivery(d *pendingDelivery, success bool, statusCode int, errMsg string) {
q.mu.Lock()
delete(q.pending, d.ID)
q.mu.Unlock()
var totalTime time.Duration
for _, a := range d.Attempts {
totalTime += a.Duration
}
result := &DeliveryResult{
Success: success,
Attempts: d.Attempts,
FinalStatus: statusCode,
FinalError: errMsg,
TotalTime: totalTime,
WebhookID: d.WebhookID,
DeliveryUUID: d.ID,
}
select {
case d.ResultChan <- result:
default:
}
close(d.ResultChan)
}
func (q *RetryQueue) isRetryable(statusCode int, err error) bool {
// Network errors are always retryable
if err != nil && statusCode == 0 {
return true
}
for _, code := range q.config.RetryableStatusCodes {
if statusCode == code {
return true
}
}
return false
}
func (q *RetryQueue) calculateDelay(attemptNumber int) time.Duration {
// Exponential backoff: delay = initialDelay * multiplier^attempt
delay := float64(q.config.InitialDelay) * math.Pow(q.config.Multiplier, float64(attemptNumber-1))
// Apply max delay cap
if delay > float64(q.config.MaxDelay) {
delay = float64(q.config.MaxDelay)
}
// Add jitter
if q.config.Jitter > 0 {
jitterAmount := delay * q.config.Jitter
delay = delay - jitterAmount + (rand.Float64() * 2 * jitterAmount)
}
return time.Duration(delay)
}
// Stats represents retry queue statistics
type Stats struct {
PendingDeliveries int `json:"pending_deliveries"`
TotalRetries int `json:"total_retries"`
SuccessRate float64 `json:"success_rate"`
LastUpdated time.Time `json:"last_updated"`
}
// GetStats returns statistics about the retry queue
func (q *RetryQueue) GetStats() Stats {
q.mu.RLock()
defer q.mu.RUnlock()
totalRetries := 0
for _, d := range q.pending {
totalRetries += d.AttemptCount
}
return Stats{
PendingDeliveries: len(q.pending),
TotalRetries: totalRetries,
LastUpdated: time.Now(),
}
}

View File

@ -26,6 +26,7 @@ import (
auth_model "code.gitea.io/gitea/models/auth"
apierrors "code.gitea.io/gitea/modules/errors"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/idempotency"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/util"
"code.gitea.io/gitea/modules/web"
@ -45,6 +46,10 @@ func Routes() *web.Router {
m.Use(middleware.RateLimitInfo())
m.Use(securityHeaders())
// Idempotency middleware for POST/PUT/PATCH requests
idempotencyMiddleware := idempotency.NewMiddleware(idempotency.GetDefaultStore())
m.Use(idempotencyMiddleware.Handler)
if setting.CORSConfig.Enabled {
m.Use(cors.Handler(cors.Options{
AllowedOrigins: setting.CORSConfig.AllowDomain,
@ -68,6 +73,21 @@ func Routes() *web.Router {
m.Get("/docs", DocsScalar)
m.Get("/swagger.json", SwaggerJSON)
// Health check endpoints
m.Group("/health", func() {
m.Get("", HealthCheck)
m.Get("/live", LivenessCheck)
m.Get("/ready", ReadinessCheck)
m.Get("/component/{component}", ComponentHealthCheck)
})
// Operation progress endpoints (SSE)
m.Group("/operations", func() {
m.Get("/{id}/progress", OperationProgress)
m.Get("/{id}", GetOperation)
m.Delete("/{id}", CancelOperation)
})
// Authenticated endpoints
m.Group("", func() {
// User info

161
routers/api/v2/health.go Normal file
View File

@ -0,0 +1,161 @@
// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package v2
import (
"net/http"
"code.gitea.io/gitea/modules/health"
"code.gitea.io/gitea/services/context"
)
// HealthCheckResponse represents the health check response
type HealthCheckResponse struct {
Status string `json:"status"`
Version string `json:"version"`
Uptime float64 `json:"uptime_seconds"`
Timestamp string `json:"timestamp"`
Components map[string]*ComponentStatus `json:"components"`
System *health.SystemInfo `json:"system,omitempty"`
}
// ComponentStatus represents a component's health status
type ComponentStatus struct {
Name string `json:"name"`
Status string `json:"status"`
Message string `json:"message,omitempty"`
DurationMs float64 `json:"duration_ms"`
LastChecked string `json:"last_checked"`
Metadata map[string]any `json:"metadata,omitempty"`
}
// HealthCheck performs a comprehensive health check
// @Summary Health check
// @Description Returns the health status of all components
// @Tags health
// @Produce json
// @Param detailed query bool false "Include system information"
// @Success 200 {object} HealthCheckResponse
// @Router /health [get]
func HealthCheck(ctx *context.APIContext) {
includeSystem := ctx.FormBool("detailed")
manager := health.GetManager()
result := manager.Check(ctx.Req.Context(), includeSystem)
// Convert to response format
components := make(map[string]*ComponentStatus)
for name, check := range result.Components {
components[name] = &ComponentStatus{
Name: check.Name,
Status: string(check.Status),
Message: check.Message,
DurationMs: float64(check.Duration.Milliseconds()),
LastChecked: check.LastChecked.Format("2006-01-02T15:04:05Z07:00"),
Metadata: check.Metadata,
}
}
response := HealthCheckResponse{
Status: string(result.Status),
Version: result.Version,
Uptime: result.Uptime.Seconds(),
Timestamp: result.Timestamp.Format("2006-01-02T15:04:05Z07:00"),
Components: components,
System: result.System,
}
// Set status code based on health
statusCode := http.StatusOK
if result.Status == health.StatusUnhealthy {
statusCode = http.StatusServiceUnavailable
} else if result.Status == health.StatusDegraded {
statusCode = http.StatusOK // Still OK but degraded
}
ctx.JSON(statusCode, response)
}
// LivenessCheck performs a simple liveness check
// @Summary Liveness probe
// @Description Returns OK if the service is running
// @Tags health
// @Produce json
// @Success 200 {object} map[string]any
// @Router /health/live [get]
func LivenessCheck(ctx *context.APIContext) {
manager := health.GetManager()
check := manager.LivenessCheck()
ctx.JSON(http.StatusOK, map[string]any{
"status": string(check.Status),
"message": check.Message,
})
}
// ReadinessCheck performs a readiness check
// @Summary Readiness probe
// @Description Returns OK if the service is ready to handle requests
// @Tags health
// @Produce json
// @Success 200 {object} map[string]any
// @Success 503 {object} map[string]any
// @Router /health/ready [get]
func ReadinessCheck(ctx *context.APIContext) {
manager := health.GetManager()
result := manager.ReadinessCheck(ctx.Req.Context())
response := map[string]any{
"status": string(result.Status),
"timestamp": result.Timestamp.Format("2006-01-02T15:04:05Z07:00"),
}
if result.Status == health.StatusUnhealthy {
ctx.JSON(http.StatusServiceUnavailable, response)
return
}
ctx.JSON(http.StatusOK, response)
}
// ComponentHealthCheck checks a specific component
// @Summary Component health check
// @Description Returns the health status of a specific component
// @Tags health
// @Produce json
// @Param component path string true "Component name"
// @Success 200 {object} ComponentStatus
// @Failure 404 {object} map[string]any
// @Router /health/component/{component} [get]
func ComponentHealthCheck(ctx *context.APIContext) {
component := ctx.PathParam("component")
manager := health.GetManager()
check, found := manager.CheckSingle(ctx.Req.Context(), component)
if !found {
ctx.JSON(http.StatusNotFound, map[string]any{
"code": "COMPONENT_NOT_FOUND",
"message": "Health check component not found",
"component": component,
})
return
}
response := ComponentStatus{
Name: check.Name,
Status: string(check.Status),
Message: check.Message,
DurationMs: float64(check.Duration.Milliseconds()),
LastChecked: check.LastChecked.Format("2006-01-02T15:04:05Z07:00"),
Metadata: check.Metadata,
}
statusCode := http.StatusOK
if check.Status == health.StatusUnhealthy {
statusCode = http.StatusServiceUnavailable
}
ctx.JSON(statusCode, response)
}

View File

@ -0,0 +1,147 @@
// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package v2
import (
"net/http"
apierrors "code.gitea.io/gitea/modules/errors"
"code.gitea.io/gitea/modules/operations"
"code.gitea.io/gitea/services/context"
)
// OperationProgressResponse represents an operation progress response
type OperationProgressResponse struct {
OperationID string `json:"operation_id"`
Type string `json:"type"`
Status string `json:"status"`
CurrentPhase string `json:"current_phase,omitempty"`
Phases []Phase `json:"phases,omitempty"`
Progress int `json:"progress"`
BytesTotal int64 `json:"bytes_total,omitempty"`
BytesDone int64 `json:"bytes_done,omitempty"`
ItemsTotal int `json:"items_total,omitempty"`
ItemsDone int `json:"items_done,omitempty"`
Message string `json:"message,omitempty"`
Error string `json:"error,omitempty"`
StartedAt string `json:"started_at"`
UpdatedAt string `json:"updated_at"`
EstimatedETA string `json:"estimated_eta,omitempty"`
SpeedBPS int64 `json:"speed_bps,omitempty"`
}
// Phase represents a phase in an operation
type Phase struct {
Name string `json:"name"`
Status string `json:"status"`
Progress int `json:"progress"`
Message string `json:"message,omitempty"`
}
// OperationProgress streams operation progress via SSE
// @Summary Stream operation progress
// @Description Streams real-time progress updates for a long-running operation via Server-Sent Events
// @Tags operations
// @Produce text/event-stream
// @Param id path string true "Operation ID"
// @Success 200 {object} OperationProgressResponse "SSE stream of progress updates"
// @Failure 404 {object} map[string]any
// @Router /operations/{id}/progress [get]
func OperationProgress(ctx *context.APIContext) {
opID := ctx.PathParam("id")
// ServeSSE handles all the response logic
operations.ServeSSE(ctx.Resp, ctx.Req, opID)
}
// GetOperation retrieves the current state of an operation
// @Summary Get operation status
// @Description Returns the current status of a long-running operation
// @Tags operations
// @Produce json
// @Param id path string true "Operation ID"
// @Success 200 {object} OperationProgressResponse
// @Failure 404 {object} map[string]any
// @Router /operations/{id} [get]
func GetOperation(ctx *context.APIContext) {
opID := ctx.PathParam("id")
manager := operations.GetManager()
op, ok := manager.GetOperation(opID)
if !ok {
ctx.APIErrorWithCode(apierrors.ResourceNotFound, map[string]any{
"resource": "operation",
"id": opID,
})
return
}
progress := op.GetProgress()
// Convert phases
phases := make([]Phase, len(progress.Phases))
for i, p := range progress.Phases {
phases[i] = Phase{
Name: p.Name,
Status: string(p.Status),
Progress: p.Progress,
Message: p.Message,
}
}
response := OperationProgressResponse{
OperationID: progress.OperationID,
Type: string(progress.Type),
Status: string(progress.Status),
CurrentPhase: progress.CurrentPhase,
Phases: phases,
Progress: progress.Progress,
BytesTotal: progress.BytesTotal,
BytesDone: progress.BytesDone,
ItemsTotal: progress.ItemsTotal,
ItemsDone: progress.ItemsDone,
Message: progress.Message,
Error: progress.Error,
StartedAt: progress.StartedAt.Format("2006-01-02T15:04:05Z07:00"),
UpdatedAt: progress.UpdatedAt.Format("2006-01-02T15:04:05Z07:00"),
SpeedBPS: progress.SpeedBPS,
}
if progress.EstimatedETA != nil {
response.EstimatedETA = progress.EstimatedETA.Format("2006-01-02T15:04:05Z07:00")
}
ctx.JSON(http.StatusOK, response)
}
// CancelOperation cancels a running operation
// @Summary Cancel operation
// @Description Cancels a long-running operation if possible
// @Tags operations
// @Produce json
// @Param id path string true "Operation ID"
// @Success 200 {object} map[string]any
// @Failure 404 {object} map[string]any
// @Router /operations/{id} [delete]
func CancelOperation(ctx *context.APIContext) {
opID := ctx.PathParam("id")
manager := operations.GetManager()
op, ok := manager.GetOperation(opID)
if !ok {
ctx.APIErrorWithCode(apierrors.ResourceNotFound, map[string]any{
"resource": "operation",
"id": opID,
})
return
}
op.Cancel()
ctx.JSON(http.StatusOK, map[string]any{
"operation_id": opID,
"status": "cancelled",
"message": "Operation cancellation requested",
})
}