diff --git a/v2/client/client.go b/v2/client/client.go index 80051b95c..5a7e3380b 100644 --- a/v2/client/client.go +++ b/v2/client/client.go @@ -101,6 +101,7 @@ type ceClient struct { eventDefaulterFns []EventDefaulter pollGoroutines int blockingCallback bool + parallelGoroutines int ackMalformedEvent bool } @@ -238,6 +239,10 @@ func (c *ceClient) StartReceiver(ctx context.Context, fn interface{}) error { wg.Add(1) go func() { defer wg.Done() + var parallel chan struct{} + if c.parallelGoroutines > 0 { + parallel = make(chan struct{}, c.parallelGoroutines) + } for { var msg binding.Message var respFn protocol.ResponseFn @@ -265,16 +270,33 @@ func (c *ceClient) StartReceiver(ctx context.Context, fn interface{}) error { } } + // if the blockingCallback option is set, we need to wait for the callback to finish if c.blockingCallback { + // Wait for the callback to finish before receiving the next message. callback() - } else { - // Do not block on the invoker. + continue + } + + // if the parallelGoroutines option is set, we need to limit the number of goroutines + if parallel != nil { wg.Add(1) + parallel <- struct{}{} go func() { - defer wg.Done() + defer func() { + <-parallel + wg.Done() + }() callback() }() + continue } + + // otherwise, we can just call the callback directly in a new goroutine + wg.Add(1) + go func() { + defer wg.Done() + callback() + }() } }() } diff --git a/v2/client/options.go b/v2/client/options.go index 44394be34..37d46ce8f 100644 --- a/v2/client/options.go +++ b/v2/client/options.go @@ -7,6 +7,7 @@ package client import ( "context" + "errors" "fmt" "github.com/cloudevents/sdk-go/v2/binding" @@ -127,6 +128,23 @@ func WithBlockingCallback() Option { } } +// WithParallelGoroutines enables the callback function of the passed-in StartReceiver to execute asynchronously +// with a fixed number of goroutines. +// WithBlockingCallback takes precedence over this configuration. +func WithParallelGoroutines(num int) Option { + return func(i interface{}) error { + if num <= 0 { + return errors.New("number of parallel goroutines must be greater than 0") + } + + if c, ok := i.(*ceClient); ok { + c.parallelGoroutines = num + } + + return nil + } +} + // WithAckMalformedevents causes malformed events received within StartReceiver to be acknowledged // rather than being permanently not-acknowledged. This can be useful when a protocol does not // provide a responder implementation and would otherwise cause the receiver to be partially or