Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 28 additions & 15 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,30 @@ import (
"go.opentelemetry.io/otel"
)

type Consumer interface {
type Consumer[In any] interface {
ConsumeOnce()
Consume()
Work()

setErrorQueue(err job.Queue[error])
setWorkerPoolSize(int)
setSpanName(string)
Input() job.Queue[In]
Error() job.Queue[error]

WithInput(job.Queue[In]) Consumer[In]
WithErrorQueue(job.Queue[error]) Consumer[In]
WithWorkerPoolSize(int) Consumer[In]
WithSpanName(string) Consumer[In]
}

func New[In any](name string, in job.Queue[In], fn func(context.Context, In) error, opts ...Option) (Consumer, job.Queue[error]) {
func New[In any](name string, fn func(context.Context, In) error) Consumer[In] {
c := &consumer[In]{
name: name,
spanName: "consume job",
Comment thread
imperfect-fourth marked this conversation as resolved.
fn: fn,
in: in,
in: make(job.Queue[In]),
err: make(job.Queue[error]),
workerPoolSize: 1,
}
for _, opt := range opts {
opt(c)
}
return c, c.err
return c
}

type consumer[In any] struct {
Expand Down Expand Up @@ -74,14 +75,26 @@ func (c consumer[In]) Work() {
c.Consume()
}

func (c *consumer[In]) setErrorQueue(err job.Queue[error]) {
c.err = err
func (c *consumer[In]) Input() job.Queue[In] {
return c.in
}
func (c *consumer[In]) Error() job.Queue[error] {
return c.err
}

func (c *consumer[In]) setWorkerPoolSize(n int) {
func (c *consumer[In]) WithInput(in job.Queue[In]) Consumer[In] {
c.in = in
return c
}
func (c *consumer[In]) WithErrorQueue(err job.Queue[error]) Consumer[In] {
c.err = err
return c
}
func (c *consumer[In]) WithWorkerPoolSize(n int) Consumer[In] {
c.workerPoolSize = n
return c
}

func (c *consumer[In]) setSpanName(name string) {
func (c *consumer[In]) WithSpanName(name string) Consumer[In] {
c.spanName = name
return c
}
23 changes: 0 additions & 23 deletions consumer/consumer_opts.go

This file was deleted.

29 changes: 19 additions & 10 deletions producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,25 @@ import (
"go.opentelemetry.io/otel/trace"
)

type Producer interface {
type Producer[Out any] interface {
ProduceOnce()
Produce()
Work()
Output() job.Queue[Out]
Error() job.Queue[error]

setCooldown(time.Duration)
setErrorQueue(job.Queue[error])
WithCooldown(time.Duration) Producer[Out]
WithErrorQueue(job.Queue[error]) Producer[Out]
}

func New[Out any](name string, fn func() ([]Out, error), opts ...Option) (Producer, job.Queue[Out], job.Queue[error]) {
func New[Out any](name string, fn func() ([]Out, error)) Producer[Out] {
p := &producer[Out]{
name: name,
fn: fn,
out: make(chan job.Job[Out]),
err: make(chan job.Job[error]),
}
for _, opt := range opts {
opt(p)
}
return p, p.out, p.err
return p
}

type producer[Out any] struct {
Expand Down Expand Up @@ -79,12 +78,22 @@ func (p producer[Out]) Work() {
p.Produce()
}

func (p *producer[Out]) setCooldown(t time.Duration) {
func (p producer[Out]) Output() job.Queue[Out] {
return p.out
}

func (p producer[Out]) Error() job.Queue[error] {
return p.err
}

func (p *producer[Out]) WithCooldown(t time.Duration) Producer[Out] {
p.cooldown = t
return p
}

func (p *producer[Out]) setErrorQueue(err job.Queue[error]) {
func (p *producer[Out]) WithErrorQueue(err job.Queue[error]) Producer[Out] {
p.err = err
return p
}

type ContextualOutput interface {
Expand Down
21 changes: 0 additions & 21 deletions producer/producer_opts.go

This file was deleted.

24 changes: 12 additions & 12 deletions test/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ import (
"time"

"github.com/imperfect-fourth/work"
"github.com/imperfect-fourth/work/consumer"
"github.com/imperfect-fourth/work/producer"
"github.com/imperfect-fourth/work/transformer"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
Expand Down Expand Up @@ -54,23 +51,26 @@ func main() {
}
}()

producer, producedIntChan, _ := work.NewProducer(
producer := work.NewProducer(
"int producer",
producerFn,
producer.WithCooldown(10*time.Second),
)
).WithCooldown(10 * time.Second)
go producer.Work()

transformer, transformedIntChan, _ := work.NewTransformer(
transformer := work.NewTransformer(
"int transformer",
producedIntChan,
transformerFn,
transformer.WithWorkerPoolSize(1),
transformer.WithSpanName("sleeping one and adding one"),
)
).
WithInput(producer.Output()).
WithWorkerPoolSize(1).
WithSpanName("sleeping one and adding one")

go transformer.Work()

c, _ := work.NewConsumer("int consumer", transformedIntChan, consumerFn, consumer.WithSpanName("sleeping one and printing"))
c := work.NewConsumer("int consumer", consumerFn).
WithInput(transformer.Output()).
WithSpanName("sleeping one and printing")

c.Work()
}

Expand Down
47 changes: 32 additions & 15 deletions transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,32 @@ import (
"go.opentelemetry.io/otel"
)

type Transformer interface {
type Transformer[In, Out any] interface {
TransformOnce()
Transform()
Work()

setErrorQueue(job.Queue[error])
setWorkerPoolSize(int)
setSpanName(string)
Input() job.Queue[In]
Output() job.Queue[Out]
Error() job.Queue[error]

WithInput(job.Queue[In]) Transformer[In, Out]
WithErrorQueue(job.Queue[error]) Transformer[In, Out]
WithWorkerPoolSize(int) Transformer[In, Out]
WithSpanName(string) Transformer[In, Out]
}

func New[In any, Out any](name string, in job.Queue[In], fn func(context.Context, In) (Out, error), opts ...Option) (Transformer, job.Queue[Out], job.Queue[error]) {
func New[In any, Out any](name string, fn func(context.Context, In) (Out, error)) Transformer[In, Out] {
t := &transformer[In, Out]{
name: name,
spanName: "transform job",
fn: fn,
in: in,
in: make(job.Queue[In]),
out: make(job.Queue[Out]),
err: make(job.Queue[error]),
workerPoolSize: 1,
}
for _, opt := range opts {
opt(t)
}
return t, t.out, t.err
return t
}

type transformer[In any, Out any] struct {
Expand Down Expand Up @@ -80,14 +82,29 @@ func (t transformer[In, Out]) Work() {
t.Transform()
}

func (t *transformer[In, Out]) setErrorQueue(err job.Queue[error]) {
t.err = err
func (t *transformer[In, Out]) Input() job.Queue[In] {
return t.in
}
func (t *transformer[In, Out]) Output() job.Queue[Out] {
return t.out
}
func (t *transformer[In, Out]) Error() job.Queue[error] {
return t.err
}

func (t *transformer[In, Out]) setWorkerPoolSize(n int) {
func (t *transformer[In, Out]) WithInput(in job.Queue[In]) Transformer[In, Out] {
t.in = in
return t
}
func (t *transformer[In, Out]) WithErrorQueue(err job.Queue[error]) Transformer[In, Out] {
t.err = err
return t
}
func (t *transformer[In, Out]) WithWorkerPoolSize(n int) Transformer[In, Out] {
t.workerPoolSize = n
return t
}

func (t *transformer[In, Out]) setSpanName(name string) {
func (t *transformer[In, Out]) WithSpanName(name string) Transformer[In, Out] {
t.spanName = name
return t
}
23 changes: 0 additions & 23 deletions transformer/transformer_opts.go

This file was deleted.

13 changes: 6 additions & 7 deletions work.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,18 @@ import (
"context"

"github.com/imperfect-fourth/work/consumer"
"github.com/imperfect-fourth/work/job"
"github.com/imperfect-fourth/work/producer"
"github.com/imperfect-fourth/work/transformer"
)

func NewProducer[Out any](name string, fn func() ([]Out, error), opts ...producer.Option) (producer.Producer, job.Queue[Out], job.Queue[error]) {
return producer.New(name, fn, opts...)
func NewProducer[Out any](name string, fn func() ([]Out, error)) producer.Producer[Out] {
return producer.New(name, fn)
}

func NewTransformer[In any, Out any](name string, in job.Queue[In], fn func(context.Context, In) (Out, error), opts ...transformer.Option) (transformer.Transformer, job.Queue[Out], job.Queue[error]) {
return transformer.New(name, in, fn, opts...)
func NewTransformer[In any, Out any](name string, fn func(context.Context, In) (Out, error)) transformer.Transformer[In, Out] {
return transformer.New(name, fn)
}

func NewConsumer[In any](name string, in job.Queue[In], fn func(context.Context, In) error, opts ...consumer.Option) (consumer.Consumer, job.Queue[error]) {
return consumer.New(name, in, fn, opts...)
func NewConsumer[In any](name string, fn func(context.Context, In) error) consumer.Consumer[In] {
return consumer.New(name, fn)
}