// Copyright 2026 The Gitea Authors. All rights reserved. // SPDX-License-Identifier: MIT package v2 import ( "net/http" "code.gitea.io/gitea/models/db" issues_model "code.gitea.io/gitea/models/issues" repo_model "code.gitea.io/gitea/models/repo" apierrors "code.gitea.io/gitea/modules/errors" "code.gitea.io/gitea/modules/gitrepo" "code.gitea.io/gitea/modules/json" "code.gitea.io/gitea/modules/optional" "code.gitea.io/gitea/services/context" ) // StreamWriter wraps response writer for NDJSON streaming type StreamWriter struct { w http.ResponseWriter flusher http.Flusher encoder json.Encoder } // NewStreamWriter creates a new NDJSON stream writer func NewStreamWriter(w http.ResponseWriter) *StreamWriter { flusher, _ := w.(http.Flusher) return &StreamWriter{ w: w, flusher: flusher, encoder: json.NewEncoder(w), } } // WriteItem writes a single item to the stream func (sw *StreamWriter) WriteItem(item any) error { if err := sw.encoder.Encode(item); err != nil { return err } if sw.flusher != nil { sw.flusher.Flush() } return nil } // StreamFilesRequest represents request for streaming file contents type StreamFilesRequest struct { Owner string `json:"owner"` Repo string `json:"repo"` Ref string `json:"ref"` Paths []string `json:"paths"` } // StreamFileItem represents a single file in the stream type StreamFileItem struct { Type string `json:"type"` // "file", "error", "done" Path string `json:"path,omitempty"` Content string `json:"content,omitempty"` SHA string `json:"sha,omitempty"` Size int64 `json:"size,omitempty"` Error string `json:"error,omitempty"` Index int `json:"index,omitempty"` Total int `json:"total,omitempty"` } // StreamFiles streams file contents as NDJSON // This allows AI tools to process files as they arrive without waiting for all files func StreamFiles(ctx *context.APIContext) { var req StreamFilesRequest if err := json.NewDecoder(ctx.Req.Body).Decode(&req); err != nil { ctx.APIErrorWithCode(apierrors.ValidationFailed, map[string]any{ "error": err.Error(), }) return } if len(req.Paths) == 0 { ctx.APIErrorWithCode(apierrors.ValidationFailed, map[string]any{ "field": "paths", "message": "at least one path is required", }) return } if len(req.Paths) > 500 { ctx.APIErrorWithCode(apierrors.ValidationFailed, map[string]any{ "field": "paths", "message": "maximum 500 paths per stream", }) return } // Get repository repo, err := repo_model.GetRepositoryByOwnerAndName(ctx, req.Owner, req.Repo) if err != nil { if repo_model.IsErrRepoNotExist(err) { ctx.APIErrorWithCode(apierrors.RepoNotFound) return } ctx.APIErrorWithCode(apierrors.InternalError) return } // Check access if repo.IsPrivate && !ctx.IsSigned { ctx.APIErrorWithCode(apierrors.PermAccessDenied) return } // Open git repo gitRepo, err := gitrepo.OpenRepository(ctx, repo) if err != nil { ctx.APIErrorWithCode(apierrors.InternalError) return } defer gitRepo.Close() // Determine ref ref := req.Ref if ref == "" { ref = repo.DefaultBranch } // Get commit commit, err := gitRepo.GetCommit(ref) if err != nil { commit, err = gitRepo.GetBranchCommit(ref) if err != nil { commit, err = gitRepo.GetTagCommit(ref) if err != nil { ctx.APIErrorWithCode(apierrors.RefNotFound, map[string]any{ "ref": ref, }) return } } } // Set up streaming response ctx.Resp.Header().Set("Content-Type", "application/x-ndjson") ctx.Resp.Header().Set("Transfer-Encoding", "chunked") ctx.Resp.Header().Set("X-Content-Type-Options", "nosniff") ctx.Resp.WriteHeader(http.StatusOK) sw := NewStreamWriter(ctx.Resp) total := len(req.Paths) // Stream each file for i, path := range req.Paths { item := StreamFileItem{ Type: "file", Path: path, Index: i, Total: total, } entry, err := commit.GetTreeEntryByPath(path) if err != nil { item.Type = "error" item.Error = "file not found" _ = sw.WriteItem(item) continue } item.SHA = entry.ID.String() item.Size = entry.Size() if entry.IsDir() { item.Type = "error" item.Error = "path is a directory" _ = sw.WriteItem(item) continue } // Size limit for streaming if entry.Size() > 5*1024*1024 { // 5MB per file in stream item.Type = "error" item.Error = "file too large for streaming (>5MB)" _ = sw.WriteItem(item) continue } blob := entry.Blob() reader, err := blob.DataAsync() if err != nil { item.Type = "error" item.Error = "failed to read content" _ = sw.WriteItem(item) continue } content := make([]byte, entry.Size()) n, _ := reader.Read(content) reader.Close() item.Content = string(content[:n]) _ = sw.WriteItem(item) } // Send completion marker _ = sw.WriteItem(StreamFileItem{ Type: "done", Total: total, }) } // StreamCommitsRequest represents request for streaming commits type StreamCommitsRequest struct { Owner string `json:"owner"` Repo string `json:"repo"` Ref string `json:"ref"` Path string `json:"path"` // Optional: filter by path Limit int `json:"limit"` Offset int `json:"offset"` } // StreamCommitItem represents a single commit in the stream type StreamCommitItem struct { Type string `json:"type"` // "commit", "error", "done" SHA string `json:"sha,omitempty"` Message string `json:"message,omitempty"` Author string `json:"author,omitempty"` Email string `json:"email,omitempty"` Timestamp string `json:"timestamp,omitempty"` Files []string `json:"files,omitempty"` Error string `json:"error,omitempty"` Index int `json:"index,omitempty"` Total int `json:"total,omitempty"` } // StreamCommits streams commit history as NDJSON func StreamCommits(ctx *context.APIContext) { var req StreamCommitsRequest if err := json.NewDecoder(ctx.Req.Body).Decode(&req); err != nil { ctx.APIErrorWithCode(apierrors.ValidationFailed, map[string]any{ "error": err.Error(), }) return } // Default and max limits if req.Limit <= 0 { req.Limit = 50 } if req.Limit > 500 { req.Limit = 500 } // Get repository repo, err := repo_model.GetRepositoryByOwnerAndName(ctx, req.Owner, req.Repo) if err != nil { if repo_model.IsErrRepoNotExist(err) { ctx.APIErrorWithCode(apierrors.RepoNotFound) return } ctx.APIErrorWithCode(apierrors.InternalError) return } // Check access if repo.IsPrivate && !ctx.IsSigned { ctx.APIErrorWithCode(apierrors.PermAccessDenied) return } // Open git repo gitRepo, err := gitrepo.OpenRepository(ctx, repo) if err != nil { ctx.APIErrorWithCode(apierrors.InternalError) return } defer gitRepo.Close() ref := req.Ref if ref == "" { ref = repo.DefaultBranch } // Get head commit for the ref headCommit, err := gitRepo.GetBranchCommit(ref) if err != nil { headCommit, err = gitRepo.GetTagCommit(ref) if err != nil { ctx.APIErrorWithCode(apierrors.RefNotFound) return } } // Set up streaming ctx.Resp.Header().Set("Content-Type", "application/x-ndjson") ctx.Resp.Header().Set("Transfer-Encoding", "chunked") ctx.Resp.WriteHeader(http.StatusOK) sw := NewStreamWriter(ctx.Resp) // Get commits from head commit commits, err := headCommit.CommitsByRange(req.Offset/req.Limit+1, req.Limit, "", "", "") if err != nil { _ = sw.WriteItem(StreamCommitItem{ Type: "error", Error: "failed to get commits", }) return } total := len(commits) for i, commit := range commits { item := StreamCommitItem{ Type: "commit", SHA: commit.ID.String(), Message: commit.CommitMessage, Author: commit.Author.Name, Email: commit.Author.Email, Timestamp: commit.Author.When.Format("2006-01-02T15:04:05Z07:00"), Index: i, Total: total, } _ = sw.WriteItem(item) } _ = sw.WriteItem(StreamCommitItem{ Type: "done", Total: total, }) } // StreamIssuesRequest represents request for streaming issues type StreamIssuesRequest struct { Owner string `json:"owner"` Repo string `json:"repo"` State string `json:"state"` // "open", "closed", "all" Labels []string `json:"labels"` Limit int `json:"limit"` Offset int `json:"offset"` } // StreamIssueItem represents a single issue in the stream type StreamIssueItem struct { Type string `json:"type"` // "issue", "error", "done" Number int64 `json:"number,omitempty"` Title string `json:"title,omitempty"` Body string `json:"body,omitempty"` State string `json:"state,omitempty"` Labels []string `json:"labels,omitempty"` Author string `json:"author,omitempty"` Assignees []string `json:"assignees,omitempty"` CreatedAt string `json:"created_at,omitempty"` UpdatedAt string `json:"updated_at,omitempty"` Comments int `json:"comments,omitempty"` Error string `json:"error,omitempty"` Index int `json:"index,omitempty"` Total int `json:"total,omitempty"` } // StreamIssues streams issues as NDJSON func StreamIssues(ctx *context.APIContext) { var req StreamIssuesRequest if err := json.NewDecoder(ctx.Req.Body).Decode(&req); err != nil { ctx.APIErrorWithCode(apierrors.ValidationFailed, map[string]any{ "error": err.Error(), }) return } // Default and max limits if req.Limit <= 0 { req.Limit = 50 } if req.Limit > 200 { req.Limit = 200 } // Get repository repo, err := repo_model.GetRepositoryByOwnerAndName(ctx, req.Owner, req.Repo) if err != nil { if repo_model.IsErrRepoNotExist(err) { ctx.APIErrorWithCode(apierrors.RepoNotFound) return } ctx.APIErrorWithCode(apierrors.InternalError) return } // Check access if repo.IsPrivate && !ctx.IsSigned { ctx.APIErrorWithCode(apierrors.PermAccessDenied) return } // Determine issue state option var isClosed optional.Option[bool] switch req.State { case "closed": isClosed = optional.Some(true) case "open": isClosed = optional.Some(false) case "all": // Leave as None to get all issues default: isClosed = optional.Some(false) // default to open } // Set up streaming ctx.Resp.Header().Set("Content-Type", "application/x-ndjson") ctx.Resp.Header().Set("Transfer-Encoding", "chunked") ctx.Resp.WriteHeader(http.StatusOK) sw := NewStreamWriter(ctx.Resp) // Get issues issues, err := issues_model.Issues(ctx, &issues_model.IssuesOptions{ RepoIDs: []int64{repo.ID}, IsClosed: isClosed, Paginator: &db.ListOptions{ Page: req.Offset/req.Limit + 1, PageSize: req.Limit, }, }) if err != nil { _ = sw.WriteItem(StreamIssueItem{ Type: "error", Error: "failed to get issues", }) return } total := len(issues) for i, issue := range issues { labels := make([]string, 0, len(issue.Labels)) for _, label := range issue.Labels { labels = append(labels, label.Name) } assignees := make([]string, 0, len(issue.Assignees)) for _, assignee := range issue.Assignees { assignees = append(assignees, assignee.Name) } state := "open" if issue.IsClosed { state = "closed" } item := StreamIssueItem{ Type: "issue", Number: issue.Index, Title: issue.Title, Body: issue.Content, State: state, Labels: labels, Author: issue.Poster.Name, Assignees: assignees, CreatedAt: issue.CreatedUnix.AsTime().Format("2006-01-02T15:04:05Z07:00"), UpdatedAt: issue.UpdatedUnix.AsTime().Format("2006-01-02T15:04:05Z07:00"), Comments: issue.NumComments, Index: i, Total: total, } _ = sw.WriteItem(item) } _ = sw.WriteItem(StreamIssueItem{ Type: "done", Total: total, }) }