Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 32 additions & 23 deletions asyncsearcher/async_searcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,12 @@ type AsyncSearcherConfig struct {
MaxSizePerRequest int
}

func MustStartAsync(config AsyncSearcherConfig, mp MappingProvider, fracs fracmanager.List) *AsyncSearcher {
type fractionAcquirer interface {
Fractions() fracmanager.List
AcquireFraction(name string) (_ frac.Fraction, release func(), ok bool)
}

func MustStartAsync(config AsyncSearcherConfig, mp MappingProvider, fracs fractionAcquirer) *AsyncSearcher {
if config.DataDir == "" {
logger.Fatal("can't start async searcher: DataDir is empty")
}
Expand Down Expand Up @@ -154,10 +159,10 @@ type asyncSearchInfo struct {
infoSize *atomic.Int64
}

func newAsyncSearchInfo(r AsyncSearchRequest, list fracmanager.List) asyncSearchInfo {
fracsToSearch := make([]fracSearchState, 0, len(list))
for _, f := range list {
fracsToSearch = append(fracsToSearch, fracSearchState{Name: f.Info().Name()})
func newAsyncSearchInfo(r AsyncSearchRequest, fracNames []string) asyncSearchInfo {
fracsToSearch := make([]fracSearchState, 0, len(fracNames))
for _, name := range fracNames {
fracsToSearch = append(fracsToSearch, fracSearchState{Name: name})
}
ctx, cancel := context.WithCancel(context.Background())
return asyncSearchInfo{
Expand Down Expand Up @@ -204,7 +209,7 @@ func (i *asyncSearchInfo) Status() AsyncSearchStatus {
return status
}

func (as *AsyncSearcher) StartSearch(r AsyncSearchRequest, fracs fracmanager.List) error {
func (as *AsyncSearcher) StartSearch(r AsyncSearchRequest, fracs fractionAcquirer) error {
if as.readOnly.Load() {
return fmt.Errorf("cannot start search on read-only mode")
}
Expand Down Expand Up @@ -235,7 +240,8 @@ func (as *AsyncSearcher) StartSearch(r AsyncSearchRequest, fracs fracmanager.Lis
return fmt.Errorf("retention time should be less than %s, got %s", maxRetention, r.Retention)
}

if ok := as.saveSearchInfo(r, fracs); !ok {
fracNames := fracs.Fractions().FilterInRange(r.Params.From, r.Params.To).Names()
if ok := as.saveSearchInfo(r, fracNames); !ok {
// Request was saved previously, skip it
return nil
}
Expand All @@ -245,13 +251,13 @@ func (as *AsyncSearcher) StartSearch(r AsyncSearchRequest, fracs fracmanager.Lis
return nil
}

func (as *AsyncSearcher) saveSearchInfo(r AsyncSearchRequest, fracs fracmanager.List) bool {
func (as *AsyncSearcher) saveSearchInfo(r AsyncSearchRequest, fracNames []string) bool {
as.requestsMu.Lock()
defer as.requestsMu.Unlock()
if _, ok := as.requests[r.ID]; ok {
return false
}
info := newAsyncSearchInfo(r, fracs)
info := newAsyncSearchInfo(r, fracNames)
as.storeSearchInfoLocked(r.ID, info)
return true
}
Expand Down Expand Up @@ -295,7 +301,7 @@ func (as *AsyncSearcher) createDataDir() {
})
}

func (as *AsyncSearcher) processRequest(asyncSearchID string, fracs fracmanager.List) {
func (as *AsyncSearcher) processRequest(asyncSearchID string, fracs fractionAcquirer) {
defer as.processWg.Done()

as.rateLimit <- struct{}{}
Expand All @@ -305,7 +311,7 @@ func (as *AsyncSearcher) processRequest(asyncSearchID string, fracs fracmanager.
asyncSearchActiveSearches.Add(-1)
}

func (as *AsyncSearcher) doSearch(id string, fracs fracmanager.List) {
func (as *AsyncSearcher) doSearch(id string, fracs fractionAcquirer) {
qprPaths, err := as.findQPRs(id)
if err != nil {
panic(fmt.Errorf("can't find QPRs for id %q: %s", id, err))
Expand Down Expand Up @@ -340,25 +346,14 @@ func (as *AsyncSearcher) doSearch(id string, fracs fracmanager.List) {
info.Request.Params.AST = ast.Root
}

fracsByName := make(map[string]frac.Fraction)
for _, f := range fracs {
fracsByName[f.Info().Name()] = f
}

for _, fracInfo := range info.Fractions {
if _, ok := processedFracs[fracInfo.Name]; ok {
continue
}
if as.shouldStopSearch(id) {
break
}

f, ok := fracsByName[fracInfo.Name]
if !ok { // oldest fracs may already be removed
logger.Info("async search: skip missing fraction", zap.String("id", id), zap.String("frac", fracInfo.Name))
continue
}
if err := as.processFrac(f, info); err != nil {
if err := as.acquireAndProcessFrac(fracInfo, info, fracs); err != nil {
as.updateSearchInfo(id, func(info *asyncSearchInfo) {
info.Error = err.Error()
})
Expand Down Expand Up @@ -400,6 +395,20 @@ func compressQPR(qpr *seq.QPR, cb func(compressed []byte) error) error {
return nil
}

func (as *AsyncSearcher) acquireAndProcessFrac(fracInfo fracSearchState, searchInfo asyncSearchInfo, fracs fractionAcquirer) (err error) {
f, release, ok := fracs.AcquireFraction(fracInfo.Name)
if !ok { // oldest fracs may already be removed
logger.Info(
"async search: skip missing fraction",
zap.String("id", searchInfo.Request.ID),
zap.String("frac", fracInfo.Name),
)
return
}
defer release()
return as.processFrac(f, searchInfo)
}

func (as *AsyncSearcher) processFrac(f frac.Fraction, info asyncSearchInfo) (err error) {
defer func() {
if panicData := util.RecoverToError(recover(), asyncSearchPanics); panicData != nil {
Expand Down
23 changes: 22 additions & 1 deletion asyncsearcher/async_searcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/ozontech/seq-db/frac"
"github.com/ozontech/seq-db/frac/common"
"github.com/ozontech/seq-db/frac/processor"
"github.com/ozontech/seq-db/fracmanager"
"github.com/ozontech/seq-db/mappingprovider"
"github.com/ozontech/seq-db/seq"
)
Expand All @@ -25,6 +26,10 @@ func (f *fakeFrac) Info() *common.Info {
return &f.info
}

func (f *fakeFrac) IsIntersecting(from, to seq.MID) bool {
return true
}

func (f *fakeFrac) Search(context.Context, processor.SearchParams) (*seq.QPR, error) {
return &f.dp.qpr, nil
}
Expand All @@ -33,6 +38,21 @@ type fakeDP struct {
qpr seq.QPR
}

type fakeFractionProvider fracmanager.List

func (fp fakeFractionProvider) AcquireFraction(name string) (frac.Fraction, func(), bool) {
for _, f := range fp {
if f.Info().Name() == name {
return f, func() {}, true
}
}
return nil, func() {}, false
}

func (fp fakeFractionProvider) Fractions() fracmanager.List {
return fracmanager.List(fp)
}

func TestAsyncSearcherMaintain(t *testing.T) {
r := require.New(t)

Expand All @@ -50,7 +70,8 @@ func TestAsyncSearcherMaintain(t *testing.T) {
Query: "*",
Retention: time.Hour,
}
fracs := []frac.Fraction{

fracs := fakeFractionProvider{
&fakeFrac{info: common.Info{Path: "1"}},
}
r.NoError(as.StartSearch(req, fracs))
Expand Down
11 changes: 8 additions & 3 deletions frac/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,20 +253,25 @@ func (f *Remote) String() string {
}

func (f *Remote) loadInfo() error {
var err error

if f.IsLegacy {
if err := f.openInfoLegacy(); err != nil {
return err
}

f.info = loadInfoLegacy(f.legacyReader)
if f.info, err = loadInfoLegacy(f.legacyReader); err != nil {
logger.Fatal("error loading Info", zap.String("fraction", f.BaseFileName), zap.Error(err))
}
return nil
}

if err := f.openInfo(); err != nil {
return err
}

f.info = loadInfo(f.infoFile)
if f.info, err = loadInfo(f.infoFile); err != nil {
logger.Fatal("error loading Info", zap.String("fraction", f.BaseFileName), zap.Error(err))
}
return nil
}

Expand Down
29 changes: 18 additions & 11 deletions frac/sealed.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package frac
import (
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
Expand Down Expand Up @@ -279,14 +280,20 @@ func (f *Sealed) openDocs() {
}

func (f *Sealed) loadInfo() {
var err error

if f.IsLegacy {
f.openInfoLegacy()
f.info = loadInfoLegacy(f.legacyReader)
if f.info, err = loadInfoLegacy(f.legacyReader); err != nil {
logger.Fatal("error loading Info", zap.String("fraction", f.BaseFileName), zap.Error(err))
}
return
}

f.openInfo()
f.info = loadInfo(f.infoFile)
if f.info, err = loadInfo(f.infoFile); err != nil {
logger.Fatal("error loading Info", zap.String("fraction", f.BaseFileName), zap.Error(err))
}
}

func (f *Sealed) init(full bool) {
Expand Down Expand Up @@ -535,41 +542,41 @@ func (f *Sealed) IsIntersecting(from, to seq.MID) bool {
return f.info.IsIntersecting(from, to)
}

func loadInfoLegacy(infoReader storage.IndexReader) *common.Info {
func loadInfoLegacy(infoReader storage.IndexReader) (*common.Info, error) {
block, _, err := infoReader.ReadIndexBlock(0, nil)
if err != nil {
logger.Fatal("cannot read info block", zap.Error(err))
return nil, fmt.Errorf("cannot read info block: %w", err)
}

var bi sealed.BlockInfo
if err := bi.Unpack(block); err != nil {
logger.Fatal("cannot unpack info block", zap.Error(err))
return nil, fmt.Errorf("cannot unpack info block: %w", err)
}

return bi.Info
return bi.Info, nil
}

func loadInfo(r interface {
io.ReaderAt
Stat() (os.FileInfo, error)
},
) *common.Info {
) (*common.Info, error) {
stat, err := r.Stat()
if err != nil {
logger.Fatal("cannot stat info file", zap.Error(err))
return nil, fmt.Errorf("cannot stat info file: %w", err)
}

block := make([]byte, stat.Size())
if _, err := r.ReadAt(block, io.SeekStart); err != nil {
logger.Fatal("cannot read info block", zap.Error(err))
return nil, fmt.Errorf("cannot read info block: %w", err)
}

var bi sealed.BlockInfo
if err := bi.Unpack(block); err != nil {
logger.Fatal("cannot unpack info block", zap.Error(err))
return nil, fmt.Errorf("cannot unpack info block: %w", err)
}

return bi.Info
return bi.Info, nil
}

// computeIndexOnDisk returns the total on-disk size of index files for a local fraction.
Expand Down
25 changes: 24 additions & 1 deletion fracmanager/frac_manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/ozontech/seq-db/consts"
"github.com/ozontech/seq-db/logger"
Expand Down Expand Up @@ -237,7 +238,7 @@ func filterValid(ids []string, manifests map[string]*fracManifest) ([]*fracManif

switch manifest.Stage() {
case fracStageUnknown:
logger.Error("unknown fraction stage", zap.String("fraction", id), zap.Any("manifest", manifest))
logger.Error("unknown fraction stage", zap.Object("manifest", manifest))
fractionLoadErrors.Inc()
continue
case fracStageZombie:
Expand Down Expand Up @@ -339,3 +340,25 @@ func extractSuffix(filename string) string {
}
return filename[i:]
}

func (f *fracManifest) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddString("basePath", f.basePath)
enc.AddBool("hasDocs", f.hasDocs)
enc.AddBool("hasMeta", f.hasMeta)
enc.AddBool("hasWal", f.hasWal)
enc.AddBool("hasIndex", f.hasIndex)
enc.AddBool("hasSdocs", f.hasSdocs)
enc.AddBool("hasRemote", f.hasRemote)

enc.AddBool("hasInfo", f.hasInfo)
enc.AddBool("hasToken", f.hasToken)
enc.AddBool("hasOffsets", f.hasOffsets)
enc.AddBool("hasID", f.hasID)
enc.AddBool("hasLID", f.hasLID)

enc.AddBool("hasDocsDel", f.hasDocsDel)
enc.AddBool("hasSdocsDel", f.hasSdocsDel)
enc.AddBool("hasIndexDel", f.hasIndexDel)

return nil
}
4 changes: 4 additions & 0 deletions fracmanager/fracmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ func New(ctx context.Context, cfg *Config, s3cli *s3.Client, skipMaskProvider sk
return &fm, stop, nil
}

func (fm *FracManager) AcquireFraction(name string) (frac.Fraction, func(), bool) {
return fm.lc.registry.AcquireFraction(name)
}

func (fm *FracManager) Fractions() List {
return fm.lc.registry.AllFractions()
}
Expand Down
Loading
Loading