Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions apps/backend/__tests__/unit/core/s3.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -369,4 +369,127 @@ describe('S3UseCases', () => {
)
})
})

describe('listObjects', () => {
// dbLimit for delimiter listings is min(maxKeys * 10 + 100, 10_000), so
// maxKeys=2 yields dbLimit=120 — small enough to construct test data for.
const DELIMITER_DB_LIMIT = (maxKeys: number) =>
Math.min(maxKeys * 10 + 100, 10_000)

const makeListing = (key: string) => ({
key,
cid: 'cid',
size: 0n,
lastModified: new Date(0),
})

it('advances continuation token past a folded CommonPrefix when the DB batch is exhausted inside one prefix group', async () => {
// Regression test for Cursor Bugbot finding on PR #696 / #709.
//
// Scenario: maxKeys=2, delimiter='/', and a single virtual directory
// ('big/') contains more keys than fit in one DB batch. Every fetched
// row folds into the same CommonPrefix, so the in-loop maxKeys cap is
// never hit and the loop exhausts the batch with isTruncated=false.
// The fallback branch must then set the continuation token to a value
// that sorts *after* every key in 'big/' — otherwise the next page
// re-scans the rest of that directory and emits 'big/' again.
const maxKeys = 2
const dbLimit = DELIMITER_DB_LIMIT(maxKeys)

// Fill the entire DB batch with keys that all fold into 'big/'.
const fullBatch = Array.from({ length: dbLimit }, (_, i) =>
makeListing(`big/${String(i).padStart(6, '0')}`),
)

jest
.spyOn(s3ObjectMappingsRepository, 'listObjects')
.mockResolvedValue(fullBatch as any)

const result = await S3UseCases.listObjects({
bucket: 'my-bucket',
prefix: '',
delimiter: '/',
maxKeys,
continuationToken: null,
})

expect(result.commonPrefixes).toEqual(['big/'])
expect(result.objects).toEqual([])
expect(result.isTruncated).toBe(true)
// Token must start with the folded prefix and sort strictly after every
// key inside it. `￿` (U+FFFF) is the sentinel chosen for this purpose.
expect(result.nextContinuationToken).toBe('big/￿')
// Sanity: the token sorts after the last key we returned in the batch.
expect(
result.nextContinuationToken! > fullBatch[fullBatch.length - 1].key,
).toBe(true)
})

it('uses the raw last key as the token when the last scanned key did not fold into a prefix', async () => {
// If the DB batch is full but the last key has no delimiter occurrence
// after the prefix, there's no CommonPrefix to skip past — fall back to
// the raw last key, which is the safe pre-fix behaviour.
const maxKeys = 2
const dbLimit = DELIMITER_DB_LIMIT(maxKeys)

// Pad the batch with folded entries, but make the LAST one a top-level
// key with no delimiter after the prefix.
const batch = [
...Array.from({ length: dbLimit - 1 }, (_, i) =>
makeListing(`folder/${String(i).padStart(6, '0')}`),
),
makeListing('zzz-top-level'),
]

jest
.spyOn(s3ObjectMappingsRepository, 'listObjects')
.mockResolvedValue(batch as any)

const result = await S3UseCases.listObjects({
bucket: 'my-bucket',
prefix: '',
delimiter: '/',
maxKeys,
continuationToken: null,
})

expect(result.isTruncated).toBe(true)
// The last key doesn't fold into a CommonPrefix, so the token stays as
// the raw key — no sentinel needed.
expect(result.nextContinuationToken).toBe('zzz-top-level')
})

it('uses the raw last key as the token when no delimiter is set', async () => {
// Without a delimiter, the dbLimit is maxKeys + 1, and there are no
// CommonPrefixes to repeat — the safe fallback is just the last key.
const maxKeys = 2
const dbLimit = maxKeys + 1 // = 3

const batch = [
makeListing('a.txt'),
makeListing('b.txt'),
makeListing('c.txt'),
]

jest
.spyOn(s3ObjectMappingsRepository, 'listObjects')
.mockResolvedValue(batch as any)

const result = await S3UseCases.listObjects({
bucket: 'my-bucket',
prefix: '',
delimiter: null,
maxKeys,
continuationToken: null,
})

// maxKeys=2 ⇒ first two keys returned, third triggers truncation in
// buildListResult (not the fallback), token = key just returned.
expect(result.objects.map((o) => o.key)).toEqual(['a.txt', 'b.txt'])
expect(result.isTruncated).toBe(true)
expect(result.nextContinuationToken).toBe('b.txt')
// dbLimit branch shouldn't have triggered, so no sentinel appended.
expect(batch.length).toBe(dbLimit)
})
})
})
64 changes: 53 additions & 11 deletions apps/backend/__tests__/unit/repositories/nodes.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import {
nodesRepository,
Node,
} from '../../../src/infrastructure/repositories/objects/nodes.js'
import { metadataRepository } from '../../../src/infrastructure/repositories/objects/metadata.js'
import { dbMigration } from '../../utils/dbMigrate.js'
import { MetadataType } from '@autonomys/auto-dag-data'
import { MetadataType, OffchainMetadata } from '@autonomys/auto-dag-data'

describe('Nodes Repository', () => {
beforeAll(async () => {
Expand Down Expand Up @@ -370,22 +371,22 @@ describe('Nodes Repository', () => {
expect(result?.piece_offset).toBe(100)
})

it('should remove nodes by root CID', async () => {
it('should remove encoded_node only for published nodes by root CID', async () => {
const rootCid = 'test-root-cid-remove'
const nodes: Node[] = [
{
cid: 'test-cid-remove-1',
cid: 'test-cid-remove-published',
root_cid: rootCid,
head_cid: 'test-head-cid-remove',
type: 'file',
encoded_node: 'test-encoded-node-remove-1',
piece_index: null,
piece_offset: null,
block_published_on: null,
block_published_on: 100,
tx_published_on: null,
},
{
cid: 'test-cid-remove-2',
cid: 'test-cid-remove-unpublished',
root_cid: rootCid,
head_cid: 'test-head-cid-remove',
type: 'file',
Expand All @@ -399,13 +400,16 @@ describe('Nodes Repository', () => {

await nodesRepository.saveNodes(nodes)
await nodesRepository.removeNodeDataByRootCid(rootCid)
const results = await nodesRepository.getNodesByRootCid(rootCid)
const fullNodes = await Promise.all(
results.map((r) => nodesRepository.getNode(r.cid)),

const publishedNode = await nodesRepository.getNode(
'test-cid-remove-published',
)
fullNodes.forEach((n) => {
expect(n?.encoded_node).toBeNull()
})
expect(publishedNode?.encoded_node).toBeNull()

const unpublishedNode = await nodesRepository.getNode(
'test-cid-remove-unpublished',
)
expect(unpublishedNode?.encoded_node).toBe('test-encoded-node-remove-2')
})

it('should get nodes by CIDs', async () => {
Expand Down Expand Up @@ -466,6 +470,44 @@ describe('Nodes Repository', () => {

expect(result?.block_published_on).toBe(12345)
expect(result?.tx_published_on).toBe('tx-hash')
expect(result?.encoded_node).toBe('test-encoded-node-published')
})

it('should clear encoded_node on publish when metadata is already archived', async () => {
const rootCid = 'test-root-cid-publish-archived'
const headCid = 'test-head-cid-publish-archived'
const metadata: OffchainMetadata = {
totalSize: 100n,
type: 'file',
dataCid: 'test-data-cid-publish-archived',
totalChunks: 1,
chunks: [],
name: 'test-file-publish-archived',
}

await metadataRepository.setMetadata(rootCid, headCid, metadata)
await metadataRepository.markAsArchived(headCid)

const node: Node = {
cid: 'test-cid-publish-after-archive',
root_cid: rootCid,
head_cid: headCid,
type: 'file',
encoded_node: 'data-that-should-be-cleared',
piece_index: null,
piece_offset: null,
block_published_on: null,
tx_published_on: null,
}

await nodesRepository.saveNode(node)
await nodesRepository.updateNodePublishedOn(node.cid, 99999, 'tx-recovery')

const result = await nodesRepository.getNode(node.cid)

expect(result?.block_published_on).toBe(99999)
expect(result?.tx_published_on).toBe('tx-recovery')
expect(result?.encoded_node).toBeNull()
})

it('should get uploaded nodes by root CID', async () => {
Expand Down
16 changes: 16 additions & 0 deletions apps/backend/src/core/objects/publishingRecovery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,22 @@ const processPublishingRecovery = async (): Promise<void> => {
const runRecoveryBatch = async (): Promise<void> => {
const maxPerCycle = config.publishingRecovery.maxObjectsPerCycle

// Surface objects that are stuck in an unrecoverable state — they have
// unpublished nodes whose encoded_node was already stripped (e.g. by
// archival before publishing completed). These cannot be recovered
// automatically and need operational attention.
const unrecoverable =
await nodesRepository.getUnrecoverablePublishingRootCids(maxPerCycle)
if (unrecoverable.length > 0) {
logger.warn(
'Found %d objects with unrecoverable unpublished nodes (data stripped before publishing): %s',
unrecoverable.length,
unrecoverable
.map((r) => `${r.root_cid} (${r.unrecoverable_count} nodes)`)
.join(', '),
)
}

const stuckRootCids = await nodesRepository.getStuckPublishingRootCids(
maxPerCycle,
config.publishingRecovery.stalenessThresholdBlocks,
Expand Down
26 changes: 25 additions & 1 deletion apps/backend/src/core/s3/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,31 @@ const listObjects = async (
// harmless, but silently dropping data is not.
if (!isTruncated && allMatching.length === dbLimit) {
isTruncated = true
nextContinuationToken = allMatching[allMatching.length - 1].key
const lastKey = allMatching[allMatching.length - 1].key

// If the last scanned key folded into a CommonPrefix, the naive choice
// `lastKey` would land *inside* a virtual directory we've already
// represented in commonPrefixes. The next page's `key > token` query
// would then return the remaining keys in that directory, which would
// re-fold into — and re-emit — the same CommonPrefix entry.
//
// Advance the token past every key that could possibly fold into that
// prefix by appending a high-sort sentinel. `￿` (encoded as
// 0xEF 0xBF 0xBF in UTF-8) sorts after every realistic S3 key character,
// so `key > token` skips the rest of the directory and resumes at the
// first key that falls outside it.
if (delimiter) {
const afterPrefix = lastKey.slice(prefix.length)
const delimIdx = afterPrefix.indexOf(delimiter)
if (delimIdx >= 0) {
const lastFoldedPrefix = prefix + afterPrefix.slice(0, delimIdx + 1)
nextContinuationToken = lastFoldedPrefix + '￿'
} else {
nextContinuationToken = lastKey
}
} else {
nextContinuationToken = lastKey
}
}

return {
Expand Down
Loading
Loading