gitea/modules/idempotency/idempotency.go
logikonline 81bb23f0da fix: resolve golangci-lint errors (batch 1)
- cmd/gitea-cli: fix errcheck, perfsprint, use modules/json, http constants
- models/migrations: remove unused nolint directive
- models/organization: interface{} -> any
- modules/health: rename HealthResponse -> Response to avoid stutter
- modules/idempotency: use modules/json, fix errcheck, rename IdempotencyInfo -> Info
- modules/structs: fix Verified_At naming, use omitzero

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-09 17:46:44 -05:00

312 lines
7.5 KiB
Go

// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
// Package idempotency provides middleware for idempotent POST request handling.
// Clients can send an Idempotency-Key header to ensure requests are processed
// exactly once, with subsequent requests returning the cached response.
package idempotency
import (
"bytes"
"net/http"
"sync"
"time"
"code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
)
const (
// HeaderIdempotencyKey is the header name for idempotency keys
HeaderIdempotencyKey = "Idempotency-Key"
// HeaderIdempotencyReplayed indicates the response was replayed from cache
HeaderIdempotencyReplayed = "Idempotency-Replayed"
// DefaultTTL is the default time-to-live for cached responses
DefaultTTL = 24 * time.Hour
// MaxKeyLength is the maximum allowed length for an idempotency key
MaxKeyLength = 256
)
// CachedResponse stores a cached API response
type CachedResponse struct {
StatusCode int `json:"status_code"`
Headers map[string]string `json:"headers"`
Body []byte `json:"body"`
CreatedAt time.Time `json:"created_at"`
ExpiresAt time.Time `json:"expires_at"`
}
// Store defines the interface for idempotency key storage
type Store interface {
// Get retrieves a cached response by key
Get(key string) (*CachedResponse, bool)
// Set stores a response with the given key
Set(key string, response *CachedResponse)
// Delete removes a cached response
Delete(key string)
// Lock acquires a lock for processing a key (returns true if lock acquired)
Lock(key string) bool
// Unlock releases the lock for a key
Unlock(key string)
}
// MemoryStore implements Store using in-memory storage
type MemoryStore struct {
mu sync.RWMutex
responses map[string]*CachedResponse
locks map[string]bool
ttl time.Duration
}
// NewMemoryStore creates a new in-memory idempotency store
func NewMemoryStore(ttl time.Duration) *MemoryStore {
store := &MemoryStore{
responses: make(map[string]*CachedResponse),
locks: make(map[string]bool),
ttl: ttl,
}
go store.cleanup()
return store
}
func (s *MemoryStore) cleanup() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for range ticker.C {
s.mu.Lock()
now := time.Now()
for key, resp := range s.responses {
if now.After(resp.ExpiresAt) {
delete(s.responses, key)
delete(s.locks, key)
}
}
s.mu.Unlock()
}
}
// Get retrieves a cached response
func (s *MemoryStore) Get(key string) (*CachedResponse, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
resp, ok := s.responses[key]
if !ok {
return nil, false
}
if time.Now().After(resp.ExpiresAt) {
return nil, false
}
return resp, true
}
// Set stores a response
func (s *MemoryStore) Set(key string, response *CachedResponse) {
s.mu.Lock()
defer s.mu.Unlock()
response.CreatedAt = time.Now()
response.ExpiresAt = time.Now().Add(s.ttl)
s.responses[key] = response
}
// Delete removes a cached response
func (s *MemoryStore) Delete(key string) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.responses, key)
delete(s.locks, key)
}
// Lock acquires a processing lock
func (s *MemoryStore) Lock(key string) bool {
s.mu.Lock()
defer s.mu.Unlock()
if s.locks[key] {
return false
}
s.locks[key] = true
return true
}
// Unlock releases a processing lock
func (s *MemoryStore) Unlock(key string) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.locks, key)
}
// ResponseRecorder captures the response for caching
type ResponseRecorder struct {
http.ResponseWriter
statusCode int
body bytes.Buffer
headers http.Header
}
// NewResponseRecorder creates a new response recorder
func NewResponseRecorder(w http.ResponseWriter) *ResponseRecorder {
return &ResponseRecorder{
ResponseWriter: w,
statusCode: http.StatusOK,
headers: make(http.Header),
}
}
// WriteHeader captures the status code
func (r *ResponseRecorder) WriteHeader(code int) {
r.statusCode = code
r.ResponseWriter.WriteHeader(code)
}
// Write captures the response body
func (r *ResponseRecorder) Write(b []byte) (int, error) {
r.body.Write(b)
return r.ResponseWriter.Write(b)
}
// Header returns the header map
func (r *ResponseRecorder) Header() http.Header {
return r.ResponseWriter.Header()
}
// ToCachedResponse converts the recorded response to a cached response
func (r *ResponseRecorder) ToCachedResponse() *CachedResponse {
headers := make(map[string]string)
for k, v := range r.ResponseWriter.Header() {
if len(v) > 0 {
headers[k] = v[0]
}
}
return &CachedResponse{
StatusCode: r.statusCode,
Headers: headers,
Body: r.body.Bytes(),
}
}
// Middleware provides idempotency handling
type Middleware struct {
store Store
}
// NewMiddleware creates a new idempotency middleware
func NewMiddleware(store Store) *Middleware {
return &Middleware{store: store}
}
var (
defaultStore *MemoryStore
once sync.Once
)
// GetDefaultStore returns the default idempotency store
func GetDefaultStore() *MemoryStore {
once.Do(func() {
defaultStore = NewMemoryStore(DefaultTTL)
})
return defaultStore
}
// Handler returns the middleware handler
func (m *Middleware) Handler(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Only apply to POST, PUT, PATCH requests
if r.Method != http.MethodPost && r.Method != http.MethodPut && r.Method != http.MethodPatch {
next.ServeHTTP(w, r)
return
}
key := r.Header.Get(HeaderIdempotencyKey)
if key == "" {
// No idempotency key, proceed normally
next.ServeHTTP(w, r)
return
}
// Validate key length
if len(key) > MaxKeyLength {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusBadRequest)
_ = json.NewEncoder(w).Encode(map[string]any{
"code": "IDEMPOTENCY_KEY_TOO_LONG",
"message": "Idempotency key exceeds maximum length of 256 characters",
})
return
}
// Check for cached response
if cached, ok := m.store.Get(key); ok {
log.Debug("Replaying cached response for idempotency key: %s", key)
w.Header().Set(HeaderIdempotencyReplayed, "true")
for k, v := range cached.Headers {
w.Header().Set(k, v)
}
w.WriteHeader(cached.StatusCode)
_, _ = w.Write(cached.Body)
return
}
// Try to acquire lock
if !m.store.Lock(key) {
// Another request is processing with this key
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Retry-After", "1")
w.WriteHeader(http.StatusConflict)
_ = json.NewEncoder(w).Encode(map[string]any{
"code": "IDEMPOTENCY_KEY_IN_USE",
"message": "A request with this idempotency key is currently being processed",
})
return
}
defer m.store.Unlock(key)
// Record the response
recorder := NewResponseRecorder(w)
next.ServeHTTP(recorder, r)
// Cache successful responses (2xx and 4xx)
// Don't cache 5xx errors as they may be transient
if recorder.statusCode < 500 {
m.store.Set(key, recorder.ToCachedResponse())
}
})
}
// Info represents information about an idempotency key
type Info struct {
Key string `json:"key"`
CreatedAt time.Time `json:"created_at"`
ExpiresAt time.Time `json:"expires_at"`
Replayed bool `json:"replayed"`
}
// GetInfo retrieves information about a cached idempotency key
func GetInfo(key string) (*Info, bool) {
store := GetDefaultStore()
cached, ok := store.Get(key)
if !ok {
return nil, false
}
return &Info{
Key: key,
CreatedAt: cached.CreatedAt,
ExpiresAt: cached.ExpiresAt,
}, true
}