Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/gorilla/handlers v1.5.2
github.com/spf13/cobra v1.9.1
github.com/wzshiming/ioswmr v0.0.0-20250120111738-c884d3ebe493
github.com/wzshiming/sss v0.4.0
github.com/wzshiming/sss v0.7.0
golang.org/x/sync v0.17.0
k8s.io/apimachinery v0.33.6
k8s.io/apiserver v0.33.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75 h1:6fotK7
github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75/go.mod h1:KO6IkyS8Y3j8OdNO85qEYBsRPuteD+YciPomcXdrMnk=
github.com/wzshiming/ioswmr v0.0.0-20250120111738-c884d3ebe493 h1:bJHJ032u4s+R4+z0V9n+X+TkxEaeZvgyPV82qT1FrkA=
github.com/wzshiming/ioswmr v0.0.0-20250120111738-c884d3ebe493/go.mod h1:TwwDyS1wnJG3AvKliA+PPB0kliN3yEjsabtH4o7xySQ=
github.com/wzshiming/sss v0.4.0 h1:EqlEAixN7Zsum68pjZLhaB0X3YmTFwiEN4RuzBGbMdc=
github.com/wzshiming/sss v0.4.0/go.mod h1:4FNGdjjLOzcmZtcCOW+cpL4dmJvg4pMeek0UQy5/FCg=
github.com/wzshiming/sss v0.7.0 h1:YNGJMJ+LBv9dhEUoWTRRl35tru8V2FoW816AR8fZtLg=
github.com/wzshiming/sss v0.7.0/go.mod h1:4FNGdjjLOzcmZtcCOW+cpL4dmJvg4pMeek0UQy5/FCg=
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
github.com/xiang90/probing v0.0.0-20221125231312-a49e3df8f510 h1:S2dVYn90KE98chqDkyE9Z4N61UnQd+KOfgp5Iu53llk=
Expand Down
11 changes: 11 additions & 0 deletions pkg/apis/task/v1alpha1/blob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ type BlobStatus struct {
// +optional
CompletionTime *metav1.Time `json:"completionTime,omitempty"`

// SourceResponseHeaders specifies which headers from the source response
// should be forwarded to destination requests.
// only works when there is a multiple chunk.
// +optional
SourceResponseHeaders map[string]string `json:"sourceResponseHeaders,omitempty"`

// Conditions holds conditions for the Blob.
// +patchMergeKey=type
// +patchStrategy=merge
Expand All @@ -116,6 +122,11 @@ type BlobSpec struct {
// Destination is the destination of the blob.
Destination []BlobDestination `json:"destination"`

// SourceResponseHeadersToDestination specifies which headers from the source response
// should be forwarded to destination requests (e.g., ["Content-Type", "Content-Disposition"]).
// +optional
SourceResponseHeadersToDestination []string `json:"sourceResponseHeadersToDestination,omitempty"`

// Priority represents the relative importance of this blob when multiple blobs exist.
Priority int64 `json:"priority"`

Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/task/v1alpha1/chunk_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ type ChunkSpec struct {
// Destination is the destination of the chunk.
Destination []ChunkHTTP `json:"destination,omitempty"`

// SourceResponseHeadersToDestination specifies which headers from the source response
// should be forwarded to destination requests (e.g., ["Content-Type", "Content-Disposition"]).
// +optional
SourceResponseHeadersToDestination []string `json:"sourceResponseHeadersToDestination,omitempty"`

// Priority represents the relative importance of this chunk when multiple chunks exist.
Priority int64 `json:"priority"`

Expand Down
17 changes: 17 additions & 0 deletions pkg/apis/task/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/controller/blob_from_chunk_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ func (c *BlobFromChunkController) fromHeadChunk(ctx context.Context, blob *v1alp
blob.Status.AcceptRanges = chunk.Status.SourceResponse.Headers["accept-ranges"] == "bytes"
}
}
blob.Status.SourceResponseHeaders = chunk.Status.SourceResponse.Headers
blob.Status.Phase = v1alpha1.BlobPhaseRunning
case v1alpha1.ChunkPhaseFailed:
blob.Status.Retry = chunk.Status.Retry
Expand Down
27 changes: 22 additions & 5 deletions pkg/controller/blob_to_chunk_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net/http"
"net/textproto"
"slices"
"strings"
"time"

"github.com/OpenCIDN/cidn/pkg/apis/task/v1alpha1"
Expand Down Expand Up @@ -346,10 +347,11 @@ func (c *BlobToChunkController) toOneChunk(ctx context.Context, blob *v1alpha1.B
},
},
Spec: v1alpha1.ChunkSpec{
Total: blob.Status.Total,
Priority: blob.Spec.Priority,
Sha256: blob.Spec.ContentSha256,
MaximumRetry: blob.Spec.MaximumRetry - blob.Status.Retry,
Total: blob.Status.Total,
Priority: blob.Spec.Priority,
Sha256: blob.Spec.ContentSha256,
MaximumRetry: blob.Spec.MaximumRetry - blob.Status.Retry,
SourceResponseHeadersToDestination: blob.Spec.SourceResponseHeadersToDestination,
},
Status: v1alpha1.ChunkStatus{
Phase: v1alpha1.ChunkPhasePending,
Expand Down Expand Up @@ -570,7 +572,22 @@ func (c *BlobToChunkController) toMultipart(ctx context.Context, blob *v1alpha1.

mp, err := s3.GetMultipart(ctx, dst.Path)
if err != nil {
mp, err = s3.NewMultipart(ctx, dst.Path)
opts := []sss.WriterOptions{}
if len(blob.Spec.SourceResponseHeadersToDestination) > 0 && len(blob.Status.SourceResponseHeaders) > 0 {
for _, headerKey := range blob.Spec.SourceResponseHeadersToDestination {
switch strings.ToLower(headerKey) {
case "content-type":
if headerValue, ok := blob.Status.SourceResponseHeaders[headerKey]; ok {
opts = append(opts, sss.WithContentType(headerValue))
}
case "content-disposition":
if headerValue, ok := blob.Status.SourceResponseHeaders[headerKey]; ok {
opts = append(opts, sss.WithContentDisposition(headerValue))
}
}
}
}
mp, err = s3.NewMultipart(ctx, dst.Path, opts...)
if err != nil {
return nil, err
}
Expand Down
46 changes: 46 additions & 0 deletions pkg/openapi/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 27 additions & 14 deletions pkg/runner/chunk_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func (r *ChunkRunner) tryAddBearer(ctx context.Context, chunk *v1alpha1.Chunk) e
return nil
}

func (r *ChunkRunner) sourceRequest(ctx context.Context, chunk *v1alpha1.Chunk, s *state) (io.ReadCloser, int64) {
func (r *ChunkRunner) sourceRequest(ctx context.Context, chunk *v1alpha1.Chunk, s *state) (io.ReadCloser, int64, map[string]string) {
err := r.tryAddBearer(ctx, chunk)
if err != nil {
if errors.Is(err, ErrBearerNotReady) {
Expand All @@ -377,13 +377,13 @@ func (r *ChunkRunner) sourceRequest(ctx context.Context, chunk *v1alpha1.Chunk,
return ss
})
r.unmarkRecord(chunk.Name)
return nil, 0
return nil, 0, nil
} else if errors.Is(err, ErrAuthentication) {
s.handleProcessError("AuthenticationError", err)
} else {
s.handleProcessError("BearerFetchError", err)
}
return nil, 0
return nil, 0, nil
}

srcReq, err := r.buildRequest(ctx, &chunk.Spec.Source, nil, 0)
Expand All @@ -395,7 +395,7 @@ func (r *ChunkRunner) sourceRequest(ctx context.Context, chunk *v1alpha1.Chunk,
} else {
s.handleProcessError("BuildRequestError", err)
}
return nil, 0
return nil, 0, nil
}

srcResp, err := r.httpClient.Do(srcReq)
Expand All @@ -410,7 +410,7 @@ func (r *ChunkRunner) sourceRequest(ctx context.Context, chunk *v1alpha1.Chunk,
} else {
s.handleProcessError("SourceRequestError", err)
}
return nil, 0
return nil, 0, nil
}

headers := map[string]string{}
Expand Down Expand Up @@ -442,7 +442,7 @@ func (r *ChunkRunner) sourceRequest(ctx context.Context, chunk *v1alpha1.Chunk,
if srcResp.Body != nil {
srcResp.Body.Close()
}
return nil, 0
return nil, 0, nil
}
} else {
if srcResp.StatusCode >= http.StatusMultipleChoices {
Expand All @@ -460,7 +460,7 @@ func (r *ChunkRunner) sourceRequest(ctx context.Context, chunk *v1alpha1.Chunk,
if srcResp.Body != nil {
srcResp.Body.Close()
}
return nil, 0
return nil, 0, nil
}
}

Expand All @@ -473,7 +473,7 @@ func (r *ChunkRunner) sourceRequest(ctx context.Context, chunk *v1alpha1.Chunk,
if srcResp.Body != nil {
srcResp.Body.Close()
}
return nil, 0
return nil, 0, nil
}

for k, v := range chunk.Spec.Source.Response.Headers {
Expand All @@ -485,14 +485,27 @@ func (r *ChunkRunner) sourceRequest(ctx context.Context, chunk *v1alpha1.Chunk,
if srcResp.Body != nil {
srcResp.Body.Close()
}
return nil, 0
return nil, 0, nil
}
}

return srcResp.Body, srcResp.ContentLength
return srcResp.Body, srcResp.ContentLength, headers
}

func (r *ChunkRunner) destinationRequest(ctx context.Context, dest *v1alpha1.ChunkHTTP, dr *swmrCount, contentLength int64) (string, bool, error) {
func (r *ChunkRunner) destinationRequest(ctx context.Context, chunk *v1alpha1.Chunk, dest *v1alpha1.ChunkHTTP, dr *swmrCount, contentLength int64, sourceHeaders map[string]string) (string, bool, error) {
// Apply headers from source response to destination request if specified
if len(chunk.Spec.SourceResponseHeadersToDestination) > 0 && sourceHeaders != nil {
if dest.Request.Headers == nil {
dest.Request.Headers = make(map[string]string)
}
for _, headerName := range chunk.Spec.SourceResponseHeadersToDestination {
headerNameLower := strings.ToLower(headerName)
if value, exists := sourceHeaders[headerNameLower]; exists {
dest.Request.Headers[headerName] = value
}
}
}

destReq, err := r.buildRequest(ctx, dest, dr.NewReader(), contentLength)
if err != nil {
retry, err := utils.IsNetworkError(err)
Expand Down Expand Up @@ -552,7 +565,7 @@ func (r *ChunkRunner) process(continues <-chan struct{}, chunk *v1alpha1.Chunk)
stopProgress := r.startProgressUpdater(ctx, s, &gsr, &gdrs)
defer stopProgress()

body, contentLength := r.sourceRequest(ctx, chunk, s)
body, contentLength, sourceHeaders := r.sourceRequest(ctx, chunk, s)
if body == nil {
return
}
Expand Down Expand Up @@ -647,13 +660,13 @@ func (r *ChunkRunner) process(continues <-chan struct{}, chunk *v1alpha1.Chunk)
i := i
dr := drs[i]
g.Go(func() error {
etag, retry, err := r.destinationRequest(ctx, &dest, dr, contentLength)
etag, retry, err := r.destinationRequest(ctx, chunk, &dest, dr, contentLength, sourceHeaders)
if err != nil {
if !retry {
return err
}
time.Sleep(time.Second)
etag, _, err = r.destinationRequest(ctx, &dest, dr, contentLength)
etag, _, err = r.destinationRequest(ctx, chunk, &dest, dr, contentLength, sourceHeaders)
if err != nil {
return err
}
Expand Down