diff --git a/models/migrations/migrations.go b/models/migrations/migrations.go index fa11acaee2..28917db728 100644 --- a/models/migrations/migrations.go +++ b/models/migrations/migrations.go @@ -399,6 +399,7 @@ func prepareMigrationTasks() []*migration { newMigration(323, "Add support for actions concurrency", v1_26.AddActionsConcurrency), newMigration(324, "Fix closed milestone completeness for milestones with no issues", v1_26.FixClosedMilestoneCompleteness), + newMigration(325, "Add upload_session table for chunked uploads", v1_26.AddUploadSessionTable), } return preparedMigrations } diff --git a/models/migrations/v1_26/v325.go b/models/migrations/v1_26/v325.go new file mode 100644 index 0000000000..dfc37a3f91 --- /dev/null +++ b/models/migrations/v1_26/v325.go @@ -0,0 +1,35 @@ +// Copyright 2024 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package v1_26 + +import ( + "code.gitea.io/gitea/modules/timeutil" + + "xorm.io/xorm" +) + +// AddUploadSessionTable adds the upload_session table for chunked uploads +func AddUploadSessionTable(x *xorm.Engine) error { + type UploadSession struct { + ID int64 `xorm:"pk autoincr"` + UUID string `xorm:"uuid UNIQUE NOT NULL"` + RepoID int64 `xorm:"INDEX NOT NULL"` + ReleaseID int64 `xorm:"INDEX"` + IssueID int64 `xorm:"INDEX"` + UploaderID int64 `xorm:"INDEX NOT NULL"` + FileName string `xorm:"NOT NULL"` + FileSize int64 `xorm:"DEFAULT -1"` + ChunkSize int64 `xorm:"NOT NULL"` + ChunksExpected int64 `xorm:"DEFAULT -1"` + ChunksReceived int64 `xorm:"DEFAULT 0"` + BytesReceived int64 `xorm:"DEFAULT 0"` + Status int `xorm:"DEFAULT 0"` + TempPath string `xorm:"NOT NULL"` + CreatedUnix timeutil.TimeStamp `xorm:"created"` + UpdatedUnix timeutil.TimeStamp `xorm:"updated"` + ExpiresUnix timeutil.TimeStamp `xorm:"INDEX"` + } + + return x.Sync(new(UploadSession)) +} diff --git a/models/repo/upload_session.go b/models/repo/upload_session.go new file mode 100644 index 0000000000..e28bcb8042 --- /dev/null +++ b/models/repo/upload_session.go @@ -0,0 +1,261 @@ +// Copyright 2024 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package repo + +import ( + "context" + "fmt" + "os" + "path" + "time" + + "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/modules/timeutil" + "code.gitea.io/gitea/modules/util" +) + +// UploadSessionStatus represents the status of an upload session +type UploadSessionStatus int + +const ( + // UploadSessionStatusActive indicates the session is active and accepting chunks + UploadSessionStatusActive UploadSessionStatus = iota + // UploadSessionStatusComplete indicates all chunks have been received + UploadSessionStatusComplete + // UploadSessionStatusExpired indicates the session has expired + UploadSessionStatusExpired + // UploadSessionStatusFailed indicates the upload failed + UploadSessionStatusFailed +) + +// UploadSession represents a chunked upload session for large files +type UploadSession struct { + ID int64 `xorm:"pk autoincr"` + UUID string `xorm:"uuid UNIQUE NOT NULL"` + RepoID int64 `xorm:"INDEX NOT NULL"` + ReleaseID int64 `xorm:"INDEX"` // 0 if not a release attachment + IssueID int64 `xorm:"INDEX"` // 0 if not an issue attachment + UploaderID int64 `xorm:"INDEX NOT NULL"` + FileName string `xorm:"NOT NULL"` + FileSize int64 `xorm:"DEFAULT -1"` // -1 if unknown + ChunkSize int64 `xorm:"NOT NULL"` + ChunksExpected int64 `xorm:"DEFAULT -1"` // -1 if unknown (calculated from FileSize/ChunkSize) + ChunksReceived int64 `xorm:"DEFAULT 0"` + BytesReceived int64 `xorm:"DEFAULT 0"` + Status UploadSessionStatus `xorm:"DEFAULT 0"` + TempPath string `xorm:"NOT NULL"` // relative path to temp storage + CreatedUnix timeutil.TimeStamp `xorm:"created"` + UpdatedUnix timeutil.TimeStamp `xorm:"updated"` + ExpiresUnix timeutil.TimeStamp `xorm:"INDEX"` +} + +func init() { + db.RegisterModel(new(UploadSession)) +} + +// DefaultChunkSize is 10MB +const DefaultChunkSize = 10 * 1024 * 1024 + +// DefaultSessionExpiry is 24 hours +const DefaultSessionExpiry = 24 * time.Hour + +// TempRelativePath returns the relative path for temporary chunk storage +func UploadSessionTempPath(uuid string) string { + return path.Join("upload-sessions", uuid[0:1], uuid[1:2], uuid) +} + +// GetTempPath returns the full path to the temp directory for this session +func (s *UploadSession) GetTempPath() string { + return path.Join(setting.AppDataPath, s.TempPath) +} + +// GetChunkPath returns the path to a specific chunk file +func (s *UploadSession) GetChunkPath(chunkNumber int64) string { + return path.Join(s.GetTempPath(), fmt.Sprintf("chunk_%d", chunkNumber)) +} + +// IsExpired checks if the session has expired +func (s *UploadSession) IsExpired() bool { + return s.ExpiresUnix.AsTime().Before(time.Now()) +} + +// IsComplete checks if all chunks have been received +func (s *UploadSession) IsComplete() bool { + if s.FileSize > 0 && s.ChunksExpected > 0 { + return s.ChunksReceived >= s.ChunksExpected + } + return s.Status == UploadSessionStatusComplete +} + +// CalculateChunksExpected calculates the expected number of chunks based on file size +func (s *UploadSession) CalculateChunksExpected() int64 { + if s.FileSize <= 0 { + return -1 + } + chunks := s.FileSize / s.ChunkSize + if s.FileSize%s.ChunkSize > 0 { + chunks++ + } + return chunks +} + +// ErrUploadSessionNotExist represents a "UploadSessionNotExist" kind of error. +type ErrUploadSessionNotExist struct { + ID int64 + UUID string +} + +// IsErrUploadSessionNotExist checks if an error is a ErrUploadSessionNotExist. +func IsErrUploadSessionNotExist(err error) bool { + _, ok := err.(ErrUploadSessionNotExist) + return ok +} + +func (err ErrUploadSessionNotExist) Error() string { + return fmt.Sprintf("upload session does not exist [id: %d, uuid: %s]", err.ID, err.UUID) +} + +func (err ErrUploadSessionNotExist) Unwrap() error { + return util.ErrNotExist +} + +// ErrUploadSessionExpired represents an expired session error +type ErrUploadSessionExpired struct { + UUID string +} + +func IsErrUploadSessionExpired(err error) bool { + _, ok := err.(ErrUploadSessionExpired) + return ok +} + +func (err ErrUploadSessionExpired) Error() string { + return fmt.Sprintf("upload session has expired [uuid: %s]", err.UUID) +} + +// CreateUploadSession creates a new upload session +func CreateUploadSession(ctx context.Context, session *UploadSession) error { + if session.ChunkSize <= 0 { + session.ChunkSize = DefaultChunkSize + } + if session.FileSize > 0 { + session.ChunksExpected = session.CalculateChunksExpected() + } + if session.ExpiresUnix == 0 { + session.ExpiresUnix = timeutil.TimeStamp(time.Now().Add(DefaultSessionExpiry).Unix()) + } + session.TempPath = UploadSessionTempPath(session.UUID) + + // Create the temp directory + tempDir := session.GetTempPath() + if err := os.MkdirAll(tempDir, 0o755); err != nil { + return fmt.Errorf("failed to create temp directory: %w", err) + } + + return db.Insert(ctx, session) +} + +// GetUploadSessionByUUID returns an upload session by UUID +func GetUploadSessionByUUID(ctx context.Context, uuid string) (*UploadSession, error) { + session := &UploadSession{} + has, err := db.GetEngine(ctx).Where("uuid=?", uuid).Get(session) + if err != nil { + return nil, err + } else if !has { + return nil, ErrUploadSessionNotExist{0, uuid} + } + return session, nil +} + +// GetUploadSessionByID returns an upload session by ID +func GetUploadSessionByID(ctx context.Context, id int64) (*UploadSession, error) { + session := &UploadSession{} + has, err := db.GetEngine(ctx).ID(id).Get(session) + if err != nil { + return nil, err + } else if !has { + return nil, ErrUploadSessionNotExist{ID: id} + } + return session, nil +} + +// UpdateUploadSession updates an upload session +func UpdateUploadSession(ctx context.Context, session *UploadSession) error { + session.UpdatedUnix = timeutil.TimeStamp(time.Now().Unix()) + _, err := db.GetEngine(ctx).ID(session.ID).AllCols().Update(session) + return err +} + +// UpdateUploadSessionChunks updates the chunk counts for a session +func UpdateUploadSessionChunks(ctx context.Context, session *UploadSession) error { + session.UpdatedUnix = timeutil.TimeStamp(time.Now().Unix()) + _, err := db.GetEngine(ctx).ID(session.ID).Cols("chunks_received", "bytes_received", "updated_unix").Update(session) + return err +} + +// DeleteUploadSession deletes an upload session and its temp files +func DeleteUploadSession(ctx context.Context, session *UploadSession) error { + // Delete temp directory + tempDir := session.GetTempPath() + if err := os.RemoveAll(tempDir); err != nil && !os.IsNotExist(err) { + log.Warn("Failed to remove temp directory for upload session %s: %v", session.UUID, err) + } + + _, err := db.GetEngine(ctx).ID(session.ID).Delete(session) + return err +} + +// DeleteUploadSessionByUUID deletes an upload session by UUID +func DeleteUploadSessionByUUID(ctx context.Context, uuid string) error { + session, err := GetUploadSessionByUUID(ctx, uuid) + if err != nil { + return err + } + return DeleteUploadSession(ctx, session) +} + +// GetExpiredUploadSessions returns all expired upload sessions +func GetExpiredUploadSessions(ctx context.Context) ([]*UploadSession, error) { + sessions := make([]*UploadSession, 0, 10) + return sessions, db.GetEngine(ctx). + Where("expires_unix < ? AND status = ?", time.Now().Unix(), UploadSessionStatusActive). + Find(&sessions) +} + +// DeleteExpiredUploadSessions deletes all expired upload sessions +func DeleteExpiredUploadSessions(ctx context.Context) (int, error) { + sessions, err := GetExpiredUploadSessions(ctx) + if err != nil { + return 0, err + } + + count := 0 + for _, session := range sessions { + if err := DeleteUploadSession(ctx, session); err != nil { + log.Warn("Failed to delete expired upload session %s: %v", session.UUID, err) + continue + } + count++ + } + return count, nil +} + +// GetActiveUploadSessionsForRelease returns active upload sessions for a release +func GetActiveUploadSessionsForRelease(ctx context.Context, releaseID int64) ([]*UploadSession, error) { + sessions := make([]*UploadSession, 0) + return sessions, db.GetEngine(ctx). + Where("release_id = ? AND status = ?", releaseID, UploadSessionStatusActive). + Find(&sessions) +} + +// GetUploadSessionsForUser returns all upload sessions for a user +func GetUploadSessionsForUser(ctx context.Context, userID int64) ([]*UploadSession, error) { + sessions := make([]*UploadSession, 0) + return sessions, db.GetEngine(ctx). + Where("uploader_id = ?", userID). + Desc("created_unix"). + Find(&sessions) +} diff --git a/modules/structs/attachment.go b/modules/structs/attachment.go index e9499d2ee7..b53d254904 100644 --- a/modules/structs/attachment.go +++ b/modules/structs/attachment.go @@ -33,3 +33,58 @@ type EditAttachmentOptions struct { // Name is the new filename for the attachment Name string `json:"name"` } + +// UploadSessionResponse response for creating an upload session +// swagger:model +type UploadSessionResponse struct { + // UUID is the unique identifier for this upload session + UUID string `json:"uuid"` + // FileName is the name of the file being uploaded + FileName string `json:"file_name"` + // FileSize is the total size of the file in bytes (-1 if unknown) + FileSize int64 `json:"file_size"` + // ChunkSize is the size of each chunk in bytes + ChunkSize int64 `json:"chunk_size"` + // ChunksExpected is the expected number of chunks (-1 if unknown) + ChunksExpected int64 `json:"chunks_expected"` + // ExpiresAt is the Unix timestamp when this session expires + ExpiresAt int64 `json:"expires_at"` +} + +// UploadChunkResponse response after uploading a chunk +// swagger:model +type UploadChunkResponse struct { + // ChunkNumber is the number of the chunk that was uploaded + ChunkNumber int64 `json:"chunk_number"` + // ChunksReceived is the total number of chunks received so far + ChunksReceived int64 `json:"chunks_received"` + // BytesReceived is the total number of bytes received so far + BytesReceived int64 `json:"bytes_received"` + // Complete indicates whether all chunks have been received + Complete bool `json:"complete"` +} + +// UploadSessionInfo contains information about an upload session for resuming +// swagger:model +type UploadSessionInfo struct { + // UUID is the unique identifier for this upload session + UUID string `json:"uuid"` + // FileName is the name of the file being uploaded + FileName string `json:"file_name"` + // FileSize is the total size of the file in bytes (-1 if unknown) + FileSize int64 `json:"file_size"` + // ChunkSize is the size of each chunk in bytes + ChunkSize int64 `json:"chunk_size"` + // ChunksExpected is the expected number of chunks (-1 if unknown) + ChunksExpected int64 `json:"chunks_expected"` + // ChunksReceived is the number of chunks received so far + ChunksReceived int64 `json:"chunks_received"` + // BytesReceived is the total number of bytes received so far + BytesReceived int64 `json:"bytes_received"` + // ReceivedChunks is the list of chunk numbers that have been received + ReceivedChunks []int64 `json:"received_chunks"` + // Status is the current status of the upload session (active, complete, expired, failed) + Status string `json:"status"` + // ExpiresAt is the Unix timestamp when this session expires + ExpiresAt int64 `json:"expires_at"` +} diff --git a/routers/api/v1/api.go b/routers/api/v1/api.go index 6d37c67cc4..a447a55f26 100644 --- a/routers/api/v1/api.go +++ b/routers/api/v1/api.go @@ -1330,6 +1330,8 @@ func Routes() *web.Router { m.Combo("/{attachment_id}").Get(repo.GetReleaseAttachment). Patch(reqToken(), reqRepoWriter(unit.TypeReleases), bind(api.EditAttachmentOptions{}), repo.EditReleaseAttachment). Delete(reqToken(), reqRepoWriter(unit.TypeReleases), repo.DeleteReleaseAttachment) + // Chunked upload session for large files + m.Post("/upload-session", reqToken(), reqRepoWriter(unit.TypeReleases), repo.CreateUploadSession) }) }) m.Group("/tags", func() { @@ -1338,6 +1340,15 @@ func Routes() *web.Router { Delete(reqToken(), reqRepoWriter(unit.TypeReleases), repo.DeleteReleaseByTag) }) }, reqRepoReader(unit.TypeReleases)) + // Chunked upload session management + m.Group("/uploads", func() { + m.Group("/{session_id}", func() { + m.Get("", repo.GetUploadSession) + m.Delete("", reqToken(), repo.CancelUploadSession) + m.Post("/complete", reqToken(), repo.CompleteUploadSession) + m.Put("/chunks/{chunk_number}", reqToken(), repo.UploadChunk) + }) + }) m.Post("/mirror-sync", reqToken(), reqRepoWriter(unit.TypeCode), mustNotBeArchived, repo.MirrorSync) m.Post("/push_mirrors-sync", reqAdmin(), reqToken(), mustNotBeArchived, repo.PushMirrorSync) m.Group("/push_mirrors", func() { diff --git a/routers/api/v1/repo/upload.go b/routers/api/v1/repo/upload.go new file mode 100644 index 0000000000..faa56cdd91 --- /dev/null +++ b/routers/api/v1/repo/upload.go @@ -0,0 +1,440 @@ +// Copyright 2024 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package repo + +import ( + "net/http" + "strconv" + + repo_model "code.gitea.io/gitea/models/repo" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/setting" + api "code.gitea.io/gitea/modules/structs" + attachment_service "code.gitea.io/gitea/services/attachment" + "code.gitea.io/gitea/services/context" + "code.gitea.io/gitea/services/convert" +) + +// CreateUploadSession creates a new chunked upload session +func CreateUploadSession(ctx *context.APIContext) { + // swagger:operation POST /repos/{owner}/{repo}/releases/{id}/assets/upload-session repository repoCreateUploadSession + // --- + // summary: Create a chunked upload session for a release attachment + // description: Creates a new upload session that allows uploading large files in chunks. Use this for files larger than 100MB or when network reliability is a concern. + // produces: + // - application/json + // consumes: + // - application/json + // parameters: + // - name: owner + // in: path + // description: owner of the repo + // type: string + // required: true + // - name: repo + // in: path + // description: name of the repo + // type: string + // required: true + // - name: id + // in: path + // description: id of the release + // type: integer + // format: int64 + // required: true + // - name: name + // in: query + // description: name of the attachment + // type: string + // required: true + // - name: size + // in: query + // description: total file size in bytes (optional but recommended for validation) + // type: integer + // format: int64 + // required: false + // - name: chunk_size + // in: query + // description: chunk size in bytes (default 10MB, max 100MB) + // type: integer + // format: int64 + // required: false + // responses: + // "201": + // "$ref": "#/responses/UploadSession" + // "400": + // "$ref": "#/responses/error" + // "404": + // "$ref": "#/responses/notFound" + + if !setting.Attachment.Enabled { + ctx.APIErrorNotFound("Attachment is not enabled") + return + } + + releaseID := ctx.PathParamInt64("id") + if !checkReleaseMatchRepo(ctx, releaseID) { + return + } + + filename := ctx.FormString("name") + if filename == "" { + ctx.APIError(http.StatusBadRequest, "name is required") + return + } + + fileSize := ctx.FormInt64("size") + if fileSize == 0 { + fileSize = -1 // Unknown size + } + + chunkSize := ctx.FormInt64("chunk_size") + if chunkSize <= 0 { + chunkSize = repo_model.DefaultChunkSize + } + // Cap chunk size at 100MB + if chunkSize > 100*1024*1024 { + chunkSize = 100 * 1024 * 1024 + } + + session, err := attachment_service.CreateChunkedUploadSession(ctx, attachment_service.ChunkedUploadOptions{ + RepoID: ctx.Repo.Repository.ID, + ReleaseID: releaseID, + UploaderID: ctx.Doer.ID, + FileName: filename, + FileSize: fileSize, + ChunkSize: chunkSize, + }) + if err != nil { + ctx.APIErrorInternal(err) + return + } + + ctx.JSON(http.StatusCreated, api.UploadSessionResponse{ + UUID: session.UUID, + FileName: session.FileName, + FileSize: session.FileSize, + ChunkSize: session.ChunkSize, + ChunksExpected: session.ChunksExpected, + ExpiresAt: int64(session.ExpiresUnix), + }) +} + +// UploadChunk uploads a chunk to an upload session +func UploadChunk(ctx *context.APIContext) { + // swagger:operation PUT /repos/{owner}/{repo}/uploads/{session_id}/chunks/{chunk_number} repository repoUploadChunk + // --- + // summary: Upload a chunk to an upload session + // description: Uploads a single chunk of data to an existing upload session. Chunks can be uploaded in any order. + // produces: + // - application/json + // consumes: + // - application/octet-stream + // parameters: + // - name: owner + // in: path + // description: owner of the repo + // type: string + // required: true + // - name: repo + // in: path + // description: name of the repo + // type: string + // required: true + // - name: session_id + // in: path + // description: upload session UUID + // type: string + // required: true + // - name: chunk_number + // in: path + // description: chunk number (0-indexed) + // type: integer + // format: int64 + // required: true + // - name: body + // in: body + // description: chunk data + // required: true + // responses: + // "200": + // "$ref": "#/responses/UploadChunkResponse" + // "400": + // "$ref": "#/responses/error" + // "404": + // "$ref": "#/responses/notFound" + // "410": + // "$ref": "#/responses/error" + + sessionUUID := ctx.PathParam("session_id") + chunkNumber, err := strconv.ParseInt(ctx.PathParam("chunk_number"), 10, 64) + if err != nil { + ctx.APIError(http.StatusBadRequest, "invalid chunk number") + return + } + + session, err := repo_model.GetUploadSessionByUUID(ctx, sessionUUID) + if err != nil { + if repo_model.IsErrUploadSessionNotExist(err) { + ctx.APIErrorNotFound() + return + } + ctx.APIErrorInternal(err) + return + } + + // Verify session belongs to this repo + if session.RepoID != ctx.Repo.Repository.ID { + ctx.APIErrorNotFound() + return + } + + // Verify uploader + if session.UploaderID != ctx.Doer.ID { + ctx.APIError(http.StatusForbidden, "not the session owner") + return + } + + // Check if session is expired + if session.IsExpired() { + ctx.APIError(http.StatusGone, "upload session has expired") + return + } + + // Get Content-Length for size validation + contentLength := ctx.Req.ContentLength + + err = attachment_service.SaveChunk(ctx, session, chunkNumber, ctx.Req.Body, contentLength) + if err != nil { + if repo_model.IsErrUploadSessionExpired(err) { + ctx.APIError(http.StatusGone, err.Error()) + return + } + ctx.APIError(http.StatusBadRequest, err.Error()) + return + } + + // Refresh session from DB + session, _ = repo_model.GetUploadSessionByUUID(ctx, sessionUUID) + + ctx.JSON(http.StatusOK, api.UploadChunkResponse{ + ChunkNumber: chunkNumber, + ChunksReceived: session.ChunksReceived, + BytesReceived: session.BytesReceived, + Complete: session.IsComplete(), + }) +} + +// GetUploadSession returns information about an upload session +func GetUploadSession(ctx *context.APIContext) { + // swagger:operation GET /repos/{owner}/{repo}/uploads/{session_id} repository repoGetUploadSession + // --- + // summary: Get upload session status + // description: Returns the current status of an upload session, including which chunks have been received. Use this to resume interrupted uploads. + // produces: + // - application/json + // parameters: + // - name: owner + // in: path + // description: owner of the repo + // type: string + // required: true + // - name: repo + // in: path + // description: name of the repo + // type: string + // required: true + // - name: session_id + // in: path + // description: upload session UUID + // type: string + // required: true + // responses: + // "200": + // "$ref": "#/responses/UploadSessionInfo" + // "404": + // "$ref": "#/responses/notFound" + + sessionUUID := ctx.PathParam("session_id") + + session, err := repo_model.GetUploadSessionByUUID(ctx, sessionUUID) + if err != nil { + if repo_model.IsErrUploadSessionNotExist(err) { + ctx.APIErrorNotFound() + return + } + ctx.APIErrorInternal(err) + return + } + + // Verify session belongs to this repo + if session.RepoID != ctx.Repo.Repository.ID { + ctx.APIErrorNotFound() + return + } + + chunks, err := attachment_service.GetReceivedChunks(session) + if err != nil { + ctx.APIErrorInternal(err) + return + } + + statusStr := "active" + switch session.Status { + case repo_model.UploadSessionStatusComplete: + statusStr = "complete" + case repo_model.UploadSessionStatusExpired: + statusStr = "expired" + case repo_model.UploadSessionStatusFailed: + statusStr = "failed" + } + + ctx.JSON(http.StatusOK, api.UploadSessionInfo{ + UUID: session.UUID, + FileName: session.FileName, + FileSize: session.FileSize, + ChunkSize: session.ChunkSize, + ChunksExpected: session.ChunksExpected, + ChunksReceived: session.ChunksReceived, + BytesReceived: session.BytesReceived, + ReceivedChunks: chunks, + Status: statusStr, + ExpiresAt: int64(session.ExpiresUnix), + }) +} + +// CompleteUploadSession finalizes an upload session and creates the attachment +func CompleteUploadSession(ctx *context.APIContext) { + // swagger:operation POST /repos/{owner}/{repo}/uploads/{session_id}/complete repository repoCompleteUploadSession + // --- + // summary: Complete an upload session + // description: Assembles all uploaded chunks into the final file and creates the attachment. Call this after all chunks have been uploaded. + // produces: + // - application/json + // parameters: + // - name: owner + // in: path + // description: owner of the repo + // type: string + // required: true + // - name: repo + // in: path + // description: name of the repo + // type: string + // required: true + // - name: session_id + // in: path + // description: upload session UUID + // type: string + // required: true + // responses: + // "201": + // "$ref": "#/responses/Attachment" + // "400": + // "$ref": "#/responses/error" + // "404": + // "$ref": "#/responses/notFound" + + sessionUUID := ctx.PathParam("session_id") + + session, err := repo_model.GetUploadSessionByUUID(ctx, sessionUUID) + if err != nil { + if repo_model.IsErrUploadSessionNotExist(err) { + ctx.APIErrorNotFound() + return + } + ctx.APIErrorInternal(err) + return + } + + // Verify session belongs to this repo + if session.RepoID != ctx.Repo.Repository.ID { + ctx.APIErrorNotFound() + return + } + + // Verify uploader + if session.UploaderID != ctx.Doer.ID { + ctx.APIError(http.StatusForbidden, "not the session owner") + return + } + + // Check if session is expired + if session.IsExpired() { + ctx.APIError(http.StatusGone, "upload session has expired") + return + } + + // Assemble chunks into final attachment + attach, err := attachment_service.AssembleChunks(ctx, session, setting.Repository.Release.AllowedTypes) + if err != nil { + log.Error("Failed to assemble chunks for session %s: %v", sessionUUID, err) + ctx.APIError(http.StatusBadRequest, err.Error()) + return + } + + ctx.JSON(http.StatusCreated, convert.ToAPIAttachment(ctx.Repo.Repository, attach)) +} + +// CancelUploadSession cancels an upload session +func CancelUploadSession(ctx *context.APIContext) { + // swagger:operation DELETE /repos/{owner}/{repo}/uploads/{session_id} repository repoCancelUploadSession + // --- + // summary: Cancel an upload session + // description: Cancels an upload session and deletes any uploaded chunks. + // produces: + // - application/json + // parameters: + // - name: owner + // in: path + // description: owner of the repo + // type: string + // required: true + // - name: repo + // in: path + // description: name of the repo + // type: string + // required: true + // - name: session_id + // in: path + // description: upload session UUID + // type: string + // required: true + // responses: + // "204": + // "$ref": "#/responses/empty" + // "404": + // "$ref": "#/responses/notFound" + + sessionUUID := ctx.PathParam("session_id") + + session, err := repo_model.GetUploadSessionByUUID(ctx, sessionUUID) + if err != nil { + if repo_model.IsErrUploadSessionNotExist(err) { + ctx.APIErrorNotFound() + return + } + ctx.APIErrorInternal(err) + return + } + + // Verify session belongs to this repo + if session.RepoID != ctx.Repo.Repository.ID { + ctx.APIErrorNotFound() + return + } + + // Verify uploader or repo admin + if session.UploaderID != ctx.Doer.ID && !ctx.Repo.IsAdmin() { + ctx.APIError(http.StatusForbidden, "not the session owner") + return + } + + if err := attachment_service.CancelUploadSession(ctx, session); err != nil { + ctx.APIErrorInternal(err) + return + } + + ctx.Status(http.StatusNoContent) +} diff --git a/services/attachment/chunked.go b/services/attachment/chunked.go new file mode 100644 index 0000000000..e18e7e1e8c --- /dev/null +++ b/services/attachment/chunked.go @@ -0,0 +1,316 @@ +// Copyright 2024 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package attachment + +import ( + "context" + "fmt" + "io" + "os" + "path" + "sort" + "strconv" + "strings" + + "code.gitea.io/gitea/models/db" + repo_model "code.gitea.io/gitea/models/repo" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/modules/storage" + "code.gitea.io/gitea/services/context/upload" + + "github.com/google/uuid" +) + +// ChunkedUploadOptions contains options for creating a chunked upload session +type ChunkedUploadOptions struct { + RepoID int64 + ReleaseID int64 + IssueID int64 + UploaderID int64 + FileName string + FileSize int64 // -1 if unknown + ChunkSize int64 // 0 to use default +} + +// CreateChunkedUploadSession creates a new chunked upload session +func CreateChunkedUploadSession(ctx context.Context, opts ChunkedUploadOptions) (*repo_model.UploadSession, error) { + if opts.ChunkSize <= 0 { + opts.ChunkSize = repo_model.DefaultChunkSize + } + + session := &repo_model.UploadSession{ + UUID: uuid.New().String(), + RepoID: opts.RepoID, + ReleaseID: opts.ReleaseID, + IssueID: opts.IssueID, + UploaderID: opts.UploaderID, + FileName: opts.FileName, + FileSize: opts.FileSize, + ChunkSize: opts.ChunkSize, + Status: repo_model.UploadSessionStatusActive, + } + + if err := repo_model.CreateUploadSession(ctx, session); err != nil { + return nil, fmt.Errorf("failed to create upload session: %w", err) + } + + return session, nil +} + +// SaveChunk saves a chunk to the upload session +func SaveChunk(ctx context.Context, session *repo_model.UploadSession, chunkNumber int64, data io.Reader, size int64) error { + if session.Status != repo_model.UploadSessionStatusActive { + return fmt.Errorf("upload session is not active") + } + + if session.IsExpired() { + session.Status = repo_model.UploadSessionStatusExpired + _ = repo_model.UpdateUploadSession(ctx, session) + return repo_model.ErrUploadSessionExpired{UUID: session.UUID} + } + + // Validate chunk number + if chunkNumber < 0 { + return fmt.Errorf("invalid chunk number: %d", chunkNumber) + } + if session.ChunksExpected > 0 && chunkNumber >= session.ChunksExpected { + return fmt.Errorf("chunk number %d exceeds expected chunks %d", chunkNumber, session.ChunksExpected) + } + + // Ensure temp directory exists + tempDir := session.GetTempPath() + if err := os.MkdirAll(tempDir, 0o755); err != nil { + return fmt.Errorf("failed to create temp directory: %w", err) + } + + // Write chunk to temp file + chunkPath := session.GetChunkPath(chunkNumber) + file, err := os.Create(chunkPath) + if err != nil { + return fmt.Errorf("failed to create chunk file: %w", err) + } + defer file.Close() + + written, err := io.Copy(file, data) + if err != nil { + os.Remove(chunkPath) + return fmt.Errorf("failed to write chunk: %w", err) + } + + // Validate size if provided + if size > 0 && written != size { + os.Remove(chunkPath) + return fmt.Errorf("chunk size mismatch: expected %d, got %d", size, written) + } + + // Update session + session.ChunksReceived++ + session.BytesReceived += written + if err := repo_model.UpdateUploadSessionChunks(ctx, session); err != nil { + return fmt.Errorf("failed to update session: %w", err) + } + + log.Debug("Saved chunk %d for session %s (total: %d chunks, %d bytes)", + chunkNumber, session.UUID, session.ChunksReceived, session.BytesReceived) + + return nil +} + +// GetReceivedChunks returns the list of chunk numbers that have been received +func GetReceivedChunks(session *repo_model.UploadSession) ([]int64, error) { + tempDir := session.GetTempPath() + entries, err := os.ReadDir(tempDir) + if err != nil { + if os.IsNotExist(err) { + return []int64{}, nil + } + return nil, err + } + + chunks := make([]int64, 0, len(entries)) + for _, entry := range entries { + if entry.IsDir() { + continue + } + name := entry.Name() + if !strings.HasPrefix(name, "chunk_") { + continue + } + numStr := strings.TrimPrefix(name, "chunk_") + num, err := strconv.ParseInt(numStr, 10, 64) + if err != nil { + continue + } + chunks = append(chunks, num) + } + + sort.Slice(chunks, func(i, j int) bool { return chunks[i] < chunks[j] }) + return chunks, nil +} + +// AssembleChunks assembles all chunks into a single file and creates an attachment +func AssembleChunks(ctx context.Context, session *repo_model.UploadSession, allowedTypes string) (*repo_model.Attachment, error) { + if session.Status != repo_model.UploadSessionStatusActive { + return nil, fmt.Errorf("upload session is not active") + } + + // Get list of chunks + chunks, err := GetReceivedChunks(session) + if err != nil { + return nil, fmt.Errorf("failed to get received chunks: %w", err) + } + + if len(chunks) == 0 { + return nil, fmt.Errorf("no chunks received") + } + + // Verify we have all chunks in sequence + if session.ChunksExpected > 0 { + if int64(len(chunks)) != session.ChunksExpected { + return nil, fmt.Errorf("chunk count mismatch: expected %d, got %d", session.ChunksExpected, len(chunks)) + } + } + + // Verify chunks are sequential starting from 0 + for i, chunk := range chunks { + if chunk != int64(i) { + return nil, fmt.Errorf("missing chunk %d", i) + } + } + + // Create a reader that concatenates all chunks + readers := make([]io.Reader, 0, len(chunks)) + files := make([]*os.File, 0, len(chunks)) + + defer func() { + for _, f := range files { + f.Close() + } + }() + + for _, chunkNum := range chunks { + chunkPath := session.GetChunkPath(chunkNum) + f, err := os.Open(chunkPath) + if err != nil { + return nil, fmt.Errorf("failed to open chunk %d: %w", chunkNum, err) + } + files = append(files, f) + readers = append(readers, f) + } + + // Read first 1KB for type verification + combinedReader := io.MultiReader(readers...) + buf := make([]byte, 1024) + n, _ := io.ReadAtLeast(combinedReader, buf, 1) + buf = buf[:n] + + // Verify file type + if err := upload.Verify(buf, session.FileName, allowedTypes); err != nil { + session.Status = repo_model.UploadSessionStatusFailed + _ = repo_model.UpdateUploadSession(ctx, session) + return nil, err + } + + // Re-open files and create combined reader with the buffer + for _, f := range files { + f.Close() + } + files = files[:0] + readers = readers[:0] + + for _, chunkNum := range chunks { + chunkPath := session.GetChunkPath(chunkNum) + f, err := os.Open(chunkPath) + if err != nil { + return nil, fmt.Errorf("failed to reopen chunk %d: %w", chunkNum, err) + } + files = append(files, f) + readers = append(readers, f) + } + + finalReader := io.MultiReader(readers...) + + // Create the attachment + attach := &repo_model.Attachment{ + UUID: uuid.New().String(), + RepoID: session.RepoID, + ReleaseID: session.ReleaseID, + IssueID: session.IssueID, + UploaderID: session.UploaderID, + Name: session.FileName, + } + + err = db.WithTx(ctx, func(ctx context.Context) error { + // Save to storage + size, err := storage.Attachments.Save(attach.RelativePath(), finalReader, session.BytesReceived) + if err != nil { + return fmt.Errorf("failed to save attachment: %w", err) + } + attach.Size = size + + // Insert into database + if err := db.Insert(ctx, attach); err != nil { + // Clean up storage on failure + _ = storage.Attachments.Delete(attach.RelativePath()) + return fmt.Errorf("failed to insert attachment: %w", err) + } + + return nil + }) + + if err != nil { + session.Status = repo_model.UploadSessionStatusFailed + _ = repo_model.UpdateUploadSession(ctx, session) + return nil, err + } + + // Mark session as complete and clean up + session.Status = repo_model.UploadSessionStatusComplete + _ = repo_model.UpdateUploadSession(ctx, session) + + // Clean up temp files (in background to not block response) + go func() { + if err := os.RemoveAll(session.GetTempPath()); err != nil { + log.Warn("Failed to clean up temp files for session %s: %v", session.UUID, err) + } + }() + + log.Info("Assembled %d chunks into attachment %s (%d bytes) for session %s", + len(chunks), attach.Name, attach.Size, session.UUID) + + return attach, nil +} + +// CancelUploadSession cancels an upload session and cleans up +func CancelUploadSession(ctx context.Context, session *repo_model.UploadSession) error { + session.Status = repo_model.UploadSessionStatusFailed + if err := repo_model.UpdateUploadSession(ctx, session); err != nil { + return err + } + + // Clean up temp files + if err := os.RemoveAll(session.GetTempPath()); err != nil && !os.IsNotExist(err) { + log.Warn("Failed to clean up temp files for session %s: %v", session.UUID, err) + } + + return repo_model.DeleteUploadSession(ctx, session) +} + +// CleanupExpiredSessions is intended to be run periodically to clean up expired sessions +func CleanupExpiredSessions(ctx context.Context) error { + count, err := repo_model.DeleteExpiredUploadSessions(ctx) + if err != nil { + return err + } + if count > 0 { + log.Info("Cleaned up %d expired upload sessions", count) + } + return nil +} + +// GetTempStoragePath returns the base path for chunked upload temp storage +func GetTempStoragePath() string { + return path.Join(setting.AppDataPath, "upload-sessions") +} diff --git a/services/cron/tasks_extended.go b/services/cron/tasks_extended.go index 74fb12430d..14c033a2f4 100644 --- a/services/cron/tasks_extended.go +++ b/services/cron/tasks_extended.go @@ -15,6 +15,7 @@ import ( "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/updatechecker" asymkey_service "code.gitea.io/gitea/services/asymkey" + attachment_service "code.gitea.io/gitea/services/attachment" repo_service "code.gitea.io/gitea/services/repository" archiver_service "code.gitea.io/gitea/services/repository/archiver" user_service "code.gitea.io/gitea/services/user" @@ -224,6 +225,16 @@ func registerRebuildIssueIndexer() { }) } +func registerCleanupExpiredUploadSessions() { + RegisterTaskFatal("cleanup_expired_upload_sessions", &BaseConfig{ + Enabled: true, + RunAtStart: true, + Schedule: "@every 1h", + }, func(ctx context.Context, _ *user_model.User, _ Config) error { + return attachment_service.CleanupExpiredSessions(ctx) + }) +} + func initExtendedTasks() { registerDeleteInactiveUsers() registerDeleteRepositoryArchives() @@ -239,4 +250,5 @@ func initExtendedTasks() { registerDeleteOldSystemNotices() registerGCLFS() registerRebuildIssueIndexer() + registerCleanupExpiredUploadSessions() }