// Copyright 2026 MarketAlly. All rights reserved. // SPDX-License-Identifier: MIT package attachment import ( "context" "crypto/sha256" "encoding/hex" "errors" "fmt" "io" "os" "path" "slices" "strconv" "strings" "code.gitea.io/gitea/models/db" repo_model "code.gitea.io/gitea/models/repo" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/storage" "code.gitea.io/gitea/services/context/upload" "github.com/google/uuid" ) // ChunkedUploadOptions contains options for creating a chunked upload session type ChunkedUploadOptions struct { RepoID int64 ReleaseID int64 IssueID int64 UploaderID int64 FileName string FileSize int64 // -1 if unknown ChunkSize int64 // 0 to use default } // CreateChunkedUploadSession creates a new chunked upload session func CreateChunkedUploadSession(ctx context.Context, opts ChunkedUploadOptions) (*repo_model.UploadSession, error) { if opts.ChunkSize <= 0 { opts.ChunkSize = repo_model.DefaultChunkSize } session := &repo_model.UploadSession{ UUID: uuid.New().String(), RepoID: opts.RepoID, ReleaseID: opts.ReleaseID, IssueID: opts.IssueID, UploaderID: opts.UploaderID, FileName: opts.FileName, FileSize: opts.FileSize, ChunkSize: opts.ChunkSize, Status: repo_model.UploadSessionStatusActive, } if err := repo_model.CreateUploadSession(ctx, session); err != nil { return nil, fmt.Errorf("failed to create upload session: %w", err) } return session, nil } // ErrChecksumMismatch is returned when the chunk checksum doesn't match type ErrChecksumMismatch struct { Expected string Actual string } func (e ErrChecksumMismatch) Error() string { return fmt.Sprintf("checksum mismatch: expected %s, got %s", e.Expected, e.Actual) } // IsErrChecksumMismatch returns true if the error is a checksum mismatch func IsErrChecksumMismatch(err error) bool { _, ok := err.(ErrChecksumMismatch) return ok } // ChunkSaveOptions contains options for saving a chunk type ChunkSaveOptions struct { ChunkNumber int64 Data io.Reader Size int64 // Checksum is the expected SHA-256 checksum of the chunk (hex-encoded) // If empty, checksum verification is skipped Checksum string } // SaveChunk saves a chunk to the upload session func SaveChunk(ctx context.Context, session *repo_model.UploadSession, chunkNumber int64, data io.Reader, size int64) error { return SaveChunkWithOptions(ctx, session, ChunkSaveOptions{ ChunkNumber: chunkNumber, Data: data, Size: size, }) } // SaveChunkWithOptions saves a chunk to the upload session with additional options func SaveChunkWithOptions(ctx context.Context, session *repo_model.UploadSession, opts ChunkSaveOptions) error { if session.Status != repo_model.UploadSessionStatusActive { return errors.New("upload session is not active") } if session.IsExpired() { session.Status = repo_model.UploadSessionStatusExpired _ = repo_model.UpdateUploadSession(ctx, session) return repo_model.ErrUploadSessionExpired{UUID: session.UUID} } // Validate chunk number if opts.ChunkNumber < 0 { return fmt.Errorf("invalid chunk number: %d", opts.ChunkNumber) } if session.ChunksExpected > 0 && opts.ChunkNumber >= session.ChunksExpected { return fmt.Errorf("chunk number %d exceeds expected chunks %d", opts.ChunkNumber, session.ChunksExpected) } // Ensure temp directory exists tempDir := session.GetTempPath() if err := os.MkdirAll(tempDir, 0o755); err != nil { return fmt.Errorf("failed to create temp directory: %w", err) } // Write chunk to temp file, computing checksum if needed chunkPath := session.GetChunkPath(opts.ChunkNumber) file, err := os.Create(chunkPath) if err != nil { return fmt.Errorf("failed to create chunk file: %w", err) } defer file.Close() var written int64 var actualChecksum string if opts.Checksum != "" { // Compute checksum while writing hasher := sha256.New() multiWriter := io.MultiWriter(file, hasher) written, err = io.Copy(multiWriter, opts.Data) if err != nil { os.Remove(chunkPath) return fmt.Errorf("failed to write chunk: %w", err) } actualChecksum = hex.EncodeToString(hasher.Sum(nil)) } else { written, err = io.Copy(file, opts.Data) if err != nil { os.Remove(chunkPath) return fmt.Errorf("failed to write chunk: %w", err) } } // Validate size if provided if opts.Size > 0 && written != opts.Size { os.Remove(chunkPath) return fmt.Errorf("chunk size mismatch: expected %d, got %d", opts.Size, written) } // Verify checksum if provided if opts.Checksum != "" && actualChecksum != opts.Checksum { os.Remove(chunkPath) return ErrChecksumMismatch{Expected: opts.Checksum, Actual: actualChecksum} } // Update session session.ChunksReceived++ session.BytesReceived += written if err := repo_model.UpdateUploadSessionChunks(ctx, session); err != nil { return fmt.Errorf("failed to update session: %w", err) } log.Debug("Saved chunk %d for session %s (total: %d chunks, %d bytes)", opts.ChunkNumber, session.UUID, session.ChunksReceived, session.BytesReceived) return nil } // GetReceivedChunks returns the list of chunk numbers that have been received func GetReceivedChunks(session *repo_model.UploadSession) ([]int64, error) { tempDir := session.GetTempPath() entries, err := os.ReadDir(tempDir) if err != nil { if os.IsNotExist(err) { return []int64{}, nil } return nil, err } chunks := make([]int64, 0, len(entries)) for _, entry := range entries { if entry.IsDir() { continue } name := entry.Name() if !strings.HasPrefix(name, "chunk_") { continue } numStr := strings.TrimPrefix(name, "chunk_") num, err := strconv.ParseInt(numStr, 10, 64) if err != nil { continue } chunks = append(chunks, num) } slices.Sort(chunks) return chunks, nil } // AssembleChunks assembles all chunks into a single file and creates an attachment func AssembleChunks(ctx context.Context, session *repo_model.UploadSession, allowedTypes string) (*repo_model.Attachment, error) { if session.Status != repo_model.UploadSessionStatusActive { return nil, errors.New("upload session is not active") } // Get list of chunks chunks, err := GetReceivedChunks(session) if err != nil { return nil, fmt.Errorf("failed to get received chunks: %w", err) } if len(chunks) == 0 { return nil, errors.New("no chunks received") } // Verify we have all chunks in sequence if session.ChunksExpected > 0 { if int64(len(chunks)) != session.ChunksExpected { return nil, fmt.Errorf("chunk count mismatch: expected %d, got %d", session.ChunksExpected, len(chunks)) } } // Verify chunks are sequential starting from 0 for i, chunk := range chunks { if chunk != int64(i) { return nil, fmt.Errorf("missing chunk %d", i) } } // Create a reader that concatenates all chunks readers := make([]io.Reader, 0, len(chunks)) files := make([]*os.File, 0, len(chunks)) defer func() { for _, f := range files { f.Close() } }() for _, chunkNum := range chunks { chunkPath := session.GetChunkPath(chunkNum) f, err := os.Open(chunkPath) if err != nil { return nil, fmt.Errorf("failed to open chunk %d: %w", chunkNum, err) } files = append(files, f) readers = append(readers, f) } // Read first 1KB for type verification combinedReader := io.MultiReader(readers...) buf := make([]byte, 1024) n, _ := io.ReadAtLeast(combinedReader, buf, 1) buf = buf[:n] // Verify file type if err := upload.Verify(buf, session.FileName, allowedTypes); err != nil { session.Status = repo_model.UploadSessionStatusFailed _ = repo_model.UpdateUploadSession(ctx, session) return nil, err } // Re-open files and create combined reader with the buffer for _, f := range files { f.Close() } files = files[:0] readers = readers[:0] for _, chunkNum := range chunks { chunkPath := session.GetChunkPath(chunkNum) f, err := os.Open(chunkPath) if err != nil { return nil, fmt.Errorf("failed to reopen chunk %d: %w", chunkNum, err) } files = append(files, f) readers = append(readers, f) } finalReader := io.MultiReader(readers...) // Create the attachment attach := &repo_model.Attachment{ UUID: uuid.New().String(), RepoID: session.RepoID, ReleaseID: session.ReleaseID, IssueID: session.IssueID, UploaderID: session.UploaderID, Name: session.FileName, } err = db.WithTx(ctx, func(ctx context.Context) error { // Save to storage size, err := storage.Attachments.Save(attach.RelativePath(), finalReader, session.BytesReceived) if err != nil { return fmt.Errorf("failed to save attachment: %w", err) } attach.Size = size // Insert into database if err := db.Insert(ctx, attach); err != nil { // Clean up storage on failure _ = storage.Attachments.Delete(attach.RelativePath()) return fmt.Errorf("failed to insert attachment: %w", err) } return nil }) if err != nil { session.Status = repo_model.UploadSessionStatusFailed _ = repo_model.UpdateUploadSession(ctx, session) return nil, err } // Mark session as complete and clean up session.Status = repo_model.UploadSessionStatusComplete _ = repo_model.UpdateUploadSession(ctx, session) // Clean up temp files (in background to not block response) go func() { if err := os.RemoveAll(session.GetTempPath()); err != nil { log.Warn("Failed to clean up temp files for session %s: %v", session.UUID, err) } }() log.Info("Assembled %d chunks into attachment %s (%d bytes) for session %s", len(chunks), attach.Name, attach.Size, session.UUID) return attach, nil } // CancelUploadSession cancels an upload session and cleans up func CancelUploadSession(ctx context.Context, session *repo_model.UploadSession) error { session.Status = repo_model.UploadSessionStatusFailed if err := repo_model.UpdateUploadSession(ctx, session); err != nil { return err } // Clean up temp files if err := os.RemoveAll(session.GetTempPath()); err != nil && !os.IsNotExist(err) { log.Warn("Failed to clean up temp files for session %s: %v", session.UUID, err) } return repo_model.DeleteUploadSession(ctx, session) } // CleanupExpiredSessions is intended to be run periodically to clean up expired sessions func CleanupExpiredSessions(ctx context.Context) error { count, err := repo_model.DeleteExpiredUploadSessions(ctx) if err != nil { return err } if count > 0 { log.Info("Cleaned up %d expired upload sessions", count) } return nil } // GetTempStoragePath returns the base path for chunked upload temp storage func GetTempStoragePath() string { return path.Join(setting.AppDataPath, "upload-sessions") }