Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/gateway/zcn/dStorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ func getFileReader(ctx context.Context,
} else {
ds = &downloadStatus{}
downloads[remotePath] = ds
log.Println("^^^^^^^^getFileReader: starting download:", remotePath)
ds.wg.Add(1)
mu.Unlock()

Expand Down
24 changes: 16 additions & 8 deletions cmd/gateway/zcn/gateway-zcn.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,17 +121,19 @@ func (z *ZCN) NewGatewayLayer(creds madmin.Credentials) (minio.ObjectLayer, erro
}

zob := &zcnObjects{
alloc: allocation,
metrics: minio.NewMetrics(),
alloc: allocation,
metrics: minio.NewMetrics(),
copyTracker: newOperationTracker(),
}

return zob, nil
}

type zcnObjects struct {
minio.GatewayUnsupported
alloc *sdk.Allocation
metrics *minio.BackendMetrics
alloc *sdk.Allocation
metrics *minio.BackendMetrics
copyTracker *operationTracker
}

// Shutdown Remove temporary directory
Expand Down Expand Up @@ -637,8 +639,9 @@ func (zob *zcnObjects) PutMultipleObjects(

return objectInfo, nil
}
func (zob *zcnObjects) CopyObject(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo minio.ObjectInfo, srcOpts, dstOpts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
var srcRemotePath, dstRemotePath string

func (zob *zcnObjects) copyZusObject(srcBucket, srcObject, destBucket, destObject string) (dstRemotePath string, err error) {
var srcRemotePath string
if srcBucket == rootBucketName {
srcRemotePath = filepath.Join(rootPath, srcObject)
} else {
Expand All @@ -655,11 +658,16 @@ func (zob *zcnObjects) CopyObject(ctx context.Context, srcBucket, srcObject, des
RemotePath: srcRemotePath,
DestPath: dstRemotePath,
}
err = zob.alloc.DoMultiOperation([]sdk.OperationRequest{
return dstRemotePath, zob.alloc.DoMultiOperation([]sdk.OperationRequest{
copyOp,
})
}

func (zob *zcnObjects) CopyObject(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo minio.ObjectInfo, srcOpts, dstOpts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
log.Println("move object - srcBucket:", srcBucket, "srcObject:", srcObject, "destBucket:", destBucket, "destObject:", destObject)
dstRemotePath, err := zob.copyZusObject(srcBucket, srcObject, destBucket, destObject)
if err != nil {
return
return minio.ObjectInfo{}, err
}

var ref *sdk.ORef
Expand Down
136 changes: 97 additions & 39 deletions cmd/gateway/zcn/multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ import (
"github.com/0chain/gosdk/zboxcore/sdk"
"github.com/google/uuid"

"github.com/minio/minio-go/v7/pkg/tags"
minio "github.com/minio/minio/cmd"
"github.com/minio/minio/cmd/gateway/zcn/seqpriorityqueue"
)

var (
FileMap = make(map[string]*MultiPartFile)
mapLock sync.Mutex
// alloc *sdk.Allocation
FileMap = make(map[string]*MultiPartFile)
mapLock sync.Mutex
localStorageDir = "store"
)

Expand Down Expand Up @@ -67,6 +67,7 @@ func (mpf *MultiPartFile) UpdateFileSize(partID int, size int64) {
log.Println("see last part, partID:", partID, "file size:", mpf.fileSize)
mpf.readyToUpload = true
close(mpf.readyUploadC)
mpf.readyUploadC = nil
mpf.lastPartSize = size
return
}
Expand All @@ -85,12 +86,14 @@ func (mpf *MultiPartFile) UpdateFileSize(partID int, size int64) {
}

func (mpf *MultiPartFile) notifyEnd() {
log.Println("notify end")
mpf.lock.Lock()
defer mpf.lock.Unlock()
if !mpf.readyToUpload {
mpf.readyToUpload = true
if mpf.readyUploadC != nil {
close(mpf.readyUploadC)
mpf.readyUploadC = nil
}
}
}
Expand All @@ -101,11 +104,6 @@ func (zob *zcnObjects) NewMultipartUpload(ctx context.Context, bucket string, ob
}

func (zob *zcnObjects) newMultiPartUpload(localStorageDir, bucket, object string) (string, error) {
// var objectSize int64
// objectSize := int64(371917281)
// objectSize := int64(22491196)
// log.Println("initial upload...")

// Generate a unique upload ID
uploadID := uuid.New().String()
mapLock.Lock()
Expand Down Expand Up @@ -337,9 +335,15 @@ func (zob *zcnObjects) PutObjectPart(ctx context.Context, bucket, object, upload
}

func (zob *zcnObjects) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []minio.CompletePart, opts minio.ObjectOptions) (oi minio.ObjectInfo, err error) {
// var objectSize int64
// objectSize := int64(371917281)
// objectSize := int64(22491196)
if zob.copyTracker.isCopying(uploadID) {
// avoid uploading
log.Println("complete multipart upload - remove copying, uploadID:", uploadID)
zob.copyTracker.remove(uploadID)
return minio.ObjectInfo{
Bucket: bucket,
Name: object,
}, nil
}

mapLock.Lock()
multiPartFile, ok := FileMap[uploadID]
Expand Down Expand Up @@ -388,23 +392,6 @@ func (zob *zcnObjects) constructCompleteObject(bucket, uploadID, object, localSt
break
}

// func() {
// // Open the part file for reading
// partFile, err := os.Open(partFilename)
// if err != nil {
// log.Panicf("could not open part file: %v, err: %v", partFilename, err)
// }
// defer partFile.Close()

// data, err := io.ReadAll(partFile)
// if err != nil {
// log.Panicf("read part: %v failed, err: %v", partNumber, err)
// }

// multiPartFile.dataC <- data
// log.Println("^^^^^^^^^ uploading part:", partNumber, "size:", len(data))
// }()

// Read the ETag of the part
partETagBytes, err := os.ReadFile(partETagFilename)
if err != nil {
Expand All @@ -418,17 +405,6 @@ func (zob *zcnObjects) constructCompleteObject(bucket, uploadID, object, localSt
// Get the concatenated ETag value
eTag := strings.Join(partETags, "")

// Close the temporary file
// if err := tmpCompleteObjectFile.Close(); err != nil {
// return "", err
// }

// Rename the temporary file to its final destination
// completeObjectFilename := filepath.Join(localStorageDir, bucket, object)
// if err := os.Rename(tmpCompleteObjectFilename, completeObjectFilename); err != nil {
// return "", err
// }

return eTag, nil
}

Expand Down Expand Up @@ -512,3 +488,85 @@ func (zob *zcnObjects) AbortMultipartUpload(ctx context.Context, bucket string,
close(multiPartFile.cancelC)
return cleanupPartFilesAndDirs(bucket, uploadID, object, localStorageDir)
}

// operationTracker keeps track of the operations in progress
// to avoid multiple uploads/copies of the same object.
// this is used when multipart operations get invovled.
type operationTracker struct {
mu sync.Mutex
ops map[string]*sync.Once
}

func newOperationTracker() *operationTracker {
return &operationTracker{
ops: make(map[string]*sync.Once),
}
}

func (mt *operationTracker) doOnce(uploadID string, f func()) {
mt.mu.Lock()
once, ok := mt.ops[uploadID]
if !ok {
once = &sync.Once{}
mt.ops[uploadID] = once
}
mt.mu.Unlock()

once.Do(f)
}

func (mt *operationTracker) remove(uploadID string) {
mt.mu.Lock()
delete(mt.ops, uploadID)
mt.mu.Unlock()
}

func (mt *operationTracker) isCopying(uploadID string) bool {
mt.mu.Lock()
_, ok := mt.ops[uploadID]
mt.mu.Unlock()
return ok
}

func (zob *zcnObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObject, destBucket, destObject, uploadID string, partID int, startOffset, length int64, srcInfo minio.ObjectInfo, srcOpts, dstOpts minio.ObjectOptions) (pi minio.PartInfo, err error) {
log.Println("copy object part, partID:", partID, "srcBucket:", srcBucket, "srcObject:", srcObject, "destBucket:", destBucket, "destObject:", destObject, "uploadID:", uploadID)
// Check if the source object exists
srcInfo, err = zob.GetObjectInfo(ctx, srcBucket, srcObject, minio.ObjectOptions{})
if err != nil {
return
}

zob.copyTracker.doOnce(uploadID, func() {
log.Println("move zus object, srcBucket:", srcBucket, "srcObject:", srcObject, "destBucket:", destBucket, "destObject:", destObject)
_, err = zob.copyZusObject(srcBucket, srcObject, destBucket, destObject)
if err != nil {
return
}
})

if err != nil {
return pi, err
}

// Mock the part copy action
pi = minio.PartInfo{
PartNumber: partID,
LastModified: srcInfo.ModTime,
ETag: srcInfo.ETag,
Size: length,
}

return pi, nil
}

func (zob *zcnObjects) GetObjectTags(ctx context.Context, bucket, object string, opts minio.ObjectOptions) (*tags.Tags, error) {
log.Println("get object tagging...")
return tags.NewTags(map[string]string{
"zus": "storage",
}, object != "")
}

// IsTaggingSupported returns whether object tagging is supported or not for this layer.
func (zob *zcnObjects) IsTaggingSupported() bool {
return true
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.20
require (
cloud.google.com/go/storage v1.27.0
github.com/0chain/errors v1.0.3
github.com/0chain/gosdk v1.10.1-0.20231214200656-5a5e14d7d186
github.com/0chain/gosdk v1.10.1-0.20231220114355-386555cc9d11
github.com/Azure/azure-pipeline-go v0.2.2
github.com/Azure/azure-storage-blob-go v0.10.0
github.com/Shopify/sarama v1.28.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ github.com/0chain/gosdk v1.10.1-0.20231217024042-6fa927150de8 h1:6CxvCuRCn1qR2A2
github.com/0chain/gosdk v1.10.1-0.20231217024042-6fa927150de8/go.mod h1:tNJnOciWF1KYaMpTlLBM5BluG8E0GizgN6biUNLwC6o=
github.com/0chain/gosdk v1.10.1-0.20231217061908-572640e971a5 h1:iFwTLabwjafWpQIMJznRBPBEn0nx1uFC7QlQfy9x6l0=
github.com/0chain/gosdk v1.10.1-0.20231217061908-572640e971a5/go.mod h1:tNJnOciWF1KYaMpTlLBM5BluG8E0GizgN6biUNLwC6o=
github.com/0chain/gosdk v1.10.1-0.20231220114355-386555cc9d11 h1:zfqy8rwbrAaAwMFiIKS9YGZiOOa7PPvhSYXuSUORzUM=
github.com/0chain/gosdk v1.10.1-0.20231220114355-386555cc9d11/go.mod h1:tNJnOciWF1KYaMpTlLBM5BluG8E0GizgN6biUNLwC6o=
github.com/Azure/azure-amqp-common-go/v2 v2.1.0/go.mod h1:R8rea+gJRuJR6QxTir/XuEd+YuKoUiazDC/N96FiDEU=
github.com/Azure/azure-pipeline-go v0.2.1/go.mod h1:UGSo8XybXnIGZ3epmeBw7Jdz+HiUVpqIlpz/HKHylF4=
github.com/Azure/azure-pipeline-go v0.2.2 h1:6oiIS9yaG6XCCzhgAgKFfIWyo4LLCiDhZot6ltoThhY=
Expand Down