diff --git a/cmd/gateway/zcn/dStorage.go b/cmd/gateway/zcn/dStorage.go index 1347773a8..cc70d73b9 100644 --- a/cmd/gateway/zcn/dStorage.go +++ b/cmd/gateway/zcn/dStorage.go @@ -223,6 +223,7 @@ func getFileReader(ctx context.Context, } else { ds = &downloadStatus{} downloads[remotePath] = ds + log.Println("^^^^^^^^getFileReader: starting download:", remotePath) ds.wg.Add(1) mu.Unlock() diff --git a/cmd/gateway/zcn/gateway-zcn.go b/cmd/gateway/zcn/gateway-zcn.go index 09328fbb7..f7635557f 100644 --- a/cmd/gateway/zcn/gateway-zcn.go +++ b/cmd/gateway/zcn/gateway-zcn.go @@ -121,8 +121,9 @@ func (z *ZCN) NewGatewayLayer(creds madmin.Credentials) (minio.ObjectLayer, erro } zob := &zcnObjects{ - alloc: allocation, - metrics: minio.NewMetrics(), + alloc: allocation, + metrics: minio.NewMetrics(), + copyTracker: newOperationTracker(), } return zob, nil @@ -130,8 +131,9 @@ func (z *ZCN) NewGatewayLayer(creds madmin.Credentials) (minio.ObjectLayer, erro type zcnObjects struct { minio.GatewayUnsupported - alloc *sdk.Allocation - metrics *minio.BackendMetrics + alloc *sdk.Allocation + metrics *minio.BackendMetrics + copyTracker *operationTracker } // Shutdown Remove temporary directory @@ -637,8 +639,9 @@ func (zob *zcnObjects) PutMultipleObjects( return objectInfo, nil } -func (zob *zcnObjects) CopyObject(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo minio.ObjectInfo, srcOpts, dstOpts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) { - var srcRemotePath, dstRemotePath string + +func (zob *zcnObjects) copyZusObject(srcBucket, srcObject, destBucket, destObject string) (dstRemotePath string, err error) { + var srcRemotePath string if srcBucket == rootBucketName { srcRemotePath = filepath.Join(rootPath, srcObject) } else { @@ -655,11 +658,16 @@ func (zob *zcnObjects) CopyObject(ctx context.Context, srcBucket, srcObject, des RemotePath: srcRemotePath, DestPath: dstRemotePath, } - err = zob.alloc.DoMultiOperation([]sdk.OperationRequest{ + return dstRemotePath, zob.alloc.DoMultiOperation([]sdk.OperationRequest{ copyOp, }) +} + +func (zob *zcnObjects) CopyObject(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo minio.ObjectInfo, srcOpts, dstOpts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) { + log.Println("move object - srcBucket:", srcBucket, "srcObject:", srcObject, "destBucket:", destBucket, "destObject:", destObject) + dstRemotePath, err := zob.copyZusObject(srcBucket, srcObject, destBucket, destObject) if err != nil { - return + return minio.ObjectInfo{}, err } var ref *sdk.ORef diff --git a/cmd/gateway/zcn/multipart.go b/cmd/gateway/zcn/multipart.go index ec4ddd6e5..e4f138446 100644 --- a/cmd/gateway/zcn/multipart.go +++ b/cmd/gateway/zcn/multipart.go @@ -19,14 +19,14 @@ import ( "github.com/0chain/gosdk/zboxcore/sdk" "github.com/google/uuid" + "github.com/minio/minio-go/v7/pkg/tags" minio "github.com/minio/minio/cmd" "github.com/minio/minio/cmd/gateway/zcn/seqpriorityqueue" ) var ( - FileMap = make(map[string]*MultiPartFile) - mapLock sync.Mutex - // alloc *sdk.Allocation + FileMap = make(map[string]*MultiPartFile) + mapLock sync.Mutex localStorageDir = "store" ) @@ -67,6 +67,7 @@ func (mpf *MultiPartFile) UpdateFileSize(partID int, size int64) { log.Println("see last part, partID:", partID, "file size:", mpf.fileSize) mpf.readyToUpload = true close(mpf.readyUploadC) + mpf.readyUploadC = nil mpf.lastPartSize = size return } @@ -85,12 +86,14 @@ func (mpf *MultiPartFile) UpdateFileSize(partID int, size int64) { } func (mpf *MultiPartFile) notifyEnd() { + log.Println("notify end") mpf.lock.Lock() defer mpf.lock.Unlock() if !mpf.readyToUpload { mpf.readyToUpload = true if mpf.readyUploadC != nil { close(mpf.readyUploadC) + mpf.readyUploadC = nil } } } @@ -101,11 +104,6 @@ func (zob *zcnObjects) NewMultipartUpload(ctx context.Context, bucket string, ob } func (zob *zcnObjects) newMultiPartUpload(localStorageDir, bucket, object string) (string, error) { - // var objectSize int64 - // objectSize := int64(371917281) - // objectSize := int64(22491196) - // log.Println("initial upload...") - // Generate a unique upload ID uploadID := uuid.New().String() mapLock.Lock() @@ -337,9 +335,15 @@ func (zob *zcnObjects) PutObjectPart(ctx context.Context, bucket, object, upload } func (zob *zcnObjects) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []minio.CompletePart, opts minio.ObjectOptions) (oi minio.ObjectInfo, err error) { - // var objectSize int64 - // objectSize := int64(371917281) - // objectSize := int64(22491196) + if zob.copyTracker.isCopying(uploadID) { + // avoid uploading + log.Println("complete multipart upload - remove copying, uploadID:", uploadID) + zob.copyTracker.remove(uploadID) + return minio.ObjectInfo{ + Bucket: bucket, + Name: object, + }, nil + } mapLock.Lock() multiPartFile, ok := FileMap[uploadID] @@ -388,23 +392,6 @@ func (zob *zcnObjects) constructCompleteObject(bucket, uploadID, object, localSt break } - // func() { - // // Open the part file for reading - // partFile, err := os.Open(partFilename) - // if err != nil { - // log.Panicf("could not open part file: %v, err: %v", partFilename, err) - // } - // defer partFile.Close() - - // data, err := io.ReadAll(partFile) - // if err != nil { - // log.Panicf("read part: %v failed, err: %v", partNumber, err) - // } - - // multiPartFile.dataC <- data - // log.Println("^^^^^^^^^ uploading part:", partNumber, "size:", len(data)) - // }() - // Read the ETag of the part partETagBytes, err := os.ReadFile(partETagFilename) if err != nil { @@ -418,17 +405,6 @@ func (zob *zcnObjects) constructCompleteObject(bucket, uploadID, object, localSt // Get the concatenated ETag value eTag := strings.Join(partETags, "") - // Close the temporary file - // if err := tmpCompleteObjectFile.Close(); err != nil { - // return "", err - // } - - // Rename the temporary file to its final destination - // completeObjectFilename := filepath.Join(localStorageDir, bucket, object) - // if err := os.Rename(tmpCompleteObjectFilename, completeObjectFilename); err != nil { - // return "", err - // } - return eTag, nil } @@ -512,3 +488,85 @@ func (zob *zcnObjects) AbortMultipartUpload(ctx context.Context, bucket string, close(multiPartFile.cancelC) return cleanupPartFilesAndDirs(bucket, uploadID, object, localStorageDir) } + +// operationTracker keeps track of the operations in progress +// to avoid multiple uploads/copies of the same object. +// this is used when multipart operations get invovled. +type operationTracker struct { + mu sync.Mutex + ops map[string]*sync.Once +} + +func newOperationTracker() *operationTracker { + return &operationTracker{ + ops: make(map[string]*sync.Once), + } +} + +func (mt *operationTracker) doOnce(uploadID string, f func()) { + mt.mu.Lock() + once, ok := mt.ops[uploadID] + if !ok { + once = &sync.Once{} + mt.ops[uploadID] = once + } + mt.mu.Unlock() + + once.Do(f) +} + +func (mt *operationTracker) remove(uploadID string) { + mt.mu.Lock() + delete(mt.ops, uploadID) + mt.mu.Unlock() +} + +func (mt *operationTracker) isCopying(uploadID string) bool { + mt.mu.Lock() + _, ok := mt.ops[uploadID] + mt.mu.Unlock() + return ok +} + +func (zob *zcnObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObject, destBucket, destObject, uploadID string, partID int, startOffset, length int64, srcInfo minio.ObjectInfo, srcOpts, dstOpts minio.ObjectOptions) (pi minio.PartInfo, err error) { + log.Println("copy object part, partID:", partID, "srcBucket:", srcBucket, "srcObject:", srcObject, "destBucket:", destBucket, "destObject:", destObject, "uploadID:", uploadID) + // Check if the source object exists + srcInfo, err = zob.GetObjectInfo(ctx, srcBucket, srcObject, minio.ObjectOptions{}) + if err != nil { + return + } + + zob.copyTracker.doOnce(uploadID, func() { + log.Println("move zus object, srcBucket:", srcBucket, "srcObject:", srcObject, "destBucket:", destBucket, "destObject:", destObject) + _, err = zob.copyZusObject(srcBucket, srcObject, destBucket, destObject) + if err != nil { + return + } + }) + + if err != nil { + return pi, err + } + + // Mock the part copy action + pi = minio.PartInfo{ + PartNumber: partID, + LastModified: srcInfo.ModTime, + ETag: srcInfo.ETag, + Size: length, + } + + return pi, nil +} + +func (zob *zcnObjects) GetObjectTags(ctx context.Context, bucket, object string, opts minio.ObjectOptions) (*tags.Tags, error) { + log.Println("get object tagging...") + return tags.NewTags(map[string]string{ + "zus": "storage", + }, object != "") +} + +// IsTaggingSupported returns whether object tagging is supported or not for this layer. +func (zob *zcnObjects) IsTaggingSupported() bool { + return true +} diff --git a/go.mod b/go.mod index 96fccfe2b..566359682 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.20 require ( cloud.google.com/go/storage v1.27.0 github.com/0chain/errors v1.0.3 - github.com/0chain/gosdk v1.10.1-0.20231214200656-5a5e14d7d186 + github.com/0chain/gosdk v1.10.1-0.20231220114355-386555cc9d11 github.com/Azure/azure-pipeline-go v0.2.2 github.com/Azure/azure-storage-blob-go v0.10.0 github.com/Shopify/sarama v1.28.0 diff --git a/go.sum b/go.sum index df6596f5b..550628905 100644 --- a/go.sum +++ b/go.sum @@ -70,6 +70,8 @@ github.com/0chain/gosdk v1.10.1-0.20231217024042-6fa927150de8 h1:6CxvCuRCn1qR2A2 github.com/0chain/gosdk v1.10.1-0.20231217024042-6fa927150de8/go.mod h1:tNJnOciWF1KYaMpTlLBM5BluG8E0GizgN6biUNLwC6o= github.com/0chain/gosdk v1.10.1-0.20231217061908-572640e971a5 h1:iFwTLabwjafWpQIMJznRBPBEn0nx1uFC7QlQfy9x6l0= github.com/0chain/gosdk v1.10.1-0.20231217061908-572640e971a5/go.mod h1:tNJnOciWF1KYaMpTlLBM5BluG8E0GizgN6biUNLwC6o= +github.com/0chain/gosdk v1.10.1-0.20231220114355-386555cc9d11 h1:zfqy8rwbrAaAwMFiIKS9YGZiOOa7PPvhSYXuSUORzUM= +github.com/0chain/gosdk v1.10.1-0.20231220114355-386555cc9d11/go.mod h1:tNJnOciWF1KYaMpTlLBM5BluG8E0GizgN6biUNLwC6o= github.com/Azure/azure-amqp-common-go/v2 v2.1.0/go.mod h1:R8rea+gJRuJR6QxTir/XuEd+YuKoUiazDC/N96FiDEU= github.com/Azure/azure-pipeline-go v0.2.1/go.mod h1:UGSo8XybXnIGZ3epmeBw7Jdz+HiUVpqIlpz/HKHylF4= github.com/Azure/azure-pipeline-go v0.2.2 h1:6oiIS9yaG6XCCzhgAgKFfIWyo4LLCiDhZot6ltoThhY=