gitea/sdk/python/gitea/client.py
logikonline ad82306b52 feat(sdk): add CLI tool and SDK libraries for developer tooling (Phase 4)
Add comprehensive developer tooling for Gitea integration:

CLI Tool (cmd/gitea-cli/):
- gitea-cli auth login/logout/status - Authentication management
- gitea-cli upload release-asset - Chunked upload with progress
- gitea-cli upload resume - Resume interrupted uploads
- gitea-cli upload list - List pending upload sessions
- Parallel chunk uploads with configurable workers
- SHA256 checksum verification
- Progress bar with speed and ETA display

Go SDK (sdk/go/):
- GiteaClient with token authentication
- User, Repository, Release, Attachment types
- ChunkedUpload with parallel workers
- Progress callbacks for upload tracking
- Functional options pattern (WithChunkSize, WithParallel, etc.)

Python SDK (sdk/python/):
- GiteaClient with requests-based HTTP
- Full type hints and dataclasses
- ThreadPoolExecutor for parallel uploads
- Resume capability for interrupted uploads
- Exception hierarchy (APIError, UploadError, etc.)

TypeScript SDK (sdk/typescript/):
- Full TypeScript types and interfaces
- Async/await API design
- Browser and Node.js compatible
- Web Crypto API for checksums
- ESM and CJS build outputs

All SDKs support:
- Chunked uploads for large files
- Parallel upload workers
- Progress tracking with callbacks
- Checksum verification
- Resume interrupted uploads

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-09 12:07:07 -05:00

421 lines
14 KiB
Python

# Copyright 2026 The Gitea Authors. All rights reserved.
# SPDX-License-Identifier: MIT
"""Gitea API client with chunked upload support."""
import hashlib
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from io import BytesIO
from typing import BinaryIO, Callable, Dict, Any, List, Optional
from urllib.parse import urljoin, quote
import requests
from .exceptions import APIError, AuthenticationError, NotFoundError, UploadError
from .models import (
User,
Repository,
Release,
UploadSession,
UploadResult,
Progress,
)
class GiteaClient:
"""Client for the Gitea API.
Args:
base_url: The base URL of the Gitea instance (e.g., "https://gitea.example.com")
token: API token for authentication
timeout: Request timeout in seconds (default: 30)
"""
DEFAULT_CHUNK_SIZE = 10 * 1024 * 1024 # 10MB
MAX_PARALLEL = 8
def __init__(
self,
base_url: str,
token: Optional[str] = None,
timeout: int = 30,
):
self.base_url = base_url.rstrip("/")
self.token = token
self.timeout = timeout
self.session = requests.Session()
self.session.headers.update({
"User-Agent": "gitea-sdk-python/1.0",
})
if token:
self.session.headers["Authorization"] = f"token {token}"
def _url(self, path: str) -> str:
"""Build full URL for API path."""
return urljoin(self.base_url, path)
def _request(
self,
method: str,
path: str,
json: Optional[Dict[str, Any]] = None,
data: Optional[bytes] = None,
headers: Optional[Dict[str, str]] = None,
) -> Dict[str, Any]:
"""Make an API request."""
url = self._url(path)
req_headers = headers or {}
try:
if data is not None:
resp = self.session.request(
method,
url,
data=data,
headers=req_headers,
timeout=self.timeout,
)
else:
resp = self.session.request(
method,
url,
json=json,
timeout=self.timeout,
)
except requests.RequestException as e:
raise APIError(f"Request failed: {e}")
if resp.status_code == 401:
raise AuthenticationError()
elif resp.status_code == 404:
raise NotFoundError("Resource")
elif resp.status_code >= 400:
try:
error_data = resp.json()
raise APIError(
message=error_data.get("message", resp.text),
code=error_data.get("code"),
status=resp.status_code,
details=error_data.get("details"),
)
except ValueError:
raise APIError(resp.text, status=resp.status_code)
if resp.status_code == 204 or not resp.content:
return {}
try:
return resp.json()
except ValueError:
return {"content": resp.text}
# User methods
def get_current_user(self) -> User:
"""Get the currently authenticated user."""
data = self._request("GET", "/api/v1/user")
return User.from_dict(data)
def get_user(self, username: str) -> User:
"""Get a user by username."""
data = self._request("GET", f"/api/v1/users/{quote(username)}")
return User.from_dict(data)
# Repository methods
def get_repository(self, owner: str, repo: str) -> Repository:
"""Get a repository by owner and name."""
data = self._request("GET", f"/api/v1/repos/{quote(owner)}/{quote(repo)}")
return Repository.from_dict(data)
def list_user_repos(self, username: str) -> List[Repository]:
"""List repositories for a user."""
data = self._request("GET", f"/api/v1/users/{quote(username)}/repos")
return [Repository.from_dict(r) for r in data]
# Release methods
def get_release(self, owner: str, repo: str, release_id: int) -> Release:
"""Get a release by ID."""
data = self._request(
"GET",
f"/api/v1/repos/{quote(owner)}/{quote(repo)}/releases/{release_id}",
)
return Release.from_dict(data)
def get_release_by_tag(self, owner: str, repo: str, tag: str) -> Release:
"""Get a release by tag name."""
data = self._request(
"GET",
f"/api/v1/repos/{quote(owner)}/{quote(repo)}/releases/tags/{quote(tag)}",
)
return Release.from_dict(data)
def list_releases(self, owner: str, repo: str) -> List[Release]:
"""List all releases for a repository."""
data = self._request(
"GET",
f"/api/v1/repos/{quote(owner)}/{quote(repo)}/releases",
)
return [Release.from_dict(r) for r in data]
# Chunked upload methods
def upload_release_asset(
self,
owner: str,
repo: str,
release_id: int,
file: BinaryIO,
filename: str,
chunk_size: int = DEFAULT_CHUNK_SIZE,
parallel: int = 4,
verify_checksum: bool = True,
progress_callback: Optional[Callable[[Progress], None]] = None,
) -> UploadResult:
"""Upload a release asset using chunked upload.
Args:
owner: Repository owner
repo: Repository name
release_id: Release ID
file: File-like object to upload
filename: Name for the uploaded file
chunk_size: Size of each chunk in bytes (default: 10MB)
parallel: Number of parallel upload workers (default: 4)
verify_checksum: Whether to verify file checksum (default: True)
progress_callback: Callback function for progress updates
Returns:
UploadResult with details of the uploaded asset
"""
# Read file content
file.seek(0, 2) # Seek to end
file_size = file.tell()
file.seek(0) # Seek back to start
# Calculate checksum if needed
checksum = None
if verify_checksum:
hasher = hashlib.sha256()
while True:
chunk = file.read(65536)
if not chunk:
break
hasher.update(chunk)
checksum = hasher.hexdigest()
file.seek(0)
# Create upload session
session = self._create_upload_session(
owner, repo, release_id, filename, file_size, chunk_size, checksum
)
# Upload chunks
try:
self._upload_chunks(
session, file, parallel, progress_callback
)
except Exception as e:
raise UploadError(
f"Upload failed: {e}",
session_id=session.id,
)
# Complete upload
result = self._complete_upload(session.id)
return result
def _create_upload_session(
self,
owner: str,
repo: str,
release_id: int,
filename: str,
file_size: int,
chunk_size: int,
checksum: Optional[str] = None,
) -> UploadSession:
"""Create a chunked upload session."""
body = {
"name": filename,
"size": file_size,
"chunk_size": chunk_size,
}
if checksum:
body["checksum"] = checksum
data = self._request(
"POST",
f"/api/v1/repos/{quote(owner)}/{quote(repo)}/releases/{release_id}/assets/upload-session",
json=body,
)
return UploadSession.from_dict(data)
def _upload_chunks(
self,
session: UploadSession,
file: BinaryIO,
parallel: int,
progress_callback: Optional[Callable[[Progress], None]] = None,
) -> None:
"""Upload file chunks in parallel."""
total_chunks = session.total_chunks
chunk_size = session.chunk_size
file_size = session.file_size
bytes_uploaded = 0
start_time = time.time()
def upload_chunk(chunk_num: int, chunk_data: bytes) -> int:
"""Upload a single chunk."""
self._request(
"PUT",
f"/api/v1/repos/uploads/{session.id}/chunks/{chunk_num}",
data=chunk_data,
headers={"Content-Type": "application/octet-stream"},
)
return len(chunk_data)
# Prepare chunks
chunks = []
for chunk_num in range(total_chunks):
offset = chunk_num * chunk_size
size = min(chunk_size, file_size - offset)
file.seek(offset)
chunk_data = file.read(size)
chunks.append((chunk_num, chunk_data))
# Upload in parallel
parallel = min(parallel, self.MAX_PARALLEL)
with ThreadPoolExecutor(max_workers=parallel) as executor:
futures = {
executor.submit(upload_chunk, num, data): num
for num, data in chunks
}
for future in as_completed(futures):
chunk_num = futures[future]
try:
bytes_done = future.result()
bytes_uploaded += bytes_done
if progress_callback:
elapsed = time.time() - start_time
speed = bytes_uploaded / elapsed if elapsed > 0 else 0
remaining = file_size - bytes_uploaded
eta = remaining / speed if speed > 0 else 0
progress = Progress(
bytes_done=bytes_uploaded,
bytes_total=file_size,
chunks_done=chunk_num + 1,
chunks_total=total_chunks,
percent=bytes_uploaded / file_size * 100,
speed=speed,
eta_seconds=eta,
)
progress_callback(progress)
except Exception as e:
raise UploadError(
f"Failed to upload chunk {chunk_num}: {e}",
session_id=session.id,
chunk=chunk_num,
)
def _complete_upload(self, session_id: str) -> UploadResult:
"""Complete a chunked upload."""
data = self._request(
"POST",
f"/api/v1/repos/uploads/{session_id}/complete",
)
return UploadResult.from_dict(data)
def cancel_upload(self, session_id: str) -> None:
"""Cancel an upload session."""
self._request("DELETE", f"/api/v1/repos/uploads/{session_id}")
def get_upload_session(self, session_id: str) -> UploadSession:
"""Get the status of an upload session."""
data = self._request("GET", f"/api/v1/repos/uploads/{session_id}")
return UploadSession.from_dict(data)
def resume_upload(
self,
session_id: str,
file: BinaryIO,
progress_callback: Optional[Callable[[Progress], None]] = None,
) -> UploadResult:
"""Resume an interrupted upload.
Args:
session_id: ID of the upload session to resume
file: File-like object to upload
progress_callback: Callback function for progress updates
Returns:
UploadResult with details of the uploaded asset
"""
session = self.get_upload_session(session_id)
if session.status == "complete":
raise UploadError("Upload already completed", session_id=session_id)
if session.status == "expired":
raise UploadError("Upload session has expired", session_id=session_id)
# Upload remaining chunks
self._upload_remaining_chunks(session, file, progress_callback)
# Complete upload
return self._complete_upload(session_id)
def _upload_remaining_chunks(
self,
session: UploadSession,
file: BinaryIO,
progress_callback: Optional[Callable[[Progress], None]] = None,
) -> None:
"""Upload remaining chunks for a resumed upload."""
# Similar to _upload_chunks but starts from chunks_received
total_chunks = session.total_chunks
chunk_size = session.chunk_size
file_size = session.file_size
start_chunk = session.chunks_received
bytes_uploaded = start_chunk * chunk_size
start_time = time.time()
for chunk_num in range(start_chunk, total_chunks):
offset = chunk_num * chunk_size
size = min(chunk_size, file_size - offset)
file.seek(offset)
chunk_data = file.read(size)
self._request(
"PUT",
f"/api/v1/repos/uploads/{session.id}/chunks/{chunk_num}",
data=chunk_data,
headers={"Content-Type": "application/octet-stream"},
)
bytes_uploaded += len(chunk_data)
if progress_callback:
elapsed = time.time() - start_time
speed = (bytes_uploaded - start_chunk * chunk_size) / elapsed if elapsed > 0 else 0
remaining = file_size - bytes_uploaded
eta = remaining / speed if speed > 0 else 0
progress = Progress(
bytes_done=bytes_uploaded,
bytes_total=file_size,
chunks_done=chunk_num + 1,
chunks_total=total_chunks,
percent=bytes_uploaded / file_size * 100,
speed=speed,
eta_seconds=eta,
)
progress_callback(progress)