Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cmd/index_analyzer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,14 @@ func analyzeIndex(
}

tokens := [][]byte{}
tokenUnpackBuf := &token.UnpackBuffer{}
for {
data := readTokenBlock()
if len(data) == 0 { // empty block - section separator
break
}
block := token.Block{}
if err := block.Unpack(data); err != nil {
if err := block.Unpack(data, b.Info.BinaryDataVer, tokenUnpackBuf); err != nil {
logger.Fatal("error unpacking tokens", zap.Error(err))
}
for i := range block.Len() {
Expand Down Expand Up @@ -189,14 +190,15 @@ func analyzeIndex(
lidsUniq := map[[16]byte]int{}
lidsLens := make([]int, len(tokens))
tokenLIDs := []uint32{}
lidUnpackBuf := &lids.UnpackBuffer{}
for {
data := readLIDBlock()
if len(data) == 0 { // empty block - section separator
break
}

block := &lids.Block{}
if err := block.Unpack(data, ver, &lids.UnpackBuffer{}); err != nil {
if err := block.Unpack(data, ver, b.Info.BinaryDataVer, lidUnpackBuf); err != nil {
logger.Fatal("error unpacking lids block", zap.Error(err))
}

Expand Down
1 change: 1 addition & 0 deletions cmd/seq-db/seq-db.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ func startStore(
TokenTableZstdLevel: cfg.Compression.SealedZstdCompressionLevel,
DocBlocksZstdLevel: cfg.Compression.DocBlockZstdCompressionLevel,
DocBlockSize: int(cfg.DocsSorting.DocBlockSize),
TokenFreqThreshold: cfg.Sealing.Tokens.FreqThreshold,
},
Fraction: frac.Config{
Search: frac.SearchConfig{
Expand Down
6 changes: 6 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ type Config struct {
// BlockSize sets max lids (postings) saved per LIDs block.
BlockSize int `config:"block_size" default:"65536"`
} `config:"lids"`

Tokens struct {
// FreqThreshold specifies minimum number of lids (postings) a token should have
// so that frequency for that token will be stored inside token blocks.
FreqThreshold int `config:"freq_threshold" default:"50"`
} `config:"tokens"`
} `config:"sealing"`

Cluster struct {
Expand Down
5 changes: 4 additions & 1 deletion config/frac_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ const (

// BinaryDataV4 - delta bitpack encoded MIDs and LIDs
BinaryDataV4

// BinaryDataV5 - token block has doc frequencies for heavy tokens
BinaryDataV5
)

const CurrentFracVersion = BinaryDataV4
const CurrentFracVersion = BinaryDataV5
1 change: 1 addition & 0 deletions consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (

DefaultBulkRequestsLimit = 32
DefaultSearchRequestsLimit = 32
DefaultTokenFreqThreshold = 50

BulkMaxTries = 3

Expand Down
2 changes: 2 additions & 0 deletions frac/common/seal_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ type SealParams struct {
DocBlocksZstdLevel int // DocBlocksZstdLevel is the zstd compress level of each document block.
LIDBlockSize int
DocBlockSize int // DocBlockSize is decompressed payload size of document block.

TokenFreqThreshold int // TokenFreqThreshold Min lids count to store frequency for a token.
}
4 changes: 2 additions & 2 deletions frac/fraction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1901,11 +1901,11 @@ func (s *FractionTestSuite) TestFractionInfo() {
s.Require().Equal(uint64(0), info.IndexOnDisk, "index on disk doesn't match")
case *Sealed:
s.Require().Equal(uint64(0), info.MetaOnDisk, "meta on disk doesn't match. actual value")
s.Require().True(info.IndexOnDisk > uint64(1300) && info.IndexOnDisk < uint64(1450),
s.Require().True(info.IndexOnDisk > uint64(1300) && info.IndexOnDisk < uint64(1500),
"index on disk doesn't match. actual value: %d", info.IndexOnDisk)
case *Remote:
s.Require().Equal(uint64(0), info.MetaOnDisk, "meta on disk doesn't match. actual value")
s.Require().True(info.IndexOnDisk > uint64(1300) && info.IndexOnDisk < uint64(1450),
s.Require().True(info.IndexOnDisk > uint64(1300) && info.IndexOnDisk < uint64(1500),
"index on disk doesn't match. actual value: %d", info.IndexOnDisk)
default:
s.Require().Fail("unsupported fraction type")
Expand Down
2 changes: 1 addition & 1 deletion frac/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (f *Remote) createDataProvider(ctx context.Context) (*sealedDataProvider, e
blocksOffsets: f.blocksData.BlocksOffsets,
lidsTable: f.blocksData.LIDsTable,
lidsLoader: lids.NewLoader(f.info.BinaryDataVer, lidReader, f.indexCache.LIDs),
tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, tokenReader, f.indexCache.Tokens),
tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, f.info.BinaryDataVer, tokenReader, f.indexCache.Tokens),
tokenTableLoader: token.NewTableLoader(f.BaseFileName, f.IsLegacy, tokenReader, f.indexCache.TokenTable),

idsTable: &f.blocksData.IDsTable,
Expand Down
2 changes: 1 addition & 1 deletion frac/sealed.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ func (f *Sealed) createDataProvider(ctx context.Context) *sealedDataProvider {
blocksOffsets: f.blocksData.BlocksOffsets,
lidsTable: f.blocksData.LIDsTable,
lidsLoader: lids.NewLoader(f.info.BinaryDataVer, lidReader, f.indexCache.LIDs),
tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, tokenReader, f.indexCache.Tokens),
tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, f.info.BinaryDataVer, tokenReader, f.indexCache.Tokens),
tokenTableLoader: token.NewTableLoader(f.BaseFileName, f.IsLegacy, tokenReader, f.indexCache.TokenTable),

idsTable: &f.blocksData.IDsTable,
Expand Down
164 changes: 136 additions & 28 deletions frac/sealed/token/block_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,47 +5,108 @@ import (
"encoding/binary"
"fmt"
"math"
"sort"
"unsafe"

"go.uber.org/zap"

"github.com/ozontech/seq-db/cache"
"github.com/ozontech/seq-db/config"
"github.com/ozontech/seq-db/logger"
"github.com/ozontech/seq-db/packer"
"github.com/ozontech/seq-db/pattern"
"github.com/ozontech/seq-db/storage"
"github.com/ozontech/seq-db/util"
)

const sizeOfUint32 = uint32(unsafe.Sizeof(uint32(0)))

type Block struct {
Payload []byte
Offsets []uint32
Payload []byte
Offsets []uint32
FreqIndexes []uint16 // indexes of tokens which have doc freqs (frequencies)
Freqs []uint32 // frequencies of certain tokens (how many docs have this token included at least once)
}

func (b *Block) Size() int {
const selfSize = int(unsafe.Sizeof(Block{}))
return selfSize + cap(b.Payload) + cap(b.Offsets)*int(sizeOfUint32)
return selfSize +
cap(b.Payload) +
cap(b.Offsets)*util.SizeOfUint32 +
cap(b.FreqIndexes)*util.SizeOfUint16 +
cap(b.Freqs)*util.SizeOfUint32
}

func (b Block) Pack(dst []byte) []byte {
return append(dst, b.Payload...)
func (b Block) Pack(dst []byte, buf []uint32) []byte {
dst = binary.LittleEndian.AppendUint32(dst, uint32(len(b.Payload)))
dst = append(dst, b.Payload...)
dst = packer.CompressDeltaBitpackUint16(dst, b.FreqIndexes, buf)
return packer.CompressDeltaBitpackUint32(dst, b.Freqs, buf)
}

func (b *Block) Unpack(data []byte) error {
var offset uint32
func (b *Block) Unpack(data []byte, fracVer config.BinaryDataVersion, unpackBuf *UnpackBuffer) error {
if fracVer >= config.BinaryDataV5 {
unpackBuf.Reset(fracVer)
return b.unpackV5(data, unpackBuf)
}
return b.unpackV1(data)
}

func (b *Block) unpackV1(data []byte) error {
b.Payload = data
for i := 0; len(data) != 0; i++ {
l := binary.LittleEndian.Uint32(data)
data = data[sizeOfUint32:]
offset += sizeOfUint32
return b.parseTokenPayload(data)
}

func (b *Block) unpackV5(data []byte, buf *UnpackBuffer) error {
if len(data) < util.SizeOfUint32 {
return fmt.Errorf("token block too short: %d bytes", len(data))
}

payloadLen := binary.LittleEndian.Uint32(data[:util.SizeOfUint32])
data = data[util.SizeOfUint32:]
if uint32(len(data)) < payloadLen {
return fmt.Errorf("invalid token block payload length: %d, data len %d", payloadLen, len(data))
}

payload := data[:payloadLen]
data = data[payloadLen:]

b.Payload = append(b.Payload[:0], payload...)

if err := b.parseTokenPayload(payload); err != nil {
return err
}

var err error
var freqIndexes []uint16
data, freqIndexes, err = packer.DecompressDeltaBitpackUint16(data, buf.decompressedUint16, buf.compressed)
if err != nil {
return err
}
b.FreqIndexes = append(b.FreqIndexes, freqIndexes...)

var freqs []uint32
_, freqs, err = packer.DecompressDeltaBitpackUint32(data, buf.decompressedUint32, buf.compressed)
if err != nil {
return err
}
b.Freqs = append(b.Freqs, freqs...)

return nil
}

func (b *Block) parseTokenPayload(payload []byte) error {
var offset uint32
for i := 0; len(payload) != 0; i++ {
l := binary.LittleEndian.Uint32(payload)
payload = payload[util.SizeOfUint32:]
offset += uint32(util.SizeOfUint32)
if l == math.MaxUint32 {
continue
}
if l > uint32(len(data)) {
if l > uint32(len(payload)) {
return fmt.Errorf("wrong field block for token %d, in pos %d", i, offset)
}
b.Offsets = append(b.Offsets, offset-sizeOfUint32)
data = data[l:]
b.Offsets = append(b.Offsets, offset-uint32(util.SizeOfUint32))
payload = payload[l:]
offset += l
}
return nil
Expand All @@ -55,10 +116,24 @@ func (b *Block) Len() int {
return len(b.Offsets)
}

// GetFreq returns frequency for a token if stored or 0 otherwise
func (b *Block) GetFreq(index int) uint32 {
if b.Freqs == nil {
return 0
}

idx := uint16(index)
found := sort.Search(len(b.FreqIndexes), func(i int) bool { return b.FreqIndexes[i] >= idx })
if found < len(b.FreqIndexes) && b.FreqIndexes[found] == idx {
return b.Freqs[found]
}
return 0
}

func (b *Block) GetToken(index int) []byte {
offset := b.Offsets[index]
l := binary.LittleEndian.Uint32(b.Payload[offset:])
offset += sizeOfUint32 // skip val length
offset += uint32(util.SizeOfUint32) // skip val length
return b.Payload[offset : offset+l]
}

Expand Down Expand Up @@ -90,16 +165,26 @@ func (b *Block) find(from, to int, searcher pattern.Searcher) ([]int, error) {
// NOT THREAD SAFE. Do not use concurrently.
// Use your own BlockLoader instance for each search query
type BlockLoader struct {
fracName string
cache *cache.Cache[*Block]
reader *storage.IndexReader
fracName string
fracVer config.BinaryDataVersion
cache *cache.Cache[*Block]
reader *storage.IndexReader
unpackBuf *UnpackBuffer
blockBuf []byte
}

func NewBlockLoader(fracName string, reader *storage.IndexReader, c *cache.Cache[*Block]) *BlockLoader {
func NewBlockLoader(
fracName string,
fracVer config.BinaryDataVersion,
reader *storage.IndexReader,
c *cache.Cache[*Block],
) *BlockLoader {
return &BlockLoader{
fracName: fracName,
cache: c,
reader: reader,
fracName: fracName,
fracVer: fracVer,
cache: c,
reader: reader,
unpackBuf: &UnpackBuffer{},
}
}

Expand All @@ -120,11 +205,34 @@ func (l *BlockLoader) Load(index uint32) *Block {
}

func (l *BlockLoader) read(index uint32) (*Block, error) {
data, _, err := l.reader.ReadIndexBlock(index, nil)
var err error
l.blockBuf, _, err = l.reader.ReadIndexBlock(index, l.blockBuf)
if err != nil {
return nil, err
}
block := Block{}
err = block.Unpack(data)
return &block, err
block := &Block{}
err = block.Unpack(l.blockBuf, l.fracVer, l.unpackBuf)
return block, err
}

type UnpackBuffer struct {
decompressedUint32 []uint32 // temporary buffer for bitpack
decompressedUint16 []uint16 // temporary buffer for bitpack
compressed []uint32 // temporary buffer for bitpack
}

func (b *UnpackBuffer) Reset(fracVer config.BinaryDataVersion) {
if fracVer < config.BinaryDataV5 {
return
}
if b.decompressedUint32 == nil {
b.decompressedUint32 = make([]uint32, 0, 256)
} else {
b.decompressedUint32 = b.decompressedUint32[:0]
}
if b.compressed == nil {
b.compressed = make([]uint32, 0, 256)
} else {
b.compressed = b.compressed[:0]
}
}
Loading