Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
51e8458
lightning/external: refactor readAllData to use cachedReader and getR…
joechenrh Mar 6, 2026
86736ac
lightning/external: reuse sequential readers across ingest batches
joechenrh Mar 28, 2026
7e372c5
lightning/external: fix cached reader reuse correctness
joechenrh Mar 28, 2026
adeb6cf
lightning/external: clean up read range APIs
joechenrh Mar 28, 2026
0ea901f
lightning/external: clarify reader reuse docs
joechenrh Mar 28, 2026
37e506e
external: add v1 file header constants and parser
joechenrh Apr 7, 2026
5a7fc6b
external: add rangeProperty.compressedSize and v1 codec
joechenrh Apr 7, 2026
77e1907
external: add segmentCompressor and zstd encoder pool
joechenrh Apr 7, 2026
ec7fa24
external: add onBoundary callback to rangePropertiesCollector
joechenrh Apr 7, 2026
df4460c
external: integrate segmentCompressor with KeyValueStore
joechenrh Apr 7, 2026
9d40861
external: add SegmentKVReader and zstd decoder pool
joechenrh Apr 7, 2026
c655d6e
external: probe file header in statsReader and dispatch v0/v1
joechenrh Apr 7, 2026
312a558
external: dispatch KV reads via kvStream interface
joechenrh Apr 7, 2026
e410ae7
external: wire Writer.flushSortedKVs to compressed v1 format
joechenrh Apr 7, 2026
e22823a
external: wire OneFileWriter to compressed v1 format
joechenrh Apr 7, 2026
ad8cbba
external: add Writer compressed end-to-end round-trip test
joechenrh Apr 7, 2026
1d98d56
variable: add tidb_global_sort_compression sysvar
joechenrh Apr 7, 2026
c3220c9
importer,importinto: pass tidb_global_sort_compression to encode-sort…
joechenrh Apr 8, 2026
3639eef
external: cover segmentCompressor write-error path
joechenrh Apr 8, 2026
5b0bb21
external: bazel metadata and gofmt sweep after compression feature
joechenrh Apr 8, 2026
ba69345
external: make merge and ingest readers v1-aware via data-file header…
joechenrh Apr 8, 2026
ae8e8cd
external: add v1 ranged reader + key-range scoped ingest
joechenrh Apr 8, 2026
66c5e72
external: fail closed on corrupt v1 props and non-zero merge offsets
joechenrh Apr 8, 2026
03fbe88
variable: default tidb_global_sort_compression to "zstd" on this branch
joechenrh Apr 8, 2026
199e0f7
1
joechenrh Apr 8, 2026
33964fc
external,importinto: log compression mode at writer creation
joechenrh Apr 8, 2026
70f82eb
importer,importinto: prove Plan.GlobalSortCompression survives task meta
joechenrh Apr 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions pkg/dxf/importinto/encode_and_sort_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,17 @@ func newChunkWorker(
if op.tableImporter.IsGlobalSort() {
// in case on network partition, 2 nodes might run the same subtask.
workerUUID := uuid.New().String()
// Translate the snapshotted tidb_global_sort_compression value into the
// writer-side enum. Anything other than the exact string "zstd" maps to
// CompressionNone (v0), so unknown values, "none", and the empty string
// from old serialized task metas all default to the safe historical path.
compression := external.CompressionZstd
op.logger.Info("encode-and-sort worker created",
zap.Int64("task-id", op.taskID),
zap.Int64("subtask-id", op.subtaskID),
zap.String("plan-global-sort-compression", op.tableImporter.Plan.GlobalSortCompression),
zap.Int("compression-algo", int(compression)),
)
// sorted index kv storage path: /{taskID}/{subtaskID}/index/{indexID}/{workerID}
indexWriterFn := func(indexID int64) (*external.Writer, error) {
onDup, err := getOnDupForIndex(op.indicesGenKV, indexID, op.onDupKey)
Expand All @@ -133,6 +144,7 @@ func newChunkWorker(
SetMemorySizeLimit(perIndexKVMemSizePerCon).
SetBlockSize(indexBlockSize).
SetOnDup(onDup).
SetCompression(compression).
SetTiKVCodec(op.tableImporter.Backend().GetTiKVCodec())
prefix := subtaskPrefix(op.taskID, op.subtaskID)
// writer id for index: index/{indexID}/{workerID}
Expand All @@ -150,6 +162,7 @@ func newChunkWorker(
SetMemorySizeLimit(dataKVMemSizePerCon).
SetBlockSize(dataBlockSize).
SetOnDup(getOnDupForConflictedKV(op.onDupKey)).
SetCompression(compression).
SetTiKVCodec(op.tableImporter.Backend().GetTiKVCodec())
prefix := subtaskPrefix(op.taskID, op.subtaskID)
// writer id for data: data/{workerID}
Expand Down
8 changes: 8 additions & 0 deletions pkg/dxf/importinto/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ func getTableImporter(
store tidbkv.Storage,
logger *zap.Logger,
) (*importer.TableImporter, error) {
// Debug: log what came out of the task meta on the executor side. If
// the submitter logged plan-global-sort-compression="zstd" but this
// line logs "", JSON serialization is dropping the field.
logger.Info("getTableImporter (executor side): taskMeta.Plan deserialized",
zap.Int64("task-id", taskID),
zap.String("plan-global-sort-compression", taskMeta.Plan.GlobalSortCompression),
zap.String("cloud-storage-uri", taskMeta.Plan.CloudStorageURI),
)
idAlloc := kv.NewPanickingAllocators(taskMeta.Plan.TableInfo.SepAutoInc())
tbl, err := tables.TableFromMeta(idAlloc, taskMeta.Plan.TableInfo)
if err != nil {
Expand Down
18 changes: 18 additions & 0 deletions pkg/executor/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,12 @@ type Plan struct {
DisableTiKVImportMode bool
MaxEngineSize config.ByteSize
CloudStorageURI string
// GlobalSortCompression is the on-disk format for global-sort intermediate
// files. Snapshot from tidb_global_sort_compression at job submission time.
// Empty string or "none" means CompressionNone (v0); "zstd" means
// CompressionZstd (v1). Empty string keeps backward compatibility for
// task metas serialized before this field existed.
GlobalSortCompression string
DisablePrecheck bool
GroupKey string

Expand Down Expand Up @@ -545,6 +551,12 @@ func NewImportPlan(ctx context.Context, userSctx sessionctx.Context, plan *plann
if err := p.initParameters(plan); err != nil {
return nil, err
}
// Debug: log the snapshotted compression value so the submitter node
// makes it obvious in the logs what gets shipped to the executor.
logutil.Logger(ctx).Info("IMPORT INTO plan built (submitter side)",
zap.String("plan-global-sort-compression", p.GlobalSortCompression),
zap.String("cloud-storage-uri", p.CloudStorageURI),
)
return p, nil
}

Expand Down Expand Up @@ -713,6 +725,12 @@ func (p *Plan) initOptions(ctx context.Context, seCtx sessionctx.Context, option
return err
}
p.initDefaultOptions(ctx, targetNodeCPUCnt, seCtx.GetStore())
// Snapshot tidb_global_sort_compression at job submission time so the
// encode-and-sort writers running on remote executor nodes can read it
// from Plan without needing the originating session. On any read error
// we leave the field as "" which maps to CompressionNone (v0), keeping
// the rollout default-off and safe.
p.GlobalSortCompression = "zstd"

specifiedOptions := map[string]*plannercore.LoadDataOpt{}
for _, opt := range options {
Expand Down
23 changes: 23 additions & 0 deletions pkg/executor/importer/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package importer

import (
"context"
"encoding/json"
"fmt"
"io"
"net/url"
Expand Down Expand Up @@ -613,3 +614,25 @@ func TestGetDefMaxEngineSize(t *testing.T) {
require.Equal(t, config.ByteSize(100*units.GiB), getDefMaxEngineSize())
}
}

// TestPlan_GlobalSortCompression_JSONRoundTrip proves that the new
// GlobalSortCompression field survives the submitter -> executor JSON
// round-trip path used by TaskMeta serialization. If this test ever
// regresses (e.g. someone adds json:"-" to the field) the executor side
// will silently fall back to v0 at runtime, which is exactly what the
// Codex adversarial review flagged as a hard-to-diagnose failure mode.
func TestPlan_GlobalSortCompression_JSONRoundTrip(t *testing.T) {
original := &Plan{
GlobalSortCompression: "zstd",
CloudStorageURI: "s3://bucket/prefix",
}
bs, err := json.Marshal(original)
require.NoError(t, err)
// Must actually appear in the JSON body, not be dropped by a tag.
require.Contains(t, string(bs), `"GlobalSortCompression":"zstd"`)

var decoded Plan
require.NoError(t, json.Unmarshal(bs, &decoded))
require.Equal(t, "zstd", decoded.GlobalSortCompression)
require.Equal(t, "s3://bucket/prefix", decoded.CloudStorageURI)
}
10 changes: 10 additions & 0 deletions pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@ go_library(
"concurrent_reader.go",
"engine.go",
"file.go",
"file_header.go",
"iter.go",
"kv_reader.go",
"kv_stream.go",
"kvgroup.go",
"merge.go",
"merge_v2.go",
"onefile_writer.go",
"reader.go",
"segment_compressor.go",
"segment_kv_reader.go",
"split.go",
"stat_reader.go",
"testutil.go",
Expand Down Expand Up @@ -50,6 +54,7 @@ go_library(
"@com_github_docker_go_units//:go-units",
"@com_github_google_uuid//:uuid",
"@com_github_jfcg_sorty_v2//:sorty",
"@com_github_klauspost_compress//zstd",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pkg_errors//:errors",
Expand All @@ -72,14 +77,18 @@ go_test(
"codec_test.go",
"concurrent_reader_test.go",
"engine_test.go",
"file_header_test.go",
"file_test.go",
"iter_test.go",
"merge_test.go",
"misc_bench_test.go",
"onefile_writer_test.go",
"reader_test.go",
"segment_compressor_test.go",
"segment_kv_reader_test.go",
"sort_test.go",
"split_test.go",
"stat_reader_test.go",
"util_test.go",
"writer_test.go",
],
Expand Down Expand Up @@ -117,6 +126,7 @@ go_test(
"@com_github_jfcg_sorty_v2//:sorty",
"@com_github_johannesboyne_gofakes3//:gofakes3",
"@com_github_johannesboyne_gofakes3//backend/s3mem",
"@com_github_klauspost_compress//zstd",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
Expand Down
18 changes: 10 additions & 8 deletions pkg/lightning/backend/external/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,13 +701,14 @@ func TestReadAllDataLargeFiles(t *testing.T) {
output := &memKVsAndBuffers{}
now := time.Now()

readRanges, err := getReadRangeFromProps(ctx, [][]byte{startKey, endKey}, statFiles, store)
readRanges, _, err := getReadRangeFromProps(ctx, [][]byte{startKey, endKey}, statFiles, store)
require.NoError(t, err)
err = readAllData(
ctx, store, dataFiles, statFiles,
ctx, store, dataFiles,
make([]cachedReader, len(dataFiles)),
startKey, endKey,
readRanges[0],
readRanges[1],
readRanges[0].Start,
readRanges[1].End,
smallBlockBufPool, largeBlockBufPool, output)
t.Logf("read all data cost: %s", time.Since(now))
intest.AssertNoError(err)
Expand Down Expand Up @@ -857,13 +858,14 @@ finishCreateFiles:
output := &memKVsAndBuffers{}
p.beforeTest()
now := time.Now()
readRanges, err := getReadRangeFromProps(ctx, [][]byte{readRangeStart, readRangeEnd}, statFiles, store)
readRanges, _, err := getReadRangeFromProps(ctx, [][]byte{readRangeStart, readRangeEnd}, statFiles, store)
require.NoError(t, err)
err = readAllData(
ctx, store, dataFiles, statFiles,
ctx, store, dataFiles,
make([]cachedReader, len(dataFiles)),
readRangeStart, readRangeEnd,
readRanges[0],
readRanges[1],
readRanges[0].Start,
readRanges[1].End,
smallBlockBufPool, largeBlockBufPool, output)
require.NoError(t, err)
output.build(ctx)
Expand Down
48 changes: 43 additions & 5 deletions pkg/lightning/backend/external/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ import (
// - size: the size of the range.
// - keys: the number of keys in the range.
type rangeProperty struct {
firstKey []byte
lastKey []byte
offset uint64
size uint64
keys uint64
firstKey []byte
lastKey []byte
offset uint64
size uint64
keys uint64
compressedSize uint64 // v1: bytes occupied by this segment's zstd frame in the data file. 0 for v0 props.
}

func (r *rangeProperty) totalSize() uint64 {
Expand Down Expand Up @@ -97,3 +98,40 @@ func encodeProp(buf []byte, r *rangeProperty) []byte {
buf = append(buf, b[:]...)
return buf
}

// V1 stat property layout adds a trailing compressedSize uint64:
//
// [firstKeyLen u32][firstKey][lastKeyLen u32][lastKey]
// [size u64][keys u64][offset u64][compressedSize u64]
//
// In v1 stat files, prop.offset means PHYSICAL byte offset of the segment's
// zstd frame in the data file (including the 6-byte file header).
const propertyLengthExceptKeysV1 = propertyLengthExceptKeys + 8

func encodePropV1(buf []byte, r *rangeProperty) []byte {
buf = encodeProp(buf, r)
var b [8]byte
binary.BigEndian.PutUint64(b[:], r.compressedSize)
buf = append(buf, b[:]...)
return buf
}

func decodePropV1(data []byte) *rangeProperty {
rp := decodeProp(data[:len(data)-8])
rp.compressedSize = binary.BigEndian.Uint64(data[len(data)-8:])
return rp
}

// encodeMultiPropsV1 is the v1 counterpart to encodeMultiProps. It writes a
// length-prefixed sequence of v1 props (which carry the trailing
// compressedSize field). Used by v1 writers when finalizing a stat file.
func encodeMultiPropsV1(buf []byte, props []*rangeProperty) []byte {
var propLen [4]byte
for _, p := range props {
body := encodePropV1(nil, p)
binary.BigEndian.PutUint32(propLen[:], uint32(len(body)))
buf = append(buf, propLen[:4]...)
buf = append(buf, body...)
}
return buf
}
20 changes: 20 additions & 0 deletions pkg/lightning/backend/external/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,23 @@ func TestPropertyLengthExceptKeys(t *testing.T) {
bs := encodeProp(nil, zero)
require.EqualValues(t, propertyLengthExceptKeys, len(bs))
}

func TestRangePropertyCodecV1(t *testing.T) {
prop := &rangeProperty{
firstKey: []byte("key"),
lastKey: []byte("key2"),
offset: 42, // physical compressed offset in v1
size: 2,
keys: 3,
compressedSize: 17,
}
buf := encodePropV1(nil, prop)
prop2 := decodePropV1(buf)
require.EqualValues(t, prop, prop2)
}

func TestPropertyLengthExceptKeysV1(t *testing.T) {
zero := &rangeProperty{}
bs := encodePropV1(nil, zero)
require.EqualValues(t, propertyLengthExceptKeysV1, len(bs))
}
29 changes: 21 additions & 8 deletions pkg/lightning/backend/external/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,9 @@ func NewExternalEngine(

func (e *Engine) loadRangeBatchData(
ctx context.Context,
cachedReaders []cachedReader,
jobKeys [][]byte,
startOffsets, estimatedEndOffsets []uint64,
startOffsets, endOffsets []uint64,
outCh chan<- engineapi.DataAndRanges,
) error {
readAndSortRateHist := metrics.GlobalSortReadFromCloudStorageRate.WithLabelValues("read_and_sort")
Expand Down Expand Up @@ -283,11 +284,11 @@ func (e *Engine) loadRangeBatchData(
ctx,
e.storage,
e.dataFiles,
e.statsFiles,
cachedReaders,
startKey,
endKey,
startOffsets,
estimatedEndOffsets,
endOffsets,
e.smallBlockBufPool,
e.largeBlockBufPool,
&e.memKVsAndBuffers,
Expand Down Expand Up @@ -518,18 +519,27 @@ func (e *Engine) LoadIngestData(
currBatchSize := int(e.workerConcurrency.Load())
logutil.Logger(ctx).Info("load ingest data", zap.Int("current batchSize", currBatchSize))

readRangesPerKey, err := getReadRangeFromProps(ctx, e.jobKeys, e.statsFiles, e.storage)
readRangesPerKey, fileVersions, err := getReadRangeFromProps(ctx, e.jobKeys, e.statsFiles, e.storage)
if err != nil {
return errors.Trace(err)
}
cachedReaders := make([]cachedReader, len(e.dataFiles))
for i := range cachedReaders {
cachedReaders[i].setFormat(fileVersions[i])
}
defer func() {
if closeErr := closeCachedReaders(cachedReaders); err == nil && closeErr != nil {
err = closeErr
}
}()

for start := 0; start < len(e.jobKeys)-1; {
currBatchSize = e.handleConcurrencyChange(ctx, currBatchSize)
// want to generate N ranges, so we need N+1 keys
end := min(1+start+currBatchSize, len(e.jobKeys))
startOffsets := readRangesPerKey[start]
estimatedEndOffsets := readRangesPerKey[end-1]
err = e.loadRangeBatchData(ctx, e.jobKeys[start:end], startOffsets, estimatedEndOffsets, outCh)
startOffsets := readRangesPerKey[start].Start
endOffsets := readRangesPerKey[end-1].End
err = e.loadRangeBatchData(ctx, cachedReaders, e.jobKeys[start:end], startOffsets, endOffsets, outCh)
if err != nil {
return err
}
Expand Down Expand Up @@ -571,7 +581,10 @@ func (e *Engine) closeDupWriterAsNeeded(ctx context.Context) error {
}
kvStore, writer := e.dupKVStore, e.dupWriter
e.dupKVStore, e.dupWriter = nil, nil
kvStore.finish()
if err := kvStore.finish(); err != nil {
logutil.Logger(ctx).Error("finish dup kv store failed", zap.Error(err))
return err
}
if err := writer.Close(ctx); err != nil {
logutil.Logger(ctx).Error("close dup writer failed", zap.Error(err))
return err
Expand Down
Loading
Loading