From 8e0efaedf1b6d43103fef6a5ff8beb6b5663f9e8 Mon Sep 17 00:00:00 2001 From: Andrei Cheboksarov <37665782+cheb0@users.noreply.github.com> Date: Thu, 11 Jun 2026 11:30:23 +0400 Subject: [PATCH] token frequency (configurable) --- cmd/index_analyzer/main.go | 6 +- cmd/seq-db/seq-db.go | 1 + config/config.go | 6 + config/frac_version.go | 5 +- consts/consts.go | 1 + frac/common/seal_params.go | 2 + frac/fraction_test.go | 4 +- frac/remote.go | 2 +- frac/sealed.go | 2 +- frac/sealed/token/block_loader.go | 164 ++++++++++++++++++++----- frac/sealed/token/block_loader_test.go | 91 ++++++++++++++ frac/sealed_source.go | 2 +- fracmanager/config.go | 3 + indexwriter/blocks.go | 14 ++- indexwriter/blocks_test.go | 37 ++++++ indexwriter/index.go | 4 +- packer/delta_bitpacker.go | 40 ++++++ packer/delta_bitpacker_test.go | 72 +++++++++++ util/size.go | 1 + 19 files changed, 418 insertions(+), 39 deletions(-) create mode 100644 frac/sealed/token/block_loader_test.go diff --git a/cmd/index_analyzer/main.go b/cmd/index_analyzer/main.go index b7422da4..c84ad734 100644 --- a/cmd/index_analyzer/main.go +++ b/cmd/index_analyzer/main.go @@ -147,13 +147,14 @@ func analyzeIndex( } tokens := [][]byte{} + tokenUnpackBuf := &token.UnpackBuffer{} for { data := readTokenBlock() if len(data) == 0 { // empty block - section separator break } block := token.Block{} - if err := block.Unpack(data); err != nil { + if err := block.Unpack(data, b.Info.BinaryDataVer, tokenUnpackBuf); err != nil { logger.Fatal("error unpacking tokens", zap.Error(err)) } for i := range block.Len() { @@ -189,6 +190,7 @@ func analyzeIndex( lidsUniq := map[[16]byte]int{} lidsLens := make([]int, len(tokens)) tokenLIDs := []uint32{} + lidUnpackBuf := &lids.UnpackBuffer{} for { data := readLIDBlock() if len(data) == 0 { // empty block - section separator @@ -196,7 +198,7 @@ func analyzeIndex( } block := &lids.Block{} - if err := block.Unpack(data, ver, &lids.UnpackBuffer{}); err != nil { + if err := block.Unpack(data, ver, b.Info.BinaryDataVer, lidUnpackBuf); err != nil { logger.Fatal("error unpacking lids block", zap.Error(err)) } diff --git a/cmd/seq-db/seq-db.go b/cmd/seq-db/seq-db.go index de29ac4c..df908fea 100644 --- a/cmd/seq-db/seq-db.go +++ b/cmd/seq-db/seq-db.go @@ -272,6 +272,7 @@ func startStore( TokenTableZstdLevel: cfg.Compression.SealedZstdCompressionLevel, DocBlocksZstdLevel: cfg.Compression.DocBlockZstdCompressionLevel, DocBlockSize: int(cfg.DocsSorting.DocBlockSize), + TokenFreqThreshold: cfg.Sealing.Tokens.FreqThreshold, }, Fraction: frac.Config{ Search: frac.SearchConfig{ diff --git a/config/config.go b/config/config.go index 0d929a7f..f32ee574 100644 --- a/config/config.go +++ b/config/config.go @@ -75,6 +75,12 @@ type Config struct { // BlockSize sets max lids (postings) saved per LIDs block. BlockSize int `config:"block_size" default:"65536"` } `config:"lids"` + + Tokens struct { + // FreqThreshold specifies minimum number of lids (postings) a token should have + // so that frequency for that token will be stored inside token blocks. + FreqThreshold int `config:"freq_threshold" default:"50"` + } `config:"tokens"` } `config:"sealing"` Cluster struct { diff --git a/config/frac_version.go b/config/frac_version.go index 73c3261a..624b718b 100644 --- a/config/frac_version.go +++ b/config/frac_version.go @@ -21,6 +21,9 @@ const ( // BinaryDataV4 - delta bitpack encoded MIDs and LIDs BinaryDataV4 + + // BinaryDataV5 - token block has doc frequencies for heavy tokens + BinaryDataV5 ) -const CurrentFracVersion = BinaryDataV4 +const CurrentFracVersion = BinaryDataV5 diff --git a/consts/consts.go b/consts/consts.go index 3341aecd..1ea8120b 100644 --- a/consts/consts.go +++ b/consts/consts.go @@ -23,6 +23,7 @@ const ( DefaultBulkRequestsLimit = 32 DefaultSearchRequestsLimit = 32 + DefaultTokenFreqThreshold = 50 BulkMaxTries = 3 diff --git a/frac/common/seal_params.go b/frac/common/seal_params.go index 05f89696..e0fd362e 100644 --- a/frac/common/seal_params.go +++ b/frac/common/seal_params.go @@ -10,4 +10,6 @@ type SealParams struct { DocBlocksZstdLevel int // DocBlocksZstdLevel is the zstd compress level of each document block. LIDBlockSize int DocBlockSize int // DocBlockSize is decompressed payload size of document block. + + TokenFreqThreshold int // TokenFreqThreshold Min lids count to store frequency for a token. } diff --git a/frac/fraction_test.go b/frac/fraction_test.go index d5fc39ed..ab69509d 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -1901,11 +1901,11 @@ func (s *FractionTestSuite) TestFractionInfo() { s.Require().Equal(uint64(0), info.IndexOnDisk, "index on disk doesn't match") case *Sealed: s.Require().Equal(uint64(0), info.MetaOnDisk, "meta on disk doesn't match. actual value") - s.Require().True(info.IndexOnDisk > uint64(1300) && info.IndexOnDisk < uint64(1450), + s.Require().True(info.IndexOnDisk > uint64(1300) && info.IndexOnDisk < uint64(1500), "index on disk doesn't match. actual value: %d", info.IndexOnDisk) case *Remote: s.Require().Equal(uint64(0), info.MetaOnDisk, "meta on disk doesn't match. actual value") - s.Require().True(info.IndexOnDisk > uint64(1300) && info.IndexOnDisk < uint64(1450), + s.Require().True(info.IndexOnDisk > uint64(1300) && info.IndexOnDisk < uint64(1500), "index on disk doesn't match. actual value: %d", info.IndexOnDisk) default: s.Require().Fail("unsupported fraction type") diff --git a/frac/remote.go b/frac/remote.go index 9630b8ca..b249a27e 100644 --- a/frac/remote.go +++ b/frac/remote.go @@ -191,7 +191,7 @@ func (f *Remote) createDataProvider(ctx context.Context) (*sealedDataProvider, e blocksOffsets: f.blocksData.BlocksOffsets, lidsTable: f.blocksData.LIDsTable, lidsLoader: lids.NewLoader(f.info.BinaryDataVer, lidReader, f.indexCache.LIDs), - tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, tokenReader, f.indexCache.Tokens), + tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, f.info.BinaryDataVer, tokenReader, f.indexCache.Tokens), tokenTableLoader: token.NewTableLoader(f.BaseFileName, f.IsLegacy, tokenReader, f.indexCache.TokenTable), idsTable: &f.blocksData.IDsTable, diff --git a/frac/sealed.go b/frac/sealed.go index dcf91f3f..20759b8c 100644 --- a/frac/sealed.go +++ b/frac/sealed.go @@ -514,7 +514,7 @@ func (f *Sealed) createDataProvider(ctx context.Context) *sealedDataProvider { blocksOffsets: f.blocksData.BlocksOffsets, lidsTable: f.blocksData.LIDsTable, lidsLoader: lids.NewLoader(f.info.BinaryDataVer, lidReader, f.indexCache.LIDs), - tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, tokenReader, f.indexCache.Tokens), + tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, f.info.BinaryDataVer, tokenReader, f.indexCache.Tokens), tokenTableLoader: token.NewTableLoader(f.BaseFileName, f.IsLegacy, tokenReader, f.indexCache.TokenTable), idsTable: &f.blocksData.IDsTable, diff --git a/frac/sealed/token/block_loader.go b/frac/sealed/token/block_loader.go index 77c73798..058b687f 100644 --- a/frac/sealed/token/block_loader.go +++ b/frac/sealed/token/block_loader.go @@ -5,47 +5,108 @@ import ( "encoding/binary" "fmt" "math" + "sort" "unsafe" "go.uber.org/zap" "github.com/ozontech/seq-db/cache" + "github.com/ozontech/seq-db/config" "github.com/ozontech/seq-db/logger" + "github.com/ozontech/seq-db/packer" "github.com/ozontech/seq-db/pattern" "github.com/ozontech/seq-db/storage" + "github.com/ozontech/seq-db/util" ) -const sizeOfUint32 = uint32(unsafe.Sizeof(uint32(0))) - type Block struct { - Payload []byte - Offsets []uint32 + Payload []byte + Offsets []uint32 + FreqIndexes []uint16 // indexes of tokens which have doc freqs (frequencies) + Freqs []uint32 // frequencies of certain tokens (how many docs have this token included at least once) } func (b *Block) Size() int { const selfSize = int(unsafe.Sizeof(Block{})) - return selfSize + cap(b.Payload) + cap(b.Offsets)*int(sizeOfUint32) + return selfSize + + cap(b.Payload) + + cap(b.Offsets)*util.SizeOfUint32 + + cap(b.FreqIndexes)*util.SizeOfUint16 + + cap(b.Freqs)*util.SizeOfUint32 } -func (b Block) Pack(dst []byte) []byte { - return append(dst, b.Payload...) +func (b Block) Pack(dst []byte, buf []uint32) []byte { + dst = binary.LittleEndian.AppendUint32(dst, uint32(len(b.Payload))) + dst = append(dst, b.Payload...) + dst = packer.CompressDeltaBitpackUint16(dst, b.FreqIndexes, buf) + return packer.CompressDeltaBitpackUint32(dst, b.Freqs, buf) } -func (b *Block) Unpack(data []byte) error { - var offset uint32 +func (b *Block) Unpack(data []byte, fracVer config.BinaryDataVersion, unpackBuf *UnpackBuffer) error { + if fracVer >= config.BinaryDataV5 { + unpackBuf.Reset(fracVer) + return b.unpackV5(data, unpackBuf) + } + return b.unpackV1(data) +} + +func (b *Block) unpackV1(data []byte) error { b.Payload = data - for i := 0; len(data) != 0; i++ { - l := binary.LittleEndian.Uint32(data) - data = data[sizeOfUint32:] - offset += sizeOfUint32 + return b.parseTokenPayload(data) +} + +func (b *Block) unpackV5(data []byte, buf *UnpackBuffer) error { + if len(data) < util.SizeOfUint32 { + return fmt.Errorf("token block too short: %d bytes", len(data)) + } + + payloadLen := binary.LittleEndian.Uint32(data[:util.SizeOfUint32]) + data = data[util.SizeOfUint32:] + if uint32(len(data)) < payloadLen { + return fmt.Errorf("invalid token block payload length: %d, data len %d", payloadLen, len(data)) + } + + payload := data[:payloadLen] + data = data[payloadLen:] + + b.Payload = append(b.Payload[:0], payload...) + + if err := b.parseTokenPayload(payload); err != nil { + return err + } + + var err error + var freqIndexes []uint16 + data, freqIndexes, err = packer.DecompressDeltaBitpackUint16(data, buf.decompressedUint16, buf.compressed) + if err != nil { + return err + } + b.FreqIndexes = append(b.FreqIndexes, freqIndexes...) + + var freqs []uint32 + _, freqs, err = packer.DecompressDeltaBitpackUint32(data, buf.decompressedUint32, buf.compressed) + if err != nil { + return err + } + b.Freqs = append(b.Freqs, freqs...) + + return nil +} + +func (b *Block) parseTokenPayload(payload []byte) error { + var offset uint32 + for i := 0; len(payload) != 0; i++ { + l := binary.LittleEndian.Uint32(payload) + payload = payload[util.SizeOfUint32:] + offset += uint32(util.SizeOfUint32) if l == math.MaxUint32 { continue } - if l > uint32(len(data)) { + if l > uint32(len(payload)) { return fmt.Errorf("wrong field block for token %d, in pos %d", i, offset) } - b.Offsets = append(b.Offsets, offset-sizeOfUint32) - data = data[l:] + b.Offsets = append(b.Offsets, offset-uint32(util.SizeOfUint32)) + payload = payload[l:] offset += l } return nil @@ -55,10 +116,24 @@ func (b *Block) Len() int { return len(b.Offsets) } +// GetFreq returns frequency for a token if stored or 0 otherwise +func (b *Block) GetFreq(index int) uint32 { + if b.Freqs == nil { + return 0 + } + + idx := uint16(index) + found := sort.Search(len(b.FreqIndexes), func(i int) bool { return b.FreqIndexes[i] >= idx }) + if found < len(b.FreqIndexes) && b.FreqIndexes[found] == idx { + return b.Freqs[found] + } + return 0 +} + func (b *Block) GetToken(index int) []byte { offset := b.Offsets[index] l := binary.LittleEndian.Uint32(b.Payload[offset:]) - offset += sizeOfUint32 // skip val length + offset += uint32(util.SizeOfUint32) // skip val length return b.Payload[offset : offset+l] } @@ -90,16 +165,26 @@ func (b *Block) find(from, to int, searcher pattern.Searcher) ([]int, error) { // NOT THREAD SAFE. Do not use concurrently. // Use your own BlockLoader instance for each search query type BlockLoader struct { - fracName string - cache *cache.Cache[*Block] - reader *storage.IndexReader + fracName string + fracVer config.BinaryDataVersion + cache *cache.Cache[*Block] + reader *storage.IndexReader + unpackBuf *UnpackBuffer + blockBuf []byte } -func NewBlockLoader(fracName string, reader *storage.IndexReader, c *cache.Cache[*Block]) *BlockLoader { +func NewBlockLoader( + fracName string, + fracVer config.BinaryDataVersion, + reader *storage.IndexReader, + c *cache.Cache[*Block], +) *BlockLoader { return &BlockLoader{ - fracName: fracName, - cache: c, - reader: reader, + fracName: fracName, + fracVer: fracVer, + cache: c, + reader: reader, + unpackBuf: &UnpackBuffer{}, } } @@ -120,11 +205,34 @@ func (l *BlockLoader) Load(index uint32) *Block { } func (l *BlockLoader) read(index uint32) (*Block, error) { - data, _, err := l.reader.ReadIndexBlock(index, nil) + var err error + l.blockBuf, _, err = l.reader.ReadIndexBlock(index, l.blockBuf) if err != nil { return nil, err } - block := Block{} - err = block.Unpack(data) - return &block, err + block := &Block{} + err = block.Unpack(l.blockBuf, l.fracVer, l.unpackBuf) + return block, err +} + +type UnpackBuffer struct { + decompressedUint32 []uint32 // temporary buffer for bitpack + decompressedUint16 []uint16 // temporary buffer for bitpack + compressed []uint32 // temporary buffer for bitpack +} + +func (b *UnpackBuffer) Reset(fracVer config.BinaryDataVersion) { + if fracVer < config.BinaryDataV5 { + return + } + if b.decompressedUint32 == nil { + b.decompressedUint32 = make([]uint32, 0, 256) + } else { + b.decompressedUint32 = b.decompressedUint32[:0] + } + if b.compressed == nil { + b.compressed = make([]uint32, 0, 256) + } else { + b.compressed = b.compressed[:0] + } } diff --git a/frac/sealed/token/block_loader_test.go b/frac/sealed/token/block_loader_test.go new file mode 100644 index 00000000..738331f8 --- /dev/null +++ b/frac/sealed/token/block_loader_test.go @@ -0,0 +1,91 @@ +package token + +import ( + "encoding/binary" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/ozontech/seq-db/config" +) + +func TestBlock_PackUnpack_NoFreq(t *testing.T) { + src := Block{ + Payload: packTokenPayload([]byte("foo"), []byte("bar")), + } + + var buf []uint32 + packed := src.Pack(nil, buf) + var dst Block + require.NoError(t, dst.Unpack(packed, config.BinaryDataV5, &UnpackBuffer{})) + + assert.Equal(t, 2, dst.Len()) + assert.Equal(t, []byte("foo"), dst.GetToken(0)) + assert.Equal(t, []byte("bar"), dst.GetToken(1)) + + assert.Empty(t, dst.FreqIndexes) + assert.Empty(t, dst.Freqs) +} + +func TestBlock_PackUnpack_WithFreq(t *testing.T) { + src := Block{ + Payload: packTokenPayload([]byte("dog"), []byte("cat"), []byte("horse"), []byte("duck")), + FreqIndexes: []uint16{0, 2}, + Freqs: []uint32{100, 200}, + } + + var buf []uint32 + packed := src.Pack(nil, buf) + var dst Block + require.NoError(t, dst.Unpack(packed, config.BinaryDataV5, &UnpackBuffer{})) + + assert.Equal(t, src.Payload, dst.Payload) + + assert.Equal(t, uint32(100), dst.GetFreq(0)) + assert.Equal(t, uint32(0), dst.GetFreq(1)) + assert.Equal(t, uint32(200), dst.GetFreq(2)) + assert.Equal(t, uint32(0), dst.GetFreq(3)) +} + +func TestBlock_Unpack_Legacy(t *testing.T) { + legacy := packTokenPayload([]byte("legacy")) + + var dst Block + require.NoError(t, dst.Unpack(legacy, config.BinaryDataV4, &UnpackBuffer{})) + + assert.Equal(t, legacy, dst.Payload) + assert.Equal(t, []uint32{0}, dst.Offsets) + assert.Empty(t, dst.FreqIndexes) + assert.Empty(t, dst.Freqs) +} + +func TestBlock_UnpackBufferReuse(t *testing.T) { + src := Block{ + Payload: packTokenPayload([]byte("a"), []byte("b")), + FreqIndexes: []uint16{1}, + Freqs: []uint32{64}, + } + + var packBuf []uint32 + packed := src.Pack(nil, packBuf) + + var dst1, dst2 Block + require.NoError(t, dst1.Unpack(packed, config.BinaryDataV5, &UnpackBuffer{})) + require.NoError(t, dst2.Unpack(packed, config.BinaryDataV5, &UnpackBuffer{})) + + assert.Equal(t, dst1.FreqIndexes, dst2.FreqIndexes) + assert.Equal(t, dst1.Freqs, dst2.Freqs) + + assert.Equal(t, uint32(0), dst2.GetFreq(0)) + assert.Equal(t, uint32(64), dst2.GetFreq(1)) +} + +func packTokenPayload(tokens ...[]byte) []byte { + var payload []byte + for _, tok := range tokens { + payload = binary.LittleEndian.AppendUint32(payload, uint32(len(tok))) + payload = append(payload, tok...) + } + return payload +} diff --git a/frac/sealed_source.go b/frac/sealed_source.go index beb7e4bd..a36ca34d 100644 --- a/frac/sealed_source.go +++ b/frac/sealed_source.go @@ -41,7 +41,7 @@ func NewSealedSource(f *Sealed) *SealedSource { f.info.BinaryDataVer, ), lidsLoader: lids.NewLoader(f.Info().BinaryDataVer, &f.lidReader, f.indexCache.LIDs), - tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, &f.tokenReader, f.indexCache.Tokens), + tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, f.info.BinaryDataVer, &f.tokenReader, f.indexCache.Tokens), tokenTableLoader: token.NewTableLoader(f.BaseFileName, f.IsLegacy, &f.tokenReader, f.indexCache.TokenTable), } } diff --git a/fracmanager/config.go b/fracmanager/config.go index e295aada..e909efb2 100644 --- a/fracmanager/config.go +++ b/fracmanager/config.go @@ -69,6 +69,9 @@ func FillConfigWithDefault(config *Config) *Config { if config.SealParams.TokenTableZstdLevel == 0 { config.SealParams.TokenTableZstdLevel = zstdDefaultLevel } + if config.SealParams.TokenFreqThreshold == 0 { + config.SealParams.TokenFreqThreshold = consts.DefaultTokenFreqThreshold + } if config.ReplayWorkers == 0 { config.ReplayWorkers = consts.DefaultReplayWorkers } diff --git a/indexwriter/blocks.go b/indexwriter/blocks.go index 3064491b..636f8fe9 100644 --- a/indexwriter/blocks.go +++ b/indexwriter/blocks.go @@ -3,6 +3,7 @@ package indexwriter import ( "encoding/binary" "iter" + "math" "unsafe" "github.com/ozontech/seq-db/frac/sealed/lids" @@ -47,7 +48,7 @@ type unpackedIDBlock struct { func tokenBlock( it iter.Seq2[string, iter.Seq2[TokenPosting, error]], - accumulate func([]uint32) error, blockCapacity int, + accumulate func([]uint32) error, blockCapacity int, tokenFreqThreshold int, ) iter.Seq2[tokenFieldBlock, error] { return func(yield func(tokenFieldBlock, error) bool) { var ( @@ -87,6 +88,8 @@ func tokenBlock( block.payload.Payload = block.payload.Payload[:0] block.payload.Offsets = block.payload.Offsets[:0] + block.payload.FreqIndexes = block.payload.FreqIndexes[:0] + block.payload.Freqs = block.payload.Freqs[:0] block.ext.minTID = currentTID + 1 blockIdx++ @@ -120,10 +123,19 @@ func tokenBlock( } } + tokenIndex := uint32(len(block.payload.Offsets)) block.payload.Offsets = append(block.payload.Offsets, uint32(len(block.payload.Payload))) block.payload.Payload = binary.LittleEndian.AppendUint32(block.payload.Payload, uint32(len(tok))) block.payload.Payload = append(block.payload.Payload, tok...) + if len(tlids) >= tokenFreqThreshold { + if tokenIndex > math.MaxUint16 { + panic("unsupported token block size") + } + block.payload.FreqIndexes = append(block.payload.FreqIndexes, uint16(tokenIndex)) + block.payload.Freqs = append(block.payload.Freqs, uint32(len(tlids))) + } + if err := accumulate(tlids); err != nil { yield(tokenFieldBlock{}, err) return diff --git a/indexwriter/blocks_test.go b/indexwriter/blocks_test.go index bb513130..c49f1d97 100644 --- a/indexwriter/blocks_test.go +++ b/indexwriter/blocks_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/ozontech/seq-db/frac/sealed/lids" "github.com/ozontech/seq-db/frac/sealed/token" @@ -58,6 +59,41 @@ func (m *mockSource) ID() iter.Seq2[DocLocation, error] { } } +func TestBlocksBuilder_BuildTokenBlocksWithFreq(t *testing.T) { + const ( + blockSize = 1024 + tokenFreqThreshold = 50 + ) + manyLids := make([]uint32, tokenFreqThreshold) + for i := range manyLids { + manyLids[i] = uint32(i + 1) + } + + src := mockSource{ + tokens: [][]byte{ + []byte("rare"), + []byte("common"), + }, + fields: []string{"f1"}, + fieldMaxTIDs: []uint32{2}, + tokenLIDs: [][]uint32{ + {1, 2, 3}, + manyLids, + }, + } + + var blocks []unpackedTokenBlock + for pair, err := range tokenBlock(src.TokenTriplet(), func([]uint32) error { return nil }, blockSize, tokenFreqThreshold) { + assert.NoError(t, err) + blocks = append(blocks, pair.First) + } + + require.Len(t, blocks, 1) + + assert.Equal(t, uint32(0), blocks[0].payload.GetFreq(0)) + assert.Equal(t, uint32(tokenFreqThreshold), blocks[0].payload.GetFreq(1)) +} + func TestBlocksBuilder_BuildTokenBlocks(t *testing.T) { src := mockSource{ tokens: [][]byte{ @@ -119,6 +155,7 @@ func TestBlocksBuilder_BuildTokenBlocks(t *testing.T) { src.TokenTriplet(), lidAccumulator.add, blockSize, + 50, ) // In our test case, each token is 4 bytes long. Also for each token we use uint32 to encode the length. diff --git a/indexwriter/index.go b/indexwriter/index.go index c28c7b6e..56bfa33e 100644 --- a/indexwriter/index.go +++ b/indexwriter/index.go @@ -153,7 +153,7 @@ func (s *IndexWriter) WriteTokenTriplet(tws, lws io.WriteSeeker, src Source) err ) var allFieldsTables []token.FieldTable - for pair, err := range tokenBlock(src.TokenTriplet(), lidAccumulator.add, consts.RegularBlockSize) { + for pair, err := range tokenBlock(src.TokenTriplet(), lidAccumulator.add, consts.RegularBlockSize, s.params.TokenFreqThreshold) { if err != nil { return err } @@ -221,7 +221,7 @@ func (s *IndexWriter) packInfoBlock(block sealed.BlockInfo) indexBlock { // packTokenBlock packs token data into a compressed index block. func (s *IndexWriter) packTokenBlock(block unpackedTokenBlock) indexBlock { - s.buf1 = block.payload.Pack(s.buf1[:0]) // Pack token data + s.buf1 = block.payload.Pack(s.buf1[:0], s.buf32[:0]) // Pack token data b := s.newIndexBlockZSTD(s.buf1, s.params.TokenListZstdLevel) // Store TID range in extended metadata b.ext1 = uint64(block.ext.maxTID)<<32 | uint64(block.ext.minTID) diff --git a/packer/delta_bitpacker.go b/packer/delta_bitpacker.go index 36fd68df..7185d23e 100644 --- a/packer/delta_bitpacker.go +++ b/packer/delta_bitpacker.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "errors" "fmt" + "sync" "unsafe" "github.com/ronanh/intcomp" @@ -13,6 +14,12 @@ const ( sizeOfUint32 = int(unsafe.Sizeof(uint32(0))) ) +var uint32Pool = sync.Pool{ + New: func() any { + return make([]uint32, 0, 16*1024) + }, +} + // CompressDeltaBitpackUint32 works on top of intcomp library. intcomp can only compress slices which are multiple of 128, but // this function supports slices of any length. Residual part is always less than 128 numbers and is not delta encoded, // since we know the number of blocks with length non-multiple of 128 is very low. @@ -36,6 +43,19 @@ func CompressDeltaBitpackUint32(dst []byte, values, buf []uint32) []byte { return dst } +// CompressDeltaBitpackUint16 uses a temporary buffer to copy and cast values from uint16 to uint32 so it's a bit slower than CompressDeltaBitpackUint32. +func CompressDeltaBitpackUint16(dst []byte, values []uint16, buf []uint32) []byte { + uint32Values, _ := uint32Pool.Get().([]uint32) + uint32Values = uint32Values[:0] + + for _, i := range values { + uint32Values = append(uint32Values, uint32(i)) + } + dst = CompressDeltaBitpackUint32(dst, uint32Values, buf) + uint32Pool.Put(uint32Values) + return dst +} + func DecompressDeltaBitpackUint32(data []byte, buf, compressed []uint32) ([]byte, []uint32, error) { if len(data) < sizeOfUint32 { return nil, nil, fmt.Errorf("not enough data. slice len %d", len(data)) @@ -81,6 +101,26 @@ func DecompressDeltaBitpackUint32(data []byte, buf, compressed []uint32) ([]byte return data, buf, nil } +// DecompressDeltaBitpackUint16 works on top of DecompressDeltaBitpackUint32 so it's a bit slower +func DecompressDeltaBitpackUint16(data []byte, buf []uint16, compressed []uint32) ([]byte, []uint16, error) { + uint32Values, _ := uint32Pool.Get().([]uint32) + uint32Values = uint32Values[:0] + + var ( + values []uint32 + err error + ) + + data, values, err = DecompressDeltaBitpackUint32(data, uint32Values, compressed) + + for _, i := range values { + buf = append(buf, uint16(i)) + } + uint32Pool.Put(uint32Values) + + return data, buf, err +} + // CompressDeltaBitpackUint64 works on top of intcomp library. intcomp can only compress uint64 slices which are multiple of 256, but // this function supports slices of any length. Residual part is always less than 256 uint64 numbers and is not delta encoded, // since we know the number of blocks with length non-multiple of 256 is very low. diff --git a/packer/delta_bitpacker_test.go b/packer/delta_bitpacker_test.go index 049c658c..9334e8dc 100644 --- a/packer/delta_bitpacker_test.go +++ b/packer/delta_bitpacker_test.go @@ -7,6 +7,68 @@ import ( "github.com/stretchr/testify/require" ) +func TestCompressDeltaBitpackUint16(t *testing.T) { + testCases := []struct { + name string + values []uint16 + }{ + { + name: "empty", + values: []uint16{}, + }, + { + name: "small_single_value", + values: []uint16{1}, + }, + { + name: "small_few_values", + values: []uint16{1, 4, 7, 8, 10}, + }, + { + name: "small_127_values", + values: generateUint16(127), + }, + { + name: "small_128", + values: generateUint16(128), + }, + { + name: "small_129", + values: generateUint16(129), + }, + { + name: "midium_4k", + values: generateUint16(4096), + }, + { + name: "midium_4k_more", + values: generateUint16(4105), + }, + { + name: "midium_64k", + values: generateUint16(64 * 1024), + }, + { + name: "midium_64k_more", + values: generateUint16(64*1024 + 34), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + compressed := CompressDeltaBitpackUint16([]byte{}, tc.values, []uint32{}) + tmp := make([]uint32, 0, len(compressed)/sizeOfUint32) + _, decompressed, err := DecompressDeltaBitpackUint16(compressed, []uint16{}, tmp) + require.NoError(t, err) + if len(tc.values) > 0 { + require.Equal(t, tc.values, decompressed) + } else { + require.Equal(t, 0, len(decompressed)) + } + }) + } +} + func TestCompressDeltaBitpackUint32(t *testing.T) { testCases := []struct { name string @@ -131,6 +193,16 @@ func TestCompressDeltaBitpackUint64(t *testing.T) { } } +func generateUint16(n int) []uint16 { + v := make([]uint16, n) + last := uint16(100) + for i := range v { + v[i] = last + last += uint16(1 + rand.Intn(5)) + } + return v +} + func generateUint32(n int) []uint32 { v := make([]uint32, n) last := uint32(100) diff --git a/util/size.go b/util/size.go index 048b6405..4f22b845 100644 --- a/util/size.go +++ b/util/size.go @@ -4,6 +4,7 @@ import "unsafe" const ( SizeOfString = int(unsafe.Sizeof("")) + SizeOfUint16 = int(unsafe.Sizeof(uint16(0))) SizeOfUint32 = int(unsafe.Sizeof(uint32(0))) SizeOfUint64 = int(unsafe.Sizeof(uint64(0))) SizeOfPointer = int(unsafe.Sizeof(int(0)))