diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ab061c19d..93b1c55672 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -46,6 +46,7 @@ Changes before Tatum release are not documented in this file. ### @streamr/cli-tools - Add `--partition` flag to `stream publish` (https://github.com/streamr-dev/network/pull/3262) +- Add `--with-metadata` flag to `stream publish` (https://github.com/streamr-dev/network/pull/3265) #### Added diff --git a/packages/cli-tools/bin/streamr-stream-publish.ts b/packages/cli-tools/bin/streamr-stream-publish.ts index aa0cba3426..1f88cff359 100755 --- a/packages/cli-tools/bin/streamr-stream-publish.ts +++ b/packages/cli-tools/bin/streamr-stream-publish.ts @@ -1,16 +1,17 @@ #!/usr/bin/env node import '../src/logLevel' -import { Writable } from 'stream' -import { StreamrClient } from '@streamr/sdk' -import { hexToBinary, wait } from '@streamr/utils' +import { PublishMetadata, StreamrClient } from '@streamr/sdk' +import { hexToBinary, merge, wait } from '@streamr/utils' import es from 'event-stream' -import { createClientCommand, Options as BaseOptions } from '../src/command' +import { Writable } from 'stream' +import { Options as BaseOptions, createClientCommand } from '../src/command' import { createFnParseInt } from '../src/common' interface Options extends BaseOptions { partition?: number partitionKeyField?: string + withMetadata: boolean } const isHexadecimal = (str: string): boolean => { @@ -21,31 +22,39 @@ const publishStream = ( streamId: string, partition: number | undefined, partitionKeyField: string | undefined, + withMetadata: boolean, client: StreamrClient ): Writable => { const writable = new Writable({ objectMode: true, write: (data: any, _: any, done: any) => { - let message = null + let content: any + let metadata: PublishMetadata // ignore newlines, etc if (!data || String(data).trim() === '') { done() return } const trimmedData = String(data).trim() - if (isHexadecimal(trimmedData)) { - message = hexToBinary(trimmedData) - } else { - try { - message = JSON.parse(trimmedData) - } catch (e) { - console.error(data.toString()) - done(e) - return + try { + if (withMetadata) { + const payload = JSON.parse(trimmedData) + if (payload.content === undefined) { + throw new Error('invalid input: no content') + } + content = isHexadecimal(payload.content) ? hexToBinary(payload.content) : payload.content + metadata = payload.metadata ?? {} + } else { + content = isHexadecimal(trimmedData) ? hexToBinary(trimmedData) : JSON.parse(trimmedData) + metadata = {} } + } catch (e) { + console.error(data.toString()) + done(e) + return } - const partitionKey = (partitionKeyField !== undefined && typeof message === 'object') ? message[partitionKeyField] : undefined - client.publish({ streamId, partition }, message, { partitionKey }).then( + const partitionKey = (partitionKeyField !== undefined && typeof content === 'object') ? content[partitionKeyField] : undefined + client.publish({ streamId, partition }, content, merge(metadata, { partitionKey })).then( () => done(), (err) => done(err) ) @@ -59,7 +68,7 @@ createClientCommand(async (client: StreamrClient, streamId: string, options: Opt console.error('Invalid combination of "partition" and "partition-key-field"') process.exit(1) } - const ps = publishStream(streamId, options.partition, options.partitionKeyField, client) + const ps = publishStream(streamId, options.partition, options.partitionKeyField, options.withMetadata, client) return new Promise((resolve, reject) => { process.stdin .pipe(es.split()) @@ -83,4 +92,5 @@ createClientCommand(async (client: StreamrClient, streamId: string, options: Opt .option('-p, --partition ', 'partition', createFnParseInt('--partition')) // eslint-disable-next-line max-len .option('-k, --partition-key-field ', 'field name in each message to use for assigning the message to a stream partition (only for JSON data)') + .option('-m, --with-metadata', 'each input contains both the content and the metadata', false) .parseAsync() diff --git a/packages/cli-tools/jest.config.ts b/packages/cli-tools/jest.config.ts index 550da64839..1c1d1df1b3 100644 --- a/packages/cli-tools/jest.config.ts +++ b/packages/cli-tools/jest.config.ts @@ -1 +1,12 @@ -export { default } from '../../jest.config' +import type { Config } from '@jest/types' +import defaultConfig from '../../jest.config' + +const config: Config.InitialOptions = { + ...defaultConfig, + setupFilesAfterEnv: [ + ...defaultConfig.setupFilesAfterEnv, + '@streamr/test-utils/setupCustomMatchers' + ] +} + +export default config diff --git a/packages/cli-tools/test/stream-publish.test.ts b/packages/cli-tools/test/stream-publish.test.ts index 571c4af7e8..a2921d5bd6 100644 --- a/packages/cli-tools/test/stream-publish.test.ts +++ b/packages/cli-tools/test/stream-publish.test.ts @@ -1,6 +1,6 @@ import StreamrClient, { StreamPermission } from '@streamr/sdk' import { createTestPrivateKey } from '@streamr/test-utils' -import { keyToArrayIndex, StreamID } from '@streamr/utils' +import { binaryToHex, keyToArrayIndex, StreamID } from '@streamr/utils' import range from 'lodash/range' import { createTestClient, nextValue, runCommand } from './utils' import { Wallet } from 'ethers' @@ -33,11 +33,11 @@ describe('stream-publish', () => { await client.destroy() }) - function publishViaCliCommand(additionalArgs: string[] = []) { + function publishViaCliCommand(inputLine: string, additionalArgs: string[] = []) { const args = [streamId, ...additionalArgs] setImmediate(async () => { await runCommand(`stream publish ${args.join(' ')}`, { - inputLines: [JSON.stringify({ foo: 123 })], + inputLines: [inputLine], privateKey: publisherPrivateKey }) }) @@ -46,17 +46,27 @@ describe('stream-publish', () => { it('happy path', async () => { const subscriber = createSubscriber() const subscriptions = await Promise.all(range(PARTITION_COUNT).map((partition) => subscriber.subscribe({ id: streamId, partition }))) - publishViaCliCommand() + publishViaCliCommand(JSON.stringify({ foo: 123 })) const receivedMessage = await Promise.race(subscriptions.map((s) => nextValue(s[Symbol.asyncIterator]()))) expect(receivedMessage!.content).toEqual({ foo: 123 }) await subscriber.destroy() }) + it('hex content', async () => { + const PARTITION = 5 + const subscriber = createSubscriber() + const subscription = await subscriber.subscribe({ id: streamId, partition: PARTITION }) + publishViaCliCommand(binaryToHex(new Uint8Array([4, 5, 6]), false), [`--partition ${PARTITION}`]) + const receivedMessage = await nextValue(subscription[Symbol.asyncIterator]()) + expect(receivedMessage!.content).toEqualBinary(new Uint8Array([4, 5, 6])) + await subscriber.destroy() + }) + it('explicit partition', async () => { const PARTITION = 5 const subscriber = createSubscriber() const subscription = await subscriber.subscribe({ id: streamId, partition: PARTITION }) - publishViaCliCommand([`--partition ${PARTITION}`]) + publishViaCliCommand(JSON.stringify({ foo: 123 }), [`--partition ${PARTITION}`]) const receivedMessage = await nextValue(subscription[Symbol.asyncIterator]()) expect(receivedMessage!.content).toEqual({ foo: 123 }) await subscriber.destroy() @@ -66,9 +76,33 @@ describe('stream-publish', () => { const partition = keyToArrayIndex(PARTITION_COUNT, 123) const subscriber = createSubscriber() const subscription = await subscriber.subscribe({ id: streamId, partition }) - publishViaCliCommand(['--partition-key-field foo']) + publishViaCliCommand(JSON.stringify({ foo: 123 }), ['--partition-key-field foo']) const receivedMessage = await nextValue(subscription[Symbol.asyncIterator]()) expect(receivedMessage!.content).toEqual({ foo: 123 }) await subscriber.destroy() - }) + }) + + it('with metadata', async () => { + const PARTITION = 5 + const PAYLOAD = { content: { foo: 123 }, metadata: { msgChainId: 'testMsgChainId' } } + const subscriber = createSubscriber() + const subscription = await subscriber.subscribe({ id: streamId, partition: PARTITION }) + publishViaCliCommand(JSON.stringify(PAYLOAD), ['--with-metadata', `--partition ${PARTITION}`]) + const receivedMessage = await nextValue(subscription[Symbol.asyncIterator]()) + expect(receivedMessage!.content).toEqual({ foo: 123 }) + expect(receivedMessage!.msgChainId).toEqual('testMsgChainId') + await subscriber.destroy() + }) + + it('with metadata, hex content', async () => { + const PARTITION = 5 + const PAYLOAD = { content: binaryToHex(new Uint8Array([4, 5, 6]), false), metadata: { msgChainId: 'testMsgChainId' } } + const subscriber = createSubscriber() + const subscription = await subscriber.subscribe({ id: streamId, partition: PARTITION }) + publishViaCliCommand(JSON.stringify(PAYLOAD), ['--with-metadata', `--partition ${PARTITION}`]) + const receivedMessage = await nextValue(subscription[Symbol.asyncIterator]()) + expect(receivedMessage!.content).toEqual(new Uint8Array([4, 5, 6])) + expect(receivedMessage!.msgChainId).toEqual('testMsgChainId') + await subscriber.destroy() + }) }) diff --git a/packages/cli-tools/tsconfig.jest.json b/packages/cli-tools/tsconfig.jest.json index 34a50f031c..9f81e203cf 100644 --- a/packages/cli-tools/tsconfig.jest.json +++ b/packages/cli-tools/tsconfig.jest.json @@ -1,5 +1,10 @@ { "extends": "../../tsconfig.jest.json", + "compilerOptions": { + "noEmit": true, + "types": ["node", "jest", "@streamr/test-utils/customMatcherTypes"], + "noImplicitOverride": false + }, "include": [ "src/**/*", "bin/**/*",