Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions packages/cli-tools/bin/streamr-stream-publish.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import { StreamrClient } from '@streamr/sdk'
import { hexToBinary, wait } from '@streamr/utils'
import es from 'event-stream'
import { createClientCommand, Options as BaseOptions } from '../src/command'
import { createFnParseInt } from '../src/common'

interface Options extends BaseOptions {
partition?: number
partitionKeyField?: string
}

Expand All @@ -16,7 +18,8 @@ const isHexadecimal = (str: string): boolean => {
}

const publishStream = (
stream: string,
streamId: string,
partition: number | undefined,
partitionKeyField: string | undefined,
client: StreamrClient
): Writable => {
Expand All @@ -42,7 +45,7 @@ const publishStream = (
}
}
const partitionKey = (partitionKeyField !== undefined && typeof message === 'object') ? message[partitionKeyField] : undefined
client.publish(stream, message, { partitionKey }).then(
client.publish({ id: streamId, partition }, message, { partitionKey }).then(
() => done(),
(err) => done(err)
)
Expand All @@ -52,7 +55,7 @@ const publishStream = (
}

createClientCommand(async (client: StreamrClient, streamId: string, options: Options) => {
const ps = publishStream(streamId, options.partitionKeyField, client)
const ps = publishStream(streamId, options.partition, options.partitionKeyField, client)
return new Promise((resolve, reject) => {
process.stdin
.pipe(es.split())
Expand All @@ -73,6 +76,7 @@ createClientCommand(async (client: StreamrClient, streamId: string, options: Opt
})
.arguments('<streamId>')
.description('publish to a stream by reading JSON messages from stdin line-by-line or hexadecimal strings for binary data')
.option('-p, --partition [partition]', 'partition', createFnParseInt('--partition'), undefined)
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated
Comment thread
teogeb marked this conversation as resolved.
Outdated
// eslint-disable-next-line max-len
.option('-k, --partition-key-field <string>', 'field name in each message to use for assigning the message to a stream partition (only for JSON data)')
.parseAsync()
74 changes: 74 additions & 0 deletions packages/cli-tools/test/stream-publish.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import StreamrClient, { StreamPermission } from '@streamr/sdk'
import { createTestPrivateKey } from '@streamr/test-utils'
import { keyToArrayIndex, StreamID } from '@streamr/utils'
import range from 'lodash/range'
import { createTestClient, nextValue, runCommand } from './utils'
import { Wallet } from 'ethers'

const PARTITION_COUNT = 10

describe('stream-publish', () => {

let streamId: StreamID
let publisherPrivateKey: string
let subscriberPrivateKey: string

function createSubscriber(): StreamrClient {
return createTestClient(subscriberPrivateKey)
}

beforeEach(async () => {
publisherPrivateKey = await createTestPrivateKey({ gas: true })
subscriberPrivateKey = await createTestPrivateKey({ gas: true })
const client = createTestClient(await createTestPrivateKey({ gas: true }))
const stream = await client.createStream({ id: `/${Date.now()}`, partitions: PARTITION_COUNT })
await stream.grantPermissions({
userId: new Wallet(publisherPrivateKey).address,
permissions: [StreamPermission.PUBLISH]
}, {
userId: new Wallet(subscriberPrivateKey).address,
permissions: [StreamPermission.SUBSCRIBE]
})
streamId = stream.id
await client.destroy()
})

function publishViaCliCommand(additionalArgs: string[] = []) {
const args = [streamId, ...additionalArgs]
setImmediate(async () => {
await runCommand(`stream publish ${args.join(' ')}`, {
inputLines: [JSON.stringify({ foo: 123 })],
privateKey: publisherPrivateKey
})
})
}

it('happy path', async () => {
const subscriber = createSubscriber()
const subscriptions = await Promise.all(range(PARTITION_COUNT).map((partition) => subscriber.subscribe({ id: streamId, partition })))
publishViaCliCommand()
const receivedMessage = await Promise.race(subscriptions.map((s) => nextValue(s[Symbol.asyncIterator]())))
expect(receivedMessage!.content).toEqual({ foo: 123 })
await subscriber.destroy()
})

it('explicit partition', async () => {
const PARTITION = 5
const subscriber = createSubscriber()
const subscription = await subscriber.subscribe({ id: streamId, partition: PARTITION })
publishViaCliCommand([`--partition ${PARTITION}`])
const receivedMessage = await nextValue(subscription[Symbol.asyncIterator]())
expect(receivedMessage!.content).toEqual({ foo: 123 })
await subscriber.destroy()
})

it('partition key field', async () => {
const partition = keyToArrayIndex(PARTITION_COUNT, 123)
const subscriber = createSubscriber()
const subscription = await subscriber.subscribe({ id: streamId, partition })
publishViaCliCommand(['--partition-key-field foo'])
const receivedMessage = await nextValue(subscription[Symbol.asyncIterator]())
expect(receivedMessage!.content).toEqual({ foo: 123 })
await subscriber.destroy()
})
})
Original file line number Diff line number Diff line change
@@ -1,74 +1,73 @@
import { StreamPermission } from '@streamr/sdk'
import StreamrClient, { StreamPermission } from '@streamr/sdk'
import { createTestPrivateKey } from '@streamr/test-utils'
import { collect } from '@streamr/utils'
import { collect, StreamID } from '@streamr/utils'
import { createTestClient, startCommand } from './utils'
import { Wallet } from 'ethers'
import { createTestClient, runCommand, startCommand } from './utils'

const TIMEOUT = 30 * 1000

describe('publish and subscribe', () => {
describe('stream-subscribe', () => {

let streamId: StreamID
let publisherPrivateKey: string
let subscriberPrivateKey: string
let streamId: string

beforeAll(async () => {
beforeEach(async () => {
publisherPrivateKey = await createTestPrivateKey({ gas: true })
subscriberPrivateKey = await createTestPrivateKey({ gas: true })
const client = createTestClient(publisherPrivateKey)
const client = createTestClient(await createTestPrivateKey({ gas: true }))
const stream = await client.createStream(`/${Date.now()}`)
await stream.grantPermissions({
userId: new Wallet(publisherPrivateKey).address,
permissions: [StreamPermission.PUBLISH]
}, {
userId: new Wallet(subscriberPrivateKey).address,
permissions: [StreamPermission.SUBSCRIBE]
})
streamId = stream.id
await client.destroy()
}, TIMEOUT)
})

function publishViaCliCommand() {
setImmediate(async () => {
await runCommand(`stream publish ${streamId}`, {
inputLines: [JSON.stringify({ foo: 123 })],
privateKey: publisherPrivateKey
})
})
async function publishTestMesssage(): Promise<StreamrClient> {
const publisher = createTestClient(publisherPrivateKey)
await publisher.publish(streamId, { foo: 123 })
return publisher
}

it('happy path', async () => {
const subscriberAbortController = new AbortController()
const subscriberOutputIterable = startCommand(`stream subscribe ${streamId}`, {
privateKey: subscriberPrivateKey,
abortSignal: subscriberAbortController.signal
abortSignal: subscriberAbortController.signal,
privateKey: subscriberPrivateKey
})
publishViaCliCommand()
const publisher = await publishTestMesssage()
const receivedMessage = (await collect(subscriberOutputIterable, 1))[0]
subscriberAbortController.abort()
expect(JSON.parse(receivedMessage)).toEqual({
foo: 123
})
}, TIMEOUT)
await publisher.destroy()
subscriberAbortController.abort()
})

it('raw subscription', async () => {
it('raw', async () => {
const subscriberAbortController = new AbortController()
const subscriberOutputIterable = startCommand(`stream subscribe ${streamId} --raw`, {
privateKey: subscriberPrivateKey,
abortSignal: subscriberAbortController.signal,
privateKey: subscriberPrivateKey
})
publishViaCliCommand()
const publisher = await publishTestMesssage()
const receivedMessage = (await collect(subscriberOutputIterable, 1))[0]
subscriberAbortController.abort()
expect(receivedMessage).toMatch(/^[0-9a-fA-F]+$/)
}, TIMEOUT)
await publisher.destroy()
subscriberAbortController.abort()
})

it('with metadata', async () => {
const subscriberAbortController = new AbortController()
const subscriberOutputIterable = startCommand(`stream subscribe ${streamId} --with-metadata`, {
privateKey: subscriberPrivateKey,
abortSignal: subscriberAbortController.signal,
privateKey: subscriberPrivateKey
})
publishViaCliCommand()
const publisher = await publishTestMesssage()
const receivedMessage = (await collect(subscriberOutputIterable, 1))[0]
subscriberAbortController.abort()
expect(JSON.parse(receivedMessage)).toMatchObject({
content: {
foo: 123
Expand All @@ -83,17 +82,18 @@ describe('publish and subscribe', () => {
msgChainId: expect.stringMatching(/[0-9a-zA-Z]+/)
}
})
}, TIMEOUT)
await publisher.destroy()
subscriberAbortController.abort()
})

it('with metadata and raw', async () => {
it('with metadata, receive as raw', async () => {
const subscriberAbortController = new AbortController()
const subscriberOutputIterable = startCommand(`stream subscribe ${streamId} --with-metadata --raw`, {
privateKey: subscriberPrivateKey,
abortSignal: subscriberAbortController.signal,
privateKey: subscriberPrivateKey
})
publishViaCliCommand()
const publisher = await publishTestMesssage()
const receivedMessage = (await collect(subscriberOutputIterable, 1))[0]
subscriberAbortController.abort()
expect(JSON.parse(receivedMessage)).toMatchObject({
content: expect.stringMatching(/^[0-9a-fA-F]+$/),
metadata: {
Expand All @@ -106,5 +106,7 @@ describe('publish and subscribe', () => {
msgChainId: expect.stringMatching(/[0-9a-zA-Z]+/)
}
})
}, TIMEOUT)
await publisher.destroy()
subscriberAbortController.abort()
})
})
5 changes: 5 additions & 0 deletions packages/cli-tools/test/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,8 @@ export const deployTestOperatorContract = async (opts: Omit<DeployOperatorContra
export const deployTestSponsorshipContract = async (opts: Omit<DeploySponsorshipContractOpts, 'environmentId'>): Promise<SponsorshipContract> => {
return _operatorContractUtils.deploySponsorshipContract({ ...opts, 'environmentId': 'dev2' })
}

export const nextValue = async <T>(source: AsyncIterator<T>): Promise<T | undefined> => {
const item = source.next()
return (await item).value
}