Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/seq-db/seq-db.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ func startStore(
TokenTableZstdLevel: cfg.Compression.SealedZstdCompressionLevel,
DocBlocksZstdLevel: cfg.Compression.DocBlockZstdCompressionLevel,
DocBlockSize: int(cfg.DocsSorting.DocBlockSize),
SkipFsync: cfg.Resources.SkipFsync,
},
Fraction: frac.Config{
Search: frac.SearchConfig{
Expand All @@ -283,6 +284,7 @@ func startStore(
},
SkipSortDocs: !cfg.DocsSorting.Enabled,
KeepMetaFile: false,
SkipFsync: cfg.Resources.SkipFsync,
},
OffloadingEnabled: cfg.Offloading.Enabled,
OffloadingRetention: cfg.Offloading.Retention,
Expand Down
15 changes: 8 additions & 7 deletions frac/active.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@
cfg *Config,
skipMaskProvider skipMaskProvider,
) *Active {
docsFile, docsStats := mustOpenFile(baseFileName+consts.DocsFileSuffix, config.SkipFsync)
docsFile, docsStats := mustOpenFile(baseFileName+consts.DocsFileSuffix, cfg.SkipFsync)

metaFile, writer, metaReader, walReader, metaSize := mustOpenMetaWriter(baseFileName, readLimiter, docsFile, docsStats)
metaFile, writer, metaReader, walReader, metaSize := mustOpenMetaWriter(baseFileName, readLimiter, docsFile, docsStats, cfg.SkipFsync)

f := &Active{
TokenList: NewActiveTokenList(config.IndexWorkers),
Expand Down Expand Up @@ -128,24 +128,25 @@
baseFileName string,
readLimiter *storage.ReadLimiter,
docsFile *os.File,
docsStats os.FileInfo) (*os.File, *ActiveWriter, *storage.DocBlocksReader, *storage.WalReader, uint64) {
docsStats os.FileInfo,
skipFsync bool) (*os.File, *ActiveWriter, *storage.DocBlocksReader, *storage.WalReader, uint64) {
legacyMetaFileName := baseFileName + consts.MetaFileSuffix

if _, err := os.Stat(legacyMetaFileName); err == nil {
// .meta file exists
metaFile, metaStats := mustOpenFile(legacyMetaFileName, config.SkipFsync)
metaFile, metaStats := mustOpenFile(legacyMetaFileName, skipFsync)
metaSize := uint64(metaStats.Size())
metaReader := storage.NewDocBlocksReader(readLimiter, metaFile)
writer := NewActiveWriterLegacy(docsFile, metaFile, docsStats.Size(), metaStats.Size(), config.SkipFsync)
writer := NewActiveWriterLegacy(docsFile, metaFile, docsStats.Size(), metaStats.Size(), skipFsync)
logger.Info("using legacy meta file format", zap.String("fraction", baseFileName))
return metaFile, writer, &metaReader, nil, metaSize
}

logger.Info("using new WAL format", zap.String("fraction", baseFileName))
walFileName := baseFileName + consts.WalFileSuffix
metaFile, metaStats := mustOpenFile(walFileName, config.SkipFsync)
metaFile, metaStats := mustOpenFile(walFileName, skipFsync)
metaSize := uint64(metaStats.Size())
writer := NewActiveWriter(docsFile, metaFile, docsStats.Size(), metaStats.Size(), config.SkipFsync)
writer := NewActiveWriter(docsFile, metaFile, docsStats.Size(), metaStats.Size(), skipFsync)
walReader, err := storage.NewWalReader(readLimiter, metaFile, baseFileName)
if err != nil {
logger.Fatal("failed to initialize WAL reader", zap.String("fraction", baseFileName), zap.Error(err))
Expand Down Expand Up @@ -300,7 +301,7 @@
}

var bulkStagesSeconds = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "seq_db_store",

Check failure on line 304 in frac/active.go

View workflow job for this annotation

GitHub Actions / lint

string `seq_db_store` has 5 occurrences, make it a constant (goconst)
Subsystem: "bulk",
Name: "stages_seconds",
Buckets: metric.SecondsBuckets,
Expand Down
15 changes: 10 additions & 5 deletions frac/active_sealing_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type ActiveSealingSource struct {
blocksOffsets []uint64 // Document block offsets
docsReader *storage.DocsReader // Document storage reader
lastErr error // Last error
skipFsync bool
}

// NewActiveSealingSource creates a new data source for sealing
Expand Down Expand Up @@ -87,6 +88,7 @@ func NewActiveSealingSource(active *Active, params common.SealParams) (*ActiveSe
docPosMap: active.DocsPositions.idToPos,
blocksOffsets: active.DocBlocks.vals,
docsReader: &active.sortReader,
skipFsync: active.Config.SkipFsync,
}

src.prepareInfo()
Expand Down Expand Up @@ -371,18 +373,21 @@ func (src *ActiveSealingSource) SortDocs() error {
}
src.info.DocsOnDisk = uint64(stat.Size())

// Synchronize and rename file
if err := sdocsFile.Sync(); err != nil {
return err
if !src.skipFsync {
if err := sdocsFile.Sync(); err != nil {
return err
}
}
if err := sdocsFile.Close(); err != nil {
return err
}
if err := os.Rename(sdocsFile.Name(), src.info.Path+consts.SdocsFileSuffix); err != nil {
return err
}
if err := util.SyncPath(filepath.Dir(src.info.Path)); err != nil {
return err
if !src.skipFsync {
if err := util.SyncPath(filepath.Dir(src.info.Path)); err != nil {
return err
}
}

// Log compression statistics
Expand Down
2 changes: 2 additions & 0 deletions frac/common/seal_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ type SealParams struct {

DocBlocksZstdLevel int // DocBlocksZstdLevel is the zstd compress level of each document block.
DocBlockSize int // DocBlockSize is decompressed payload size of document block.

SkipFsync bool
}
1 change: 1 addition & 0 deletions frac/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ type Config struct {

SkipSortDocs bool
KeepMetaFile bool
SkipFsync bool
}

type SearchConfig struct {
Expand Down
14 changes: 9 additions & 5 deletions frac/sealed/sealing/sealer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,11 @@ func Seal(src Source, params common.SealParams) (*sealed.PreloadedData, error) {
return nil, err
}

// Ensure data is flushed to disk
if err := indexFile.Sync(); err != nil {
return nil, err
if !params.SkipFsync {
// Ensure data is flushed to disk
if err := indexFile.Sync(); err != nil {
return nil, err
}
}

// Get final file size for metadata
Expand All @@ -81,8 +83,10 @@ func Seal(src Source, params common.SealParams) (*sealed.PreloadedData, error) {
return nil, err
}

// Ensure directory metadata is synced to disk
util.MustSyncPath(filepath.Dir(info.Path))
if !params.SkipFsync {
// Ensure directory metadata is synced to disk
util.MustSyncPath(filepath.Dir(info.Path))
}

// Build preloaded data structure for fast query access
lidsTable := indexSealer.LIDsTable()
Expand Down
Loading