Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ Changes before Tatum release are not documented in this file.

### @streamr/cli-tools

- Add `--partition` flag to `stream publish` (https://github.com/streamr-dev/network/pull/3262)

#### Added

#### Changed
Expand Down
10 changes: 7 additions & 3 deletions packages/cli-tools/bin/streamr-stream-publish.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import { StreamrClient } from '@streamr/sdk'
import { hexToBinary, wait } from '@streamr/utils'
import es from 'event-stream'
import { createClientCommand, Options as BaseOptions } from '../src/command'
import { createFnParseInt } from '../src/common'

interface Options extends BaseOptions {
partition?: number
partitionKeyField?: string
}

Expand All @@ -16,7 +18,8 @@ const isHexadecimal = (str: string): boolean => {
}

const publishStream = (
stream: string,
streamId: string,
partition: number | undefined,
partitionKeyField: string | undefined,
client: StreamrClient
): Writable => {
Expand All @@ -42,7 +45,7 @@ const publishStream = (
}
}
const partitionKey = (partitionKeyField !== undefined && typeof message === 'object') ? message[partitionKeyField] : undefined
client.publish(stream, message, { partitionKey }).then(
client.publish({ id: streamId, partition }, message, { partitionKey }).then(
() => done(),
(err) => done(err)
)
Expand All @@ -52,7 +55,7 @@ const publishStream = (
}

createClientCommand(async (client: StreamrClient, streamId: string, options: Options) => {
const ps = publishStream(streamId, options.partitionKeyField, client)
const ps = publishStream(streamId, options.partition, options.partitionKeyField, client)
return new Promise((resolve, reject) => {
process.stdin
.pipe(es.split())
Expand All @@ -73,6 +76,7 @@ createClientCommand(async (client: StreamrClient, streamId: string, options: Opt
})
.arguments('<streamId>')
.description('publish to a stream by reading JSON messages from stdin line-by-line or hexadecimal strings for binary data')
.option('-p, --partition [partition]', 'partition', createFnParseInt('--partition'), undefined)
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated
Comment thread
teogeb marked this conversation as resolved.
Outdated
// eslint-disable-next-line max-len
.option('-k, --partition-key-field <string>', 'field name in each message to use for assigning the message to a stream partition (only for JSON data)')
.parseAsync()
74 changes: 74 additions & 0 deletions packages/cli-tools/test/stream-publish.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import StreamrClient, { StreamPermission } from '@streamr/sdk'
import { createTestPrivateKey } from '@streamr/test-utils'
import { keyToArrayIndex, StreamID } from '@streamr/utils'
import range from 'lodash/range'
import { createTestClient, nextValue, runCommand } from './utils'
import { Wallet } from 'ethers'

const PARTITION_COUNT = 10

describe('stream-publish', () => {

let streamId: StreamID
let publisherPrivateKey: string
let subscriberPrivateKey: string

function createSubscriber(): StreamrClient {
return createTestClient(subscriberPrivateKey)
}

beforeEach(async () => {
publisherPrivateKey = await createTestPrivateKey({ gas: true })
subscriberPrivateKey = await createTestPrivateKey({ gas: true })
const client = createTestClient(await createTestPrivateKey({ gas: true }))
const stream = await client.createStream({ id: `/${Date.now()}`, partitions: PARTITION_COUNT })
await stream.grantPermissions({
userId: new Wallet(publisherPrivateKey).address,
permissions: [StreamPermission.PUBLISH]
}, {
userId: new Wallet(subscriberPrivateKey).address,
permissions: [StreamPermission.SUBSCRIBE]
})
streamId = stream.id
await client.destroy()
})

function publishViaCliCommand(additionalArgs: string[] = []) {
const args = [streamId, ...additionalArgs]
setImmediate(async () => {
await runCommand(`stream publish ${args.join(' ')}`, {
inputLines: [JSON.stringify({ foo: 123 })],
privateKey: publisherPrivateKey
})
})
}

it('happy path', async () => {
const subscriber = createSubscriber()
const subscriptions = await Promise.all(range(PARTITION_COUNT).map((partition) => subscriber.subscribe({ id: streamId, partition })))
publishViaCliCommand()
const receivedMessage = await Promise.race(subscriptions.map((s) => nextValue(s[Symbol.asyncIterator]())))
expect(receivedMessage!.content).toEqual({ foo: 123 })
await subscriber.destroy()
})

it('explicit partition', async () => {
const PARTITION = 5
const subscriber = createSubscriber()
const subscription = await subscriber.subscribe({ id: streamId, partition: PARTITION })
publishViaCliCommand([`--partition ${PARTITION}`])
const receivedMessage = await nextValue(subscription[Symbol.asyncIterator]())
expect(receivedMessage!.content).toEqual({ foo: 123 })
await subscriber.destroy()
})

it('partition key field', async () => {
const partition = keyToArrayIndex(PARTITION_COUNT, 123)
const subscriber = createSubscriber()
const subscription = await subscriber.subscribe({ id: streamId, partition })
publishViaCliCommand(['--partition-key-field foo'])
const receivedMessage = await nextValue(subscription[Symbol.asyncIterator]())
expect(receivedMessage!.content).toEqual({ foo: 123 })
await subscriber.destroy()
})
})
Original file line number Diff line number Diff line change
@@ -1,74 +1,73 @@
import { StreamPermission } from '@streamr/sdk'
import StreamrClient, { StreamPermission } from '@streamr/sdk'
import { createTestPrivateKey } from '@streamr/test-utils'
import { collect } from '@streamr/utils'
import { collect, StreamID } from '@streamr/utils'
import { createTestClient, startCommand } from './utils'
import { Wallet } from 'ethers'
import { createTestClient, runCommand, startCommand } from './utils'

const TIMEOUT = 30 * 1000

describe('publish and subscribe', () => {
describe('stream-subscribe', () => {

let streamId: StreamID
let publisherPrivateKey: string
let subscriberPrivateKey: string
let streamId: string

beforeAll(async () => {
beforeEach(async () => {
publisherPrivateKey = await createTestPrivateKey({ gas: true })
subscriberPrivateKey = await createTestPrivateKey({ gas: true })
const client = createTestClient(publisherPrivateKey)
const client = createTestClient(await createTestPrivateKey({ gas: true }))
const stream = await client.createStream(`/${Date.now()}`)
await stream.grantPermissions({
userId: new Wallet(publisherPrivateKey).address,
permissions: [StreamPermission.PUBLISH]
}, {
userId: new Wallet(subscriberPrivateKey).address,
permissions: [StreamPermission.SUBSCRIBE]
})
streamId = stream.id
await client.destroy()
}, TIMEOUT)
})

function publishViaCliCommand() {
setImmediate(async () => {
await runCommand(`stream publish ${streamId}`, {
inputLines: [JSON.stringify({ foo: 123 })],
privateKey: publisherPrivateKey
})
})
async function publishTestMesssage(): Promise<StreamrClient> {
const publisher = createTestClient(publisherPrivateKey)
await publisher.publish(streamId, { foo: 123 })
return publisher
}

it('happy path', async () => {
const subscriberAbortController = new AbortController()
const subscriberOutputIterable = startCommand(`stream subscribe ${streamId}`, {
privateKey: subscriberPrivateKey,
abortSignal: subscriberAbortController.signal
abortSignal: subscriberAbortController.signal,
privateKey: subscriberPrivateKey
})
publishViaCliCommand()
const publisher = await publishTestMesssage()
const receivedMessage = (await collect(subscriberOutputIterable, 1))[0]
subscriberAbortController.abort()
expect(JSON.parse(receivedMessage)).toEqual({
foo: 123
})
}, TIMEOUT)
await publisher.destroy()
subscriberAbortController.abort()
})

it('raw subscription', async () => {
it('raw', async () => {
const subscriberAbortController = new AbortController()
const subscriberOutputIterable = startCommand(`stream subscribe ${streamId} --raw`, {
privateKey: subscriberPrivateKey,
abortSignal: subscriberAbortController.signal,
privateKey: subscriberPrivateKey
})
publishViaCliCommand()
const publisher = await publishTestMesssage()
const receivedMessage = (await collect(subscriberOutputIterable, 1))[0]
subscriberAbortController.abort()
expect(receivedMessage).toMatch(/^[0-9a-fA-F]+$/)
}, TIMEOUT)
await publisher.destroy()
subscriberAbortController.abort()
})

it('with metadata', async () => {
const subscriberAbortController = new AbortController()
const subscriberOutputIterable = startCommand(`stream subscribe ${streamId} --with-metadata`, {
privateKey: subscriberPrivateKey,
abortSignal: subscriberAbortController.signal,
privateKey: subscriberPrivateKey
})
publishViaCliCommand()
const publisher = await publishTestMesssage()
const receivedMessage = (await collect(subscriberOutputIterable, 1))[0]
subscriberAbortController.abort()
expect(JSON.parse(receivedMessage)).toMatchObject({
content: {
foo: 123
Expand All @@ -83,17 +82,18 @@ describe('publish and subscribe', () => {
msgChainId: expect.stringMatching(/[0-9a-zA-Z]+/)
}
})
}, TIMEOUT)
await publisher.destroy()
subscriberAbortController.abort()
})

it('with metadata and raw', async () => {
it('with metadata, receive as raw', async () => {
const subscriberAbortController = new AbortController()
const subscriberOutputIterable = startCommand(`stream subscribe ${streamId} --with-metadata --raw`, {
privateKey: subscriberPrivateKey,
abortSignal: subscriberAbortController.signal,
privateKey: subscriberPrivateKey
})
publishViaCliCommand()
const publisher = await publishTestMesssage()
const receivedMessage = (await collect(subscriberOutputIterable, 1))[0]
subscriberAbortController.abort()
expect(JSON.parse(receivedMessage)).toMatchObject({
content: expect.stringMatching(/^[0-9a-fA-F]+$/),
metadata: {
Expand All @@ -106,5 +106,7 @@ describe('publish and subscribe', () => {
msgChainId: expect.stringMatching(/[0-9a-zA-Z]+/)
}
})
}, TIMEOUT)
await publisher.destroy()
subscriberAbortController.abort()
})
})
5 changes: 5 additions & 0 deletions packages/cli-tools/test/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,8 @@ export const deployTestOperatorContract = async (opts: Omit<DeployOperatorContra
export const deployTestSponsorshipContract = async (opts: Omit<DeploySponsorshipContractOpts, 'environmentId'>): Promise<SponsorshipContract> => {
return _operatorContractUtils.deploySponsorshipContract({ ...opts, 'environmentId': 'dev2' })
}

export const nextValue = async <T>(source: AsyncIterator<T>): Promise<T | undefined> => {
const item = source.next()
return (await item).value
}