From ff7585111f11d1f0da7c4652c695c6997659e374 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 17 Nov 2025 07:26:10 +0000 Subject: [PATCH 1/2] Initial plan From 5650131d600ce44725b8d9186129abc6c68a1efa Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 25 Dec 2025 09:37:40 +0800 Subject: [PATCH 2/2] Add ContentType support for Blob and Chunk resources - Add SourceResponseHeadersToDestination field to ChunkSpec to specify headers to forward - Update blob_from_chunk_controller to extract ContentType from HEAD responses - Update blob_to_chunk_controller to set SourceResponseHeadersToDestination with Content-Type - Update chunk_runner to forward specified source headers to destination requests - Regenerate API code (deepcopy, openapi, clientset, etc.) Co-authored-by: wzshiming <6565744+wzshiming@users.noreply.github.com> --- go.mod | 2 +- go.sum | 4 +- pkg/apis/task/v1alpha1/blob_types.go | 11 +++++ pkg/apis/task/v1alpha1/chunk_types.go | 5 ++ .../task/v1alpha1/zz_generated.deepcopy.go | 17 +++++++ pkg/controller/blob_from_chunk_controller.go | 1 + pkg/controller/blob_to_chunk_controller.go | 27 +++++++++-- pkg/openapi/zz_generated.openapi.go | 46 +++++++++++++++++++ pkg/runner/chunk_runner.go | 41 +++++++++++------ 9 files changed, 132 insertions(+), 22 deletions(-) diff --git a/go.mod b/go.mod index 840d966..45d19fa 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/gorilla/handlers v1.5.2 github.com/spf13/cobra v1.9.1 github.com/wzshiming/ioswmr v0.0.0-20250120111738-c884d3ebe493 - github.com/wzshiming/sss v0.4.0 + github.com/wzshiming/sss v0.7.0 golang.org/x/sync v0.17.0 k8s.io/apimachinery v0.33.6 k8s.io/apiserver v0.33.6 diff --git a/go.sum b/go.sum index 7711363..1c16d04 100644 --- a/go.sum +++ b/go.sum @@ -169,8 +169,8 @@ github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75 h1:6fotK7 github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75/go.mod h1:KO6IkyS8Y3j8OdNO85qEYBsRPuteD+YciPomcXdrMnk= github.com/wzshiming/ioswmr v0.0.0-20250120111738-c884d3ebe493 h1:bJHJ032u4s+R4+z0V9n+X+TkxEaeZvgyPV82qT1FrkA= github.com/wzshiming/ioswmr v0.0.0-20250120111738-c884d3ebe493/go.mod h1:TwwDyS1wnJG3AvKliA+PPB0kliN3yEjsabtH4o7xySQ= -github.com/wzshiming/sss v0.4.0 h1:EqlEAixN7Zsum68pjZLhaB0X3YmTFwiEN4RuzBGbMdc= -github.com/wzshiming/sss v0.4.0/go.mod h1:4FNGdjjLOzcmZtcCOW+cpL4dmJvg4pMeek0UQy5/FCg= +github.com/wzshiming/sss v0.7.0 h1:YNGJMJ+LBv9dhEUoWTRRl35tru8V2FoW816AR8fZtLg= +github.com/wzshiming/sss v0.7.0/go.mod h1:4FNGdjjLOzcmZtcCOW+cpL4dmJvg4pMeek0UQy5/FCg= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/xiang90/probing v0.0.0-20221125231312-a49e3df8f510 h1:S2dVYn90KE98chqDkyE9Z4N61UnQd+KOfgp5Iu53llk= diff --git a/pkg/apis/task/v1alpha1/blob_types.go b/pkg/apis/task/v1alpha1/blob_types.go index a5e09d3..0117e0f 100644 --- a/pkg/apis/task/v1alpha1/blob_types.go +++ b/pkg/apis/task/v1alpha1/blob_types.go @@ -98,6 +98,12 @@ type BlobStatus struct { // +optional CompletionTime *metav1.Time `json:"completionTime,omitempty"` + // SourceResponseHeaders specifies which headers from the source response + // should be forwarded to destination requests. + // only works when there is a multiple chunk. + // +optional + SourceResponseHeaders map[string]string `json:"sourceResponseHeaders,omitempty"` + // Conditions holds conditions for the Blob. // +patchMergeKey=type // +patchStrategy=merge @@ -116,6 +122,11 @@ type BlobSpec struct { // Destination is the destination of the blob. Destination []BlobDestination `json:"destination"` + // SourceResponseHeadersToDestination specifies which headers from the source response + // should be forwarded to destination requests (e.g., ["Content-Type", "Content-Disposition"]). + // +optional + SourceResponseHeadersToDestination []string `json:"sourceResponseHeadersToDestination,omitempty"` + // Priority represents the relative importance of this blob when multiple blobs exist. Priority int64 `json:"priority"` diff --git a/pkg/apis/task/v1alpha1/chunk_types.go b/pkg/apis/task/v1alpha1/chunk_types.go index 658832a..4d0619c 100644 --- a/pkg/apis/task/v1alpha1/chunk_types.go +++ b/pkg/apis/task/v1alpha1/chunk_types.go @@ -162,6 +162,11 @@ type ChunkSpec struct { // Destination is the destination of the chunk. Destination []ChunkHTTP `json:"destination,omitempty"` + // SourceResponseHeadersToDestination specifies which headers from the source response + // should be forwarded to destination requests (e.g., ["Content-Type", "Content-Disposition"]). + // +optional + SourceResponseHeadersToDestination []string `json:"sourceResponseHeadersToDestination,omitempty"` + // Priority represents the relative importance of this chunk when multiple chunks exist. Priority int64 `json:"priority"` diff --git a/pkg/apis/task/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/task/v1alpha1/zz_generated.deepcopy.go index f24494d..a8f5ec5 100644 --- a/pkg/apis/task/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/task/v1alpha1/zz_generated.deepcopy.go @@ -260,6 +260,11 @@ func (in *BlobSpec) DeepCopyInto(out *BlobSpec) { *out = make([]BlobDestination, len(*in)) copy(*out, *in) } + if in.SourceResponseHeadersToDestination != nil { + in, out := &in.SourceResponseHeadersToDestination, &out.SourceResponseHeadersToDestination + *out = make([]string, len(*in)) + copy(*out, *in) + } return } @@ -280,6 +285,13 @@ func (in *BlobStatus) DeepCopyInto(out *BlobStatus) { in, out := &in.CompletionTime, &out.CompletionTime *out = (*in).DeepCopy() } + if in.SourceResponseHeaders != nil { + in, out := &in.SourceResponseHeaders, &out.SourceResponseHeaders + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } if in.Conditions != nil { in, out := &in.Conditions, &out.Conditions *out = make([]Condition, len(*in)) @@ -434,6 +446,11 @@ func (in *ChunkSpec) DeepCopyInto(out *ChunkSpec) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.SourceResponseHeadersToDestination != nil { + in, out := &in.SourceResponseHeadersToDestination, &out.SourceResponseHeadersToDestination + *out = make([]string, len(*in)) + copy(*out, *in) + } return } diff --git a/pkg/controller/blob_from_chunk_controller.go b/pkg/controller/blob_from_chunk_controller.go index 1f7ade4..03475fb 100644 --- a/pkg/controller/blob_from_chunk_controller.go +++ b/pkg/controller/blob_from_chunk_controller.go @@ -274,6 +274,7 @@ func (c *BlobFromChunkController) fromHeadChunk(ctx context.Context, blob *v1alp blob.Status.AcceptRanges = chunk.Status.SourceResponse.Headers["accept-ranges"] == "bytes" } } + blob.Status.SourceResponseHeaders = chunk.Status.SourceResponse.Headers blob.Status.Phase = v1alpha1.BlobPhaseRunning case v1alpha1.ChunkPhaseFailed: blob.Status.Retry = chunk.Status.Retry diff --git a/pkg/controller/blob_to_chunk_controller.go b/pkg/controller/blob_to_chunk_controller.go index 63e0cd6..961ed29 100644 --- a/pkg/controller/blob_to_chunk_controller.go +++ b/pkg/controller/blob_to_chunk_controller.go @@ -22,6 +22,7 @@ import ( "net/http" "net/textproto" "slices" + "strings" "time" "github.com/OpenCIDN/cidn/pkg/apis/task/v1alpha1" @@ -346,10 +347,11 @@ func (c *BlobToChunkController) toOneChunk(ctx context.Context, blob *v1alpha1.B }, }, Spec: v1alpha1.ChunkSpec{ - Total: blob.Status.Total, - Priority: blob.Spec.Priority, - Sha256: blob.Spec.ContentSha256, - MaximumRetry: blob.Spec.MaximumRetry - blob.Status.Retry, + Total: blob.Status.Total, + Priority: blob.Spec.Priority, + Sha256: blob.Spec.ContentSha256, + MaximumRetry: blob.Spec.MaximumRetry - blob.Status.Retry, + SourceResponseHeadersToDestination: blob.Spec.SourceResponseHeadersToDestination, }, Status: v1alpha1.ChunkStatus{ Phase: v1alpha1.ChunkPhasePending, @@ -570,7 +572,22 @@ func (c *BlobToChunkController) toMultipart(ctx context.Context, blob *v1alpha1. mp, err := s3.GetMultipart(ctx, dst.Path) if err != nil { - mp, err = s3.NewMultipart(ctx, dst.Path) + opts := []sss.WriterOptions{} + if len(blob.Spec.SourceResponseHeadersToDestination) > 0 && len(blob.Status.SourceResponseHeaders) > 0 { + for _, headerKey := range blob.Spec.SourceResponseHeadersToDestination { + switch strings.ToLower(headerKey) { + case "content-type": + if headerValue, ok := blob.Status.SourceResponseHeaders[headerKey]; ok { + opts = append(opts, sss.WithContentType(headerValue)) + } + case "content-disposition": + if headerValue, ok := blob.Status.SourceResponseHeaders[headerKey]; ok { + opts = append(opts, sss.WithContentDisposition(headerValue)) + } + } + } + } + mp, err = s3.NewMultipart(ctx, dst.Path, opts...) if err != nil { return nil, err } diff --git a/pkg/openapi/zz_generated.openapi.go b/pkg/openapi/zz_generated.openapi.go index 9e7a72e..b00007c 100644 --- a/pkg/openapi/zz_generated.openapi.go +++ b/pkg/openapi/zz_generated.openapi.go @@ -567,6 +567,21 @@ func schema_pkg_apis_task_v1alpha1_BlobSpec(ref common.ReferenceCallback) common }, }, }, + "sourceResponseHeadersToDestination": { + SchemaProps: spec.SchemaProps{ + Description: "SourceResponseHeadersToDestination specifies which headers from the source response should be forwarded to destination requests (e.g., [\"Content-Type\", \"Content-Disposition\"]).", + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + }, + }, + }, "priority": { SchemaProps: spec.SchemaProps{ Description: "Priority represents the relative importance of this blob when multiple blobs exist.", @@ -727,6 +742,22 @@ func schema_pkg_apis_task_v1alpha1_BlobStatus(ref common.ReferenceCallback) comm Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.Time"), }, }, + "sourceResponseHeaders": { + SchemaProps: spec.SchemaProps{ + Description: "SourceResponseHeaders specifies which headers from the source response should be forwarded to destination requests. only works when there is a multiple chunk.", + Type: []string{"object"}, + AdditionalProperties: &spec.SchemaOrBool{ + Allows: true, + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + }, + }, + }, "conditions": { VendorExtensible: spec.VendorExtensible{ Extensions: spec.Extensions{ @@ -1009,6 +1040,21 @@ func schema_pkg_apis_task_v1alpha1_ChunkSpec(ref common.ReferenceCallback) commo }, }, }, + "sourceResponseHeadersToDestination": { + SchemaProps: spec.SchemaProps{ + Description: "SourceResponseHeadersToDestination specifies which headers from the source response should be forwarded to destination requests (e.g., [\"Content-Type\", \"Content-Disposition\"]).", + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + }, + }, + }, "priority": { SchemaProps: spec.SchemaProps{ Description: "Priority represents the relative importance of this chunk when multiple chunks exist.", diff --git a/pkg/runner/chunk_runner.go b/pkg/runner/chunk_runner.go index 8ab9abc..be4a156 100644 --- a/pkg/runner/chunk_runner.go +++ b/pkg/runner/chunk_runner.go @@ -364,7 +364,7 @@ func (r *ChunkRunner) tryAddBearer(ctx context.Context, chunk *v1alpha1.Chunk) e return nil } -func (r *ChunkRunner) sourceRequest(ctx context.Context, chunk *v1alpha1.Chunk, s *state) (io.ReadCloser, int64) { +func (r *ChunkRunner) sourceRequest(ctx context.Context, chunk *v1alpha1.Chunk, s *state) (io.ReadCloser, int64, map[string]string) { err := r.tryAddBearer(ctx, chunk) if err != nil { if errors.Is(err, ErrBearerNotReady) { @@ -377,13 +377,13 @@ func (r *ChunkRunner) sourceRequest(ctx context.Context, chunk *v1alpha1.Chunk, return ss }) r.unmarkRecord(chunk.Name) - return nil, 0 + return nil, 0, nil } else if errors.Is(err, ErrAuthentication) { s.handleProcessError("AuthenticationError", err) } else { s.handleProcessError("BearerFetchError", err) } - return nil, 0 + return nil, 0, nil } srcReq, err := r.buildRequest(ctx, &chunk.Spec.Source, nil, 0) @@ -395,7 +395,7 @@ func (r *ChunkRunner) sourceRequest(ctx context.Context, chunk *v1alpha1.Chunk, } else { s.handleProcessError("BuildRequestError", err) } - return nil, 0 + return nil, 0, nil } srcResp, err := r.httpClient.Do(srcReq) @@ -410,7 +410,7 @@ func (r *ChunkRunner) sourceRequest(ctx context.Context, chunk *v1alpha1.Chunk, } else { s.handleProcessError("SourceRequestError", err) } - return nil, 0 + return nil, 0, nil } headers := map[string]string{} @@ -442,7 +442,7 @@ func (r *ChunkRunner) sourceRequest(ctx context.Context, chunk *v1alpha1.Chunk, if srcResp.Body != nil { srcResp.Body.Close() } - return nil, 0 + return nil, 0, nil } } else { if srcResp.StatusCode >= http.StatusMultipleChoices { @@ -460,7 +460,7 @@ func (r *ChunkRunner) sourceRequest(ctx context.Context, chunk *v1alpha1.Chunk, if srcResp.Body != nil { srcResp.Body.Close() } - return nil, 0 + return nil, 0, nil } } @@ -473,7 +473,7 @@ func (r *ChunkRunner) sourceRequest(ctx context.Context, chunk *v1alpha1.Chunk, if srcResp.Body != nil { srcResp.Body.Close() } - return nil, 0 + return nil, 0, nil } for k, v := range chunk.Spec.Source.Response.Headers { @@ -485,14 +485,27 @@ func (r *ChunkRunner) sourceRequest(ctx context.Context, chunk *v1alpha1.Chunk, if srcResp.Body != nil { srcResp.Body.Close() } - return nil, 0 + return nil, 0, nil } } - return srcResp.Body, srcResp.ContentLength + return srcResp.Body, srcResp.ContentLength, headers } -func (r *ChunkRunner) destinationRequest(ctx context.Context, dest *v1alpha1.ChunkHTTP, dr *swmrCount, contentLength int64) (string, bool, error) { +func (r *ChunkRunner) destinationRequest(ctx context.Context, chunk *v1alpha1.Chunk, dest *v1alpha1.ChunkHTTP, dr *swmrCount, contentLength int64, sourceHeaders map[string]string) (string, bool, error) { + // Apply headers from source response to destination request if specified + if len(chunk.Spec.SourceResponseHeadersToDestination) > 0 && sourceHeaders != nil { + if dest.Request.Headers == nil { + dest.Request.Headers = make(map[string]string) + } + for _, headerName := range chunk.Spec.SourceResponseHeadersToDestination { + headerNameLower := strings.ToLower(headerName) + if value, exists := sourceHeaders[headerNameLower]; exists { + dest.Request.Headers[headerName] = value + } + } + } + destReq, err := r.buildRequest(ctx, dest, dr.NewReader(), contentLength) if err != nil { retry, err := utils.IsNetworkError(err) @@ -552,7 +565,7 @@ func (r *ChunkRunner) process(continues <-chan struct{}, chunk *v1alpha1.Chunk) stopProgress := r.startProgressUpdater(ctx, s, &gsr, &gdrs) defer stopProgress() - body, contentLength := r.sourceRequest(ctx, chunk, s) + body, contentLength, sourceHeaders := r.sourceRequest(ctx, chunk, s) if body == nil { return } @@ -647,13 +660,13 @@ func (r *ChunkRunner) process(continues <-chan struct{}, chunk *v1alpha1.Chunk) i := i dr := drs[i] g.Go(func() error { - etag, retry, err := r.destinationRequest(ctx, &dest, dr, contentLength) + etag, retry, err := r.destinationRequest(ctx, chunk, &dest, dr, contentLength, sourceHeaders) if err != nil { if !retry { return err } time.Sleep(time.Second) - etag, _, err = r.destinationRequest(ctx, &dest, dr, contentLength) + etag, _, err = r.destinationRequest(ctx, chunk, &dest, dr, contentLength, sourceHeaders) if err != nil { return err }