mirror of
https://codeberg.org/forgejo/forgejo
synced 2024-11-25 11:16:11 +01:00
ad98ea63ee
Fix `Uploaded artifacts should be overwritten` https://github.com/go-gitea/gitea/issues/28549 When upload different content to uploaded artifact, it checks that content size is not match in db record with previous artifact size, then the new artifact is refused. Now if it finds uploading content size is not matching db record when receiving chunks, it updates db records to follow the latest size value.
205 lines
6.8 KiB
Go
205 lines
6.8 KiB
Go
// Copyright 2023 The Gitea Authors. All rights reserved.
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
package actions
|
|
|
|
import (
|
|
"crypto/md5"
|
|
"encoding/base64"
|
|
"fmt"
|
|
"io"
|
|
"path/filepath"
|
|
"sort"
|
|
"time"
|
|
|
|
"code.gitea.io/gitea/models/actions"
|
|
"code.gitea.io/gitea/models/db"
|
|
"code.gitea.io/gitea/modules/log"
|
|
"code.gitea.io/gitea/modules/storage"
|
|
)
|
|
|
|
func saveUploadChunk(st storage.ObjectStorage, ctx *ArtifactContext,
|
|
artifact *actions.ActionArtifact,
|
|
contentSize, runID int64,
|
|
) (int64, error) {
|
|
// parse content-range header, format: bytes 0-1023/146515
|
|
contentRange := ctx.Req.Header.Get("Content-Range")
|
|
start, end, length := int64(0), int64(0), int64(0)
|
|
if _, err := fmt.Sscanf(contentRange, "bytes %d-%d/%d", &start, &end, &length); err != nil {
|
|
log.Warn("parse content range error: %v, content-range: %s", err, contentRange)
|
|
return -1, fmt.Errorf("parse content range error: %v", err)
|
|
}
|
|
// build chunk store path
|
|
storagePath := fmt.Sprintf("tmp%d/%d-%d-%d-%d.chunk", runID, runID, artifact.ID, start, end)
|
|
// use io.TeeReader to avoid reading all body to md5 sum.
|
|
// it writes data to hasher after reading end
|
|
// if hash is not matched, delete the read-end result
|
|
hasher := md5.New()
|
|
r := io.TeeReader(ctx.Req.Body, hasher)
|
|
// save chunk to storage
|
|
writtenSize, err := st.Save(storagePath, r, -1)
|
|
if err != nil {
|
|
return -1, fmt.Errorf("save chunk to storage error: %v", err)
|
|
}
|
|
// check md5
|
|
reqMd5String := ctx.Req.Header.Get(artifactXActionsResultsMD5Header)
|
|
chunkMd5String := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
|
|
log.Info("[artifact] check chunk md5, sum: %s, header: %s", chunkMd5String, reqMd5String)
|
|
// if md5 not match, delete the chunk
|
|
if reqMd5String != chunkMd5String || writtenSize != contentSize {
|
|
if err := st.Delete(storagePath); err != nil {
|
|
log.Error("Error deleting chunk: %s, %v", storagePath, err)
|
|
}
|
|
return -1, fmt.Errorf("md5 not match")
|
|
}
|
|
log.Info("[artifact] save chunk %s, size: %d, artifact id: %d, start: %d, end: %d",
|
|
storagePath, contentSize, artifact.ID, start, end)
|
|
// return chunk total size
|
|
return length, nil
|
|
}
|
|
|
|
type chunkFileItem struct {
|
|
RunID int64
|
|
ArtifactID int64
|
|
Start int64
|
|
End int64
|
|
Path string
|
|
}
|
|
|
|
func listChunksByRunID(st storage.ObjectStorage, runID int64) (map[int64][]*chunkFileItem, error) {
|
|
storageDir := fmt.Sprintf("tmp%d", runID)
|
|
var chunks []*chunkFileItem
|
|
if err := st.IterateObjects(storageDir, func(fpath string, obj storage.Object) error {
|
|
baseName := filepath.Base(fpath)
|
|
// when read chunks from storage, it only contains storage dir and basename,
|
|
// no matter the subdirectory setting in storage config
|
|
item := chunkFileItem{Path: storageDir + "/" + baseName}
|
|
if _, err := fmt.Sscanf(baseName, "%d-%d-%d-%d.chunk", &item.RunID, &item.ArtifactID, &item.Start, &item.End); err != nil {
|
|
return fmt.Errorf("parse content range error: %v", err)
|
|
}
|
|
chunks = append(chunks, &item)
|
|
return nil
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
// chunks group by artifact id
|
|
chunksMap := make(map[int64][]*chunkFileItem)
|
|
for _, c := range chunks {
|
|
chunksMap[c.ArtifactID] = append(chunksMap[c.ArtifactID], c)
|
|
}
|
|
return chunksMap, nil
|
|
}
|
|
|
|
func mergeChunksForRun(ctx *ArtifactContext, st storage.ObjectStorage, runID int64, artifactName string) error {
|
|
// read all db artifacts by name
|
|
artifacts, err := db.Find[actions.ActionArtifact](ctx, actions.FindArtifactsOptions{
|
|
RunID: runID,
|
|
ArtifactName: artifactName,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// read all uploading chunks from storage
|
|
chunksMap, err := listChunksByRunID(st, runID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// range db artifacts to merge chunks
|
|
for _, art := range artifacts {
|
|
chunks, ok := chunksMap[art.ID]
|
|
if !ok {
|
|
log.Debug("artifact %d chunks not found", art.ID)
|
|
continue
|
|
}
|
|
if err := mergeChunksForArtifact(ctx, chunks, st, art); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func mergeChunksForArtifact(ctx *ArtifactContext, chunks []*chunkFileItem, st storage.ObjectStorage, artifact *actions.ActionArtifact) error {
|
|
sort.Slice(chunks, func(i, j int) bool {
|
|
return chunks[i].Start < chunks[j].Start
|
|
})
|
|
allChunks := make([]*chunkFileItem, 0)
|
|
startAt := int64(-1)
|
|
// check if all chunks are uploaded and in order and clean repeated chunks
|
|
for _, c := range chunks {
|
|
// startAt is -1 means this is the first chunk
|
|
// previous c.ChunkEnd + 1 == c.ChunkStart means this chunk is in order
|
|
// StartAt is not -1 and c.ChunkStart is not startAt + 1 means there is a chunk missing
|
|
if c.Start == (startAt + 1) {
|
|
allChunks = append(allChunks, c)
|
|
startAt = c.End
|
|
}
|
|
}
|
|
// if the last chunk.End + 1 is not equal to chunk.ChunkLength, means chunks are not uploaded completely
|
|
if startAt+1 != artifact.FileCompressedSize {
|
|
log.Debug("[artifact] chunks are not uploaded completely, artifact_id: %d", artifact.ID)
|
|
return nil
|
|
}
|
|
// use multiReader
|
|
readers := make([]io.Reader, 0, len(allChunks))
|
|
closeReaders := func() {
|
|
for _, r := range readers {
|
|
_ = r.(io.Closer).Close() // it guarantees to be io.Closer by the following loop's Open function
|
|
}
|
|
readers = nil
|
|
}
|
|
defer closeReaders()
|
|
for _, c := range allChunks {
|
|
var readCloser io.ReadCloser
|
|
var err error
|
|
if readCloser, err = st.Open(c.Path); err != nil {
|
|
return fmt.Errorf("open chunk error: %v, %s", err, c.Path)
|
|
}
|
|
readers = append(readers, readCloser)
|
|
}
|
|
mergedReader := io.MultiReader(readers...)
|
|
|
|
// if chunk is gzip, use gz as extension
|
|
// download-artifact action will use content-encoding header to decide if it should decompress the file
|
|
extension := "chunk"
|
|
if artifact.ContentEncoding == "gzip" {
|
|
extension = "chunk.gz"
|
|
}
|
|
|
|
// save merged file
|
|
storagePath := fmt.Sprintf("%d/%d/%d.%s", artifact.RunID%255, artifact.ID%255, time.Now().UnixNano(), extension)
|
|
written, err := st.Save(storagePath, mergedReader, -1)
|
|
if err != nil {
|
|
return fmt.Errorf("save merged file error: %v", err)
|
|
}
|
|
if written != artifact.FileCompressedSize {
|
|
return fmt.Errorf("merged file size is not equal to chunk length")
|
|
}
|
|
|
|
defer func() {
|
|
closeReaders() // close before delete
|
|
// drop chunks
|
|
for _, c := range chunks {
|
|
if err := st.Delete(c.Path); err != nil {
|
|
log.Warn("Error deleting chunk: %s, %v", c.Path, err)
|
|
}
|
|
}
|
|
}()
|
|
|
|
// save storage path to artifact
|
|
log.Debug("[artifact] merge chunks to artifact: %d, %s, old:%s", artifact.ID, storagePath, artifact.StoragePath)
|
|
// if artifact is already uploaded, delete the old file
|
|
if artifact.StoragePath != "" {
|
|
if err := st.Delete(artifact.StoragePath); err != nil {
|
|
log.Warn("Error deleting old artifact: %s, %v", artifact.StoragePath, err)
|
|
}
|
|
}
|
|
|
|
artifact.StoragePath = storagePath
|
|
artifact.Status = int64(actions.ArtifactStatusUploadConfirmed)
|
|
if err := actions.UpdateArtifactByID(ctx, artifact.ID, artifact); err != nil {
|
|
return fmt.Errorf("update artifact error: %v", err)
|
|
}
|
|
|
|
return nil
|
|
}
|