forgejo-federation/routers/api/actions/artifacts_chunks.go
FuXiaoHei ad98ea63ee
Fix uploaded artifacts should be overwritten (#28726)
Fix `Uploaded artifacts should be overwritten`
https://github.com/go-gitea/gitea/issues/28549

When upload different content to uploaded artifact, it checks that
content size is not match in db record with previous artifact size, then
the new artifact is refused.

Now if it finds uploading content size is not matching db record when
receiving chunks, it updates db records to follow the latest size value.
2024-01-17 11:21:16 +08:00

204 lines
6.8 KiB
Go

// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"crypto/md5"
"encoding/base64"
"fmt"
"io"
"path/filepath"
"sort"
"time"
"code.gitea.io/gitea/models/actions"
"code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/storage"
)
func saveUploadChunk(st storage.ObjectStorage, ctx *ArtifactContext,
artifact *actions.ActionArtifact,
contentSize, runID int64,
) (int64, error) {
// parse content-range header, format: bytes 0-1023/146515
contentRange := ctx.Req.Header.Get("Content-Range")
start, end, length := int64(0), int64(0), int64(0)
if _, err := fmt.Sscanf(contentRange, "bytes %d-%d/%d", &start, &end, &length); err != nil {
log.Warn("parse content range error: %v, content-range: %s", err, contentRange)
return -1, fmt.Errorf("parse content range error: %v", err)
}
// build chunk store path
storagePath := fmt.Sprintf("tmp%d/%d-%d-%d-%d.chunk", runID, runID, artifact.ID, start, end)
// use io.TeeReader to avoid reading all body to md5 sum.
// it writes data to hasher after reading end
// if hash is not matched, delete the read-end result
hasher := md5.New()
r := io.TeeReader(ctx.Req.Body, hasher)
// save chunk to storage
writtenSize, err := st.Save(storagePath, r, -1)
if err != nil {
return -1, fmt.Errorf("save chunk to storage error: %v", err)
}
// check md5
reqMd5String := ctx.Req.Header.Get(artifactXActionsResultsMD5Header)
chunkMd5String := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
log.Info("[artifact] check chunk md5, sum: %s, header: %s", chunkMd5String, reqMd5String)
// if md5 not match, delete the chunk
if reqMd5String != chunkMd5String || writtenSize != contentSize {
if err := st.Delete(storagePath); err != nil {
log.Error("Error deleting chunk: %s, %v", storagePath, err)
}
return -1, fmt.Errorf("md5 not match")
}
log.Info("[artifact] save chunk %s, size: %d, artifact id: %d, start: %d, end: %d",
storagePath, contentSize, artifact.ID, start, end)
// return chunk total size
return length, nil
}
type chunkFileItem struct {
RunID int64
ArtifactID int64
Start int64
End int64
Path string
}
func listChunksByRunID(st storage.ObjectStorage, runID int64) (map[int64][]*chunkFileItem, error) {
storageDir := fmt.Sprintf("tmp%d", runID)
var chunks []*chunkFileItem
if err := st.IterateObjects(storageDir, func(fpath string, obj storage.Object) error {
baseName := filepath.Base(fpath)
// when read chunks from storage, it only contains storage dir and basename,
// no matter the subdirectory setting in storage config
item := chunkFileItem{Path: storageDir + "/" + baseName}
if _, err := fmt.Sscanf(baseName, "%d-%d-%d-%d.chunk", &item.RunID, &item.ArtifactID, &item.Start, &item.End); err != nil {
return fmt.Errorf("parse content range error: %v", err)
}
chunks = append(chunks, &item)
return nil
}); err != nil {
return nil, err
}
// chunks group by artifact id
chunksMap := make(map[int64][]*chunkFileItem)
for _, c := range chunks {
chunksMap[c.ArtifactID] = append(chunksMap[c.ArtifactID], c)
}
return chunksMap, nil
}
func mergeChunksForRun(ctx *ArtifactContext, st storage.ObjectStorage, runID int64, artifactName string) error {
// read all db artifacts by name
artifacts, err := db.Find[actions.ActionArtifact](ctx, actions.FindArtifactsOptions{
RunID: runID,
ArtifactName: artifactName,
})
if err != nil {
return err
}
// read all uploading chunks from storage
chunksMap, err := listChunksByRunID(st, runID)
if err != nil {
return err
}
// range db artifacts to merge chunks
for _, art := range artifacts {
chunks, ok := chunksMap[art.ID]
if !ok {
log.Debug("artifact %d chunks not found", art.ID)
continue
}
if err := mergeChunksForArtifact(ctx, chunks, st, art); err != nil {
return err
}
}
return nil
}
func mergeChunksForArtifact(ctx *ArtifactContext, chunks []*chunkFileItem, st storage.ObjectStorage, artifact *actions.ActionArtifact) error {
sort.Slice(chunks, func(i, j int) bool {
return chunks[i].Start < chunks[j].Start
})
allChunks := make([]*chunkFileItem, 0)
startAt := int64(-1)
// check if all chunks are uploaded and in order and clean repeated chunks
for _, c := range chunks {
// startAt is -1 means this is the first chunk
// previous c.ChunkEnd + 1 == c.ChunkStart means this chunk is in order
// StartAt is not -1 and c.ChunkStart is not startAt + 1 means there is a chunk missing
if c.Start == (startAt + 1) {
allChunks = append(allChunks, c)
startAt = c.End
}
}
// if the last chunk.End + 1 is not equal to chunk.ChunkLength, means chunks are not uploaded completely
if startAt+1 != artifact.FileCompressedSize {
log.Debug("[artifact] chunks are not uploaded completely, artifact_id: %d", artifact.ID)
return nil
}
// use multiReader
readers := make([]io.Reader, 0, len(allChunks))
closeReaders := func() {
for _, r := range readers {
_ = r.(io.Closer).Close() // it guarantees to be io.Closer by the following loop's Open function
}
readers = nil
}
defer closeReaders()
for _, c := range allChunks {
var readCloser io.ReadCloser
var err error
if readCloser, err = st.Open(c.Path); err != nil {
return fmt.Errorf("open chunk error: %v, %s", err, c.Path)
}
readers = append(readers, readCloser)
}
mergedReader := io.MultiReader(readers...)
// if chunk is gzip, use gz as extension
// download-artifact action will use content-encoding header to decide if it should decompress the file
extension := "chunk"
if artifact.ContentEncoding == "gzip" {
extension = "chunk.gz"
}
// save merged file
storagePath := fmt.Sprintf("%d/%d/%d.%s", artifact.RunID%255, artifact.ID%255, time.Now().UnixNano(), extension)
written, err := st.Save(storagePath, mergedReader, -1)
if err != nil {
return fmt.Errorf("save merged file error: %v", err)
}
if written != artifact.FileCompressedSize {
return fmt.Errorf("merged file size is not equal to chunk length")
}
defer func() {
closeReaders() // close before delete
// drop chunks
for _, c := range chunks {
if err := st.Delete(c.Path); err != nil {
log.Warn("Error deleting chunk: %s, %v", c.Path, err)
}
}
}()
// save storage path to artifact
log.Debug("[artifact] merge chunks to artifact: %d, %s, old:%s", artifact.ID, storagePath, artifact.StoragePath)
// if artifact is already uploaded, delete the old file
if artifact.StoragePath != "" {
if err := st.Delete(artifact.StoragePath); err != nil {
log.Warn("Error deleting old artifact: %s, %v", artifact.StoragePath, err)
}
}
artifact.StoragePath = storagePath
artifact.Status = int64(actions.ArtifactStatusUploadConfirmed)
if err := actions.UpdateArtifactByID(ctx, artifact.ID, artifact); err != nil {
return fmt.Errorf("update artifact error: %v", err)
}
return nil
}