Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ Changes before Tatum release are not documented in this file.
### @streamr/cli-tools

- Add `--partition` flag to `stream publish` (https://github.com/streamr-dev/network/pull/3262)
- Add `--with-metadata` flag to `stream publish` (https://github.com/streamr-dev/network/pull/3265)

#### Added

Expand Down
44 changes: 27 additions & 17 deletions packages/cli-tools/bin/streamr-stream-publish.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
#!/usr/bin/env node
import '../src/logLevel'

import { Writable } from 'stream'
import { StreamrClient } from '@streamr/sdk'
import { hexToBinary, wait } from '@streamr/utils'
import { PublishMetadata, StreamrClient } from '@streamr/sdk'
import { hexToBinary, merge, wait } from '@streamr/utils'
import es from 'event-stream'
import { createClientCommand, Options as BaseOptions } from '../src/command'
import { Writable } from 'stream'
import { Options as BaseOptions, createClientCommand } from '../src/command'
import { createFnParseInt } from '../src/common'

interface Options extends BaseOptions {
partition?: number
partitionKeyField?: string
withMetadata: boolean
}

const isHexadecimal = (str: string): boolean => {
Expand All @@ -21,31 +22,39 @@ const publishStream = (
streamId: string,
partition: number | undefined,
partitionKeyField: string | undefined,
withMetadata: boolean,
client: StreamrClient
): Writable => {
const writable = new Writable({
objectMode: true,
write: (data: any, _: any, done: any) => {
let message = null
let content: any
let metadata: PublishMetadata
// ignore newlines, etc
if (!data || String(data).trim() === '') {
done()
return
}
const trimmedData = String(data).trim()
if (isHexadecimal(trimmedData)) {
message = hexToBinary(trimmedData)
} else {
try {
message = JSON.parse(trimmedData)
} catch (e) {
console.error(data.toString())
done(e)
return
try {
if (withMetadata) {
const payload = JSON.parse(trimmedData)
if (payload.content === undefined) {
throw new Error('invalid input: no content')
}
content = isHexadecimal(payload.content) ? hexToBinary(payload.content) : payload.content
metadata = payload.metadata ?? {}
} else {
content = isHexadecimal(trimmedData) ? hexToBinary(trimmedData) : JSON.parse(trimmedData)
metadata = {}
}
} catch (e) {
console.error(data.toString())
done(e)
return
}
const partitionKey = (partitionKeyField !== undefined && typeof message === 'object') ? message[partitionKeyField] : undefined
client.publish({ streamId, partition }, message, { partitionKey }).then(
const partitionKey = (partitionKeyField !== undefined && typeof content === 'object') ? content[partitionKeyField] : undefined
client.publish({ streamId, partition }, content, merge(metadata, { partitionKey })).then(
() => done(),
(err) => done(err)
)
Expand All @@ -59,7 +68,7 @@ createClientCommand(async (client: StreamrClient, streamId: string, options: Opt
console.error('Invalid combination of "partition" and "partition-key-field"')
process.exit(1)
}
const ps = publishStream(streamId, options.partition, options.partitionKeyField, client)
const ps = publishStream(streamId, options.partition, options.partitionKeyField, options.withMetadata, client)
return new Promise((resolve, reject) => {
process.stdin
.pipe(es.split())
Expand All @@ -83,4 +92,5 @@ createClientCommand(async (client: StreamrClient, streamId: string, options: Opt
.option('-p, --partition <partition>', 'partition', createFnParseInt('--partition'))
// eslint-disable-next-line max-len
.option('-k, --partition-key-field <string>', 'field name in each message to use for assigning the message to a stream partition (only for JSON data)')
.option('-m, --with-metadata', 'each input contains both the content and the metadata', false)
.parseAsync()
13 changes: 12 additions & 1 deletion packages/cli-tools/jest.config.ts
Original file line number Diff line number Diff line change
@@ -1 +1,12 @@
export { default } from '../../jest.config'
import type { Config } from '@jest/types'
import defaultConfig from '../../jest.config'

const config: Config.InitialOptions = {
...defaultConfig,
setupFilesAfterEnv: [
...defaultConfig.setupFilesAfterEnv,
'@streamr/test-utils/setupCustomMatchers'
]
}

export default config
48 changes: 41 additions & 7 deletions packages/cli-tools/test/stream-publish.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import StreamrClient, { StreamPermission } from '@streamr/sdk'
import { createTestPrivateKey } from '@streamr/test-utils'
import { keyToArrayIndex, StreamID } from '@streamr/utils'
import { binaryToHex, keyToArrayIndex, StreamID } from '@streamr/utils'
import range from 'lodash/range'
import { createTestClient, nextValue, runCommand } from './utils'
import { Wallet } from 'ethers'
Expand Down Expand Up @@ -33,11 +33,11 @@ describe('stream-publish', () => {
await client.destroy()
})

function publishViaCliCommand(additionalArgs: string[] = []) {
function publishViaCliCommand(inputLine: string, additionalArgs: string[] = []) {
const args = [streamId, ...additionalArgs]
setImmediate(async () => {
await runCommand(`stream publish ${args.join(' ')}`, {
inputLines: [JSON.stringify({ foo: 123 })],
inputLines: [inputLine],
privateKey: publisherPrivateKey
})
})
Expand All @@ -46,17 +46,27 @@ describe('stream-publish', () => {
it('happy path', async () => {
const subscriber = createSubscriber()
const subscriptions = await Promise.all(range(PARTITION_COUNT).map((partition) => subscriber.subscribe({ id: streamId, partition })))
publishViaCliCommand()
publishViaCliCommand(JSON.stringify({ foo: 123 }))
const receivedMessage = await Promise.race(subscriptions.map((s) => nextValue(s[Symbol.asyncIterator]())))
expect(receivedMessage!.content).toEqual({ foo: 123 })
await subscriber.destroy()
})

it('hex content', async () => {
const PARTITION = 5
const subscriber = createSubscriber()
const subscription = await subscriber.subscribe({ id: streamId, partition: PARTITION })
publishViaCliCommand(binaryToHex(new Uint8Array([4, 5, 6]), false), [`--partition ${PARTITION}`])
const receivedMessage = await nextValue(subscription[Symbol.asyncIterator]())
expect(receivedMessage!.content).toEqualBinary(new Uint8Array([4, 5, 6]))
await subscriber.destroy()
})

it('explicit partition', async () => {
const PARTITION = 5
const subscriber = createSubscriber()
const subscription = await subscriber.subscribe({ id: streamId, partition: PARTITION })
publishViaCliCommand([`--partition ${PARTITION}`])
publishViaCliCommand(JSON.stringify({ foo: 123 }), [`--partition ${PARTITION}`])
const receivedMessage = await nextValue(subscription[Symbol.asyncIterator]())
expect(receivedMessage!.content).toEqual({ foo: 123 })
await subscriber.destroy()
Expand All @@ -66,9 +76,33 @@ describe('stream-publish', () => {
const partition = keyToArrayIndex(PARTITION_COUNT, 123)
const subscriber = createSubscriber()
const subscription = await subscriber.subscribe({ id: streamId, partition })
publishViaCliCommand(['--partition-key-field foo'])
publishViaCliCommand(JSON.stringify({ foo: 123 }), ['--partition-key-field foo'])
const receivedMessage = await nextValue(subscription[Symbol.asyncIterator]())
expect(receivedMessage!.content).toEqual({ foo: 123 })
await subscriber.destroy()
})
})

it('with metadata', async () => {
const PARTITION = 5
const PAYLOAD = { content: { foo: 123 }, metadata: { msgChainId: 'testMsgChainId' } }
const subscriber = createSubscriber()
const subscription = await subscriber.subscribe({ id: streamId, partition: PARTITION })
publishViaCliCommand(JSON.stringify(PAYLOAD), ['--with-metadata', `--partition ${PARTITION}`])
const receivedMessage = await nextValue(subscription[Symbol.asyncIterator]())
expect(receivedMessage!.content).toEqual({ foo: 123 })
expect(receivedMessage!.msgChainId).toEqual('testMsgChainId')
await subscriber.destroy()
})

it('with metadata, hex content', async () => {
const PARTITION = 5
const PAYLOAD = { content: binaryToHex(new Uint8Array([4, 5, 6]), false), metadata: { msgChainId: 'testMsgChainId' } }
const subscriber = createSubscriber()
const subscription = await subscriber.subscribe({ id: streamId, partition: PARTITION })
publishViaCliCommand(JSON.stringify(PAYLOAD), ['--with-metadata', `--partition ${PARTITION}`])
const receivedMessage = await nextValue(subscription[Symbol.asyncIterator]())
expect(receivedMessage!.content).toEqual(new Uint8Array([4, 5, 6]))
expect(receivedMessage!.msgChainId).toEqual('testMsgChainId')
await subscriber.destroy()
})
})
5 changes: 5 additions & 0 deletions packages/cli-tools/tsconfig.jest.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
{
"extends": "../../tsconfig.jest.json",
"compilerOptions": {
"noEmit": true,
"types": ["node", "jest", "@streamr/test-utils/customMatcherTypes"],
"noImplicitOverride": false
},
"include": [
"src/**/*",
"bin/**/*",
Expand Down