From d10dce5dfc0ea0844c9f36d1a74a91b0f43e7d0c Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Mon, 8 Dec 2025 12:40:07 +0200 Subject: [PATCH 01/17] add --partition flag to stream publish command --- packages/cli-tools/bin/streamr-stream-publish.ts | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/packages/cli-tools/bin/streamr-stream-publish.ts b/packages/cli-tools/bin/streamr-stream-publish.ts index 53f323e30a..b27bba6b3c 100755 --- a/packages/cli-tools/bin/streamr-stream-publish.ts +++ b/packages/cli-tools/bin/streamr-stream-publish.ts @@ -6,8 +6,10 @@ import { StreamrClient } from '@streamr/sdk' import { hexToBinary, wait } from '@streamr/utils' import es from 'event-stream' import { createClientCommand, Options as BaseOptions } from '../src/command' +import { createFnParseInt } from '../src/common' interface Options extends BaseOptions { + partition?: number partitionKeyField?: string } @@ -16,7 +18,8 @@ const isHexadecimal = (str: string): boolean => { } const publishStream = ( - stream: string, + streamId: string, + partition: number | undefined, partitionKeyField: string | undefined, client: StreamrClient ): Writable => { @@ -42,7 +45,7 @@ const publishStream = ( } } const partitionKey = (partitionKeyField !== undefined && typeof message === 'object') ? message[partitionKeyField] : undefined - client.publish(stream, message, { partitionKey }).then( + client.publish({ id: streamId, partition }, message, { partitionKey }).then( () => done(), (err) => done(err) ) @@ -52,7 +55,7 @@ const publishStream = ( } createClientCommand(async (client: StreamrClient, streamId: string, options: Options) => { - const ps = publishStream(streamId, options.partitionKeyField, client) + const ps = publishStream(streamId, options.partition, options.partitionKeyField, client) return new Promise((resolve, reject) => { process.stdin .pipe(es.split()) @@ -73,6 +76,7 @@ createClientCommand(async (client: StreamrClient, streamId: string, options: Opt }) .arguments('') .description('publish to a stream by reading JSON messages from stdin line-by-line or hexadecimal strings for binary data') + .option('-p, --partition [partition]', 'partition', createFnParseInt('--partition'), undefined) // eslint-disable-next-line max-len .option('-k, --partition-key-field ', 'field name in each message to use for assigning the message to a stream partition (only for JSON data)') .parseAsync() From d93a213a4ac7ba799193f0b713fb9b0916412902 Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Mon, 8 Dec 2025 12:40:07 +0200 Subject: [PATCH 02/17] add tests --- .../cli-tools/test/stream-publish.test.ts | 74 +++++++++++++++++++ packages/cli-tools/test/utils.ts | 5 ++ 2 files changed, 79 insertions(+) create mode 100644 packages/cli-tools/test/stream-publish.test.ts diff --git a/packages/cli-tools/test/stream-publish.test.ts b/packages/cli-tools/test/stream-publish.test.ts new file mode 100644 index 0000000000..571c4af7e8 --- /dev/null +++ b/packages/cli-tools/test/stream-publish.test.ts @@ -0,0 +1,74 @@ +import StreamrClient, { StreamPermission } from '@streamr/sdk' +import { createTestPrivateKey } from '@streamr/test-utils' +import { keyToArrayIndex, StreamID } from '@streamr/utils' +import range from 'lodash/range' +import { createTestClient, nextValue, runCommand } from './utils' +import { Wallet } from 'ethers' + +const PARTITION_COUNT = 10 + +describe('stream-publish', () => { + + let streamId: StreamID + let publisherPrivateKey: string + let subscriberPrivateKey: string + + function createSubscriber(): StreamrClient { + return createTestClient(subscriberPrivateKey) + } + + beforeEach(async () => { + publisherPrivateKey = await createTestPrivateKey({ gas: true }) + subscriberPrivateKey = await createTestPrivateKey({ gas: true }) + const client = createTestClient(await createTestPrivateKey({ gas: true })) + const stream = await client.createStream({ id: `/${Date.now()}`, partitions: PARTITION_COUNT }) + await stream.grantPermissions({ + userId: new Wallet(publisherPrivateKey).address, + permissions: [StreamPermission.PUBLISH] + }, { + userId: new Wallet(subscriberPrivateKey).address, + permissions: [StreamPermission.SUBSCRIBE] + }) + streamId = stream.id + await client.destroy() + }) + + function publishViaCliCommand(additionalArgs: string[] = []) { + const args = [streamId, ...additionalArgs] + setImmediate(async () => { + await runCommand(`stream publish ${args.join(' ')}`, { + inputLines: [JSON.stringify({ foo: 123 })], + privateKey: publisherPrivateKey + }) + }) + } + + it('happy path', async () => { + const subscriber = createSubscriber() + const subscriptions = await Promise.all(range(PARTITION_COUNT).map((partition) => subscriber.subscribe({ id: streamId, partition }))) + publishViaCliCommand() + const receivedMessage = await Promise.race(subscriptions.map((s) => nextValue(s[Symbol.asyncIterator]()))) + expect(receivedMessage!.content).toEqual({ foo: 123 }) + await subscriber.destroy() + }) + + it('explicit partition', async () => { + const PARTITION = 5 + const subscriber = createSubscriber() + const subscription = await subscriber.subscribe({ id: streamId, partition: PARTITION }) + publishViaCliCommand([`--partition ${PARTITION}`]) + const receivedMessage = await nextValue(subscription[Symbol.asyncIterator]()) + expect(receivedMessage!.content).toEqual({ foo: 123 }) + await subscriber.destroy() + }) + + it('partition key field', async () => { + const partition = keyToArrayIndex(PARTITION_COUNT, 123) + const subscriber = createSubscriber() + const subscription = await subscriber.subscribe({ id: streamId, partition }) + publishViaCliCommand(['--partition-key-field foo']) + const receivedMessage = await nextValue(subscription[Symbol.asyncIterator]()) + expect(receivedMessage!.content).toEqual({ foo: 123 }) + await subscriber.destroy() + }) +}) diff --git a/packages/cli-tools/test/utils.ts b/packages/cli-tools/test/utils.ts index 6d14c1eefb..f06414b4a1 100644 --- a/packages/cli-tools/test/utils.ts +++ b/packages/cli-tools/test/utils.ts @@ -96,3 +96,8 @@ export const deployTestOperatorContract = async (opts: Omit): Promise => { return _operatorContractUtils.deploySponsorshipContract({ ...opts, 'environmentId': 'dev2' }) } + +export const nextValue = async (source: AsyncIterator): Promise => { + const item = source.next() + return (await item).value +} From d0ce3acfbef1db0edd38e4ad983c84b00cb6599a Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Mon, 8 Dec 2025 12:40:07 +0200 Subject: [PATCH 03/17] convert publish-subscribe test to subscribe test --- ...cribe.test.ts => stream-subscribe.test.ts} | 74 ++++++++++--------- 1 file changed, 38 insertions(+), 36 deletions(-) rename packages/cli-tools/test/{stream-publish-subscribe.test.ts => stream-subscribe.test.ts} (69%) diff --git a/packages/cli-tools/test/stream-publish-subscribe.test.ts b/packages/cli-tools/test/stream-subscribe.test.ts similarity index 69% rename from packages/cli-tools/test/stream-publish-subscribe.test.ts rename to packages/cli-tools/test/stream-subscribe.test.ts index 724cd9b606..872c18cbfe 100644 --- a/packages/cli-tools/test/stream-publish-subscribe.test.ts +++ b/packages/cli-tools/test/stream-subscribe.test.ts @@ -1,74 +1,73 @@ -import { StreamPermission } from '@streamr/sdk' +import StreamrClient, { StreamPermission } from '@streamr/sdk' import { createTestPrivateKey } from '@streamr/test-utils' -import { collect } from '@streamr/utils' +import { collect, StreamID } from '@streamr/utils' +import { createTestClient, startCommand } from './utils' import { Wallet } from 'ethers' -import { createTestClient, runCommand, startCommand } from './utils' -const TIMEOUT = 30 * 1000 - -describe('publish and subscribe', () => { +describe('stream-subscribe', () => { + let streamId: StreamID let publisherPrivateKey: string let subscriberPrivateKey: string - let streamId: string - beforeAll(async () => { + beforeEach(async () => { publisherPrivateKey = await createTestPrivateKey({ gas: true }) subscriberPrivateKey = await createTestPrivateKey({ gas: true }) - const client = createTestClient(publisherPrivateKey) + const client = createTestClient(await createTestPrivateKey({ gas: true })) const stream = await client.createStream(`/${Date.now()}`) await stream.grantPermissions({ + userId: new Wallet(publisherPrivateKey).address, + permissions: [StreamPermission.PUBLISH] + }, { userId: new Wallet(subscriberPrivateKey).address, permissions: [StreamPermission.SUBSCRIBE] }) streamId = stream.id await client.destroy() - }, TIMEOUT) + }) - function publishViaCliCommand() { - setImmediate(async () => { - await runCommand(`stream publish ${streamId}`, { - inputLines: [JSON.stringify({ foo: 123 })], - privateKey: publisherPrivateKey - }) - }) + async function publishTestMesssage(): Promise { + const publisher = createTestClient(publisherPrivateKey) + await publisher.publish(streamId, { foo: 123 }) + return publisher } it('happy path', async () => { const subscriberAbortController = new AbortController() const subscriberOutputIterable = startCommand(`stream subscribe ${streamId}`, { - privateKey: subscriberPrivateKey, - abortSignal: subscriberAbortController.signal + abortSignal: subscriberAbortController.signal, + privateKey: subscriberPrivateKey }) - publishViaCliCommand() + const publisher = await publishTestMesssage() const receivedMessage = (await collect(subscriberOutputIterable, 1))[0] - subscriberAbortController.abort() expect(JSON.parse(receivedMessage)).toEqual({ foo: 123 }) - }, TIMEOUT) + await publisher.destroy() + subscriberAbortController.abort() + }) - it('raw subscription', async () => { + it('raw', async () => { const subscriberAbortController = new AbortController() const subscriberOutputIterable = startCommand(`stream subscribe ${streamId} --raw`, { - privateKey: subscriberPrivateKey, abortSignal: subscriberAbortController.signal, + privateKey: subscriberPrivateKey }) - publishViaCliCommand() + const publisher = await publishTestMesssage() const receivedMessage = (await collect(subscriberOutputIterable, 1))[0] - subscriberAbortController.abort() expect(receivedMessage).toMatch(/^[0-9a-fA-F]+$/) - }, TIMEOUT) + await publisher.destroy() + subscriberAbortController.abort() + }) it('with metadata', async () => { const subscriberAbortController = new AbortController() const subscriberOutputIterable = startCommand(`stream subscribe ${streamId} --with-metadata`, { - privateKey: subscriberPrivateKey, abortSignal: subscriberAbortController.signal, + privateKey: subscriberPrivateKey }) - publishViaCliCommand() + const publisher = await publishTestMesssage() const receivedMessage = (await collect(subscriberOutputIterable, 1))[0] - subscriberAbortController.abort() expect(JSON.parse(receivedMessage)).toMatchObject({ content: { foo: 123 @@ -83,17 +82,18 @@ describe('publish and subscribe', () => { msgChainId: expect.stringMatching(/[0-9a-zA-Z]+/) } }) - }, TIMEOUT) + await publisher.destroy() + subscriberAbortController.abort() + }) - it('with metadata and raw', async () => { + it('with metadata, receive as raw', async () => { const subscriberAbortController = new AbortController() const subscriberOutputIterable = startCommand(`stream subscribe ${streamId} --with-metadata --raw`, { - privateKey: subscriberPrivateKey, abortSignal: subscriberAbortController.signal, + privateKey: subscriberPrivateKey }) - publishViaCliCommand() + const publisher = await publishTestMesssage() const receivedMessage = (await collect(subscriberOutputIterable, 1))[0] - subscriberAbortController.abort() expect(JSON.parse(receivedMessage)).toMatchObject({ content: expect.stringMatching(/^[0-9a-fA-F]+$/), metadata: { @@ -106,5 +106,7 @@ describe('publish and subscribe', () => { msgChainId: expect.stringMatching(/[0-9a-zA-Z]+/) } }) - }, TIMEOUT) + await publisher.destroy() + subscriberAbortController.abort() + }) }) From 55a7622763dea5300a5faa6fce850a417af42777 Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Mon, 8 Dec 2025 13:00:11 +0200 Subject: [PATCH 04/17] changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5d48257255..3942af91c2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,8 @@ Changes before Tatum release are not documented in this file. ### @streamr/cli-tools +- Add `--partition` flag to `stream publish` (https://github.com/streamr-dev/network/pull/3262) + #### Added #### Changed From 3c60e4cb965676409f5bb84cd22dacfa6b98877b Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Mon, 8 Dec 2025 13:40:03 +0200 Subject: [PATCH 05/17] fix default value handling --- packages/cli-tools/bin/streamr-stream-publish.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/cli-tools/bin/streamr-stream-publish.ts b/packages/cli-tools/bin/streamr-stream-publish.ts index b27bba6b3c..d6ef0c3065 100755 --- a/packages/cli-tools/bin/streamr-stream-publish.ts +++ b/packages/cli-tools/bin/streamr-stream-publish.ts @@ -76,7 +76,7 @@ createClientCommand(async (client: StreamrClient, streamId: string, options: Opt }) .arguments('') .description('publish to a stream by reading JSON messages from stdin line-by-line or hexadecimal strings for binary data') - .option('-p, --partition [partition]', 'partition', createFnParseInt('--partition'), undefined) + .option('-p, --partition ', 'partition', createFnParseInt('--partition')) // eslint-disable-next-line max-len .option('-k, --partition-key-field ', 'field name in each message to use for assigning the message to a stream partition (only for JSON data)') .parseAsync() From f3e97634a18b2c51fe854f137e32f2dd57e69fb9 Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Mon, 8 Dec 2025 13:52:53 +0200 Subject: [PATCH 06/17] simplify --- packages/cli-tools/bin/streamr-stream-publish.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/cli-tools/bin/streamr-stream-publish.ts b/packages/cli-tools/bin/streamr-stream-publish.ts index d6ef0c3065..40c79aba5e 100755 --- a/packages/cli-tools/bin/streamr-stream-publish.ts +++ b/packages/cli-tools/bin/streamr-stream-publish.ts @@ -45,7 +45,7 @@ const publishStream = ( } } const partitionKey = (partitionKeyField !== undefined && typeof message === 'object') ? message[partitionKeyField] : undefined - client.publish({ id: streamId, partition }, message, { partitionKey }).then( + client.publish({ streamId, partition }, message, { partitionKey }).then( () => done(), (err) => done(err) ) From 3cf70088e223f9f69555c47fcbb53e58f637a64e Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Mon, 8 Dec 2025 14:23:01 +0200 Subject: [PATCH 07/17] add validation --- packages/cli-tools/bin/streamr-stream-publish.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/packages/cli-tools/bin/streamr-stream-publish.ts b/packages/cli-tools/bin/streamr-stream-publish.ts index 40c79aba5e..aa0cba3426 100755 --- a/packages/cli-tools/bin/streamr-stream-publish.ts +++ b/packages/cli-tools/bin/streamr-stream-publish.ts @@ -55,6 +55,10 @@ const publishStream = ( } createClientCommand(async (client: StreamrClient, streamId: string, options: Options) => { + if ((options.partition !== undefined) && (options.partitionKeyField !== undefined)) { + console.error('Invalid combination of "partition" and "partition-key-field"') + process.exit(1) + } const ps = publishStream(streamId, options.partition, options.partitionKeyField, client) return new Promise((resolve, reject) => { process.stdin From 1ed9a779fbb9b12399c01beb1f25908ef2d47a73 Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Mon, 8 Dec 2025 14:36:02 +0200 Subject: [PATCH 08/17] rename variable --- packages/cli-tools/bin/streamr-stream-publish.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/cli-tools/bin/streamr-stream-publish.ts b/packages/cli-tools/bin/streamr-stream-publish.ts index aa0cba3426..7297bab8ec 100755 --- a/packages/cli-tools/bin/streamr-stream-publish.ts +++ b/packages/cli-tools/bin/streamr-stream-publish.ts @@ -26,7 +26,7 @@ const publishStream = ( const writable = new Writable({ objectMode: true, write: (data: any, _: any, done: any) => { - let message = null + let content = null // ignore newlines, etc if (!data || String(data).trim() === '') { done() @@ -34,18 +34,18 @@ const publishStream = ( } const trimmedData = String(data).trim() if (isHexadecimal(trimmedData)) { - message = hexToBinary(trimmedData) + content = hexToBinary(trimmedData) } else { try { - message = JSON.parse(trimmedData) + content = JSON.parse(trimmedData) } catch (e) { console.error(data.toString()) done(e) return } } - const partitionKey = (partitionKeyField !== undefined && typeof message === 'object') ? message[partitionKeyField] : undefined - client.publish({ streamId, partition }, message, { partitionKey }).then( + const partitionKey = (partitionKeyField !== undefined && typeof content === 'object') ? content[partitionKeyField] : undefined + client.publish({ streamId, partition }, content, { partitionKey }).then( () => done(), (err) => done(err) ) From fd1c728e7aaab7c76828f070236e7b605f8c89f7 Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Mon, 8 Dec 2025 14:36:02 +0200 Subject: [PATCH 09/17] no need to initialize to null --- packages/cli-tools/bin/streamr-stream-publish.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/cli-tools/bin/streamr-stream-publish.ts b/packages/cli-tools/bin/streamr-stream-publish.ts index 7297bab8ec..c7975eff32 100755 --- a/packages/cli-tools/bin/streamr-stream-publish.ts +++ b/packages/cli-tools/bin/streamr-stream-publish.ts @@ -26,7 +26,7 @@ const publishStream = ( const writable = new Writable({ objectMode: true, write: (data: any, _: any, done: any) => { - let content = null + let content // ignore newlines, etc if (!data || String(data).trim() === '') { done() From 2751b4200e70052d7b1ae8a9a987c43f5af2bfa3 Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Mon, 8 Dec 2025 14:36:02 +0200 Subject: [PATCH 10/17] add type --- packages/cli-tools/bin/streamr-stream-publish.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/cli-tools/bin/streamr-stream-publish.ts b/packages/cli-tools/bin/streamr-stream-publish.ts index c7975eff32..5fe728ff80 100755 --- a/packages/cli-tools/bin/streamr-stream-publish.ts +++ b/packages/cli-tools/bin/streamr-stream-publish.ts @@ -26,7 +26,7 @@ const publishStream = ( const writable = new Writable({ objectMode: true, write: (data: any, _: any, done: any) => { - let content + let content: any // ignore newlines, etc if (!data || String(data).trim() === '') { done() From 30e738e9688269b5c081d5c4ace5970aa875f63c Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Mon, 8 Dec 2025 14:36:02 +0200 Subject: [PATCH 11/17] add --with-metadata flag --- .../cli-tools/bin/streamr-stream-publish.ts | 32 +++++++++++++++---- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/packages/cli-tools/bin/streamr-stream-publish.ts b/packages/cli-tools/bin/streamr-stream-publish.ts index 5fe728ff80..4fe56908ee 100755 --- a/packages/cli-tools/bin/streamr-stream-publish.ts +++ b/packages/cli-tools/bin/streamr-stream-publish.ts @@ -1,16 +1,17 @@ #!/usr/bin/env node import '../src/logLevel' -import { Writable } from 'stream' -import { StreamrClient } from '@streamr/sdk' -import { hexToBinary, wait } from '@streamr/utils' +import { PublishMetadata, StreamrClient } from '@streamr/sdk' +import { hexToBinary, merge, wait } from '@streamr/utils' import es from 'event-stream' -import { createClientCommand, Options as BaseOptions } from '../src/command' +import { Writable } from 'stream' +import { Options as BaseOptions, createClientCommand } from '../src/command' import { createFnParseInt } from '../src/common' interface Options extends BaseOptions { partition?: number partitionKeyField?: string + withMetadata: boolean } const isHexadecimal = (str: string): boolean => { @@ -21,12 +22,14 @@ const publishStream = ( streamId: string, partition: number | undefined, partitionKeyField: string | undefined, + withMetadata: boolean, client: StreamrClient ): Writable => { const writable = new Writable({ objectMode: true, write: (data: any, _: any, done: any) => { let content: any + let metadata: PublishMetadata // ignore newlines, etc if (!data || String(data).trim() === '') { done() @@ -34,10 +37,24 @@ const publishStream = ( } const trimmedData = String(data).trim() if (isHexadecimal(trimmedData)) { + if (withMetadata) { + throw new Error('hex input is not supported when publishing with metadata') + } content = hexToBinary(trimmedData) + metadata = {} } else { try { - content = JSON.parse(trimmedData) + const payload = JSON.parse(trimmedData) + if (withMetadata) { + content = payload.content + if (content === undefined) { + throw new Error('invalid input: no content') + } + metadata = payload.metadata ?? {} + } else { + content = payload + metadata = {} + } } catch (e) { console.error(data.toString()) done(e) @@ -45,7 +62,7 @@ const publishStream = ( } } const partitionKey = (partitionKeyField !== undefined && typeof content === 'object') ? content[partitionKeyField] : undefined - client.publish({ streamId, partition }, content, { partitionKey }).then( + client.publish({ streamId, partition }, content, merge(metadata, { partitionKey })).then( () => done(), (err) => done(err) ) @@ -59,7 +76,7 @@ createClientCommand(async (client: StreamrClient, streamId: string, options: Opt console.error('Invalid combination of "partition" and "partition-key-field"') process.exit(1) } - const ps = publishStream(streamId, options.partition, options.partitionKeyField, client) + const ps = publishStream(streamId, options.partition, options.partitionKeyField, options.withMetadata, client) return new Promise((resolve, reject) => { process.stdin .pipe(es.split()) @@ -83,4 +100,5 @@ createClientCommand(async (client: StreamrClient, streamId: string, options: Opt .option('-p, --partition ', 'partition', createFnParseInt('--partition')) // eslint-disable-next-line max-len .option('-k, --partition-key-field ', 'field name in each message to use for assigning the message to a stream partition (only for JSON data)') + .option('-m, --with-metadata', 'each input contains both the content and the metadata', false) .parseAsync() From c78d89ccf21e283b2eabf60f2582279e3478c466 Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Mon, 8 Dec 2025 14:36:02 +0200 Subject: [PATCH 12/17] add test --- .../cli-tools/test/stream-publish.test.ts | 24 ++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/packages/cli-tools/test/stream-publish.test.ts b/packages/cli-tools/test/stream-publish.test.ts index 571c4af7e8..52482fec57 100644 --- a/packages/cli-tools/test/stream-publish.test.ts +++ b/packages/cli-tools/test/stream-publish.test.ts @@ -33,11 +33,11 @@ describe('stream-publish', () => { await client.destroy() }) - function publishViaCliCommand(additionalArgs: string[] = []) { + function publishViaCliCommand(payload: any, additionalArgs: string[] = []) { const args = [streamId, ...additionalArgs] setImmediate(async () => { await runCommand(`stream publish ${args.join(' ')}`, { - inputLines: [JSON.stringify({ foo: 123 })], + inputLines: [JSON.stringify(payload)], privateKey: publisherPrivateKey }) }) @@ -46,7 +46,7 @@ describe('stream-publish', () => { it('happy path', async () => { const subscriber = createSubscriber() const subscriptions = await Promise.all(range(PARTITION_COUNT).map((partition) => subscriber.subscribe({ id: streamId, partition }))) - publishViaCliCommand() + publishViaCliCommand({ foo: 123 }) const receivedMessage = await Promise.race(subscriptions.map((s) => nextValue(s[Symbol.asyncIterator]()))) expect(receivedMessage!.content).toEqual({ foo: 123 }) await subscriber.destroy() @@ -56,7 +56,7 @@ describe('stream-publish', () => { const PARTITION = 5 const subscriber = createSubscriber() const subscription = await subscriber.subscribe({ id: streamId, partition: PARTITION }) - publishViaCliCommand([`--partition ${PARTITION}`]) + publishViaCliCommand({ foo: 123 }, [`--partition ${PARTITION}`]) const receivedMessage = await nextValue(subscription[Symbol.asyncIterator]()) expect(receivedMessage!.content).toEqual({ foo: 123 }) await subscriber.destroy() @@ -66,9 +66,21 @@ describe('stream-publish', () => { const partition = keyToArrayIndex(PARTITION_COUNT, 123) const subscriber = createSubscriber() const subscription = await subscriber.subscribe({ id: streamId, partition }) - publishViaCliCommand(['--partition-key-field foo']) + publishViaCliCommand({ foo: 123 }, ['--partition-key-field foo']) const receivedMessage = await nextValue(subscription[Symbol.asyncIterator]()) expect(receivedMessage!.content).toEqual({ foo: 123 }) await subscriber.destroy() - }) + }) + + it('with metadata', async () => { + const PARTITION = 5 + const CONTENT = { content: { foo: 123 }, metadata: { msgChainId: 'testMsgChainId' } } + const subscriber = createSubscriber() + const subscription = await subscriber.subscribe({ id: streamId, partition: PARTITION }) + publishViaCliCommand(CONTENT, ['--with-metadata', `--partition ${PARTITION}`]) + const receivedMessage = await nextValue(subscription[Symbol.asyncIterator]()) + expect(receivedMessage!.content).toEqual({ foo: 123 }) + expect(receivedMessage!.msgChainId).toEqual('testMsgChainId') + await subscriber.destroy() + }) }) From 879dca8e852ea84fd654ff9be695394819ba6b3d Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Mon, 8 Dec 2025 15:10:26 +0200 Subject: [PATCH 13/17] add test-utils custom matchers --- packages/cli-tools/jest.config.ts | 13 ++++++++++++- packages/cli-tools/tsconfig.jest.json | 5 +++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/packages/cli-tools/jest.config.ts b/packages/cli-tools/jest.config.ts index 550da64839..1c1d1df1b3 100644 --- a/packages/cli-tools/jest.config.ts +++ b/packages/cli-tools/jest.config.ts @@ -1 +1,12 @@ -export { default } from '../../jest.config' +import type { Config } from '@jest/types' +import defaultConfig from '../../jest.config' + +const config: Config.InitialOptions = { + ...defaultConfig, + setupFilesAfterEnv: [ + ...defaultConfig.setupFilesAfterEnv, + '@streamr/test-utils/setupCustomMatchers' + ] +} + +export default config diff --git a/packages/cli-tools/tsconfig.jest.json b/packages/cli-tools/tsconfig.jest.json index 34a50f031c..9f81e203cf 100644 --- a/packages/cli-tools/tsconfig.jest.json +++ b/packages/cli-tools/tsconfig.jest.json @@ -1,5 +1,10 @@ { "extends": "../../tsconfig.jest.json", + "compilerOptions": { + "noEmit": true, + "types": ["node", "jest", "@streamr/test-utils/customMatcherTypes"], + "noImplicitOverride": false + }, "include": [ "src/**/*", "bin/**/*", From eec1aefb55937c654a00764825719894eb391b44 Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Mon, 8 Dec 2025 15:09:08 +0200 Subject: [PATCH 14/17] add test --- .../cli-tools/test/stream-publish.test.ts | 24 +++++++++++++------ 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/packages/cli-tools/test/stream-publish.test.ts b/packages/cli-tools/test/stream-publish.test.ts index 52482fec57..952a342fc9 100644 --- a/packages/cli-tools/test/stream-publish.test.ts +++ b/packages/cli-tools/test/stream-publish.test.ts @@ -1,6 +1,6 @@ import StreamrClient, { StreamPermission } from '@streamr/sdk' import { createTestPrivateKey } from '@streamr/test-utils' -import { keyToArrayIndex, StreamID } from '@streamr/utils' +import { binaryToHex, keyToArrayIndex, StreamID } from '@streamr/utils' import range from 'lodash/range' import { createTestClient, nextValue, runCommand } from './utils' import { Wallet } from 'ethers' @@ -33,11 +33,11 @@ describe('stream-publish', () => { await client.destroy() }) - function publishViaCliCommand(payload: any, additionalArgs: string[] = []) { + function publishViaCliCommand(inputLine: string, additionalArgs: string[] = []) { const args = [streamId, ...additionalArgs] setImmediate(async () => { await runCommand(`stream publish ${args.join(' ')}`, { - inputLines: [JSON.stringify(payload)], + inputLines: [inputLine], privateKey: publisherPrivateKey }) }) @@ -46,17 +46,27 @@ describe('stream-publish', () => { it('happy path', async () => { const subscriber = createSubscriber() const subscriptions = await Promise.all(range(PARTITION_COUNT).map((partition) => subscriber.subscribe({ id: streamId, partition }))) - publishViaCliCommand({ foo: 123 }) + publishViaCliCommand(JSON.stringify({ foo: 123 })) const receivedMessage = await Promise.race(subscriptions.map((s) => nextValue(s[Symbol.asyncIterator]()))) expect(receivedMessage!.content).toEqual({ foo: 123 }) await subscriber.destroy() }) + it('hex content', async () => { + const PARTITION = 5 + const subscriber = createSubscriber() + const subscription = await subscriber.subscribe({ id: streamId, partition: PARTITION }) + publishViaCliCommand(binaryToHex(new Uint8Array([4, 5, 6]), false), [`--partition ${PARTITION}`]) + const receivedMessage = await nextValue(subscription[Symbol.asyncIterator]()) + expect(receivedMessage!.content).toEqualBinary(new Uint8Array([4, 5, 6])) + await subscriber.destroy() + }) + it('explicit partition', async () => { const PARTITION = 5 const subscriber = createSubscriber() const subscription = await subscriber.subscribe({ id: streamId, partition: PARTITION }) - publishViaCliCommand({ foo: 123 }, [`--partition ${PARTITION}`]) + publishViaCliCommand(JSON.stringify({ foo: 123 }), [`--partition ${PARTITION}`]) const receivedMessage = await nextValue(subscription[Symbol.asyncIterator]()) expect(receivedMessage!.content).toEqual({ foo: 123 }) await subscriber.destroy() @@ -66,7 +76,7 @@ describe('stream-publish', () => { const partition = keyToArrayIndex(PARTITION_COUNT, 123) const subscriber = createSubscriber() const subscription = await subscriber.subscribe({ id: streamId, partition }) - publishViaCliCommand({ foo: 123 }, ['--partition-key-field foo']) + publishViaCliCommand(JSON.stringify({ foo: 123 }), ['--partition-key-field foo']) const receivedMessage = await nextValue(subscription[Symbol.asyncIterator]()) expect(receivedMessage!.content).toEqual({ foo: 123 }) await subscriber.destroy() @@ -77,7 +87,7 @@ describe('stream-publish', () => { const CONTENT = { content: { foo: 123 }, metadata: { msgChainId: 'testMsgChainId' } } const subscriber = createSubscriber() const subscription = await subscriber.subscribe({ id: streamId, partition: PARTITION }) - publishViaCliCommand(CONTENT, ['--with-metadata', `--partition ${PARTITION}`]) + publishViaCliCommand(JSON.stringify(CONTENT), ['--with-metadata', `--partition ${PARTITION}`]) const receivedMessage = await nextValue(subscription[Symbol.asyncIterator]()) expect(receivedMessage!.content).toEqual({ foo: 123 }) expect(receivedMessage!.msgChainId).toEqual('testMsgChainId') From c4c6614ea5d22b641bf558655d2e5c6d5e0b44ae Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Mon, 8 Dec 2025 15:15:11 +0200 Subject: [PATCH 15/17] rename variable --- packages/cli-tools/test/stream-publish.test.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/cli-tools/test/stream-publish.test.ts b/packages/cli-tools/test/stream-publish.test.ts index 952a342fc9..b5b40a17c9 100644 --- a/packages/cli-tools/test/stream-publish.test.ts +++ b/packages/cli-tools/test/stream-publish.test.ts @@ -84,10 +84,10 @@ describe('stream-publish', () => { it('with metadata', async () => { const PARTITION = 5 - const CONTENT = { content: { foo: 123 }, metadata: { msgChainId: 'testMsgChainId' } } + const PAYLOAD = { content: { foo: 123 }, metadata: { msgChainId: 'testMsgChainId' } } const subscriber = createSubscriber() const subscription = await subscriber.subscribe({ id: streamId, partition: PARTITION }) - publishViaCliCommand(JSON.stringify(CONTENT), ['--with-metadata', `--partition ${PARTITION}`]) + publishViaCliCommand(JSON.stringify(PAYLOAD), ['--with-metadata', `--partition ${PARTITION}`]) const receivedMessage = await nextValue(subscription[Symbol.asyncIterator]()) expect(receivedMessage!.content).toEqual({ foo: 123 }) expect(receivedMessage!.msgChainId).toEqual('testMsgChainId') From 5aa8fa1db29a0488595e2378b0c22779a7bee9da Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Mon, 8 Dec 2025 15:39:36 +0200 Subject: [PATCH 16/17] support hex content also when --with-metadata is used --- .../cli-tools/bin/streamr-stream-publish.ts | 32 +++++++------------ .../cli-tools/test/stream-publish.test.ts | 12 +++++++ 2 files changed, 24 insertions(+), 20 deletions(-) diff --git a/packages/cli-tools/bin/streamr-stream-publish.ts b/packages/cli-tools/bin/streamr-stream-publish.ts index 4fe56908ee..1f88cff359 100755 --- a/packages/cli-tools/bin/streamr-stream-publish.ts +++ b/packages/cli-tools/bin/streamr-stream-publish.ts @@ -36,30 +36,22 @@ const publishStream = ( return } const trimmedData = String(data).trim() - if (isHexadecimal(trimmedData)) { + try { if (withMetadata) { - throw new Error('hex input is not supported when publishing with metadata') - } - content = hexToBinary(trimmedData) - metadata = {} - } else { - try { const payload = JSON.parse(trimmedData) - if (withMetadata) { - content = payload.content - if (content === undefined) { - throw new Error('invalid input: no content') - } - metadata = payload.metadata ?? {} - } else { - content = payload - metadata = {} + if (payload.content === undefined) { + throw new Error('invalid input: no content') } - } catch (e) { - console.error(data.toString()) - done(e) - return + content = isHexadecimal(payload.content) ? hexToBinary(payload.content) : payload.content + metadata = payload.metadata ?? {} + } else { + content = isHexadecimal(trimmedData) ? hexToBinary(trimmedData) : JSON.parse(trimmedData) + metadata = {} } + } catch (e) { + console.error(data.toString()) + done(e) + return } const partitionKey = (partitionKeyField !== undefined && typeof content === 'object') ? content[partitionKeyField] : undefined client.publish({ streamId, partition }, content, merge(metadata, { partitionKey })).then( diff --git a/packages/cli-tools/test/stream-publish.test.ts b/packages/cli-tools/test/stream-publish.test.ts index b5b40a17c9..a2921d5bd6 100644 --- a/packages/cli-tools/test/stream-publish.test.ts +++ b/packages/cli-tools/test/stream-publish.test.ts @@ -93,4 +93,16 @@ describe('stream-publish', () => { expect(receivedMessage!.msgChainId).toEqual('testMsgChainId') await subscriber.destroy() }) + + it('with metadata, hex content', async () => { + const PARTITION = 5 + const PAYLOAD = { content: binaryToHex(new Uint8Array([4, 5, 6]), false), metadata: { msgChainId: 'testMsgChainId' } } + const subscriber = createSubscriber() + const subscription = await subscriber.subscribe({ id: streamId, partition: PARTITION }) + publishViaCliCommand(JSON.stringify(PAYLOAD), ['--with-metadata', `--partition ${PARTITION}`]) + const receivedMessage = await nextValue(subscription[Symbol.asyncIterator]()) + expect(receivedMessage!.content).toEqual(new Uint8Array([4, 5, 6])) + expect(receivedMessage!.msgChainId).toEqual('testMsgChainId') + await subscriber.destroy() + }) }) From 517bb64418e980f96271d3a2eede66f941ca6743 Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Mon, 8 Dec 2025 15:43:48 +0200 Subject: [PATCH 17/17] changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3942af91c2..b34ca31428 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,7 @@ Changes before Tatum release are not documented in this file. ### @streamr/cli-tools - Add `--partition` flag to `stream publish` (https://github.com/streamr-dev/network/pull/3262) +- Add `--with-metadata` flag to `stream publish` (https://github.com/streamr-dev/network/pull/3265) #### Added