diff --git a/src/config.ts b/src/config.ts index 7b573c1c7..de9823ccb 100644 --- a/src/config.ts +++ b/src/config.ts @@ -65,6 +65,7 @@ type StorageConfigType = { storageFileEtagAlgorithm: 'mtime' | 'md5' storageS3InternalTracesEnabled?: boolean storageS3MaxSockets: number + storageS3BatchDeleteEnabled: boolean storageS3DisableChecksum: boolean storageS3UploadQueueSize: number storageS3Bucket: string @@ -364,6 +365,8 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType { getOptionalConfigFromEnv('STORAGE_S3_MAX_SOCKETS', 'GLOBAL_S3_MAX_SOCKETS') || '200', 10 ), + storageS3BatchDeleteEnabled: + getOptionalConfigFromEnv('STORAGE_S3_BATCH_DELETE_ENABLED') !== 'false', storageS3DisableChecksum: getOptionalConfigFromEnv('STORAGE_S3_DISABLE_CHECKSUM') === 'true', storageS3UploadQueueSize: envNumber(getOptionalConfigFromEnv('STORAGE_S3_UPLOAD_QUEUE_SIZE')) ?? 2, diff --git a/src/http/routes/tus/index.ts b/src/http/routes/tus/index.ts index b75f88cab..0af546525 100644 --- a/src/http/routes/tus/index.ts +++ b/src/http/routes/tus/index.ts @@ -36,6 +36,7 @@ const { storageS3ForcePathStyle, storageS3Region, storageS3ClientTimeout, + storageS3BatchDeleteEnabled, tusUrlExpiryMs, tusPath, tusPartSize, @@ -117,6 +118,7 @@ function createTusServer( maxRetries: 10, retryDelayMs: 250, renewalIntervalMs: 10 * 1000, // 10 seconds + batchDeleteEnabled: storageS3BatchDeleteEnabled, s3Client: new S3Client({ requestHandler: new NodeHttpHandler({ ...agent, diff --git a/src/storage/backend/s3/adapter.ts b/src/storage/backend/s3/adapter.ts index 5c8fc4736..2c3d91196 100644 --- a/src/storage/backend/s3/adapter.ts +++ b/src/storage/backend/s3/adapter.ts @@ -330,7 +330,12 @@ export class S3Backend implements StorageBackendAdapter { * @param prefixes */ async deleteObjects(bucket: string, prefixes: string[]): Promise { + const { storageS3BatchDeleteEnabled } = getConfig() + try { + if (!storageS3BatchDeleteEnabled) { + throw Object.assign(new Error('NotImplemented'), { name: 'NotImplemented' }) + } const s3Prefixes = prefixes.map((ele) => { return { Key: ele } }) @@ -343,6 +348,26 @@ export class S3Backend implements StorageBackendAdapter { }) await this.client.send(command) } catch (e) { + // Some S3-compatible backends (e.g. GCS) do not support DeleteObjects; fall back to individual deletes + const code = (e as { Code?: string; name?: string })?.Code ?? (e as { name?: string })?.name + if (code === 'NotImplemented') { + const results = await Promise.allSettled( + prefixes.map((key) => + this.client.send(new DeleteObjectCommand({ Bucket: bucket, Key: key })) + ) + ) + for (const result of results) { + if (result.status === 'rejected') { + const errCode = + (result.reason as { Code?: string })?.Code ?? + (result.reason as { name?: string })?.name + if (errCode !== 'NoSuchKey') { + throw StorageBackendError.fromError(result.reason) + } + } + } + return + } throw StorageBackendError.fromError(e) } } diff --git a/src/storage/protocols/tus/s3-locker.ts b/src/storage/protocols/tus/s3-locker.ts index 7e75a4901..8ff0cc502 100644 --- a/src/storage/protocols/tus/s3-locker.ts +++ b/src/storage/protocols/tus/s3-locker.ts @@ -19,6 +19,7 @@ export interface S3LockerOptions { renewalIntervalMs?: number maxRetries?: number retryDelayMs?: number + batchDeleteEnabled?: boolean logger?: Pick } @@ -37,6 +38,7 @@ export class S3Locker implements Locker { private readonly renewalIntervalMs: number private readonly maxRetries: number private readonly retryDelayMs: number + private readonly batchDeleteEnabled: boolean private readonly logger: Pick private readonly notifier: LockNotifier @@ -49,6 +51,7 @@ export class S3Locker implements Locker { this.renewalIntervalMs = options.renewalIntervalMs || 10000 // 10 seconds this.maxRetries = options.maxRetries || 10 this.retryDelayMs = options.retryDelayMs || 500 + this.batchDeleteEnabled = options.batchDeleteEnabled !== false // default true this.logger = options.logger || console // Validate configuration @@ -245,6 +248,9 @@ export class S3Locker implements Locker { const batch = expiredLocks.slice(i, i + 1000) try { + if (!this.batchDeleteEnabled) { + throw Object.assign(new Error('NotImplemented'), { name: 'NotImplemented' }) + } await this.s3Client.send( new DeleteObjectsCommand({ Bucket: this.bucket, @@ -255,8 +261,27 @@ export class S3Locker implements Locker { }) ) this.logger.log(`Cleaned up ${batch.length} expired locks in batch`) - } catch (error) { - this.logger.warn(`Failed to delete batch of expired locks:`, error) + } catch (error: any) { + const code = error?.Code ?? error?.name + if (code === 'NotImplemented') { + const results = await Promise.allSettled( + batch.map((key) => + this.s3Client.send(new DeleteObjectCommand({ Bucket: this.bucket, Key: key })) + ) + ) + for (const result of results) { + if (result.status === 'rejected') { + const errCode = + (result.reason as { Code?: string })?.Code ?? + (result.reason as { name?: string })?.name + if (errCode !== 'NoSuchKey') { + this.logger.warn(`Failed to delete expired lock in fallback:`, result.reason) + } + } + } + } else { + this.logger.warn(`Failed to delete batch of expired locks:`, error) + } } } } diff --git a/src/test/s3-adapter.test.ts b/src/test/s3-adapter.test.ts index 0d5288e5f..9e153697d 100644 --- a/src/test/s3-adapter.test.ts +++ b/src/test/s3-adapter.test.ts @@ -2,6 +2,7 @@ import { S3Client } from '@aws-sdk/client-s3' import { Readable } from 'stream' +import * as config from '../config' import { S3Backend } from '../storage/backend/s3/adapter' jest.mock('@aws-sdk/client-s3', () => { @@ -20,9 +21,9 @@ describe('S3Backend', () => { beforeEach(() => { jest.clearAllMocks() mockSend = jest.fn() - ;(S3Client as jest.Mock).mockImplementation(() => ({ - send: mockSend, - })) + ; (S3Client as jest.Mock).mockImplementation(() => ({ + send: mockSend, + })) }) describe('getObject', () => { @@ -74,4 +75,109 @@ describe('S3Backend', () => { expect(result.metadata.mimetype).toBe('image/png') }) }) + + describe('deleteObjects', () => { + test('should use batch DeleteObjectsCommand when backend supports it', async () => { + mockSend.mockResolvedValue({ + Deleted: [{ Key: 'file1.txt' }, { Key: 'file2.txt' }], + $metadata: { httpStatusCode: 200 }, + }) + + const backend = new S3Backend({ region: 'us-east-1', endpoint: 'http://localhost:9000' }) + await backend.deleteObjects('test-bucket', ['file1.txt', 'file2.txt']) + + expect(mockSend).toHaveBeenCalledTimes(1) + expect(mockSend.mock.calls[0][0].constructor.name).toBe('DeleteObjectsCommand') + }) + + test('should fall back to individual DeleteObjectCommands when backend returns NotImplemented', async () => { + const err = Object.assign(new Error('NotImplemented'), { Code: 'NotImplemented' }) + mockSend + .mockRejectedValueOnce(err) + .mockResolvedValue({ $metadata: { httpStatusCode: 204 } }) + + const backend = new S3Backend({ region: 'us-east-1', endpoint: 'http://localhost:9000' }) + await backend.deleteObjects('test-bucket', ['file1.txt', 'file2.txt']) + + expect(mockSend).toHaveBeenCalledTimes(3) + expect(mockSend.mock.calls[0][0].constructor.name).toBe('DeleteObjectsCommand') + expect(mockSend.mock.calls[1][0].constructor.name).toBe('DeleteObjectCommand') + expect(mockSend.mock.calls[2][0].constructor.name).toBe('DeleteObjectCommand') + }) + + test('should ignore NoSuchKey errors in the individual fallback', async () => { + const notImplemented = Object.assign(new Error('NotImplemented'), { Code: 'NotImplemented' }) + const noSuchKey = Object.assign(new Error('NoSuchKey'), { Code: 'NoSuchKey' }) + mockSend + .mockRejectedValueOnce(notImplemented) + .mockResolvedValueOnce({ $metadata: { httpStatusCode: 204 } }) + .mockRejectedValueOnce(noSuchKey) + + const backend = new S3Backend({ region: 'us-east-1', endpoint: 'http://localhost:9000' }) + await expect( + backend.deleteObjects('test-bucket', ['file1.txt', 'file2.txt']) + ).resolves.toBeUndefined() + }) + + test('should throw when an individual fallback delete fails with a real error', async () => { + const notImplemented = Object.assign(new Error('NotImplemented'), { Code: 'NotImplemented' }) + const accessDenied = Object.assign(new Error('AccessDenied'), { Code: 'AccessDenied' }) + mockSend + .mockRejectedValueOnce(notImplemented) + .mockResolvedValueOnce({ $metadata: { httpStatusCode: 204 } }) + .mockRejectedValueOnce(accessDenied) + + const backend = new S3Backend({ region: 'us-east-1', endpoint: 'http://localhost:9000' }) + await expect( + backend.deleteObjects('test-bucket', ['file1.txt', 'file2.txt']) + ).rejects.toThrow() + }) + + test('should rethrow errors that are not NotImplemented', async () => { + const err = Object.assign(new Error('AccessDenied'), { Code: 'AccessDenied' }) + mockSend.mockRejectedValue(err) + + const backend = new S3Backend({ region: 'us-east-1', endpoint: 'http://localhost:9000' }) + await expect(backend.deleteObjects('test-bucket', ['file1.txt'])).rejects.toThrow() + expect(mockSend).toHaveBeenCalledTimes(1) + }) + + test('should skip DeleteObjectsCommand and use individual deletes when batchDeleteEnabled is false', async () => { + const getConfigSpy = jest + .spyOn(config, 'getConfig') + .mockReturnValue({ storageS3BatchDeleteEnabled: false } as any) + mockSend.mockResolvedValue({ $metadata: { httpStatusCode: 204 } }) + + try { + const backend = new S3Backend({ region: 'us-east-1', endpoint: 'http://localhost:9000' }) + await backend.deleteObjects('test-bucket', ['file1.txt', 'file2.txt']) + + // No DeleteObjectsCommand call — straight to individual deletes + expect(mockSend).toHaveBeenCalledTimes(2) + expect(mockSend.mock.calls[0][0].constructor.name).toBe('DeleteObjectCommand') + expect(mockSend.mock.calls[1][0].constructor.name).toBe('DeleteObjectCommand') + } finally { + getConfigSpy.mockRestore() + } + }) + + test('should ignore NoSuchKey when batchDeleteEnabled is false', async () => { + const getConfigSpy = jest + .spyOn(config, 'getConfig') + .mockReturnValue({ storageS3BatchDeleteEnabled: false } as any) + const noSuchKey = Object.assign(new Error('NoSuchKey'), { Code: 'NoSuchKey' }) + mockSend + .mockResolvedValueOnce({ $metadata: { httpStatusCode: 204 } }) + .mockRejectedValueOnce(noSuchKey) + + try { + const backend = new S3Backend({ region: 'us-east-1', endpoint: 'http://localhost:9000' }) + await expect( + backend.deleteObjects('test-bucket', ['file1.txt', 'file2.txt']) + ).resolves.toBeUndefined() + } finally { + getConfigSpy.mockRestore() + } + }) + }) }) diff --git a/src/test/s3-locker.test.ts b/src/test/s3-locker.test.ts index 8954d045e..cd8c1a445 100644 --- a/src/test/s3-locker.test.ts +++ b/src/test/s3-locker.test.ts @@ -782,4 +782,163 @@ describe('S3Locker', () => { await lock.unlock() }) }) + + describe('cleanupZombieLocks – NotImplemented fallback', () => { + // Helper: place a pre-expired lock object in S3 + async function putExpiredLock(key: string) { + await s3Client.send( + new PutObjectCommand({ + Bucket: testBucket, + Key: key, + Body: JSON.stringify({ + lockId: key, + expiresAt: Date.now() - 10000, + createdAt: Date.now() - 20000, + renewedAt: Date.now() - 20000, + }), + ContentType: 'application/json', + }) + ) + } + + test('falls back to individual deletes when DeleteObjectsCommand returns NotImplemented', async () => { + const lockKey = 'test-locks/not-impl-fallback-lock.lock' + await putExpiredLock(lockKey) + + const originalSend = s3Client.send.bind(s3Client) + const sendSpy = jest.spyOn(s3Client, 'send').mockImplementation(async (command: any) => { + // Only reject the batch delete; let everything else through + if (command.constructor.name === 'DeleteObjectsCommand') { + const err: any = new Error('NotImplemented') + err.name = 'NotImplemented' + throw err + } + return originalSend(command) + }) + + try { + // Should not throw – fallback handles the error + await expect(locker.cleanupZombieLocks()).resolves.not.toThrow() + } finally { + sendSpy.mockRestore() + // Cleanup in case individual delete didn't run + try { + await originalSend( + // @ts-ignore - manual cleanup + new (await import('@aws-sdk/client-s3')).DeleteObjectCommand({ + Bucket: testBucket, + Key: lockKey, + }) + ) + } catch { + // ignore + } + } + }) + + test('ignores NoSuchKey errors for individual deletes in NotImplemented fallback', async () => { + const lockKey = 'test-locks/not-impl-nosuchkey-lock.lock' + await putExpiredLock(lockKey) + + const originalSend = s3Client.send.bind(s3Client) + const sendSpy = jest.spyOn(s3Client, 'send').mockImplementation(async (command: any) => { + if (command.constructor.name === 'DeleteObjectsCommand') { + const err: any = new Error('NotImplemented') + err.name = 'NotImplemented' + throw err + } + if (command.constructor.name === 'DeleteObjectCommand') { + const err: any = new Error('NoSuchKey') + err.name = 'NoSuchKey' + throw err + } + return originalSend(command) + }) + + try { + // NoSuchKey on individual deletes must be swallowed; method must not throw + await expect(locker.cleanupZombieLocks()).resolves.not.toThrow() + } finally { + sendSpy.mockRestore() + } + }) + + test('warns (but does not throw) on real individual-delete errors in fallback', async () => { + const lockKey = 'test-locks/not-impl-real-err-lock.lock' + await putExpiredLock(lockKey) + + const mockWarn = jest.fn() + const warnLocker = new S3Locker({ + s3Client, + bucket: testBucket, + notifier: mockNotifier, + keyPrefix: 'test-locks/', + lockTtlMs: 5000, + renewalIntervalMs: 1000, + logger: { log: jest.fn(), warn: mockWarn, error: jest.fn() }, + }) + + const originalSend = s3Client.send.bind(s3Client) + const sendSpy = jest.spyOn(s3Client, 'send').mockImplementation(async (command: any) => { + if (command.constructor.name === 'DeleteObjectsCommand') { + const err: any = new Error('NotImplemented') + err.name = 'NotImplemented' + throw err + } + if (command.constructor.name === 'DeleteObjectCommand') { + const err: any = new Error('AccessDenied') + err.name = 'AccessDenied' + throw err + } + return originalSend(command) + }) + + try { + await expect(warnLocker.cleanupZombieLocks()).resolves.not.toThrow() + // The individual failure should be logged as a warning + expect(mockWarn).toHaveBeenCalledWith( + expect.stringContaining('Failed to delete expired lock in fallback:'), + expect.anything() + ) + } finally { + sendSpy.mockRestore() + } + }) + + test('skips DeleteObjectsCommand entirely when batchDeleteEnabled is false', async () => { + const lockKey = 'test-locks/batch-disabled-lock.lock' + await putExpiredLock(lockKey) + + const deletedKeys: string[] = [] + const originalSend = s3Client.send.bind(s3Client) + const sendSpy = jest.spyOn(s3Client, 'send').mockImplementation(async (command: any) => { + if (command.constructor.name === 'DeleteObjectsCommand') { + throw new Error('Should not have called DeleteObjectsCommand when batch is disabled') + } + if (command.constructor.name === 'DeleteObjectCommand') { + deletedKeys.push(command.input.Key) + } + return originalSend(command) + }) + + const batchDisabledLocker = new S3Locker({ + s3Client, + bucket: testBucket, + notifier: mockNotifier, + keyPrefix: 'test-locks/', + lockTtlMs: 5000, + renewalIntervalMs: 1000, + batchDeleteEnabled: false, + logger: { log: jest.fn(), warn: jest.fn(), error: jest.fn() }, + }) + + try { + await expect(batchDisabledLocker.cleanupZombieLocks()).resolves.not.toThrow() + // The expired lock should have been deleted via an individual DeleteObjectCommand + expect(deletedKeys).toContain(lockKey) + } finally { + sendSpy.mockRestore() + } + }) + }) })