Some checks failed
Build and Release / Lint and Test (push) Successful in 19m14s
Build and Release / Build Binaries (amd64, windows) (push) Has been cancelled
Build and Release / Build Binaries (arm64, darwin) (push) Has been cancelled
Build and Release / Build Binaries (arm64, linux) (push) Has been cancelled
Build and Release / Build Docker Image (push) Has been cancelled
Build and Release / Create Release (push) Has been cancelled
Build and Release / Build Binaries (amd64, linux) (push) Has been cancelled
Build and Release / Build Binaries (amd64, darwin) (push) Has been cancelled
Replace wg.Add(1)/go func()/defer wg.Done() pattern with the simpler wg.Go() method as required by the modernize linter. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
345 lines
8.3 KiB
Go
345 lines
8.3 KiB
Go
// Copyright 2026 The Gitea Authors. All rights reserved.
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
package gitea
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"io"
|
|
"net/url"
|
|
"strconv"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
// UploadSession represents a chunked upload session
|
|
type UploadSession struct {
|
|
ID string `json:"id"`
|
|
FileName string `json:"file_name"`
|
|
FileSize int64 `json:"file_size"`
|
|
ChunkSize int64 `json:"chunk_size"`
|
|
TotalChunks int64 `json:"total_chunks"`
|
|
ChunksReceived int64 `json:"chunks_received"`
|
|
Status string `json:"status"`
|
|
ExpiresAt time.Time `json:"expires_at"`
|
|
Checksum string `json:"checksum,omitempty"`
|
|
}
|
|
|
|
// UploadResult represents the result of a completed upload
|
|
type UploadResult struct {
|
|
ID int64 `json:"id"`
|
|
Name string `json:"name"`
|
|
Size int64 `json:"size"`
|
|
DownloadURL string `json:"browser_download_url"`
|
|
ChecksumVerified bool `json:"checksum_verified"`
|
|
}
|
|
|
|
// Progress represents upload progress
|
|
type Progress struct {
|
|
BytesDone int64
|
|
BytesTotal int64
|
|
ChunksDone int64
|
|
ChunksTotal int64
|
|
Percent float64
|
|
Speed float64 // bytes per second
|
|
ETA time.Duration
|
|
}
|
|
|
|
// ProgressFunc is called with progress updates
|
|
type ProgressFunc func(Progress)
|
|
|
|
// ChunkedUploadOptions configures a chunked upload
|
|
type ChunkedUploadOptions struct {
|
|
FileName string
|
|
ChunkSize int64
|
|
Parallel int
|
|
VerifyChecksum bool
|
|
OnProgress ProgressFunc
|
|
}
|
|
|
|
// ChunkedUpload handles large file uploads
|
|
type ChunkedUpload struct {
|
|
client *Client
|
|
owner string
|
|
repo string
|
|
release string
|
|
session *UploadSession
|
|
options ChunkedUploadOptions
|
|
|
|
bytesWritten int64
|
|
startTime time.Time
|
|
}
|
|
|
|
// CreateChunkedUpload starts a new chunked upload session
|
|
func (c *Client) CreateChunkedUpload(ctx context.Context, owner, repo string, releaseID int64, opts ChunkedUploadOptions) (*ChunkedUpload, error) {
|
|
if opts.ChunkSize == 0 {
|
|
opts.ChunkSize = 10 * 1024 * 1024 // 10MB default
|
|
}
|
|
if opts.Parallel == 0 {
|
|
opts.Parallel = 4
|
|
}
|
|
|
|
return &ChunkedUpload{
|
|
client: c,
|
|
owner: owner,
|
|
repo: repo,
|
|
release: strconv.FormatInt(releaseID, 10),
|
|
options: opts,
|
|
}, nil
|
|
}
|
|
|
|
// Upload uploads the file from the reader
|
|
func (cu *ChunkedUpload) Upload(ctx context.Context, reader io.Reader, size int64) (*UploadResult, error) {
|
|
cu.startTime = time.Now()
|
|
|
|
// Calculate checksum if requested
|
|
var checksum string
|
|
var data []byte
|
|
var err error
|
|
|
|
// Read all data into memory for checksum calculation
|
|
// For very large files, this should be optimized to stream
|
|
data, err = io.ReadAll(reader)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to read file: %w", err)
|
|
}
|
|
|
|
if cu.options.VerifyChecksum {
|
|
hash := sha256.Sum256(data)
|
|
checksum = hex.EncodeToString(hash[:])
|
|
}
|
|
|
|
// Create session
|
|
session, err := cu.createSession(ctx, size, checksum)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create session: %w", err)
|
|
}
|
|
cu.session = session
|
|
|
|
// Upload chunks
|
|
if err := cu.uploadChunks(ctx, bytes.NewReader(data)); err != nil {
|
|
return nil, fmt.Errorf("failed to upload chunks: %w", err)
|
|
}
|
|
|
|
// Complete upload
|
|
result, err := cu.complete(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to complete upload: %w", err)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func (cu *ChunkedUpload) createSession(ctx context.Context, size int64, checksum string) (*UploadSession, error) {
|
|
path := fmt.Sprintf("/api/v1/repos/%s/%s/releases/%s/assets/upload-session",
|
|
url.PathEscape(cu.owner), url.PathEscape(cu.repo), cu.release)
|
|
|
|
body := map[string]any{
|
|
"name": cu.options.FileName,
|
|
"size": size,
|
|
"chunk_size": cu.options.ChunkSize,
|
|
}
|
|
if checksum != "" {
|
|
body["checksum"] = checksum
|
|
}
|
|
|
|
var session UploadSession
|
|
if err := cu.client.doRequest(ctx, "POST", path, body, &session); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &session, nil
|
|
}
|
|
|
|
func (cu *ChunkedUpload) uploadChunks(ctx context.Context, reader io.ReaderAt) error {
|
|
totalChunks := cu.session.TotalChunks
|
|
chunkSize := cu.session.ChunkSize
|
|
fileSize := cu.session.FileSize
|
|
|
|
// Worker pool
|
|
type job struct {
|
|
number int64
|
|
data []byte
|
|
}
|
|
|
|
jobs := make(chan job, cu.options.Parallel)
|
|
errors := make(chan error, totalChunks)
|
|
var wg sync.WaitGroup
|
|
|
|
// Start workers
|
|
for range cu.options.Parallel {
|
|
wg.Go(func() {
|
|
for j := range jobs {
|
|
if err := cu.uploadChunk(ctx, j.number, j.data); err != nil {
|
|
errors <- err
|
|
return
|
|
}
|
|
atomic.AddInt64(&cu.bytesWritten, int64(len(j.data)))
|
|
cu.reportProgress()
|
|
}
|
|
})
|
|
}
|
|
|
|
// Queue chunks
|
|
for chunkNum := range totalChunks {
|
|
offset := chunkNum * chunkSize
|
|
size := chunkSize
|
|
if chunkNum == totalChunks-1 {
|
|
size = fileSize - offset
|
|
}
|
|
|
|
data := make([]byte, size)
|
|
n, err := reader.ReadAt(data, offset)
|
|
if err != nil && err != io.EOF {
|
|
close(jobs)
|
|
return fmt.Errorf("failed to read chunk %d: %w", chunkNum, err)
|
|
}
|
|
data = data[:n]
|
|
|
|
select {
|
|
case err := <-errors:
|
|
close(jobs)
|
|
return err
|
|
case jobs <- job{number: chunkNum, data: data}:
|
|
case <-ctx.Done():
|
|
close(jobs)
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
|
|
close(jobs)
|
|
wg.Wait()
|
|
|
|
select {
|
|
case err := <-errors:
|
|
return err
|
|
default:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (cu *ChunkedUpload) uploadChunk(ctx context.Context, chunkNum int64, data []byte) error {
|
|
path := fmt.Sprintf("/api/v1/repos/uploads/%s/chunks/%d", cu.session.ID, chunkNum)
|
|
return cu.client.doRequestRaw(ctx, "PUT", path, bytes.NewReader(data), "application/octet-stream", nil)
|
|
}
|
|
|
|
func (cu *ChunkedUpload) complete(ctx context.Context) (*UploadResult, error) {
|
|
path := fmt.Sprintf("/api/v1/repos/uploads/%s/complete", cu.session.ID)
|
|
|
|
var result UploadResult
|
|
if err := cu.client.doRequest(ctx, "POST", path, nil, &result); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &result, nil
|
|
}
|
|
|
|
func (cu *ChunkedUpload) reportProgress() {
|
|
if cu.options.OnProgress == nil {
|
|
return
|
|
}
|
|
|
|
bytesWritten := atomic.LoadInt64(&cu.bytesWritten)
|
|
elapsed := time.Since(cu.startTime).Seconds()
|
|
|
|
var speed float64
|
|
var eta time.Duration
|
|
if elapsed > 0 {
|
|
speed = float64(bytesWritten) / elapsed
|
|
if speed > 0 {
|
|
remaining := cu.session.FileSize - bytesWritten
|
|
eta = time.Duration(float64(remaining)/speed) * time.Second
|
|
}
|
|
}
|
|
|
|
var percent float64
|
|
if cu.session.FileSize > 0 {
|
|
percent = float64(bytesWritten) / float64(cu.session.FileSize) * 100
|
|
}
|
|
|
|
chunksTotal := cu.session.TotalChunks
|
|
chunksDone := min(bytesWritten/cu.session.ChunkSize, chunksTotal)
|
|
|
|
cu.options.OnProgress(Progress{
|
|
BytesDone: bytesWritten,
|
|
BytesTotal: cu.session.FileSize,
|
|
ChunksDone: chunksDone,
|
|
ChunksTotal: chunksTotal,
|
|
Percent: percent,
|
|
Speed: speed,
|
|
ETA: eta,
|
|
})
|
|
}
|
|
|
|
// GetSession returns the current session
|
|
func (cu *ChunkedUpload) GetSession() *UploadSession {
|
|
return cu.session
|
|
}
|
|
|
|
// Cancel cancels the upload
|
|
func (cu *ChunkedUpload) Cancel(ctx context.Context) error {
|
|
if cu.session == nil {
|
|
return nil
|
|
}
|
|
|
|
path := "/api/v1/repos/uploads/" + cu.session.ID
|
|
return cu.client.doRequest(ctx, "DELETE", path, nil, nil)
|
|
}
|
|
|
|
// UploadReleaseAsset is a convenience method for uploading a release asset
|
|
func (c *Client) UploadReleaseAsset(ctx context.Context, owner, repo string, releaseID int64, fileName string, reader io.Reader, size int64, opts ...UploadOption) (*UploadResult, error) {
|
|
uploadOpts := ChunkedUploadOptions{
|
|
FileName: fileName,
|
|
ChunkSize: 10 * 1024 * 1024,
|
|
Parallel: 4,
|
|
VerifyChecksum: true,
|
|
}
|
|
|
|
for _, opt := range opts {
|
|
opt(&uploadOpts)
|
|
}
|
|
|
|
upload, err := c.CreateChunkedUpload(ctx, owner, repo, releaseID, uploadOpts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return upload.Upload(ctx, reader, size)
|
|
}
|
|
|
|
// UploadOption configures an upload
|
|
type UploadOption func(*ChunkedUploadOptions)
|
|
|
|
// WithChunkSize sets the chunk size
|
|
func WithChunkSize(size int64) UploadOption {
|
|
return func(o *ChunkedUploadOptions) {
|
|
o.ChunkSize = size
|
|
}
|
|
}
|
|
|
|
// WithParallel sets the number of parallel uploads
|
|
func WithParallel(n int) UploadOption {
|
|
return func(o *ChunkedUploadOptions) {
|
|
o.Parallel = n
|
|
}
|
|
}
|
|
|
|
// WithProgress sets the progress callback
|
|
func WithProgress(fn ProgressFunc) UploadOption {
|
|
return func(o *ChunkedUploadOptions) {
|
|
o.OnProgress = fn
|
|
}
|
|
}
|
|
|
|
// WithChecksum enables checksum verification
|
|
func WithChecksum(verify bool) UploadOption {
|
|
return func(o *ChunkedUploadOptions) {
|
|
o.VerifyChecksum = verify
|
|
}
|
|
}
|