gitea/models/repo/upload_session.go
logikonline 7e36d7d55c feat(api): add chunked upload support for large release attachments
Implement resumable chunked uploads to improve reliability for large file
uploads that may fail due to network issues or timeouts.

New API endpoints:
- POST /repos/{owner}/{repo}/releases/{id}/assets/upload-session
- PUT /repos/{owner}/{repo}/uploads/{session_id}/chunks/{chunk_number}
- GET /repos/{owner}/{repo}/uploads/{session_id}
- POST /repos/{owner}/{repo}/uploads/{session_id}/complete
- DELETE /repos/{owner}/{repo}/uploads/{session_id}

Features:
- Resumable uploads with session status tracking
- Out-of-order chunk uploads supported
- Configurable chunk size (default 10MB, max 100MB)
- Automatic cleanup of expired sessions (24h expiry, hourly cleanup)
- Progress tracking with bytes/chunks received counts

Files added:
- models/repo/upload_session.go - Session model and DB operations
- services/attachment/chunked.go - Chunk storage and assembly logic
- routers/api/v1/repo/upload.go - API endpoint handlers
- models/migrations/v1_26/v325.go - Database migration
2026-01-08 08:58:56 -05:00

262 lines
8.4 KiB
Go

// Copyright 2024 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package repo
import (
"context"
"fmt"
"os"
"path"
"time"
"code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/modules/util"
)
// UploadSessionStatus represents the status of an upload session
type UploadSessionStatus int
const (
// UploadSessionStatusActive indicates the session is active and accepting chunks
UploadSessionStatusActive UploadSessionStatus = iota
// UploadSessionStatusComplete indicates all chunks have been received
UploadSessionStatusComplete
// UploadSessionStatusExpired indicates the session has expired
UploadSessionStatusExpired
// UploadSessionStatusFailed indicates the upload failed
UploadSessionStatusFailed
)
// UploadSession represents a chunked upload session for large files
type UploadSession struct {
ID int64 `xorm:"pk autoincr"`
UUID string `xorm:"uuid UNIQUE NOT NULL"`
RepoID int64 `xorm:"INDEX NOT NULL"`
ReleaseID int64 `xorm:"INDEX"` // 0 if not a release attachment
IssueID int64 `xorm:"INDEX"` // 0 if not an issue attachment
UploaderID int64 `xorm:"INDEX NOT NULL"`
FileName string `xorm:"NOT NULL"`
FileSize int64 `xorm:"DEFAULT -1"` // -1 if unknown
ChunkSize int64 `xorm:"NOT NULL"`
ChunksExpected int64 `xorm:"DEFAULT -1"` // -1 if unknown (calculated from FileSize/ChunkSize)
ChunksReceived int64 `xorm:"DEFAULT 0"`
BytesReceived int64 `xorm:"DEFAULT 0"`
Status UploadSessionStatus `xorm:"DEFAULT 0"`
TempPath string `xorm:"NOT NULL"` // relative path to temp storage
CreatedUnix timeutil.TimeStamp `xorm:"created"`
UpdatedUnix timeutil.TimeStamp `xorm:"updated"`
ExpiresUnix timeutil.TimeStamp `xorm:"INDEX"`
}
func init() {
db.RegisterModel(new(UploadSession))
}
// DefaultChunkSize is 10MB
const DefaultChunkSize = 10 * 1024 * 1024
// DefaultSessionExpiry is 24 hours
const DefaultSessionExpiry = 24 * time.Hour
// TempRelativePath returns the relative path for temporary chunk storage
func UploadSessionTempPath(uuid string) string {
return path.Join("upload-sessions", uuid[0:1], uuid[1:2], uuid)
}
// GetTempPath returns the full path to the temp directory for this session
func (s *UploadSession) GetTempPath() string {
return path.Join(setting.AppDataPath, s.TempPath)
}
// GetChunkPath returns the path to a specific chunk file
func (s *UploadSession) GetChunkPath(chunkNumber int64) string {
return path.Join(s.GetTempPath(), fmt.Sprintf("chunk_%d", chunkNumber))
}
// IsExpired checks if the session has expired
func (s *UploadSession) IsExpired() bool {
return s.ExpiresUnix.AsTime().Before(time.Now())
}
// IsComplete checks if all chunks have been received
func (s *UploadSession) IsComplete() bool {
if s.FileSize > 0 && s.ChunksExpected > 0 {
return s.ChunksReceived >= s.ChunksExpected
}
return s.Status == UploadSessionStatusComplete
}
// CalculateChunksExpected calculates the expected number of chunks based on file size
func (s *UploadSession) CalculateChunksExpected() int64 {
if s.FileSize <= 0 {
return -1
}
chunks := s.FileSize / s.ChunkSize
if s.FileSize%s.ChunkSize > 0 {
chunks++
}
return chunks
}
// ErrUploadSessionNotExist represents a "UploadSessionNotExist" kind of error.
type ErrUploadSessionNotExist struct {
ID int64
UUID string
}
// IsErrUploadSessionNotExist checks if an error is a ErrUploadSessionNotExist.
func IsErrUploadSessionNotExist(err error) bool {
_, ok := err.(ErrUploadSessionNotExist)
return ok
}
func (err ErrUploadSessionNotExist) Error() string {
return fmt.Sprintf("upload session does not exist [id: %d, uuid: %s]", err.ID, err.UUID)
}
func (err ErrUploadSessionNotExist) Unwrap() error {
return util.ErrNotExist
}
// ErrUploadSessionExpired represents an expired session error
type ErrUploadSessionExpired struct {
UUID string
}
func IsErrUploadSessionExpired(err error) bool {
_, ok := err.(ErrUploadSessionExpired)
return ok
}
func (err ErrUploadSessionExpired) Error() string {
return fmt.Sprintf("upload session has expired [uuid: %s]", err.UUID)
}
// CreateUploadSession creates a new upload session
func CreateUploadSession(ctx context.Context, session *UploadSession) error {
if session.ChunkSize <= 0 {
session.ChunkSize = DefaultChunkSize
}
if session.FileSize > 0 {
session.ChunksExpected = session.CalculateChunksExpected()
}
if session.ExpiresUnix == 0 {
session.ExpiresUnix = timeutil.TimeStamp(time.Now().Add(DefaultSessionExpiry).Unix())
}
session.TempPath = UploadSessionTempPath(session.UUID)
// Create the temp directory
tempDir := session.GetTempPath()
if err := os.MkdirAll(tempDir, 0o755); err != nil {
return fmt.Errorf("failed to create temp directory: %w", err)
}
return db.Insert(ctx, session)
}
// GetUploadSessionByUUID returns an upload session by UUID
func GetUploadSessionByUUID(ctx context.Context, uuid string) (*UploadSession, error) {
session := &UploadSession{}
has, err := db.GetEngine(ctx).Where("uuid=?", uuid).Get(session)
if err != nil {
return nil, err
} else if !has {
return nil, ErrUploadSessionNotExist{0, uuid}
}
return session, nil
}
// GetUploadSessionByID returns an upload session by ID
func GetUploadSessionByID(ctx context.Context, id int64) (*UploadSession, error) {
session := &UploadSession{}
has, err := db.GetEngine(ctx).ID(id).Get(session)
if err != nil {
return nil, err
} else if !has {
return nil, ErrUploadSessionNotExist{ID: id}
}
return session, nil
}
// UpdateUploadSession updates an upload session
func UpdateUploadSession(ctx context.Context, session *UploadSession) error {
session.UpdatedUnix = timeutil.TimeStamp(time.Now().Unix())
_, err := db.GetEngine(ctx).ID(session.ID).AllCols().Update(session)
return err
}
// UpdateUploadSessionChunks updates the chunk counts for a session
func UpdateUploadSessionChunks(ctx context.Context, session *UploadSession) error {
session.UpdatedUnix = timeutil.TimeStamp(time.Now().Unix())
_, err := db.GetEngine(ctx).ID(session.ID).Cols("chunks_received", "bytes_received", "updated_unix").Update(session)
return err
}
// DeleteUploadSession deletes an upload session and its temp files
func DeleteUploadSession(ctx context.Context, session *UploadSession) error {
// Delete temp directory
tempDir := session.GetTempPath()
if err := os.RemoveAll(tempDir); err != nil && !os.IsNotExist(err) {
log.Warn("Failed to remove temp directory for upload session %s: %v", session.UUID, err)
}
_, err := db.GetEngine(ctx).ID(session.ID).Delete(session)
return err
}
// DeleteUploadSessionByUUID deletes an upload session by UUID
func DeleteUploadSessionByUUID(ctx context.Context, uuid string) error {
session, err := GetUploadSessionByUUID(ctx, uuid)
if err != nil {
return err
}
return DeleteUploadSession(ctx, session)
}
// GetExpiredUploadSessions returns all expired upload sessions
func GetExpiredUploadSessions(ctx context.Context) ([]*UploadSession, error) {
sessions := make([]*UploadSession, 0, 10)
return sessions, db.GetEngine(ctx).
Where("expires_unix < ? AND status = ?", time.Now().Unix(), UploadSessionStatusActive).
Find(&sessions)
}
// DeleteExpiredUploadSessions deletes all expired upload sessions
func DeleteExpiredUploadSessions(ctx context.Context) (int, error) {
sessions, err := GetExpiredUploadSessions(ctx)
if err != nil {
return 0, err
}
count := 0
for _, session := range sessions {
if err := DeleteUploadSession(ctx, session); err != nil {
log.Warn("Failed to delete expired upload session %s: %v", session.UUID, err)
continue
}
count++
}
return count, nil
}
// GetActiveUploadSessionsForRelease returns active upload sessions for a release
func GetActiveUploadSessionsForRelease(ctx context.Context, releaseID int64) ([]*UploadSession, error) {
sessions := make([]*UploadSession, 0)
return sessions, db.GetEngine(ctx).
Where("release_id = ? AND status = ?", releaseID, UploadSessionStatusActive).
Find(&sessions)
}
// GetUploadSessionsForUser returns all upload sessions for a user
func GetUploadSessionsForUser(ctx context.Context, userID int64) ([]*UploadSession, error) {
sessions := make([]*UploadSession, 0)
return sessions, db.GetEngine(ctx).
Where("uploader_id = ?", userID).
Desc("created_unix").
Find(&sessions)
}