gitea/services/attachment/chunked.go
logikonline 7e36d7d55c feat(api): add chunked upload support for large release attachments
Implement resumable chunked uploads to improve reliability for large file
uploads that may fail due to network issues or timeouts.

New API endpoints:
- POST /repos/{owner}/{repo}/releases/{id}/assets/upload-session
- PUT /repos/{owner}/{repo}/uploads/{session_id}/chunks/{chunk_number}
- GET /repos/{owner}/{repo}/uploads/{session_id}
- POST /repos/{owner}/{repo}/uploads/{session_id}/complete
- DELETE /repos/{owner}/{repo}/uploads/{session_id}

Features:
- Resumable uploads with session status tracking
- Out-of-order chunk uploads supported
- Configurable chunk size (default 10MB, max 100MB)
- Automatic cleanup of expired sessions (24h expiry, hourly cleanup)
- Progress tracking with bytes/chunks received counts

Files added:
- models/repo/upload_session.go - Session model and DB operations
- services/attachment/chunked.go - Chunk storage and assembly logic
- routers/api/v1/repo/upload.go - API endpoint handlers
- models/migrations/v1_26/v325.go - Database migration
2026-01-08 08:58:56 -05:00

317 lines
8.8 KiB
Go

// Copyright 2024 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package attachment
import (
"context"
"fmt"
"io"
"os"
"path"
"sort"
"strconv"
"strings"
"code.gitea.io/gitea/models/db"
repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/storage"
"code.gitea.io/gitea/services/context/upload"
"github.com/google/uuid"
)
// ChunkedUploadOptions contains options for creating a chunked upload session
type ChunkedUploadOptions struct {
RepoID int64
ReleaseID int64
IssueID int64
UploaderID int64
FileName string
FileSize int64 // -1 if unknown
ChunkSize int64 // 0 to use default
}
// CreateChunkedUploadSession creates a new chunked upload session
func CreateChunkedUploadSession(ctx context.Context, opts ChunkedUploadOptions) (*repo_model.UploadSession, error) {
if opts.ChunkSize <= 0 {
opts.ChunkSize = repo_model.DefaultChunkSize
}
session := &repo_model.UploadSession{
UUID: uuid.New().String(),
RepoID: opts.RepoID,
ReleaseID: opts.ReleaseID,
IssueID: opts.IssueID,
UploaderID: opts.UploaderID,
FileName: opts.FileName,
FileSize: opts.FileSize,
ChunkSize: opts.ChunkSize,
Status: repo_model.UploadSessionStatusActive,
}
if err := repo_model.CreateUploadSession(ctx, session); err != nil {
return nil, fmt.Errorf("failed to create upload session: %w", err)
}
return session, nil
}
// SaveChunk saves a chunk to the upload session
func SaveChunk(ctx context.Context, session *repo_model.UploadSession, chunkNumber int64, data io.Reader, size int64) error {
if session.Status != repo_model.UploadSessionStatusActive {
return fmt.Errorf("upload session is not active")
}
if session.IsExpired() {
session.Status = repo_model.UploadSessionStatusExpired
_ = repo_model.UpdateUploadSession(ctx, session)
return repo_model.ErrUploadSessionExpired{UUID: session.UUID}
}
// Validate chunk number
if chunkNumber < 0 {
return fmt.Errorf("invalid chunk number: %d", chunkNumber)
}
if session.ChunksExpected > 0 && chunkNumber >= session.ChunksExpected {
return fmt.Errorf("chunk number %d exceeds expected chunks %d", chunkNumber, session.ChunksExpected)
}
// Ensure temp directory exists
tempDir := session.GetTempPath()
if err := os.MkdirAll(tempDir, 0o755); err != nil {
return fmt.Errorf("failed to create temp directory: %w", err)
}
// Write chunk to temp file
chunkPath := session.GetChunkPath(chunkNumber)
file, err := os.Create(chunkPath)
if err != nil {
return fmt.Errorf("failed to create chunk file: %w", err)
}
defer file.Close()
written, err := io.Copy(file, data)
if err != nil {
os.Remove(chunkPath)
return fmt.Errorf("failed to write chunk: %w", err)
}
// Validate size if provided
if size > 0 && written != size {
os.Remove(chunkPath)
return fmt.Errorf("chunk size mismatch: expected %d, got %d", size, written)
}
// Update session
session.ChunksReceived++
session.BytesReceived += written
if err := repo_model.UpdateUploadSessionChunks(ctx, session); err != nil {
return fmt.Errorf("failed to update session: %w", err)
}
log.Debug("Saved chunk %d for session %s (total: %d chunks, %d bytes)",
chunkNumber, session.UUID, session.ChunksReceived, session.BytesReceived)
return nil
}
// GetReceivedChunks returns the list of chunk numbers that have been received
func GetReceivedChunks(session *repo_model.UploadSession) ([]int64, error) {
tempDir := session.GetTempPath()
entries, err := os.ReadDir(tempDir)
if err != nil {
if os.IsNotExist(err) {
return []int64{}, nil
}
return nil, err
}
chunks := make([]int64, 0, len(entries))
for _, entry := range entries {
if entry.IsDir() {
continue
}
name := entry.Name()
if !strings.HasPrefix(name, "chunk_") {
continue
}
numStr := strings.TrimPrefix(name, "chunk_")
num, err := strconv.ParseInt(numStr, 10, 64)
if err != nil {
continue
}
chunks = append(chunks, num)
}
sort.Slice(chunks, func(i, j int) bool { return chunks[i] < chunks[j] })
return chunks, nil
}
// AssembleChunks assembles all chunks into a single file and creates an attachment
func AssembleChunks(ctx context.Context, session *repo_model.UploadSession, allowedTypes string) (*repo_model.Attachment, error) {
if session.Status != repo_model.UploadSessionStatusActive {
return nil, fmt.Errorf("upload session is not active")
}
// Get list of chunks
chunks, err := GetReceivedChunks(session)
if err != nil {
return nil, fmt.Errorf("failed to get received chunks: %w", err)
}
if len(chunks) == 0 {
return nil, fmt.Errorf("no chunks received")
}
// Verify we have all chunks in sequence
if session.ChunksExpected > 0 {
if int64(len(chunks)) != session.ChunksExpected {
return nil, fmt.Errorf("chunk count mismatch: expected %d, got %d", session.ChunksExpected, len(chunks))
}
}
// Verify chunks are sequential starting from 0
for i, chunk := range chunks {
if chunk != int64(i) {
return nil, fmt.Errorf("missing chunk %d", i)
}
}
// Create a reader that concatenates all chunks
readers := make([]io.Reader, 0, len(chunks))
files := make([]*os.File, 0, len(chunks))
defer func() {
for _, f := range files {
f.Close()
}
}()
for _, chunkNum := range chunks {
chunkPath := session.GetChunkPath(chunkNum)
f, err := os.Open(chunkPath)
if err != nil {
return nil, fmt.Errorf("failed to open chunk %d: %w", chunkNum, err)
}
files = append(files, f)
readers = append(readers, f)
}
// Read first 1KB for type verification
combinedReader := io.MultiReader(readers...)
buf := make([]byte, 1024)
n, _ := io.ReadAtLeast(combinedReader, buf, 1)
buf = buf[:n]
// Verify file type
if err := upload.Verify(buf, session.FileName, allowedTypes); err != nil {
session.Status = repo_model.UploadSessionStatusFailed
_ = repo_model.UpdateUploadSession(ctx, session)
return nil, err
}
// Re-open files and create combined reader with the buffer
for _, f := range files {
f.Close()
}
files = files[:0]
readers = readers[:0]
for _, chunkNum := range chunks {
chunkPath := session.GetChunkPath(chunkNum)
f, err := os.Open(chunkPath)
if err != nil {
return nil, fmt.Errorf("failed to reopen chunk %d: %w", chunkNum, err)
}
files = append(files, f)
readers = append(readers, f)
}
finalReader := io.MultiReader(readers...)
// Create the attachment
attach := &repo_model.Attachment{
UUID: uuid.New().String(),
RepoID: session.RepoID,
ReleaseID: session.ReleaseID,
IssueID: session.IssueID,
UploaderID: session.UploaderID,
Name: session.FileName,
}
err = db.WithTx(ctx, func(ctx context.Context) error {
// Save to storage
size, err := storage.Attachments.Save(attach.RelativePath(), finalReader, session.BytesReceived)
if err != nil {
return fmt.Errorf("failed to save attachment: %w", err)
}
attach.Size = size
// Insert into database
if err := db.Insert(ctx, attach); err != nil {
// Clean up storage on failure
_ = storage.Attachments.Delete(attach.RelativePath())
return fmt.Errorf("failed to insert attachment: %w", err)
}
return nil
})
if err != nil {
session.Status = repo_model.UploadSessionStatusFailed
_ = repo_model.UpdateUploadSession(ctx, session)
return nil, err
}
// Mark session as complete and clean up
session.Status = repo_model.UploadSessionStatusComplete
_ = repo_model.UpdateUploadSession(ctx, session)
// Clean up temp files (in background to not block response)
go func() {
if err := os.RemoveAll(session.GetTempPath()); err != nil {
log.Warn("Failed to clean up temp files for session %s: %v", session.UUID, err)
}
}()
log.Info("Assembled %d chunks into attachment %s (%d bytes) for session %s",
len(chunks), attach.Name, attach.Size, session.UUID)
return attach, nil
}
// CancelUploadSession cancels an upload session and cleans up
func CancelUploadSession(ctx context.Context, session *repo_model.UploadSession) error {
session.Status = repo_model.UploadSessionStatusFailed
if err := repo_model.UpdateUploadSession(ctx, session); err != nil {
return err
}
// Clean up temp files
if err := os.RemoveAll(session.GetTempPath()); err != nil && !os.IsNotExist(err) {
log.Warn("Failed to clean up temp files for session %s: %v", session.UUID, err)
}
return repo_model.DeleteUploadSession(ctx, session)
}
// CleanupExpiredSessions is intended to be run periodically to clean up expired sessions
func CleanupExpiredSessions(ctx context.Context) error {
count, err := repo_model.DeleteExpiredUploadSessions(ctx)
if err != nil {
return err
}
if count > 0 {
log.Info("Cleaned up %d expired upload sessions", count)
}
return nil
}
// GetTempStoragePath returns the base path for chunked upload temp storage
func GetTempStoragePath() string {
return path.Join(setting.AppDataPath, "upload-sessions")
}