diff --git a/api/src/services/platform-service.interface.ts b/api/src/services/platform-service.interface.ts index 572f862f91..d806dc4c20 100644 --- a/api/src/services/platform-service.interface.ts +++ b/api/src/services/platform-service.interface.ts @@ -15,3 +15,40 @@ export interface ISubmissionHistoryRow { createDate: string; submissionId?: number; } + +export interface UploadPart { + partNumber: number; + url: string; + partSizeBytes: number; +} + +export interface UploadPartByteRange extends UploadPart { + start: number; + end: number; +} + +export interface SubmissionUploadInitiateResult { + uploadId: string; + s3UploadId: string; + key: string; + presignedUrls: UploadPart[]; + partCount: number; + submissionId: string; + submissionUploadId: string; +} + +export interface SubmissionUploadInitiateResponse extends SubmissionUploadInitiateResult { + uploadArchiveId: string; +} + +export interface UploadTarFilePartsOptions { + concurrencyLimit?: number; +} + +/** + * Interface for multipart upload result + */ +export interface UploadResult { + PartNumber: number; + ETag: string; +} diff --git a/api/src/services/platform-service.test.ts b/api/src/services/platform-service.test.ts index 02b6d0a3b7..56df337b1e 100644 --- a/api/src/services/platform-service.test.ts +++ b/api/src/services/platform-service.test.ts @@ -1,6 +1,8 @@ import axios from 'axios'; import chai, { expect } from 'chai'; +import fs from 'fs'; import { describe } from 'mocha'; +import { Readable } from 'node:stream'; import sinon from 'sinon'; import sinonChai from 'sinon-chai'; import { ApiError, ApiErrorType } from '../errors/api-error'; @@ -213,7 +215,7 @@ describe('PlatformService', () => { uploadId: 'upload-123-456-789', s3UploadId: 's3-upload-id', key: 's3-key', - presignedUrls: [{ partNumber: 1, url: 'https://s3.amazonaws.com/presigned-url' }], + presignedUrls: [{ partNumber: 1, url: 'https://s3.amazonaws.com/presigned-url', partSizeBytes: 55 }], partCount: 1, submissionId: submissionIdFromApi, submissionUploadId: '660e8400-e29b-41d4-a716-446655440001' @@ -286,7 +288,7 @@ describe('PlatformService', () => { uploadId: 'multipart-session-upload-id', s3UploadId: 's3-upload-id', key: 's3-key', - presignedUrls: [{ partNumber: 1, url: 'https://s3.amazonaws.com/presigned-url' }], + presignedUrls: [{ partNumber: 1, url: 'https://s3.amazonaws.com/presigned-url', partSizeBytes: 55 }], partCount: 1, submissionId: existingSubmissionUuid, submissionUploadId: '660e8400-e29b-41d4-a716-446655440002' @@ -593,7 +595,7 @@ describe('PlatformService', () => { uploadId: 'upload-123-456-789', s3UploadId: 's3-upload-id', key: 's3-key', - presignedUrls: [{ partNumber: 1, url: 'https://s3.amazonaws.com/presigned-url' }], + presignedUrls: [{ partNumber: 1, url: 'https://s3.amazonaws.com/presigned-url', partSizeBytes: 55 }], partCount: 1, submissionId: '550e8400-e29b-41d4-a716-446655440001', submissionUploadId: '660e8400-e29b-41d4-a716-446655440003' @@ -704,8 +706,6 @@ describe('PlatformService', () => { const mockDBConnection = getMockDBConnection(); const platformService = new PlatformService(mockDBConnection); - // Test that helper methods exist and can be called - // Full integration testing would require mocking fs which is non-configurable expect(platformService._addMetadataFile).to.be.a('function'); expect(platformService._addJsonFiles).to.be.a('function'); expect(platformService._addFileToArchive).to.be.a('function'); @@ -758,45 +758,76 @@ describe('PlatformService', () => { }); }); - describe('_splitFileIntoChunks', () => { + describe('_buildPartByteRanges', () => { afterEach(() => { sinon.restore(); }); - it('should split file into single chunk when numChunks is 1', () => { + it('should build a single range when one part matches the full file size', () => { const mockDBConnection = getMockDBConnection(); const platformService = new PlatformService(mockDBConnection); - const fileBuffer = Buffer.from('test file content'); - const chunks = platformService._splitFileIntoChunks(fileBuffer, 1); + const ranges = platformService._buildPartByteRanges(17, [ + { partNumber: 1, url: 'https://s3.amazonaws.com/url1', partSizeBytes: 17 } + ]); - expect(chunks).to.have.length(1); - expect(chunks[0]).to.deep.equal(fileBuffer); + expect(ranges).to.deep.equal([ + { + partNumber: 1, + url: 'https://s3.amazonaws.com/url1', + partSizeBytes: 17, + start: 0, + end: 16 + } + ]); }); - it('should split file into multiple chunks', () => { + it('should build multiple ranges using exact part sizes', () => { const mockDBConnection = getMockDBConnection(); const platformService = new PlatformService(mockDBConnection); - const fileBuffer = Buffer.from('1234567890'); - const chunks = platformService._splitFileIntoChunks(fileBuffer, 3); + const ranges = platformService._buildPartByteRanges(10, [ + { partNumber: 1, url: 'https://s3.amazonaws.com/url1', partSizeBytes: 4 }, + { partNumber: 2, url: 'https://s3.amazonaws.com/url2', partSizeBytes: 4 }, + { partNumber: 3, url: 'https://s3.amazonaws.com/url3', partSizeBytes: 2 } + ]); - expect(chunks).to.have.length(3); - expect(chunks[0].toString()).to.equal('1234'); - expect(chunks[1].toString()).to.equal('5678'); - expect(chunks[2].toString()).to.equal('90'); + expect(ranges).to.deep.equal([ + { partNumber: 1, url: 'https://s3.amazonaws.com/url1', partSizeBytes: 4, start: 0, end: 3 }, + { partNumber: 2, url: 'https://s3.amazonaws.com/url2', partSizeBytes: 4, start: 4, end: 7 }, + { partNumber: 3, url: 'https://s3.amazonaws.com/url3', partSizeBytes: 2, start: 8, end: 9 } + ]); }); - it('should handle uneven division correctly', () => { + it('should throw when no parts are provided', () => { const mockDBConnection = getMockDBConnection(); const platformService = new PlatformService(mockDBConnection); - const fileBuffer = Buffer.from('12345'); - const chunks = platformService._splitFileIntoChunks(fileBuffer, 2); + expect(() => platformService._buildPartByteRanges(5, [])).to.throw('Part count must be positive'); + }); + + it('should throw when part instructions do not match file size', () => { + const mockDBConnection = getMockDBConnection(); + const platformService = new PlatformService(mockDBConnection); - expect(chunks).to.have.length(2); - expect(chunks[0].toString()).to.equal('123'); - expect(chunks[1].toString()).to.equal('45'); + expect(() => + platformService._buildPartByteRanges(5, [ + { partNumber: 1, url: 'https://s3.amazonaws.com/url1', partSizeBytes: 2 }, + { partNumber: 2, url: 'https://s3.amazonaws.com/url2', partSizeBytes: 2 } + ]) + ).to.throw('Part instructions do not match file size.'); + }); + + it('should throw when a part size is invalid', () => { + const mockDBConnection = getMockDBConnection(); + const platformService = new PlatformService(mockDBConnection); + + expect(() => + platformService._buildPartByteRanges(5, [ + { partNumber: 1, url: 'https://s3.amazonaws.com/url1', partSizeBytes: 0 }, + { partNumber: 2, url: 'https://s3.amazonaws.com/url2', partSizeBytes: 5 } + ]) + ).to.throw('Invalid part size for part 1.'); }); }); @@ -810,22 +841,21 @@ describe('PlatformService', () => { const platformService = new PlatformService(mockDBConnection); const mockResponse = { + status: 200, + statusText: 'OK', headers: { etag: '"abc123def456"' } }; const axiosPutStub = sinon.stub(axios, 'put').resolves(mockResponse); + const chunkStream = Readable.from([Buffer.from('chunk')]); - const result = await platformService._uploadChunkToS3( - 'https://s3.amazonaws.com/presigned-url', - Buffer.from('chunk'), - 1 - ); + const result = await platformService._uploadChunkToS3('https://s3.amazonaws.com/presigned-url', chunkStream, 1); expect(axiosPutStub).to.have.been.calledOnce; expect(axiosPutStub.getCall(0).args[0]).to.equal('https://s3.amazonaws.com/presigned-url'); - expect(axiosPutStub.getCall(0).args[1]).to.deep.equal(Buffer.from('chunk')); + expect(axiosPutStub.getCall(0).args[1]).to.equal(chunkStream); expect(axiosPutStub.getCall(0).args[2]?.headers?.['Content-Type']).to.equal('application/x-tar'); expect(result).to.deep.equal({ @@ -839,18 +869,17 @@ describe('PlatformService', () => { const platformService = new PlatformService(mockDBConnection); const mockResponse = { + status: 200, + statusText: 'OK', headers: { ETag: '"xyz789"' } }; sinon.stub(axios, 'put').resolves(mockResponse); + const chunkStream = Readable.from([Buffer.from('chunk')]); - const result = await platformService._uploadChunkToS3( - 'https://s3.amazonaws.com/presigned-url', - Buffer.from('chunk'), - 2 - ); + const result = await platformService._uploadChunkToS3('https://s3.amazonaws.com/presigned-url', chunkStream, 2); expect(result.ETag).to.equal('xyz789'); }); @@ -860,17 +889,41 @@ describe('PlatformService', () => { const platformService = new PlatformService(mockDBConnection); const mockResponse = { + status: 200, + statusText: 'OK', headers: {} }; sinon.stub(axios, 'put').resolves(mockResponse); + const chunkStream = Readable.from([Buffer.from('chunk')]); + + try { + await platformService._uploadChunkToS3('https://s3.amazonaws.com/presigned-url', chunkStream, 1); + expect.fail('Should have thrown an error'); + } catch (error) { + expect((error as Error).message).to.include('Failed to upload part 1 to S3'); + } + }); + + it('should throw error when upload returns non-200 status', async () => { + const mockDBConnection = getMockDBConnection(); + const platformService = new PlatformService(mockDBConnection); + + const mockResponse = { + status: 500, + statusText: 'Internal Server Error', + headers: { + etag: '"abc123def456"' + } + }; + + sinon.stub(axios, 'put').resolves(mockResponse); + const chunkStream = Readable.from([Buffer.from('chunk')]); try { - await platformService._uploadChunkToS3('https://s3.amazonaws.com/presigned-url', Buffer.from('chunk'), 1); + await platformService._uploadChunkToS3('https://s3.amazonaws.com/presigned-url', chunkStream, 1); expect.fail('Should have thrown an error'); } catch (error) { - // The error gets caught and re-thrown, but the original error message should be in the error chain - // We check for the wrapped error message expect((error as Error).message).to.include('Failed to upload part 1 to S3'); } }); @@ -880,9 +933,10 @@ describe('PlatformService', () => { const platformService = new PlatformService(mockDBConnection); sinon.stub(axios, 'put').rejects(new Error('Network error')); + const chunkStream = Readable.from([Buffer.from('chunk')]); try { - await platformService._uploadChunkToS3('https://s3.amazonaws.com/presigned-url', Buffer.from('chunk'), 1); + await platformService._uploadChunkToS3('https://s3.amazonaws.com/presigned-url', chunkStream, 1); expect.fail('Should have thrown an error'); } catch (error) { expect((error as Error).message).to.include('Failed to upload part 1 to S3'); @@ -899,27 +953,120 @@ describe('PlatformService', () => { const mockDBConnection = getMockDBConnection(); const platformService = new PlatformService(mockDBConnection); - const fs = require('node:fs'); - sinon.stub(fs, 'readFileSync').returns(Buffer.from('1234567890')); + sinon.stub(fs, 'statSync').returns({ size: 10 } as fs.Stats); + const createReadStreamStub = sinon.stub(fs, 'createReadStream').returns({} as fs.ReadStream); const presignedUrls = [ - { partNumber: 1, url: 'https://s3.amazonaws.com/url1' }, - { partNumber: 2, url: 'https://s3.amazonaws.com/url2' }, - { partNumber: 3, url: 'https://s3.amazonaws.com/url3' } + { partNumber: 2, url: 'https://s3.amazonaws.com/url2', partSizeBytes: 5 }, + { partNumber: 1, url: 'https://s3.amazonaws.com/url1', partSizeBytes: 3 }, + { partNumber: 3, url: 'https://s3.amazonaws.com/url3', partSizeBytes: 2 } ]; - const uploadChunkStub = sinon.stub(PlatformService.prototype, '_uploadChunkToS3'); - uploadChunkStub.onCall(0).resolves({ PartNumber: 2, ETag: 'etag2' }); - uploadChunkStub.onCall(1).resolves({ PartNumber: 1, ETag: 'etag1' }); + const uploadChunkStub = sinon.stub(platformService, '_uploadChunkToS3'); + uploadChunkStub.onCall(0).resolves({ PartNumber: 1, ETag: 'etag1' }); + uploadChunkStub.onCall(1).resolves({ PartNumber: 2, ETag: 'etag2' }); uploadChunkStub.onCall(2).resolves({ PartNumber: 3, ETag: 'etag3' }); - const result = await platformService._uploadTarFileParts('/path/to/file.tar', presignedUrls, 3); + const result = await platformService._uploadTarFileParts( + '/path/to/file.tar', + presignedUrls, + presignedUrls.length + ); expect(uploadChunkStub).to.have.been.calledThrice; - expect(result).to.have.length(3); - expect(result[0].PartNumber).to.equal(1); - expect(result[1].PartNumber).to.equal(2); - expect(result[2].PartNumber).to.equal(3); + expect(createReadStreamStub.callCount).to.equal(3); + expect(createReadStreamStub.getCall(0).args).to.deep.equal(['/path/to/file.tar', { start: 0, end: 2 }]); + expect(createReadStreamStub.getCall(1).args).to.deep.equal(['/path/to/file.tar', { start: 3, end: 7 }]); + expect(createReadStreamStub.getCall(2).args).to.deep.equal(['/path/to/file.tar', { start: 8, end: 9 }]); + + expect(uploadChunkStub.getCall(0).args[0]).to.equal('https://s3.amazonaws.com/url1'); + expect(uploadChunkStub.getCall(0).args[1]).to.equal(createReadStreamStub.getCall(0).returnValue); + expect(uploadChunkStub.getCall(0).args[2]).to.equal(1); + expect(uploadChunkStub.getCall(0).args[3]).to.equal(3); + + expect(uploadChunkStub.getCall(1).args[0]).to.equal('https://s3.amazonaws.com/url2'); + expect(uploadChunkStub.getCall(1).args[1]).to.equal(createReadStreamStub.getCall(1).returnValue); + expect(uploadChunkStub.getCall(1).args[2]).to.equal(2); + expect(uploadChunkStub.getCall(1).args[3]).to.equal(5); + + expect(uploadChunkStub.getCall(2).args[0]).to.equal('https://s3.amazonaws.com/url3'); + expect(uploadChunkStub.getCall(2).args[1]).to.equal(createReadStreamStub.getCall(2).returnValue); + expect(uploadChunkStub.getCall(2).args[2]).to.equal(3); + expect(uploadChunkStub.getCall(2).args[3]).to.equal(2); + + expect(result).to.deep.equal([ + { PartNumber: 1, ETag: 'etag1' }, + { PartNumber: 2, ETag: 'etag2' }, + { PartNumber: 3, ETag: 'etag3' } + ]); + }); + + it('should respect concurrencyLimit batching', async () => { + const mockDBConnection = getMockDBConnection(); + const platformService = new PlatformService(mockDBConnection); + + sinon.stub(fs, 'statSync').returns({ size: 10 } as fs.Stats); + sinon.stub(fs, 'createReadStream').returns({} as fs.ReadStream); + + const presignedUrls = [ + { partNumber: 1, url: 'https://s3.amazonaws.com/url1', partSizeBytes: 2 }, + { partNumber: 2, url: 'https://s3.amazonaws.com/url2', partSizeBytes: 2 }, + { partNumber: 3, url: 'https://s3.amazonaws.com/url3', partSizeBytes: 2 }, + { partNumber: 4, url: 'https://s3.amazonaws.com/url4', partSizeBytes: 2 }, + { partNumber: 5, url: 'https://s3.amazonaws.com/url5', partSizeBytes: 2 } + ]; + + const uploadChunkStub = sinon + .stub(platformService, '_uploadChunkToS3') + .callsFake(async (_url, _chunk, partNumber, _partSizeBytes) => { + return { PartNumber: partNumber, ETag: `etag${partNumber}` }; + }); + + const result = await platformService._uploadTarFileParts( + '/path/to/file.tar', + presignedUrls, + presignedUrls.length, + { + concurrencyLimit: 2 + } + ); + + expect(uploadChunkStub.callCount).to.equal(5); + expect(result).to.deep.equal([ + { PartNumber: 1, ETag: 'etag1' }, + { PartNumber: 2, ETag: 'etag2' }, + { PartNumber: 3, ETag: 'etag3' }, + { PartNumber: 4, ETag: 'etag4' }, + { PartNumber: 5, ETag: 'etag5' } + ]); + }); + + it('should throw when presigned URL count does not match expected part count', async () => { + const mockDBConnection = getMockDBConnection(); + const platformService = new PlatformService(mockDBConnection); + + const presignedUrls = [{ partNumber: 1, url: 'https://s3.amazonaws.com/url1', partSizeBytes: 5 }]; + + try { + await platformService._uploadTarFileParts('/path/to/file.tar', presignedUrls, 2); + expect.fail('Should have thrown an error'); + } catch (error) { + expect((error as Error).message).to.equal('Presigned URL count (1) does not match expected part count (2)'); + } + }); + + it('should throw when concurrencyLimit is invalid', async () => { + const mockDBConnection = getMockDBConnection(); + const platformService = new PlatformService(mockDBConnection); + + const presignedUrls = [{ partNumber: 1, url: 'https://s3.amazonaws.com/url1', partSizeBytes: 5 }]; + + try { + await platformService._uploadTarFileParts('/path/to/file.tar', presignedUrls, 1, { concurrencyLimit: 0 }); + expect.fail('Should have thrown an error'); + } catch (error) { + expect((error as Error).message).to.equal('concurrencyLimit must be a positive integer'); + } }); }); @@ -946,8 +1093,8 @@ describe('PlatformService', () => { partSizeBytes: 5242880, partCount: 2, presignedUrls: [ - { partNumber: 1, url: 'https://s3.amazonaws.com/url1' }, - { partNumber: 2, url: 'https://s3.amazonaws.com/url2' } + { partNumber: 1, url: 'https://s3.amazonaws.com/url1', partSizeBytes: 512 }, + { partNumber: 2, url: 'https://s3.amazonaws.com/url2', partSizeBytes: 512 } ] } }; @@ -1000,8 +1147,8 @@ describe('PlatformService', () => { partSizeBytes: 5242880, partCount: 2, presignedUrls: [ - { partNumber: 1, url: 'https://s3.amazonaws.com/url1' }, - { partNumber: 2, url: 'https://s3.amazonaws.com/url2' } + { partNumber: 1, url: 'https://s3.amazonaws.com/url1', partSizeBytes: 512 }, + { partNumber: 2, url: 'https://s3.amazonaws.com/url2', partSizeBytes: 512 } ] } }; @@ -1050,7 +1197,7 @@ describe('PlatformService', () => { key: 's3-key', partSizeBytes: 5242880, partCount: 2, - presignedUrls: [{ partNumber: 1, url: 'https://s3.amazonaws.com/url1' }] + presignedUrls: [{ partNumber: 1, url: 'https://s3.amazonaws.com/url1', partSizeBytes: 1024 }] } }; diff --git a/api/src/services/platform-service.ts b/api/src/services/platform-service.ts index 953ceb4e1f..a777c3c2f7 100644 --- a/api/src/services/platform-service.ts +++ b/api/src/services/platform-service.ts @@ -25,7 +25,13 @@ import { ObservationService } from './observation-services/observation-service'; import { IBioHubSubmissionHistoryRow, IBioHubWrappedSubmissionHistoryResponse, - ISubmissionHistoryRow + ISubmissionHistoryRow, + SubmissionUploadInitiateResponse, + SubmissionUploadInitiateResult, + UploadPart, + UploadPartByteRange, + UploadResult, + UploadTarFilePartsOptions } from './platform-service.interface'; import { SamplePeriodService } from './sample-period-service'; import { SampleSiteService } from './sample-site-service'; @@ -865,7 +871,7 @@ export class PlatformService extends DBService { * @param {PostSurveySubmissionToBioHubObject} surveyDataPackage - Survey data package * @param {string} submissionComment - Comment for the submission * @param {string | null} existingSubmissionUuid - When set, initiate upload for this existing submission (re-publish) - * @return {*} {Promise<{uploadId: string, s3UploadId: string, key: string, presignedUrls: Array<{partNumber: number, url: string}>, partCount: number, submissionId: string, submissionUploadId: string}>} + * @return {*} {Promise} * @memberof PlatformService */ async _initiateSubmissionUpload( @@ -874,15 +880,7 @@ export class PlatformService extends DBService { surveyDataPackage: PostSurveySubmissionToBioHubObject, submissionComment: string, existingSubmissionUuid: string | null - ): Promise<{ - uploadId: string; - s3UploadId: string; - key: string; - presignedUrls: Array<{ partNumber: number; url: string }>; - partCount: number; - submissionId: string; - submissionUploadId: string; - }> { + ): Promise { defaultLog.debug({ label: '_initiateSubmissionUpload', tarFileSize, @@ -916,17 +914,7 @@ export class PlatformService extends DBService { }; try { - const response = await axios.post<{ - submissionId: string; - submissionUploadId: string; - uploadId: string; - s3UploadId: string; - uploadArchiveId: string; - key: string; - partSizeBytes: number; - partCount: number; - presignedUrls: Array<{ partNumber: number; url: string }>; - }>(backboneSubmissionUploadUrl, requestBody, { + const response = await axios.post(backboneSubmissionUploadUrl, requestBody, { headers: { authorization: `Bearer ${token}`, 'Content-Type': 'application/json' @@ -972,60 +960,77 @@ export class PlatformService extends DBService { } /** - * Splits a file buffer into chunks for multipart upload. + * Builds byte ranges for multipart upload directly from backend part instructions. * - * @param {Buffer} fileBuffer - The file buffer to split - * @param {number} numChunks - Number of chunks to create - * @return {*} {Buffer[]} Array of buffer chunks + * @param {number} fileSize - TAR file size in bytes + * @param {UploadPart[]} orderedPresignedParts - Presigned parts sorted by part number + * @return {*} {UploadPartByteRange[]} * @memberof PlatformService */ - _splitFileIntoChunks(fileBuffer: Buffer, numChunks: number): Buffer[] { - if (numChunks === 1) { - return [fileBuffer]; + _buildPartByteRanges(fileSize: number, orderedPresignedParts: UploadPart[]): UploadPartByteRange[] { + if (!orderedPresignedParts.length) { + throw new Error('Part count must be positive'); } - const chunkSize = Math.ceil(fileBuffer.length / numChunks); - const chunks: Buffer[] = []; + const expectedBytes = orderedPresignedParts.reduce((sum, part) => sum + part.partSizeBytes, 0); + if (expectedBytes !== fileSize) { + throw new Error('Part instructions do not match file size.'); + } + + const partRanges: UploadPartByteRange[] = []; + let start = 0; - for (let i = 0; i < numChunks; i++) { - const start = i * chunkSize; - const end = Math.min(start + chunkSize, fileBuffer.length); - chunks.push(fileBuffer.slice(start, end)); + for (const part of orderedPresignedParts) { + if (!Number.isFinite(part.partSizeBytes) || part.partSizeBytes <= 0) { + throw new Error(`Invalid part size for part ${part.partNumber}.`); + } + + const end = start + part.partSizeBytes - 1; + partRanges.push({ + ...part, + start, + end + }); + start += part.partSizeBytes; } - return chunks; + return partRanges; } /** * Uploads a single chunk to S3 using a presigned URL. * * @param {string} presignedUrl - The presigned S3 URL - * @param {Buffer} chunk - The chunk data to upload + * @param {NodeJS.ReadableStream} chunk - The chunk data stream to upload * @param {number} partNumber - The part number (1-indexed) - * @return {*} {Promise<{PartNumber: number, ETag: string}>} + * @return {*} {Promise} * @memberof PlatformService */ async _uploadChunkToS3( presignedUrl: string, - chunk: Buffer, - partNumber: number - ): Promise<{ PartNumber: number; ETag: string }> { + chunk: NodeJS.ReadableStream, + partNumber: number, + partSizeBytes?: number + ): Promise { try { const response = await axios.put(presignedUrl, chunk, { headers: { - 'Content-Type': 'application/x-tar' + 'Content-Type': 'application/x-tar', + ...(partSizeBytes ? { 'Content-Length': String(partSizeBytes) } : {}) }, maxBodyLength: Infinity, maxContentLength: Infinity }); - // Get ETag from response headers (case-insensitive) + if (response.status !== 200) { + throw new Error(`Upload failed with status ${response.status}: ${response.statusText}`); + } + const etag = response.headers.etag || response.headers.ETag; if (!etag) { throw new Error(`No ETag returned from S3 upload for part ${partNumber}`); } - // Remove quotes from ETag const cleanEtag = etag.replaceAll('"', ''); defaultLog.debug({ @@ -1046,6 +1051,7 @@ export class PlatformService extends DBService { partNumber, error: formatAxiosError(error) }); + throw new Error(`Failed to upload part ${partNumber} to S3`); } } @@ -1053,49 +1059,63 @@ export class PlatformService extends DBService { /** * Uploads TAR file parts to S3 using presigned URLs with concurrency control. * + * Part boundaries are derived from `presignedUrls[].partSizeBytes`, so each + * uploaded chunk aligns with the backend-issued multipart layout. + * * @param {string} tarFilePath - Path to the TAR file - * @param {Array<{partNumber: number, url: string}>} presignedUrls - Array of presigned URLs + * @param {UploadPart[]} presignedUrls - Array of presigned URLs with per-part sizes * @param {number} partCount - Total number of parts - * @return {*} {Promise>} Array of uploaded parts with ETags + * @param {UploadTarFilePartsOptions} options - Optional upload settings + * @return {*} {Promise} Array of uploaded parts with ETags * @memberof PlatformService */ async _uploadTarFileParts( tarFilePath: string, - presignedUrls: Array<{ partNumber: number; url: string }>, - partCount: number - ): Promise> { - defaultLog.debug({ label: '_uploadTarFileParts', tarFilePath, partCount }); - - // Read TAR file as Buffer - const fileBuffer = fs.readFileSync(tarFilePath); + presignedUrls: UploadPart[], + partCount: number, + options: UploadTarFilePartsOptions = {} + ): Promise { + const { concurrencyLimit = 4 } = options; + if (!Number.isInteger(concurrencyLimit) || concurrencyLimit <= 0) { + throw new Error('concurrencyLimit must be a positive integer'); + } - // Split into chunks - const chunks = this._splitFileIntoChunks(fileBuffer, partCount); + defaultLog.debug({ + label: '_uploadTarFileParts', + tarFilePath, + partCount, + presignedUrlCount: presignedUrls.length + }); - // Upload with concurrency limit (4 parallel uploads) - const concurrencyLimit = 4; - const parts: Array<{ PartNumber: number; ETag: string }> = []; + if (presignedUrls.length !== partCount) { + throw new Error( + `Presigned URL count (${presignedUrls.length}) does not match expected part count (${partCount})` + ); + } - for (let i = 0; i < presignedUrls.length; i += concurrencyLimit) { - const batch = presignedUrls.slice(i, i + concurrencyLimit); - const chunkBatch = chunks.slice(i, i + concurrencyLimit); + const orderedPresignedUrls = [...presignedUrls].sort((a, b) => a.partNumber - b.partNumber); + const fileSize = fs.statSync(tarFilePath).size; + const partRanges = this._buildPartByteRanges(fileSize, orderedPresignedUrls); + const results: UploadResult[] = []; - defaultLog.debug({ - label: '_uploadTarFileParts', - message: 'Uploading batch', - batchStart: i, - batchSize: batch.length - }); + for (let i = 0; i < partRanges.length; i += concurrencyLimit) { + const partBatch = partRanges.slice(i, i + concurrencyLimit); const batchResults = await Promise.all( - batch.map((urlObj, idx) => this._uploadChunkToS3(urlObj.url, chunkBatch[idx], urlObj.partNumber)) + partBatch.map((part) => + this._uploadChunkToS3( + part.url, + fs.createReadStream(tarFilePath, { start: part.start, end: part.end }), + part.partNumber, + part.partSizeBytes + ) + ) ); - parts.push(...batchResults); + results.push(...batchResults); } - // Sort parts by PartNumber - parts.sort((a, b) => a.PartNumber - b.PartNumber); + const parts = results.sort((a, b) => a.PartNumber - b.PartNumber); defaultLog.info({ label: '_uploadTarFileParts', @@ -1113,7 +1133,7 @@ export class PlatformService extends DBService { * @param {string} uploadId - Upload ID from initiate response * @param {string} s3UploadId - S3 upload ID from initiate response * @param {string} key - S3 key from initiate response - * @param {Array<{PartNumber: number, ETag: string}>} parts - Array of uploaded parts with ETags + * @param {UploadResult[]} parts - Array of uploaded parts with ETags * @return {*} {Promise} * @memberof PlatformService */ @@ -1122,7 +1142,7 @@ export class PlatformService extends DBService { uploadId: string, s3UploadId: string, key: string, - parts: Array<{ PartNumber: number; ETag: string }> + parts: UploadResult[] ): Promise { defaultLog.debug({ label: '_completeSubmissionUpload', uploadId, partCount: parts.length });