Implement resumable chunked uploads to improve reliability for large file
uploads that may fail due to network issues or timeouts.
New API endpoints:
- POST /repos/{owner}/{repo}/releases/{id}/assets/upload-session
- PUT /repos/{owner}/{repo}/uploads/{session_id}/chunks/{chunk_number}
- GET /repos/{owner}/{repo}/uploads/{session_id}
- POST /repos/{owner}/{repo}/uploads/{session_id}/complete
- DELETE /repos/{owner}/{repo}/uploads/{session_id}
Features:
- Resumable uploads with session status tracking
- Out-of-order chunk uploads supported
- Configurable chunk size (default 10MB, max 100MB)
- Automatic cleanup of expired sessions (24h expiry, hourly cleanup)
- Progress tracking with bytes/chunks received counts
Files added:
- models/repo/upload_session.go - Session model and DB operations
- services/attachment/chunked.go - Chunk storage and assembly logic
- routers/api/v1/repo/upload.go - API endpoint handlers
- models/migrations/v1_26/v325.go - Database migration
262 lines
8.4 KiB
Go
262 lines
8.4 KiB
Go
// Copyright 2024 The Gitea Authors. All rights reserved.
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
package repo
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"path"
|
|
"time"
|
|
|
|
"code.gitea.io/gitea/models/db"
|
|
"code.gitea.io/gitea/modules/log"
|
|
"code.gitea.io/gitea/modules/setting"
|
|
"code.gitea.io/gitea/modules/timeutil"
|
|
"code.gitea.io/gitea/modules/util"
|
|
)
|
|
|
|
// UploadSessionStatus represents the status of an upload session
|
|
type UploadSessionStatus int
|
|
|
|
const (
|
|
// UploadSessionStatusActive indicates the session is active and accepting chunks
|
|
UploadSessionStatusActive UploadSessionStatus = iota
|
|
// UploadSessionStatusComplete indicates all chunks have been received
|
|
UploadSessionStatusComplete
|
|
// UploadSessionStatusExpired indicates the session has expired
|
|
UploadSessionStatusExpired
|
|
// UploadSessionStatusFailed indicates the upload failed
|
|
UploadSessionStatusFailed
|
|
)
|
|
|
|
// UploadSession represents a chunked upload session for large files
|
|
type UploadSession struct {
|
|
ID int64 `xorm:"pk autoincr"`
|
|
UUID string `xorm:"uuid UNIQUE NOT NULL"`
|
|
RepoID int64 `xorm:"INDEX NOT NULL"`
|
|
ReleaseID int64 `xorm:"INDEX"` // 0 if not a release attachment
|
|
IssueID int64 `xorm:"INDEX"` // 0 if not an issue attachment
|
|
UploaderID int64 `xorm:"INDEX NOT NULL"`
|
|
FileName string `xorm:"NOT NULL"`
|
|
FileSize int64 `xorm:"DEFAULT -1"` // -1 if unknown
|
|
ChunkSize int64 `xorm:"NOT NULL"`
|
|
ChunksExpected int64 `xorm:"DEFAULT -1"` // -1 if unknown (calculated from FileSize/ChunkSize)
|
|
ChunksReceived int64 `xorm:"DEFAULT 0"`
|
|
BytesReceived int64 `xorm:"DEFAULT 0"`
|
|
Status UploadSessionStatus `xorm:"DEFAULT 0"`
|
|
TempPath string `xorm:"NOT NULL"` // relative path to temp storage
|
|
CreatedUnix timeutil.TimeStamp `xorm:"created"`
|
|
UpdatedUnix timeutil.TimeStamp `xorm:"updated"`
|
|
ExpiresUnix timeutil.TimeStamp `xorm:"INDEX"`
|
|
}
|
|
|
|
func init() {
|
|
db.RegisterModel(new(UploadSession))
|
|
}
|
|
|
|
// DefaultChunkSize is 10MB
|
|
const DefaultChunkSize = 10 * 1024 * 1024
|
|
|
|
// DefaultSessionExpiry is 24 hours
|
|
const DefaultSessionExpiry = 24 * time.Hour
|
|
|
|
// TempRelativePath returns the relative path for temporary chunk storage
|
|
func UploadSessionTempPath(uuid string) string {
|
|
return path.Join("upload-sessions", uuid[0:1], uuid[1:2], uuid)
|
|
}
|
|
|
|
// GetTempPath returns the full path to the temp directory for this session
|
|
func (s *UploadSession) GetTempPath() string {
|
|
return path.Join(setting.AppDataPath, s.TempPath)
|
|
}
|
|
|
|
// GetChunkPath returns the path to a specific chunk file
|
|
func (s *UploadSession) GetChunkPath(chunkNumber int64) string {
|
|
return path.Join(s.GetTempPath(), fmt.Sprintf("chunk_%d", chunkNumber))
|
|
}
|
|
|
|
// IsExpired checks if the session has expired
|
|
func (s *UploadSession) IsExpired() bool {
|
|
return s.ExpiresUnix.AsTime().Before(time.Now())
|
|
}
|
|
|
|
// IsComplete checks if all chunks have been received
|
|
func (s *UploadSession) IsComplete() bool {
|
|
if s.FileSize > 0 && s.ChunksExpected > 0 {
|
|
return s.ChunksReceived >= s.ChunksExpected
|
|
}
|
|
return s.Status == UploadSessionStatusComplete
|
|
}
|
|
|
|
// CalculateChunksExpected calculates the expected number of chunks based on file size
|
|
func (s *UploadSession) CalculateChunksExpected() int64 {
|
|
if s.FileSize <= 0 {
|
|
return -1
|
|
}
|
|
chunks := s.FileSize / s.ChunkSize
|
|
if s.FileSize%s.ChunkSize > 0 {
|
|
chunks++
|
|
}
|
|
return chunks
|
|
}
|
|
|
|
// ErrUploadSessionNotExist represents a "UploadSessionNotExist" kind of error.
|
|
type ErrUploadSessionNotExist struct {
|
|
ID int64
|
|
UUID string
|
|
}
|
|
|
|
// IsErrUploadSessionNotExist checks if an error is a ErrUploadSessionNotExist.
|
|
func IsErrUploadSessionNotExist(err error) bool {
|
|
_, ok := err.(ErrUploadSessionNotExist)
|
|
return ok
|
|
}
|
|
|
|
func (err ErrUploadSessionNotExist) Error() string {
|
|
return fmt.Sprintf("upload session does not exist [id: %d, uuid: %s]", err.ID, err.UUID)
|
|
}
|
|
|
|
func (err ErrUploadSessionNotExist) Unwrap() error {
|
|
return util.ErrNotExist
|
|
}
|
|
|
|
// ErrUploadSessionExpired represents an expired session error
|
|
type ErrUploadSessionExpired struct {
|
|
UUID string
|
|
}
|
|
|
|
func IsErrUploadSessionExpired(err error) bool {
|
|
_, ok := err.(ErrUploadSessionExpired)
|
|
return ok
|
|
}
|
|
|
|
func (err ErrUploadSessionExpired) Error() string {
|
|
return fmt.Sprintf("upload session has expired [uuid: %s]", err.UUID)
|
|
}
|
|
|
|
// CreateUploadSession creates a new upload session
|
|
func CreateUploadSession(ctx context.Context, session *UploadSession) error {
|
|
if session.ChunkSize <= 0 {
|
|
session.ChunkSize = DefaultChunkSize
|
|
}
|
|
if session.FileSize > 0 {
|
|
session.ChunksExpected = session.CalculateChunksExpected()
|
|
}
|
|
if session.ExpiresUnix == 0 {
|
|
session.ExpiresUnix = timeutil.TimeStamp(time.Now().Add(DefaultSessionExpiry).Unix())
|
|
}
|
|
session.TempPath = UploadSessionTempPath(session.UUID)
|
|
|
|
// Create the temp directory
|
|
tempDir := session.GetTempPath()
|
|
if err := os.MkdirAll(tempDir, 0o755); err != nil {
|
|
return fmt.Errorf("failed to create temp directory: %w", err)
|
|
}
|
|
|
|
return db.Insert(ctx, session)
|
|
}
|
|
|
|
// GetUploadSessionByUUID returns an upload session by UUID
|
|
func GetUploadSessionByUUID(ctx context.Context, uuid string) (*UploadSession, error) {
|
|
session := &UploadSession{}
|
|
has, err := db.GetEngine(ctx).Where("uuid=?", uuid).Get(session)
|
|
if err != nil {
|
|
return nil, err
|
|
} else if !has {
|
|
return nil, ErrUploadSessionNotExist{0, uuid}
|
|
}
|
|
return session, nil
|
|
}
|
|
|
|
// GetUploadSessionByID returns an upload session by ID
|
|
func GetUploadSessionByID(ctx context.Context, id int64) (*UploadSession, error) {
|
|
session := &UploadSession{}
|
|
has, err := db.GetEngine(ctx).ID(id).Get(session)
|
|
if err != nil {
|
|
return nil, err
|
|
} else if !has {
|
|
return nil, ErrUploadSessionNotExist{ID: id}
|
|
}
|
|
return session, nil
|
|
}
|
|
|
|
// UpdateUploadSession updates an upload session
|
|
func UpdateUploadSession(ctx context.Context, session *UploadSession) error {
|
|
session.UpdatedUnix = timeutil.TimeStamp(time.Now().Unix())
|
|
_, err := db.GetEngine(ctx).ID(session.ID).AllCols().Update(session)
|
|
return err
|
|
}
|
|
|
|
// UpdateUploadSessionChunks updates the chunk counts for a session
|
|
func UpdateUploadSessionChunks(ctx context.Context, session *UploadSession) error {
|
|
session.UpdatedUnix = timeutil.TimeStamp(time.Now().Unix())
|
|
_, err := db.GetEngine(ctx).ID(session.ID).Cols("chunks_received", "bytes_received", "updated_unix").Update(session)
|
|
return err
|
|
}
|
|
|
|
// DeleteUploadSession deletes an upload session and its temp files
|
|
func DeleteUploadSession(ctx context.Context, session *UploadSession) error {
|
|
// Delete temp directory
|
|
tempDir := session.GetTempPath()
|
|
if err := os.RemoveAll(tempDir); err != nil && !os.IsNotExist(err) {
|
|
log.Warn("Failed to remove temp directory for upload session %s: %v", session.UUID, err)
|
|
}
|
|
|
|
_, err := db.GetEngine(ctx).ID(session.ID).Delete(session)
|
|
return err
|
|
}
|
|
|
|
// DeleteUploadSessionByUUID deletes an upload session by UUID
|
|
func DeleteUploadSessionByUUID(ctx context.Context, uuid string) error {
|
|
session, err := GetUploadSessionByUUID(ctx, uuid)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return DeleteUploadSession(ctx, session)
|
|
}
|
|
|
|
// GetExpiredUploadSessions returns all expired upload sessions
|
|
func GetExpiredUploadSessions(ctx context.Context) ([]*UploadSession, error) {
|
|
sessions := make([]*UploadSession, 0, 10)
|
|
return sessions, db.GetEngine(ctx).
|
|
Where("expires_unix < ? AND status = ?", time.Now().Unix(), UploadSessionStatusActive).
|
|
Find(&sessions)
|
|
}
|
|
|
|
// DeleteExpiredUploadSessions deletes all expired upload sessions
|
|
func DeleteExpiredUploadSessions(ctx context.Context) (int, error) {
|
|
sessions, err := GetExpiredUploadSessions(ctx)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
count := 0
|
|
for _, session := range sessions {
|
|
if err := DeleteUploadSession(ctx, session); err != nil {
|
|
log.Warn("Failed to delete expired upload session %s: %v", session.UUID, err)
|
|
continue
|
|
}
|
|
count++
|
|
}
|
|
return count, nil
|
|
}
|
|
|
|
// GetActiveUploadSessionsForRelease returns active upload sessions for a release
|
|
func GetActiveUploadSessionsForRelease(ctx context.Context, releaseID int64) ([]*UploadSession, error) {
|
|
sessions := make([]*UploadSession, 0)
|
|
return sessions, db.GetEngine(ctx).
|
|
Where("release_id = ? AND status = ?", releaseID, UploadSessionStatusActive).
|
|
Find(&sessions)
|
|
}
|
|
|
|
// GetUploadSessionsForUser returns all upload sessions for a user
|
|
func GetUploadSessionsForUser(ctx context.Context, userID int64) ([]*UploadSession, error) {
|
|
sessions := make([]*UploadSession, 0)
|
|
return sessions, db.GetEngine(ctx).
|
|
Where("uploader_id = ?", userID).
|
|
Desc("created_unix").
|
|
Find(&sessions)
|
|
}
|