-
-
Notifications
You must be signed in to change notification settings - Fork 281
fix: fall back to individual deletes when S3 backend returns NotImplemented for DeleteObjects #899
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 3 commits
f7db348
cf308f1
6d4e96c
80d7e22
d736718
077adc0
bb23880
2d1e61a
50c59ad
b36e5d8
de221c1
7940539
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -36,6 +36,7 @@ const { | |||||||||||||||||||||||||||||||||
| storageS3ForcePathStyle, | ||||||||||||||||||||||||||||||||||
| storageS3Region, | ||||||||||||||||||||||||||||||||||
| storageS3ClientTimeout, | ||||||||||||||||||||||||||||||||||
| storageS3BatchDeleteEnabled, | ||||||||||||||||||||||||||||||||||
| tusUrlExpiryMs, | ||||||||||||||||||||||||||||||||||
| tusPath, | ||||||||||||||||||||||||||||||||||
| tusPartSize, | ||||||||||||||||||||||||||||||||||
|
|
@@ -117,6 +118,7 @@ function createTusServer( | |||||||||||||||||||||||||||||||||
| maxRetries: 10, | ||||||||||||||||||||||||||||||||||
| retryDelayMs: 250, | ||||||||||||||||||||||||||||||||||
| renewalIntervalMs: 10 * 1000, // 10 seconds | ||||||||||||||||||||||||||||||||||
| batchDeleteEnabled: storageS3BatchDeleteEnabled, | ||||||||||||||||||||||||||||||||||
| s3Client: new S3Client({ | ||||||||||||||||||||||||||||||||||
| requestHandler: new NodeHttpHandler({ | ||||||||||||||||||||||||||||||||||
| ...agent, | ||||||||||||||||||||||||||||||||||
|
|
@@ -256,14 +258,14 @@ const authenticatedRoutes = fastifyPlugin( | |||||||||||||||||||||||||||||||||
| }) | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| fastify.addHook('preHandler', async (req) => { | ||||||||||||||||||||||||||||||||||
| ;(req.raw as MultiPartRequest).log = req.log | ||||||||||||||||||||||||||||||||||
| ;(req.raw as MultiPartRequest).upload = { | ||||||||||||||||||||||||||||||||||
| storage: req.storage, | ||||||||||||||||||||||||||||||||||
| owner: req.owner, | ||||||||||||||||||||||||||||||||||
| tenantId: req.tenantId, | ||||||||||||||||||||||||||||||||||
| db: req.db, | ||||||||||||||||||||||||||||||||||
| isUpsert: req.headers['x-upsert'] === 'true', | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| ; (req.raw as MultiPartRequest).log = req.log | ||||||||||||||||||||||||||||||||||
| ; (req.raw as MultiPartRequest).upload = { | ||||||||||||||||||||||||||||||||||
| storage: req.storage, | ||||||||||||||||||||||||||||||||||
| owner: req.owner, | ||||||||||||||||||||||||||||||||||
| tenantId: req.tenantId, | ||||||||||||||||||||||||||||||||||
| db: req.db, | ||||||||||||||||||||||||||||||||||
| isUpsert: req.headers['x-upsert'] === 'true', | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
| }) | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| fastify.post( | ||||||||||||||||||||||||||||||||||
|
|
@@ -358,14 +360,14 @@ const publicRoutes = fastifyPlugin( | |||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| fastify.addHook('preHandler', async (req) => { | ||||||||||||||||||||||||||||||||||
| ;(req.raw as MultiPartRequest).log = req.log | ||||||||||||||||||||||||||||||||||
| ;(req.raw as MultiPartRequest).upload = { | ||||||||||||||||||||||||||||||||||
| storage: req.storage, | ||||||||||||||||||||||||||||||||||
| owner: req.owner, | ||||||||||||||||||||||||||||||||||
| tenantId: req.tenantId, | ||||||||||||||||||||||||||||||||||
| db: req.db, | ||||||||||||||||||||||||||||||||||
| isUpsert: req.headers['x-upsert'] === 'true', | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| ; (req.raw as MultiPartRequest).log = req.log | ||||||||||||||||||||||||||||||||||
| ; (req.raw as MultiPartRequest).upload = { | ||||||||||||||||||||||||||||||||||
| storage: req.storage, | ||||||||||||||||||||||||||||||||||
| owner: req.owner, | ||||||||||||||||||||||||||||||||||
| tenantId: req.tenantId, | ||||||||||||||||||||||||||||||||||
| db: req.db, | ||||||||||||||||||||||||||||||||||
| isUpsert: req.headers['x-upsert'] === 'true', | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
| ; (req.raw as MultiPartRequest).log = req.log | |
| ; (req.raw as MultiPartRequest).upload = { | |
| storage: req.storage, | |
| owner: req.owner, | |
| tenantId: req.tenantId, | |
| db: req.db, | |
| isUpsert: req.headers['x-upsert'] === 'true', | |
| } | |
| ;(req.raw as MultiPartRequest).log = req.log | |
| ;(req.raw as MultiPartRequest).upload = { | |
| storage: req.storage, | |
| owner: req.owner, | |
| tenantId: req.tenantId, | |
| db: req.db, | |
| isUpsert: req.headers['x-upsert'] === 'true', | |
| } |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -19,6 +19,8 @@ export interface S3LockerOptions { | |||||
| renewalIntervalMs?: number | ||||||
| maxRetries?: number | ||||||
| retryDelayMs?: number | ||||||
| /** When false, skips DeleteObjectsCommand in zombie-lock cleanup and uses individual deletes. Default: true */ | ||||||
| batchDeleteEnabled?: boolean | ||||||
| logger?: Pick<Console, 'log' | 'warn' | 'error'> | ||||||
| } | ||||||
|
|
||||||
|
|
@@ -37,6 +39,7 @@ export class S3Locker implements Locker { | |||||
| private readonly renewalIntervalMs: number | ||||||
| private readonly maxRetries: number | ||||||
| private readonly retryDelayMs: number | ||||||
| private readonly batchDeleteEnabled: boolean | ||||||
| private readonly logger: Pick<Console, 'log' | 'warn' | 'error'> | ||||||
| private readonly notifier: LockNotifier | ||||||
|
|
||||||
|
|
@@ -49,6 +52,7 @@ export class S3Locker implements Locker { | |||||
| this.renewalIntervalMs = options.renewalIntervalMs || 10000 // 10 seconds | ||||||
| this.maxRetries = options.maxRetries || 10 | ||||||
| this.retryDelayMs = options.retryDelayMs || 500 | ||||||
| this.batchDeleteEnabled = options.batchDeleteEnabled !== false // default true | ||||||
| this.logger = options.logger || console | ||||||
|
|
||||||
| // Validate configuration | ||||||
|
|
@@ -244,6 +248,13 @@ export class S3Locker implements Locker { | |||||
| for (let i = 0; i < expiredLocks.length; i += 1000) { | ||||||
| const batch = expiredLocks.slice(i, i + 1000) | ||||||
|
|
||||||
| if (!this.batchDeleteEnabled) { | ||||||
| // Batch delete explicitly disabled — use individual deletes directly | ||||||
| await this.deleteLocksIndividually(batch) | ||||||
| this.logger.log(`Cleaned up ${batch.length} expired locks (individual, batch disabled)`) | ||||||
| continue | ||||||
| } | ||||||
|
|
||||||
| try { | ||||||
| await this.s3Client.send( | ||||||
| new DeleteObjectsCommand({ | ||||||
|
|
@@ -255,8 +266,18 @@ export class S3Locker implements Locker { | |||||
| }) | ||||||
| ) | ||||||
| this.logger.log(`Cleaned up ${batch.length} expired locks in batch`) | ||||||
| } catch (error) { | ||||||
| this.logger.warn(`Failed to delete batch of expired locks:`, error) | ||||||
| } catch (error: any) { | ||||||
| // Some S3-compatible backends (e.g. GCS) do not support DeleteObjects; | ||||||
| // fall back to individual deletes so zombie-lock cleanup still works. | ||||||
| const code = error?.Code ?? error?.name | ||||||
| if (code === 'NotImplemented') { | ||||||
| await this.deleteLocksIndividually(batch) | ||||||
| this.logger.log( | ||||||
| `Cleaned up ${batch.length} expired locks in batch (individual fallback)` | ||||||
| ) | ||||||
| } else { | ||||||
| this.logger.warn(`Failed to delete batch of expired locks:`, error) | ||||||
| } | ||||||
| } | ||||||
| } | ||||||
| } | ||||||
|
|
@@ -282,6 +303,28 @@ export class S3Locker implements Locker { | |||||
| } | ||||||
| } | ||||||
|
|
||||||
| /** | ||||||
| * Deletes a batch of lock keys one-by-one in parallel. | ||||||
| * NoSuchKey is ignored (lock already gone). Other errors are logged as warnings. | ||||||
| */ | ||||||
| private async deleteLocksIndividually(keys: string[]): Promise<void> { | ||||||
| const results = await Promise.allSettled( | ||||||
| keys.map((key) => | ||||||
| this.s3Client.send(new DeleteObjectCommand({ Bucket: this.bucket, Key: key })) | ||||||
| ) | ||||||
| ) | ||||||
| for (const result of results) { | ||||||
| if (result.status === 'rejected') { | ||||||
| const errCode = | ||||||
| (result.reason as { Code?: string })?.Code ?? | ||||||
| (result.reason as { name?: string })?.name | ||||||
| if (errCode !== 'NoSuchKey') { | ||||||
| this.logger.warn(`Failed to delete expired lock in fallback:`, result.reason) | ||||||
| } | ||||||
| } | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| private async checkAndCleanupExpiredLock(lockKey: string, signal: AbortSignal): Promise<boolean> { | ||||||
| if (signal.aborted) { | ||||||
| return false | ||||||
|
|
@@ -350,7 +393,7 @@ export class S3Lock implements Lock { | |||||
| private readonly id: string, | ||||||
| private readonly locker: S3Locker, | ||||||
| private readonly notifier: LockNotifier | ||||||
| ) {} | ||||||
| ) { } | ||||||
|
||||||
| ) { } | |
| ) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This introduces a new env var/config flag (
STORAGE_S3_BATCH_DELETE_ENABLED). The PR description says "No new env vars, config flags" and "No configuration required"; please either update the PR/docs to reflect the new flag (and its default), or remove the flag and rely solely on runtime NotImplemented detection.