Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
fc42dc2
update gosdk for enterprise blobbers
Hitenjain14 Jul 27, 2024
7387079
update gosdk
Hitenjain14 Jul 27, 2024
af2bb02
update gosdk
Hitenjain14 Aug 8, 2024
52dfd91
Upload pool for disk cache (#155)
Hitenjain14 Sep 5, 2024
cbefe70
update gosdk
Hitenjain14 Sep 24, 2024
97b99fc
Merge branch 'feat/enterprise-blobber' of https://github.com/0chain/z…
Hitenjain14 Sep 24, 2024
d276778
update work file
Hitenjain14 Sep 24, 2024
170ae19
Merge remote-tracking branch 'origin' into feat/enterprise-blobber
Hitenjain14 Sep 24, 2024
e5a521e
update gosdk for enterprise blobbers
Hitenjain14 Jul 27, 2024
cd08e1c
update gosdk
Hitenjain14 Jul 27, 2024
8d04f70
update gosdk
Hitenjain14 Aug 8, 2024
8dc0e6f
update gosdk
Hitenjain14 Sep 24, 2024
e1326a2
Upload pool for disk cache (#155)
Hitenjain14 Sep 5, 2024
8ad6994
update work file
Hitenjain14 Sep 24, 2024
24c60bb
feat: replace disk cache with staging
harsh4723 Sep 26, 2024
0d8a975
Merge branch 'feat/enterprise-blobber' of https://github.com/0chain/z…
Hitenjain14 Oct 9, 2024
6074691
use another art package
Hitenjain14 Oct 9, 2024
f425e0f
use go radix
Hitenjain14 Oct 9, 2024
28b0be5
don't terminate list go radix
Hitenjain14 Oct 10, 2024
2208660
update gosdk
Hitenjain14 Oct 23, 2024
024222f
Merge branch 'feat/enterprise-blobber' into feat/art-change
Hitenjain14 Oct 23, 2024
88b521d
fix concurrent map read and write
Hitenjain14 Oct 31, 2024
afe4cd1
send copy oi to chan
Hitenjain14 Oct 31, 2024
929edd0
add logs for object
Hitenjain14 Nov 10, 2024
a3d8f73
add log for delete object
Hitenjain14 Nov 10, 2024
98bb694
add upload and download timings
Hitenjain14 Jan 17, 2025
963d4db
update go in base
Hitenjain14 Jan 17, 2025
c0eb598
update go to 1.22.5
Hitenjain14 Jan 17, 2025
41dbdc4
use alpine
Hitenjain14 Jan 17, 2025
c12b2db
change settings and update gosdk
Hitenjain14 Jan 21, 2025
5328e89
update gosdk and worker count
Hitenjain14 Jan 23, 2025
0e8b38a
decrease workers
Hitenjain14 Jan 24, 2025
4baebc2
check for mod time in cached object
Hitenjain14 Jan 27, 2025
4f08333
add log for toFree
Hitenjain14 Jan 30, 2025
60d4144
set expiry to 30 min
Hitenjain14 Jan 30, 2025
47e0c0c
set last modified in meta
Hitenjain14 Feb 7, 2025
f92ff65
fix mod time check
Hitenjain14 Feb 8, 2025
686a9ad
compare nano sec
Hitenjain14 Feb 11, 2025
bc019b6
add logs for time
Hitenjain14 Feb 12, 2025
07fddf4
Use tag latest
Apr 7, 2025
8ce8fb4
add check and change transport
Hitenjain14 Apr 21, 2025
ce7538c
Merge branch 'feat/enterprise-timings' of https://github.com/0chain/z…
Hitenjain14 Apr 21, 2025
00f9c71
update list objects
Hitenjain14 Jul 1, 2025
ca0ca60
feat: Add memory-based multipart upload for files up to 1GB
Dec 26, 2025
31298d0
feat: revert memory opt code and replace with multipart uploads
Dec 26, 2025
37c08aa
implement parallel blobber workflow
Dec 26, 2025
a61f37b
parallel chunking, increase channel buffer, remove directory creation
Dec 26, 2025
494d357
change parallel to sequential chunking to maintain order
Dec 27, 2025
c5d5ce8
more upload optimizations
Dec 27, 2025
919158b
increase buffer size for fewer reads, go mod tidy for gosdk download PR
Dec 27, 2025
bc4a64e
Handle logsearchapi connection errors gracefully
Jan 1, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions .github/workflows/build-push-client-docker-image.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,7 @@ jobs:
- name: Set Docker Image Tags.
id: get_info
run: |
if [[ "${{github.ref}}" == refs/pull/* ]]; then
tag=${GITHUB_REF/\/merge/}
echo "TAG=$(echo pr-${tag:10})" >> $GITHUB_ENV
else
echo "TAG=$(echo ${GITHUB_REF#refs/*/} | sed 's/\//-/g')" >> $GITHUB_ENV
fi
echo "TAG=enterprise_latest" >> $GITHUB_ENV
echo "BRANCH=$([ -z '${{ github.event.pull_request.head.sha }}' ] && echo ${GITHUB_REF#refs/*/} || echo $GITHUB_HEAD_REF)" >> $GITHUB_ENV
echo "SHA=$([ -z '${{ github.event.pull_request.head.sha }}' ] && echo $GITHUB_SHA || echo '${{ github.event.pull_request.head.sha }}')" >> $GITHUB_ENV

Expand Down
7 changes: 1 addition & 6 deletions .github/workflows/build-push-logsearchapi-docker-image.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,7 @@ jobs:
- name: Set Docker Image Tags.
id: get_info
run: |
if [[ "${{github.ref}}" == refs/pull/* ]]; then
tag=${GITHUB_REF/\/merge/}
echo "TAG=$(echo pr-${tag:10})" >> $GITHUB_ENV
else
echo "TAG=$(echo ${GITHUB_REF#refs/*/} | sed 's/\//-/g')" >> $GITHUB_ENV
fi
echo "TAG=enterprise_latest" >> $GITHUB_ENV
echo "BRANCH=$([ -z '${{ github.event.pull_request.head.sha }}' ] && echo ${GITHUB_REF#refs/*/} || echo $GITHUB_HEAD_REF)" >> $GITHUB_ENV
echo "SHA=$([ -z '${{ github.event.pull_request.head.sha }}' ] && echo $GITHUB_SHA || echo '${{ github.event.pull_request.head.sha }}')" >> $GITHUB_ENV

Expand Down
9 changes: 2 additions & 7 deletions .github/workflows/build-push-minio-docker-image.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,12 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: ^1.20.0
go-version: ^1.22.5

- name: Set Docker Image Tags.
id: get_info
run: |
if [[ "${{github.ref}}" == refs/pull/* ]]; then
tag=${GITHUB_REF/\/merge/}
echo "TAG=$(echo pr-${tag:10})" >> $GITHUB_ENV
else
echo "TAG=$(echo ${GITHUB_REF#refs/*/} | sed 's/\//-/g')" >> $GITHUB_ENV
fi
echo "TAG=enterprise_latest" >> $GITHUB_ENV
echo "BRANCH=$([ -z '${{ github.event.pull_request.head.sha }}' ] && echo ${GITHUB_REF#refs/*/} || echo $GITHUB_HEAD_REF)" >> $GITHUB_ENV
echo "SHA=$([ -z '${{ github.event.pull_request.head.sha }}' ] && echo $GITHUB_SHA || echo '${{ github.event.pull_request.head.sha }}')" >> $GITHUB_ENV

Expand Down
11 changes: 10 additions & 1 deletion cmd/disk-cache-backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ const (
cacheDataFilePrefix = "part"

cacheMetaVersion = "1.0.0"
cacheExpiryDays = 90 * time.Hour * 24 // defaults to 90 days
cacheExpiryDays = time.Minute * 2 // defaults to 90 days
// SSECacheEncrypted is the metadata key indicating that the object
// is a cache entry encrypted with cache KMS master key in globalCacheKMS.
SSECacheEncrypted = "X-Minio-Internal-Encrypted-Cache"
Expand Down Expand Up @@ -333,8 +333,10 @@ func (c *diskCache) purge(ctx context.Context) {

toFree := c.toClear()
if toFree == 0 {
log.Println("No cache entries to purge")
return
}
log.Println("Purging cache entries: ", toFree)

atomic.StoreInt32(&c.purgeRunning, 1) // do not run concurrent purge()
defer atomic.StoreInt32(&c.purgeRunning, 0)
Expand Down Expand Up @@ -405,13 +407,15 @@ func (c *diskCache) purge(ctx context.Context) {
// cache writeback commit setting is enabled.
status, ok := objInfo.UserDefined[writeBackStatusHeader]
if ok && status != CommitComplete.String() {
log.Println("Skipping cache entry", objInfo.Name, "as it is not committed yet")
return nil
}
cc := cacheControlOpts(objInfo)
switch {
case cc != nil:
if cc.isStale(objInfo.ModTime) {
removeAll(cacheDir)
log.Println("purge cache entry", objInfo.Name)
scorer.adjustSaveBytes(-objInfo.Size)
// break early if sufficient disk space reclaimed.
if c.diskUsageLow() {
Expand Down Expand Up @@ -448,6 +452,7 @@ func (c *diskCache) purge(ctx context.Context) {
// clean up stale cache.json files for objects that never got cached but access count was maintained in cache.json
fi, err := os.Stat(pathJoin(cacheDir, cacheMetaJSONFile))
if err != nil || (fi != nil && fi.ModTime().Before(expiry) && len(cachedRngFiles) == 0) {
log.Println("purge cache dir", cacheDir)
removeAll(cacheDir)
if fi != nil {
scorer.adjustSaveBytes(-fi.Size())
Expand All @@ -472,6 +477,7 @@ func (c *diskCache) purge(ctx context.Context) {

scorer.purgeFunc(func(qfile queuedFile) {
fileName := qfile.name
log.Println("score purge", fileName)
removeAll(fileName)
slashIdx := strings.LastIndex(fileName, SlashSeparator)
if slashIdx >= 0 {
Expand Down Expand Up @@ -924,19 +930,22 @@ func (c *diskCache) put(ctx context.Context, bucket, object string, data io.Read
removeAll(cachePath)
return oi, IncompleteBody{Bucket: bucket, Object: object}
}
modTime := time.Now()
if writeback {
metadata["content-md5"] = md5sum
if md5bytes, err := base64.StdEncoding.DecodeString(md5sum); err == nil {
metadata["etag"] = hex.EncodeToString(md5bytes)
}
metadata[writeBackStatusHeader] = CommitPending.String()
metadata["last-modified"] = modTime.UTC().Format(http.TimeFormat)
}
return ObjectInfo{
Bucket: bucket,
Name: object,
ETag: metadata["etag"],
Size: n,
UserDefined: metadata,
ModTime: modTime,
},
c.saveMetadata(ctx, bucket, object, metadata, n, nil, "", incHitsOnly)
}
Expand Down
100 changes: 56 additions & 44 deletions cmd/disk-cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"sync/atomic"
"time"

"github.com/armon/go-radix"
objectlock "github.com/minio/minio/internal/bucket/object/lock"
"github.com/minio/minio/internal/color"
"github.com/minio/minio/internal/config/cache"
Expand All @@ -39,7 +40,6 @@ import (
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/sync/errgroup"
"github.com/minio/pkg/wildcard"
art "github.com/plar/go-adaptive-radix-tree"
)

const (
Expand Down Expand Up @@ -180,7 +180,7 @@ func (c *cacheObjects) updateMetadataIfChanged(ctx context.Context, dcache *disk
func (c *cacheObjects) DeleteObject(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
// delete from backend and then delete from cache always
objInfoB, errB := c.InnerDeleteObjectFn(ctx, bucket, object, opts)

log.Println("delete object from cache", bucket, object)
if c.isCacheExclude(bucket, object) || c.skipCache() {
return
}
Expand Down Expand Up @@ -255,9 +255,9 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string
// fetch diskCache if object is currently cached or nearest available cache drive
dcache, err := c.getCacheToLoc(ctx, bucket, object)
if err != nil {
log.Println("errorGettingLoc: ", err)
return c.InnerGetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
}

cacheReader, numCacheHits, cacheErr := dcache.Get(ctx, bucket, object, rs, h, opts)
if cacheErr == nil {
cacheObjSize = cacheReader.ObjInfo.Size
Expand Down Expand Up @@ -501,7 +501,6 @@ func (c *cacheObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dst

// ListObjects from disk cache
func (c *cacheObjects) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error) {
log.Printf("listobject cache prefix %s marker %s delim %s maxkey %d \n", prefix, marker, delimiter, maxKeys)
objInfos := []ObjectInfo{}
prefixes := map[string]bool{}

Expand Down Expand Up @@ -530,32 +529,30 @@ func (c *cacheObjects) ListObjects(ctx context.Context, bucket, prefix, marker,
rootprefix := bucket + "/" + prefix
rootMarker := bucket + "/" + marker
objectCount := 0
leafFilter := func(n art.Node) bool {
if n.Kind() == art.Leaf {
if strings.HasPrefix(string(n.Key()), rootprefix) {
if marker == "" || string(n.Key()) > rootMarker {
trimmed := strings.TrimPrefix(string(n.Key()), rootprefix)
parts := strings.Split(trimmed, delimiter)
if len(parts) > 0 && parts[0] != "" {
if (len(objInfos) + len(prefixes)) < maxKeys {
if len(parts) == 1 {
ob, ok := n.Value().(ObjectInfo)
if ok {
objInfos = append(objInfos, ob)
}
} else if delimiter != "" {
dir := prefix + parts[0] + delimiter
if marker == "" || dir > marker {
prefixes[dir] = true
}
leafFilter := func(key string, value any) bool {
if strings.HasPrefix(key, rootprefix) {
if marker == "" || key > rootMarker {
trimmed := strings.TrimPrefix(key, rootprefix)
parts := strings.Split(trimmed, delimiter)
if len(parts) > 0 && parts[0] != "" {
if (len(objInfos) + len(prefixes)) < maxKeys {
if len(parts) == 1 {
ob, ok := value.(ObjectInfo)
if ok {
objInfos = append(objInfos, ob)
}
} else if delimiter != "" {
dir := prefix + parts[0] + delimiter
if marker == "" || dir > marker {
prefixes[dir] = true
}
}
}
objectCount++
}
objectCount++
}
}
return true
return false
}
c.prefixSearch(rootprefix, leafFilter)
var uPrefixes []string
Expand All @@ -566,20 +563,21 @@ func (c *cacheObjects) ListObjects(ctx context.Context, bucket, prefix, marker,
if objectCount > maxKeys {
isTruncated = true
}
log.Printf("listobject cache prefix %s marker %s delim %s maxkey %d result %d \n", prefix, marker, delimiter, maxKeys, len(objInfos))
return ListObjectsInfo{
Objects: objInfos,
Prefixes: unique(uPrefixes),
IsTruncated: isTruncated,
}, nil
}

func (c *cacheObjects) prefixSearch(rootprefix string, leafFilter art.Callback) {
defer func() {
if r := recover(); r != nil {
log.Println("recovered from panic from list tree listobj")
}
}()
c.listTree.ForEachPrefix([]byte(rootprefix), leafFilter)
func (c *cacheObjects) prefixSearch(rootprefix string, leafFilter radix.WalkFn) {
// defer func() {
// if r := recover(); r != nil {
// log.Println("recovered from panic from list tree listobj")
// }
// }()
c.listTree.ForEachPrefix(rootprefix, leafFilter)
}

func (c *cacheObjects) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error) {
Expand Down Expand Up @@ -818,7 +816,7 @@ func (c *cacheObjects) PutObject(ctx context.Context, bucket, object string, r *

// fetch from backend if there is no space on cache drive
if !dcache.diskSpaceAvailable(size) {
log.Println("uploading to backend no space on cache drive")
log.Println("uploading to backend no space on cache drive")
return putObjectFn(ctx, bucket, object, r, opts)
}

Expand All @@ -845,16 +843,17 @@ func (c *cacheObjects) PutObject(ctx context.Context, bucket, object string, r *
return putObjectFn(ctx, bucket, object, r, opts)
}
if c.commitWriteback {
log.Println("uploading to cache writeback", object)
oi, err := dcache.Put(ctx, bucket, object, r, r.Size(), nil, opts, false, true)
if err != nil {
return ObjectInfo{}, err
}
objPath := oi.Bucket + "/" + oi.Name
c.listTree.Insert([]byte(objPath), oi)
log.Println("uploading to cache writeback", object, " modTime", oi.ModTime.UnixNano())
c.listTree.Insert(objPath, oi)
coi := oi.Clone()
//go c.uploadObject(GlobalContext, oi) // use schedule to upload in batch
select {
case c.writeBackInputCh <- oi:
case c.writeBackInputCh <- coi:
default:
}
return oi, nil
Expand Down Expand Up @@ -932,15 +931,27 @@ func (c *cacheObjects) PutObject(ctx context.Context, bucket, object string, r *

// upload cached object to backend in async commit mode.
func (c *cacheObjects) uploadObject(ctx context.Context, oi ObjectInfo) {
log.Printf("uploading object %s in backend in async commit mode", oi.Name)
dcache, err := c.getCacheToLoc(ctx, oi.Bucket, oi.Name)
if err != nil {
// disk cache could not be located.
logger.LogIf(ctx, fmt.Errorf("Could not upload %s/%s to backend: %w", oi.Bucket, oi.Name, err))
return
}
objPath := oi.Bucket + "/" + oi.Name
cachedObj, ok := c.listTree.Get(objPath)
if !ok {
log.Println("object not found in list tree ", objPath)
return
}
cachedObjInfo := cachedObj.(ObjectInfo)
if !cachedObjInfo.ModTime.IsZero() && cachedObjInfo.ModTime.UnixNano() != oi.ModTime.UnixNano() {
log.Println("object modified since cached", cachedObjInfo.ModTime.UnixNano(), oi.ModTime.UnixNano(), oi.Name, cachedObjInfo.ModTime.UnixNano() != oi.ModTime.UnixNano())
return
}
log.Printf("uploading object %s in backend in async commit mode", oi.Name)
cReader, _, bErr := dcache.Get(ctx, oi.Bucket, oi.Name, nil, http.Header{}, ObjectOptions{})
if bErr != nil {
log.Println("errorGettingReader: ", bErr)
return
}
defer cReader.Close()
Expand Down Expand Up @@ -974,7 +985,7 @@ func (c *cacheObjects) uploadObject(ctx context.Context, oi ObjectInfo) {
size = cReader.ObjInfo.Size
} else {
delete(meta, writeBackRetryHeader)
c.deleteFromListTree(oi.Bucket + "/" + oi.Name)
c.listTree.CheckTimeAndDelete(objPath, cachedObjInfo.ModTime)
}
meta[writeBackStatusHeader] = wbCommitStatus.String()
meta["etag"] = oi.ETag
Expand All @@ -987,12 +998,12 @@ func (c *cacheObjects) uploadObject(ctx context.Context, oi ObjectInfo) {
}

func (c *cacheObjects) deleteFromListTree(key string) {
defer func() {
if r := recover(); r != nil {
log.Println("Recovered from panic from list tree delete")
}
}()
c.listTree.Delete([]byte(key))
// defer func() {
// if r := recover(); r != nil {
// log.Println("Recovered from panic from list tree delete")
// }
// }()
c.listTree.Delete(key)
}

func (c *cacheObjects) queueWritebackRetry(oi ObjectInfo) {
Expand All @@ -1014,6 +1025,7 @@ func newServerCacheObjects(ctx context.Context, config cache.Config) (CacheObjec
if err != nil {
return nil, err
}

c := &cacheObjects{
cache: cache,
exclude: config.Exclude,
Expand Down Expand Up @@ -1168,7 +1180,7 @@ func (c *cacheObjects) recreateListTreeOnStartUp() {
if !ok || status == CommitComplete.String() {
return nil
}
c.listTree.Insert([]byte(objInfo.Bucket+"/"+objInfo.Name), objInfo)
c.listTree.Insert(objInfo.Bucket+"/"+objInfo.Name, objInfo)
return nil
}

Expand Down
8 changes: 3 additions & 5 deletions cmd/gateway/zcn/dStorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
var tempdir string

const (
pageLimit = 100
pageLimit = 500
numBlocks = 100
dirType = "d"
fileType = "f"
Expand Down Expand Up @@ -376,7 +376,7 @@ func getFileReader(ctx context.Context,

}

func putFile(ctx context.Context, alloc *sdk.Allocation, remotePath, contentType string, r io.Reader, size int64, isUpdate bool, userDefined map[string]string) (err error) {
func putFile(ctx context.Context, alloc *sdk.Allocation, remotePath, contentType string, r io.Reader, size int64, _ bool, userDefined map[string]string) (err error) {
fileName := filepath.Base(remotePath)
var customMeta string
if len(userDefined) > 0 {
Expand Down Expand Up @@ -409,9 +409,7 @@ func putFile(ctx context.Context, alloc *sdk.Allocation, remotePath, contentType
},
StreamUpload: isStreamUpload,
}
if isUpdate {
opRequest.OperationType = constants.FileOperationUpdate
}

if isStreamUpload {
err = alloc.DoMultiOperation([]sdk.OperationRequest{opRequest})
if err != nil && !isSameRootError(err) {
Expand Down
Loading