gitea/services/attachment/chunked.go
logikonline 4d1424df80 feat(api): add Phase 1 API enhancements for reliability and tracing
- Add X-Request-ID header middleware for request tracing
  - Extracts from incoming headers or generates short UUID
  - Included in all error responses for debugging

- Add rate limit headers (X-RateLimit-Limit/Remaining/Reset)
  - Currently informational, configurable via API.RateLimitPerHour
  - Prepared for future enforcement

- Add chunk checksum verification for uploads
  - Optional X-Chunk-Checksum header with SHA-256 hash
  - Verifies data integrity during chunked uploads

- Standardize error responses with RFC 7807 Problem Details
  - Added type, title, status, detail, instance fields
  - Maintains backward compatibility with legacy fields

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-09 11:14:18 -05:00

375 lines
10 KiB
Go

// Copyright 2024 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package attachment
import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"os"
"path"
"sort"
"strconv"
"strings"
"code.gitea.io/gitea/models/db"
repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/storage"
"code.gitea.io/gitea/services/context/upload"
"github.com/google/uuid"
)
// ChunkedUploadOptions contains options for creating a chunked upload session
type ChunkedUploadOptions struct {
RepoID int64
ReleaseID int64
IssueID int64
UploaderID int64
FileName string
FileSize int64 // -1 if unknown
ChunkSize int64 // 0 to use default
}
// CreateChunkedUploadSession creates a new chunked upload session
func CreateChunkedUploadSession(ctx context.Context, opts ChunkedUploadOptions) (*repo_model.UploadSession, error) {
if opts.ChunkSize <= 0 {
opts.ChunkSize = repo_model.DefaultChunkSize
}
session := &repo_model.UploadSession{
UUID: uuid.New().String(),
RepoID: opts.RepoID,
ReleaseID: opts.ReleaseID,
IssueID: opts.IssueID,
UploaderID: opts.UploaderID,
FileName: opts.FileName,
FileSize: opts.FileSize,
ChunkSize: opts.ChunkSize,
Status: repo_model.UploadSessionStatusActive,
}
if err := repo_model.CreateUploadSession(ctx, session); err != nil {
return nil, fmt.Errorf("failed to create upload session: %w", err)
}
return session, nil
}
// ErrChecksumMismatch is returned when the chunk checksum doesn't match
type ErrChecksumMismatch struct {
Expected string
Actual string
}
func (e ErrChecksumMismatch) Error() string {
return fmt.Sprintf("checksum mismatch: expected %s, got %s", e.Expected, e.Actual)
}
// IsErrChecksumMismatch returns true if the error is a checksum mismatch
func IsErrChecksumMismatch(err error) bool {
_, ok := err.(ErrChecksumMismatch)
return ok
}
// ChunkSaveOptions contains options for saving a chunk
type ChunkSaveOptions struct {
ChunkNumber int64
Data io.Reader
Size int64
// Checksum is the expected SHA-256 checksum of the chunk (hex-encoded)
// If empty, checksum verification is skipped
Checksum string
}
// SaveChunk saves a chunk to the upload session
func SaveChunk(ctx context.Context, session *repo_model.UploadSession, chunkNumber int64, data io.Reader, size int64) error {
return SaveChunkWithOptions(ctx, session, ChunkSaveOptions{
ChunkNumber: chunkNumber,
Data: data,
Size: size,
})
}
// SaveChunkWithOptions saves a chunk to the upload session with additional options
func SaveChunkWithOptions(ctx context.Context, session *repo_model.UploadSession, opts ChunkSaveOptions) error {
if session.Status != repo_model.UploadSessionStatusActive {
return fmt.Errorf("upload session is not active")
}
if session.IsExpired() {
session.Status = repo_model.UploadSessionStatusExpired
_ = repo_model.UpdateUploadSession(ctx, session)
return repo_model.ErrUploadSessionExpired{UUID: session.UUID}
}
// Validate chunk number
if opts.ChunkNumber < 0 {
return fmt.Errorf("invalid chunk number: %d", opts.ChunkNumber)
}
if session.ChunksExpected > 0 && opts.ChunkNumber >= session.ChunksExpected {
return fmt.Errorf("chunk number %d exceeds expected chunks %d", opts.ChunkNumber, session.ChunksExpected)
}
// Ensure temp directory exists
tempDir := session.GetTempPath()
if err := os.MkdirAll(tempDir, 0o755); err != nil {
return fmt.Errorf("failed to create temp directory: %w", err)
}
// Write chunk to temp file, computing checksum if needed
chunkPath := session.GetChunkPath(opts.ChunkNumber)
file, err := os.Create(chunkPath)
if err != nil {
return fmt.Errorf("failed to create chunk file: %w", err)
}
defer file.Close()
var written int64
var actualChecksum string
if opts.Checksum != "" {
// Compute checksum while writing
hasher := sha256.New()
multiWriter := io.MultiWriter(file, hasher)
written, err = io.Copy(multiWriter, opts.Data)
if err != nil {
os.Remove(chunkPath)
return fmt.Errorf("failed to write chunk: %w", err)
}
actualChecksum = hex.EncodeToString(hasher.Sum(nil))
} else {
written, err = io.Copy(file, opts.Data)
if err != nil {
os.Remove(chunkPath)
return fmt.Errorf("failed to write chunk: %w", err)
}
}
// Validate size if provided
if opts.Size > 0 && written != opts.Size {
os.Remove(chunkPath)
return fmt.Errorf("chunk size mismatch: expected %d, got %d", opts.Size, written)
}
// Verify checksum if provided
if opts.Checksum != "" && actualChecksum != opts.Checksum {
os.Remove(chunkPath)
return ErrChecksumMismatch{Expected: opts.Checksum, Actual: actualChecksum}
}
// Update session
session.ChunksReceived++
session.BytesReceived += written
if err := repo_model.UpdateUploadSessionChunks(ctx, session); err != nil {
return fmt.Errorf("failed to update session: %w", err)
}
log.Debug("Saved chunk %d for session %s (total: %d chunks, %d bytes)",
opts.ChunkNumber, session.UUID, session.ChunksReceived, session.BytesReceived)
return nil
}
// GetReceivedChunks returns the list of chunk numbers that have been received
func GetReceivedChunks(session *repo_model.UploadSession) ([]int64, error) {
tempDir := session.GetTempPath()
entries, err := os.ReadDir(tempDir)
if err != nil {
if os.IsNotExist(err) {
return []int64{}, nil
}
return nil, err
}
chunks := make([]int64, 0, len(entries))
for _, entry := range entries {
if entry.IsDir() {
continue
}
name := entry.Name()
if !strings.HasPrefix(name, "chunk_") {
continue
}
numStr := strings.TrimPrefix(name, "chunk_")
num, err := strconv.ParseInt(numStr, 10, 64)
if err != nil {
continue
}
chunks = append(chunks, num)
}
sort.Slice(chunks, func(i, j int) bool { return chunks[i] < chunks[j] })
return chunks, nil
}
// AssembleChunks assembles all chunks into a single file and creates an attachment
func AssembleChunks(ctx context.Context, session *repo_model.UploadSession, allowedTypes string) (*repo_model.Attachment, error) {
if session.Status != repo_model.UploadSessionStatusActive {
return nil, fmt.Errorf("upload session is not active")
}
// Get list of chunks
chunks, err := GetReceivedChunks(session)
if err != nil {
return nil, fmt.Errorf("failed to get received chunks: %w", err)
}
if len(chunks) == 0 {
return nil, fmt.Errorf("no chunks received")
}
// Verify we have all chunks in sequence
if session.ChunksExpected > 0 {
if int64(len(chunks)) != session.ChunksExpected {
return nil, fmt.Errorf("chunk count mismatch: expected %d, got %d", session.ChunksExpected, len(chunks))
}
}
// Verify chunks are sequential starting from 0
for i, chunk := range chunks {
if chunk != int64(i) {
return nil, fmt.Errorf("missing chunk %d", i)
}
}
// Create a reader that concatenates all chunks
readers := make([]io.Reader, 0, len(chunks))
files := make([]*os.File, 0, len(chunks))
defer func() {
for _, f := range files {
f.Close()
}
}()
for _, chunkNum := range chunks {
chunkPath := session.GetChunkPath(chunkNum)
f, err := os.Open(chunkPath)
if err != nil {
return nil, fmt.Errorf("failed to open chunk %d: %w", chunkNum, err)
}
files = append(files, f)
readers = append(readers, f)
}
// Read first 1KB for type verification
combinedReader := io.MultiReader(readers...)
buf := make([]byte, 1024)
n, _ := io.ReadAtLeast(combinedReader, buf, 1)
buf = buf[:n]
// Verify file type
if err := upload.Verify(buf, session.FileName, allowedTypes); err != nil {
session.Status = repo_model.UploadSessionStatusFailed
_ = repo_model.UpdateUploadSession(ctx, session)
return nil, err
}
// Re-open files and create combined reader with the buffer
for _, f := range files {
f.Close()
}
files = files[:0]
readers = readers[:0]
for _, chunkNum := range chunks {
chunkPath := session.GetChunkPath(chunkNum)
f, err := os.Open(chunkPath)
if err != nil {
return nil, fmt.Errorf("failed to reopen chunk %d: %w", chunkNum, err)
}
files = append(files, f)
readers = append(readers, f)
}
finalReader := io.MultiReader(readers...)
// Create the attachment
attach := &repo_model.Attachment{
UUID: uuid.New().String(),
RepoID: session.RepoID,
ReleaseID: session.ReleaseID,
IssueID: session.IssueID,
UploaderID: session.UploaderID,
Name: session.FileName,
}
err = db.WithTx(ctx, func(ctx context.Context) error {
// Save to storage
size, err := storage.Attachments.Save(attach.RelativePath(), finalReader, session.BytesReceived)
if err != nil {
return fmt.Errorf("failed to save attachment: %w", err)
}
attach.Size = size
// Insert into database
if err := db.Insert(ctx, attach); err != nil {
// Clean up storage on failure
_ = storage.Attachments.Delete(attach.RelativePath())
return fmt.Errorf("failed to insert attachment: %w", err)
}
return nil
})
if err != nil {
session.Status = repo_model.UploadSessionStatusFailed
_ = repo_model.UpdateUploadSession(ctx, session)
return nil, err
}
// Mark session as complete and clean up
session.Status = repo_model.UploadSessionStatusComplete
_ = repo_model.UpdateUploadSession(ctx, session)
// Clean up temp files (in background to not block response)
go func() {
if err := os.RemoveAll(session.GetTempPath()); err != nil {
log.Warn("Failed to clean up temp files for session %s: %v", session.UUID, err)
}
}()
log.Info("Assembled %d chunks into attachment %s (%d bytes) for session %s",
len(chunks), attach.Name, attach.Size, session.UUID)
return attach, nil
}
// CancelUploadSession cancels an upload session and cleans up
func CancelUploadSession(ctx context.Context, session *repo_model.UploadSession) error {
session.Status = repo_model.UploadSessionStatusFailed
if err := repo_model.UpdateUploadSession(ctx, session); err != nil {
return err
}
// Clean up temp files
if err := os.RemoveAll(session.GetTempPath()); err != nil && !os.IsNotExist(err) {
log.Warn("Failed to clean up temp files for session %s: %v", session.UUID, err)
}
return repo_model.DeleteUploadSession(ctx, session)
}
// CleanupExpiredSessions is intended to be run periodically to clean up expired sessions
func CleanupExpiredSessions(ctx context.Context) error {
count, err := repo_model.DeleteExpiredUploadSessions(ctx)
if err != nil {
return err
}
if count > 0 {
log.Info("Cleaned up %d expired upload sessions", count)
}
return nil
}
// GetTempStoragePath returns the base path for chunked upload temp storage
func GetTempStoragePath() string {
return path.Join(setting.AppDataPath, "upload-sessions")
}