Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ const (
// We can remove it in the future releases.
IndexDelFileSuffix = ".index.del"

RemoteFractionSuffix = ".remote"
RemoteFractionSuffix = ".remote"
RemoteFractionInfoSuffix = ".remote-info"

FracCacheFileSuffix = ".frac-cache"

Expand Down
2 changes: 0 additions & 2 deletions frac/fraction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2347,7 +2347,6 @@ func (s *SealedLoadedFractionTestSuite) newSealedLoaded(bulks ...[]string) *Seal
nil,
s.config,
testSkipMaskProvider{},
false,
)

s.fraction = sealed
Expand Down Expand Up @@ -2409,7 +2408,6 @@ func (s *RemoteFractionTestSuite) SetupTest() {
s.config,
s3cli,
testSkipMaskProvider{},
false,
)

s.fraction = remoteFrac
Expand Down
128 changes: 78 additions & 50 deletions frac/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package frac
import (
"context"
"fmt"
"os"
"path/filepath"
"sync"

"go.uber.org/zap"

"github.com/ozontech/seq-db/cache"
"github.com/ozontech/seq-db/config"
"github.com/ozontech/seq-db/consts"
"github.com/ozontech/seq-db/frac/common"
"github.com/ozontech/seq-db/frac/processor"
Expand Down Expand Up @@ -43,8 +45,6 @@ type Remote struct {
docsCache *cache.Cache[[]byte]
docsReader storage.DocsReader

// IsLegacy is true for fractions that use the old single .index file format.
IsLegacy bool
legacyFile storage.ImmutableFile
legacyReader storage.IndexReader

Expand Down Expand Up @@ -79,10 +79,9 @@ func NewRemote(
indexCache *IndexCache,
docsCache *cache.Cache[[]byte],
info *common.Info,
config *Config,
cfg *Config,
s3cli *s3.Client,
skipMaskProvider skipMaskProvider,
isLegacy bool,
) *Remote {
f := &Remote{
ctx: ctx,
Expand All @@ -95,12 +94,10 @@ func NewRemote(

info: info,
BaseFileName: baseFile,
Config: config,
Config: cfg,

s3cli: s3cli,
skipMaskProvider: skipMaskProvider,

IsLegacy: isLegacy,
}

// Fast path if fraction-info cache exists AND it has valid index size.
Expand All @@ -116,7 +113,7 @@ func NewRemote(
// https://github.com/ozontech/seq-db/issues/92

if err := f.loadInfo(); err != nil {
logger.Error(
logger.Fatal(
"cannot open info file: any subsequent operation will fail",
zap.String("fraction", filepath.Base(f.BaseFileName)),
zap.Error(err),
Expand Down Expand Up @@ -175,7 +172,8 @@ func (f *Remote) createDataProvider(ctx context.Context) (*sealedDataProvider, e
lidReader := &f.lidReader
idReader := &f.idReader

if f.IsLegacy {
isLegacy := f.IsSingleIndex()
if isLegacy {
tokenReader = &f.legacyReader
lidReader = &f.legacyReader
idReader = &f.legacyReader
Expand All @@ -192,7 +190,7 @@ func (f *Remote) createDataProvider(ctx context.Context) (*sealedDataProvider, e
lidsTable: f.blocksData.LIDsTable,
lidsLoader: lids.NewLoader(f.info.BinaryDataVer, lidReader, f.indexCache.LIDs),
tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, tokenReader, f.indexCache.Tokens),
tokenTableLoader: token.NewTableLoader(f.BaseFileName, f.IsLegacy, tokenReader, f.indexCache.TokenTable),
tokenTableLoader: token.NewTableLoader(f.BaseFileName, isLegacy, tokenReader, f.indexCache.TokenTable),

idsTable: &f.blocksData.IDsTable,
idsProvider: seqids.NewProvider(
Expand All @@ -219,6 +217,7 @@ func (f *Remote) Suicide() {
// FIXME(dkharms): We need to rename `.remote` file to `._remote` to commit deletion intent.
// Now, we might have fraction leaks in S3 storage since [Suicide] is not atomic.
util.MustRemoveFileByPath(f.BaseFileName + consts.RemoteFractionSuffix)
util.MustRemoveFileByPath(f.BaseFileName + consts.RemoteFractionInfoSuffix)

f.docsCache.Release()
f.indexCache.Release()
Expand Down Expand Up @@ -252,27 +251,60 @@ func (f *Remote) String() string {
return fracToString(f, "remote")
}

func (f *Remote) IsSingleIndex() bool {
return f.info.BinaryDataVer < config.BinaryDataV3
}

func (f *Remote) loadInfo() error {
var err error
err := f.tryLoadInfoLocal()
if err == nil {
return nil
}

if f.IsLegacy {
if err := f.openInfoLegacy(); err != nil {
return err
}
if f.info, err = loadInfoLegacy(f.legacyReader); err != nil {
logger.Fatal("error loading Info", zap.String("fraction", f.BaseFileName), zap.Error(err))
}
logger.Warn(
"cannot open local info file for remote fraction, falling back to S3",
zap.String("fraction", f.BaseFileName),
zap.Error(err),
)

err = f.tryLoadInfoRemote()
if err == nil {
return nil
}

if err := f.openInfo(); err != nil {
return err
logger.Warn(
"cannot open remote info file, falling back to legacy index",
zap.String("fraction", f.BaseFileName),
zap.Error(err),
)

return f.loadInfoLegacy()
}

func (f *Remote) loadInfoLegacy() error {
err := f.openIndexLegacyRemote()
if err == nil {
f.info, err = loadInfoLegacy(f.legacyReader)
}
return err
}

func (f *Remote) tryLoadInfoLocal() error {
remoteInfoPath := f.BaseFileName + consts.RemoteFractionInfoSuffix
file, err := os.Open(remoteInfoPath)
if err == nil {
defer file.Close()
f.info, err = loadInfo(file)
}
return err
}

if f.info, err = loadInfo(f.infoFile); err != nil {
logger.Fatal("error loading Info", zap.String("fraction", f.BaseFileName), zap.Error(err))
func (f *Remote) tryLoadInfoRemote() error {
err := f.openInfoRemote()
if err == nil {
f.info, err = loadInfo(f.infoFile)
}
return nil
return err
}

func (f *Remote) init() error {
Expand All @@ -291,7 +323,7 @@ func (f *Remote) init() error {
return nil
}

if f.IsLegacy {
if f.IsSingleIndex() {
(&LegacyLoader{}).Load(&f.blocksData, f.info, f.legacyReader)
f.isInited = true
return nil
Expand All @@ -308,44 +340,36 @@ func (f *Remote) init() error {
return nil
}

func (f *Remote) openInfoLegacy() error {
func (f *Remote) openIndexLegacyRemote() error {
if f.legacyFile != nil {
return nil
}

return f.openRemoteFile(consts.IndexFileSuffix, func(file storage.ImmutableFile) {
f.legacyFile = file
f.legacyReader = storage.NewIndexReader(
f.readLimiter, file.Name(),
file, f.indexCache.LegacyRegistry,
)
f.legacyReader = storage.NewIndexReader(f.readLimiter, file.Name(), file, f.indexCache.LegacyRegistry)
})
}

func (f *Remote) openInfo() error {
func (f *Remote) openInfoRemote() error {
if f.infoFile != nil {
return nil
}

return f.openRemoteFile(
consts.InfoFileSuffix,
func(file storage.ImmutableFile) {
f.infoFile = file
},
)
return f.openRemoteFile(consts.InfoFileSuffix, func(file storage.ImmutableFile) {
f.infoFile = file
})
}

func (f *Remote) openIndex() error {
if f.IsLegacy {
return f.openInfoLegacy()
if f.IsSingleIndex() {
return f.openIndexLegacyRemote()
}

if err := f.openInfo(); err != nil {
if err := f.openInfoRemote(); err != nil {
return err
}

if f.tokenFile == nil {
if err := f.openRemoteFile(
err := f.openRemoteFile(
consts.TokenFileSuffix,
func(file storage.ImmutableFile) {
f.tokenFile = file
Expand All @@ -354,13 +378,14 @@ func (f *Remote) openIndex() error {
file, f.indexCache.TokenRegistry,
)
},
); err != nil {
)
if err != nil {
return err
}
}

if f.offsetsFile == nil {
if err := f.openRemoteFile(
err := f.openRemoteFile(
consts.OffsetsFileSuffix,
func(file storage.ImmutableFile) {
f.offsetsFile = file
Expand All @@ -369,13 +394,14 @@ func (f *Remote) openIndex() error {
file, f.indexCache.OffsetsRegistry,
)
},
); err != nil {
)
if err != nil {
return err
}
}

if f.idFile == nil {
if err := f.openRemoteFile(
err := f.openRemoteFile(
consts.IDFileSuffix,
func(file storage.ImmutableFile) {
f.idFile = file
Expand All @@ -384,13 +410,14 @@ func (f *Remote) openIndex() error {
file, f.indexCache.IDRegistry,
)
},
); err != nil {
)
if err != nil {
return err
}
}

if f.lidFile == nil {
if err := f.openRemoteFile(
err := f.openRemoteFile(
consts.LIDFileSuffix,
func(file storage.ImmutableFile) {
f.lidFile = file
Expand All @@ -399,7 +426,8 @@ func (f *Remote) openIndex() error {
file, f.indexCache.LIDRegistry,
)
},
); err != nil {
)
if err != nil {
return err
}
}
Expand Down Expand Up @@ -482,7 +510,7 @@ func (f *Remote) computeIndexSize() {
f.lidFile,
}

if f.IsLegacy {
if f.IsSingleIndex() {
files = []storage.ImmutableFile{
f.legacyFile,
}
Expand Down
Loading
Loading