Files
logikonline 7e037935cc fix: resolve remaining golangci-lint errors (batch 3)
- modules/pages/config.go: use slices.Contains for template validation
- modules/webhook/retry.go: use slices.Contains for retryable status codes
- routers/api/v1/org/profile.go: extract helper to remove duplicate code
- cmd/gitea-cli/cmd/upload.go: apply gofumpt formatting, add nolint directive for waitgroup

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-09 17:53:59 -05:00

327 lines
7.7 KiB
Go

// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package webhook
import (
"context"
"math"
"math/rand"
"slices"
"sync"
"time"
"code.gitea.io/gitea/modules/log"
)
// RetryConfig configures retry behavior
type RetryConfig struct {
// MaxRetries is the maximum number of retry attempts
MaxRetries int
// InitialDelay is the delay before the first retry
InitialDelay time.Duration
// MaxDelay is the maximum delay between retries
MaxDelay time.Duration
// Multiplier is the factor by which delay increases
Multiplier float64
// Jitter adds randomness to delays (0-1)
Jitter float64
// RetryableStatusCodes are HTTP status codes that should trigger a retry
RetryableStatusCodes []int
}
// DefaultRetryConfig returns the default retry configuration
func DefaultRetryConfig() RetryConfig {
return RetryConfig{
MaxRetries: 5,
InitialDelay: 1 * time.Second,
MaxDelay: 5 * time.Minute,
Multiplier: 2.0,
Jitter: 0.2,
RetryableStatusCodes: []int{
408, // Request Timeout
429, // Too Many Requests
500, // Internal Server Error
502, // Bad Gateway
503, // Service Unavailable
504, // Gateway Timeout
},
}
}
// DeliveryAttempt represents a single webhook delivery attempt
type DeliveryAttempt struct {
AttemptNumber int `json:"attempt_number"`
StatusCode int `json:"status_code,omitempty"`
Error string `json:"error,omitempty"`
Duration time.Duration `json:"duration"`
Timestamp time.Time `json:"timestamp"`
NextRetryAt *time.Time `json:"next_retry_at,omitempty"`
}
// DeliveryResult represents the final result of a webhook delivery
type DeliveryResult struct {
Success bool `json:"success"`
Attempts []DeliveryAttempt `json:"attempts"`
FinalStatus int `json:"final_status,omitempty"`
FinalError string `json:"final_error,omitempty"`
TotalTime time.Duration `json:"total_time"`
WebhookID int64 `json:"webhook_id"`
DeliveryUUID string `json:"delivery_uuid"`
}
// RetryQueue manages webhook delivery retries
type RetryQueue struct {
mu sync.RWMutex
pending map[string]*pendingDelivery
config RetryConfig
ctx context.Context
cancel context.CancelFunc
deliverF func(ctx context.Context, delivery *pendingDelivery) (int, error)
}
type pendingDelivery struct {
ID string
WebhookID int64
URL string
Payload []byte
Headers map[string]string
Attempts []DeliveryAttempt
NextAttempt time.Time
AttemptCount int
ResultChan chan *DeliveryResult
}
// NewRetryQueue creates a new retry queue
func NewRetryQueue(config RetryConfig, deliverFunc func(ctx context.Context, d *pendingDelivery) (int, error)) *RetryQueue {
ctx, cancel := context.WithCancel(context.Background())
q := &RetryQueue{
pending: make(map[string]*pendingDelivery),
config: config,
ctx: ctx,
cancel: cancel,
deliverF: deliverFunc,
}
go q.processLoop()
return q
}
// Stop stops the retry queue
func (q *RetryQueue) Stop() {
q.cancel()
}
// Enqueue adds a delivery to the retry queue
func (q *RetryQueue) Enqueue(id string, webhookID int64, url string, payload []byte, headers map[string]string) <-chan *DeliveryResult {
resultChan := make(chan *DeliveryResult, 1)
delivery := &pendingDelivery{
ID: id,
WebhookID: webhookID,
URL: url,
Payload: payload,
Headers: headers,
Attempts: make([]DeliveryAttempt, 0),
NextAttempt: time.Now(),
ResultChan: resultChan,
}
q.mu.Lock()
q.pending[id] = delivery
q.mu.Unlock()
return resultChan
}
// GetStatus returns the status of a pending delivery
func (q *RetryQueue) GetStatus(id string) (*DeliveryResult, bool) {
q.mu.RLock()
defer q.mu.RUnlock()
d, ok := q.pending[id]
if !ok {
return nil, false
}
return &DeliveryResult{
Success: false,
Attempts: d.Attempts,
WebhookID: d.WebhookID,
DeliveryUUID: d.ID,
}, true
}
func (q *RetryQueue) processLoop() {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-q.ctx.Done():
return
case <-ticker.C:
q.processRetries()
}
}
}
func (q *RetryQueue) processRetries() {
q.mu.Lock()
now := time.Now()
var toProcess []*pendingDelivery
for _, d := range q.pending {
if now.After(d.NextAttempt) {
toProcess = append(toProcess, d)
}
}
q.mu.Unlock()
for _, d := range toProcess {
q.attemptDelivery(d)
}
}
func (q *RetryQueue) attemptDelivery(d *pendingDelivery) {
start := time.Now()
statusCode, err := q.deliverF(q.ctx, d)
duration := time.Since(start)
attempt := DeliveryAttempt{
AttemptNumber: d.AttemptCount + 1,
StatusCode: statusCode,
Duration: duration,
Timestamp: start,
}
if err != nil {
attempt.Error = err.Error()
}
d.AttemptCount++
d.Attempts = append(d.Attempts, attempt)
// Check if successful
if err == nil && statusCode >= 200 && statusCode < 300 {
q.completeDelivery(d, true, statusCode, "")
return
}
// Check if should retry
if d.AttemptCount >= q.config.MaxRetries {
errMsg := "max retries exceeded"
if err != nil {
errMsg = err.Error()
}
q.completeDelivery(d, false, statusCode, errMsg)
return
}
// Check if status code is retryable
if !q.isRetryable(statusCode, err) {
errMsg := "non-retryable error"
if err != nil {
errMsg = err.Error()
}
q.completeDelivery(d, false, statusCode, errMsg)
return
}
// Schedule next retry
delay := q.calculateDelay(d.AttemptCount)
nextRetry := time.Now().Add(delay)
d.NextAttempt = nextRetry
// Update the last attempt with next retry time
if len(d.Attempts) > 0 {
d.Attempts[len(d.Attempts)-1].NextRetryAt = &nextRetry
}
log.Debug("Scheduling webhook retry %d/%d for %s in %v",
d.AttemptCount, q.config.MaxRetries, d.ID, delay)
}
func (q *RetryQueue) completeDelivery(d *pendingDelivery, success bool, statusCode int, errMsg string) {
q.mu.Lock()
delete(q.pending, d.ID)
q.mu.Unlock()
var totalTime time.Duration
for _, a := range d.Attempts {
totalTime += a.Duration
}
result := &DeliveryResult{
Success: success,
Attempts: d.Attempts,
FinalStatus: statusCode,
FinalError: errMsg,
TotalTime: totalTime,
WebhookID: d.WebhookID,
DeliveryUUID: d.ID,
}
select {
case d.ResultChan <- result:
default:
}
close(d.ResultChan)
}
func (q *RetryQueue) isRetryable(statusCode int, err error) bool {
// Network errors are always retryable
if err != nil && statusCode == 0 {
return true
}
return slices.Contains(q.config.RetryableStatusCodes, statusCode)
}
func (q *RetryQueue) calculateDelay(attemptNumber int) time.Duration {
// Exponential backoff: delay = initialDelay * multiplier^attempt
delay := float64(q.config.InitialDelay) * math.Pow(q.config.Multiplier, float64(attemptNumber-1))
// Apply max delay cap
if delay > float64(q.config.MaxDelay) {
delay = float64(q.config.MaxDelay)
}
// Add jitter
if q.config.Jitter > 0 {
jitterAmount := delay * q.config.Jitter
delay = delay - jitterAmount + (rand.Float64() * 2 * jitterAmount)
}
return time.Duration(delay)
}
// Stats represents retry queue statistics
type Stats struct {
PendingDeliveries int `json:"pending_deliveries"`
TotalRetries int `json:"total_retries"`
SuccessRate float64 `json:"success_rate"`
LastUpdated time.Time `json:"last_updated"`
}
// GetStats returns statistics about the retry queue
func (q *RetryQueue) GetStats() Stats {
q.mu.RLock()
defer q.mu.RUnlock()
totalRetries := 0
for _, d := range q.pending {
totalRetries += d.AttemptCount
}
return Stats{
PendingDeliveries: len(q.pending),
TotalRetries: totalRetries,
LastUpdated: time.Now(),
}
}