Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions apps/backend/__tests__/unit/repositories/nodes.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -370,22 +370,22 @@ describe('Nodes Repository', () => {
expect(result?.piece_offset).toBe(100)
})

it('should remove nodes by root CID', async () => {
it('should remove encoded_node only for published nodes by root CID', async () => {
const rootCid = 'test-root-cid-remove'
const nodes: Node[] = [
{
cid: 'test-cid-remove-1',
cid: 'test-cid-remove-published',
root_cid: rootCid,
head_cid: 'test-head-cid-remove',
type: 'file',
encoded_node: 'test-encoded-node-remove-1',
piece_index: null,
piece_offset: null,
block_published_on: null,
block_published_on: 100,
tx_published_on: null,
},
{
cid: 'test-cid-remove-2',
cid: 'test-cid-remove-unpublished',
root_cid: rootCid,
head_cid: 'test-head-cid-remove',
type: 'file',
Expand All @@ -399,13 +399,16 @@ describe('Nodes Repository', () => {

await nodesRepository.saveNodes(nodes)
await nodesRepository.removeNodeDataByRootCid(rootCid)
const results = await nodesRepository.getNodesByRootCid(rootCid)
const fullNodes = await Promise.all(
results.map((r) => nodesRepository.getNode(r.cid)),

const publishedNode = await nodesRepository.getNode(
'test-cid-remove-published',
)
fullNodes.forEach((n) => {
expect(n?.encoded_node).toBeNull()
})
expect(publishedNode?.encoded_node).toBeNull()

const unpublishedNode = await nodesRepository.getNode(
'test-cid-remove-unpublished',
)
expect(unpublishedNode?.encoded_node).toBe('test-encoded-node-remove-2')
})

it('should get nodes by CIDs', async () => {
Expand Down
16 changes: 16 additions & 0 deletions apps/backend/src/core/objects/publishingRecovery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,22 @@ const processPublishingRecovery = async (): Promise<void> => {
const runRecoveryBatch = async (): Promise<void> => {
const maxPerCycle = config.publishingRecovery.maxObjectsPerCycle

// Surface objects that are stuck in an unrecoverable state — they have
// unpublished nodes whose encoded_node was already stripped (e.g. by
// archival before publishing completed). These cannot be recovered
// automatically and need operational attention.
const unrecoverable =
await nodesRepository.getUnrecoverablePublishingRootCids(maxPerCycle)
if (unrecoverable.length > 0) {
logger.warn(
'Found %d objects with unrecoverable unpublished nodes (data stripped before publishing): %s',
unrecoverable.length,
unrecoverable
.map((r) => `${r.root_cid} (${r.unrecoverable_count} nodes)`)
.join(', '),
)
}

const stuckRootCids = await nodesRepository.getStuckPublishingRootCids(
maxPerCycle,
config.publishingRecovery.stalenessThresholdBlocks,
Expand Down
56 changes: 55 additions & 1 deletion apps/backend/src/infrastructure/repositories/objects/nodes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,22 @@ const setNodeArchivingData = async ({
})
}

/**
* Removes encoded_node data for all *published* nodes under a root_cid.
*
* Only nodes with `block_published_on IS NOT NULL` are stripped — their
* data is already immutably on-chain and the local copy is no longer
* needed. Unpublished nodes retain their encoded_node so the publishing
* recovery job can still re-enqueue them.
*/
const removeNodeDataByRootCid = async (rootCid: string) => {
const db = await getDatabase()

return db.query({
text: 'UPDATE nodes SET encoded_node = NULL WHERE root_cid = $1',
text: `UPDATE nodes
SET encoded_node = NULL
WHERE root_cid = $1
AND block_published_on IS NOT NULL`,
Comment thread
cursor[bot] marked this conversation as resolved.
values: [rootCid],
})
}
Expand Down Expand Up @@ -421,6 +432,10 @@ const getFullyArchivedHeadCids = async (
*
* The staleness filter prevents false positives on objects that are
* still being actively published in the normal pipeline.
*
* Objects where every unpublished node has `encoded_node IS NULL`
* (i.e. archived before publishing completed) are excluded — they
* are un-publishable and would only cause repeated failures.
*/
const getStuckPublishingRootCids = async (
limit: number,
Expand All @@ -436,6 +451,9 @@ const getStuckPublishingRootCids = async (
GROUP BY root_cid
HAVING COUNT(block_published_on) > 0
AND COUNT(block_published_on) < COUNT(*)
AND COUNT(*) FILTER (
WHERE block_published_on IS NULL AND encoded_node IS NOT NULL
) > 0
AND MAX(block_published_on) + $2 < (
SELECT MAX(block_published_on) FROM nodes
)
Expand All @@ -448,6 +466,8 @@ const getStuckPublishingRootCids = async (

/**
* Returns CIDs of unpublished nodes for a given root_cid.
* Only returns nodes that still have encoded_node data — nodes whose
* data was removed (e.g. by archival) are un-publishable and excluded.
*/
const getUnpublishedNodeCidsByRootCid = async (
rootCid: string,
Expand All @@ -461,12 +481,45 @@ const getUnpublishedNodeCidsByRootCid = async (
FROM nodes
WHERE root_cid = $1
AND block_published_on IS NULL
AND encoded_node IS NOT NULL
`,
values: [rootCid],
})
.then((e) => e.rows.map((r) => r.cid))
}

/**
* Returns root_cids that have nodes stuck in an unrecoverable state:
* `block_published_on IS NULL` (never published) AND `encoded_node IS NULL`
* (data already stripped, e.g. by a prior archival bug).
*
* These objects can never complete publishing from local state alone.
* The caller should surface them for operational attention.
*/
const getUnrecoverablePublishingRootCids = async (
limit: number,
): Promise<{ root_cid: string; unrecoverable_count: number }[]> => {
const db = await getDatabase()

return db
.query<{ root_cid: string; unrecoverable_count: number }>({
text: `
SELECT root_cid,
COUNT(*) FILTER (
WHERE block_published_on IS NULL AND encoded_node IS NULL
)::int AS unrecoverable_count
FROM nodes
GROUP BY root_cid
HAVING COUNT(*) FILTER (
WHERE block_published_on IS NULL AND encoded_node IS NULL
) > 0
LIMIT $1
`,
values: [limit],
})
.then((e) => e.rows)
}

export const nodesRepository = {
getNode,
getNodeCount,
Expand All @@ -493,4 +546,5 @@ export const nodesRepository = {
getFullyArchivedHeadCids,
getStuckPublishingRootCids,
getUnpublishedNodeCidsByRootCid,
getUnrecoverablePublishingRootCids,
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,30 @@ const publishNodes = async (cids: string[]) => {
const repeatedNodes = nodes.filter((node) => publishedCidSet.has(node.cid))
await NodesUseCases.handleRepeatedNodes(repeatedNodes)

// Nodes that are not known as published anywhere should be sent for publishing
const publishingNodes = nodes.filter((node) => !publishedCidSet.has(node.cid))
// Nodes that are not known as published anywhere should be sent for publishing.
// Filter out nodes whose encoded_node has been removed (e.g. after archival) —
// these are un-publishable and must not reach the Buffer.from() call.
const publishingNodes = nodes.filter(
(node) => !publishedCidSet.has(node.cid) && node.encoded_node != null,
)

const skippedNullCount = nodes.filter(
(node) => !publishedCidSet.has(node.cid) && node.encoded_node == null,
).length
if (skippedNullCount > 0) {
logger.warn(
'Skipped %d nodes with NULL encoded_node (archived before publishing)',
skippedNullCount,
)
}

if (publishingNodes.length === 0) {
logger.info('No publishable nodes remaining after filtering')
return
Comment thread
EmilFattakhov marked this conversation as resolved.
}

const transactions = publishingNodes.map((node) => {
const buffer = Buffer.from(node.encoded_node, 'base64')
const buffer = Buffer.from(node.encoded_node!, 'base64')

return {
module: 'system',
Expand Down
Loading