diff --git a/README.md b/README.md index 2ef9dfa..646aa73 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ # Go channels at horizontal scale [![Build Status](https://travis-ci.org/matryer/vice.svg?branch=master)](https://travis-ci.org/matryer/vice) -* Use Go channels transparently over a [messaging queue technology of your choice](https://github.com/matryer/vice/tree/master/queues) (Currently [NATS](http://nats.io), [Redis](http://redis.io) or [NSQ](http://nsq.io), [Amazon SQS](https://aws.amazon.com/sqs/)) +* Use Go channels transparently over a [messaging queue technology of your choice](https://github.com/matryer/vice/tree/master/queues) (Currently [NATS](http://nats.io), [Redis](http://redis.io), [NSQ](http://nsq.io), [Amazon SQS](https://aws.amazon.com/sqs/), or [Emitter](https://emitter.io)) * Swap `vice.Transport` to change underlying queueing technologies transparently * Write idiomatic Go code instead of learning queue specific APIs * Develop against in-memory implementation before putting it into the wild diff --git a/docs/writing-transports.md b/docs/writing-transports.md index dcfbd49..8e048c0 100644 --- a/docs/writing-transports.md +++ b/docs/writing-transports.md @@ -15,7 +15,7 @@ package my_transport import ( "testing" - "github.com/matryer/vice/vicetest" + "github.com/matryer/vice/v2/vicetest" ) func TestTransport(t *testing.T) { diff --git a/example/emitter/README.md b/example/emitter/README.md new file mode 100644 index 0000000..4cec8db --- /dev/null +++ b/example/emitter/README.md @@ -0,0 +1,24 @@ +# Emitter: Example Vice service + +## Running + +1. In a shell, install Emitter [https://emitter.io/download/](https://emitter.io/download/) + - `go install github.com/emitter-io/emitter@latest && emitter` + - The first run of emitter will create an `emitter.conf` file and + print a new license key and a new secret key to stdout. +2. In the same shell, save the new license in an environment variable: + - `export EMITTER_LICENSE=...` +3. In the same shell, start `emitter` again: + - `emitter |& tee emitter.log` +4. In a new shell, save the new secret key in an environment variable: + - `export EMITTER_SECRET_KEY=...` + then navigate to `example/emitter/server` and execute it with `go run main.go` +5. In another new shell, save the new secret key in an environment variable: + - `export EMITTER_SECRET_KEY=...` + then navigate to `example/emitter/client` and execute it with `go run main.go` +6. Type in names into the client, and see the responses coming from the service + +Note that there is no reason why you can start up _multiple_ servers and +_multiple_ clients, but realize that a message from a client will only be +received (and responded to) by one of the running servers and likewise the +response will be randomly received by one of the clients. diff --git a/example/emitter/client/main.go b/example/emitter/client/main.go new file mode 100644 index 0000000..f4f8856 --- /dev/null +++ b/example/emitter/client/main.go @@ -0,0 +1,58 @@ +// client is a simple Greeter client that uses the emitter transport. + +package main + +import ( + "bufio" + "context" + "fmt" + "log" + "os" + "os/signal" + + "github.com/matryer/vice/v2/queues/emitter" +) + +func main() { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + defer func() { + signal.Stop(c) + cancel() + }() + go func() { + select { + case <-c: + cancel() + case <-ctx.Done(): + } + }() + transport := emitter.New() + names := transport.Send("names") + greetings := transport.Receive("greetings") + go func() { + for greeting := range greetings { + fmt.Println(string(greeting)) + } + }() + go func() { + fmt.Println("Type some names to send through the |names| channel:") + s := bufio.NewScanner(os.Stdin) + for s.Scan() { + b := s.Bytes() + if len(b) == 0 { + continue + } + names <- b + } + if err := s.Err(); err != nil { + log.Println(err) + } + }() + <-ctx.Done() + transport.Stop() + <-transport.Done() + log.Println("transport stopped") +} diff --git a/example/emitter/server/main.go b/example/emitter/server/main.go new file mode 100644 index 0000000..6d99fbc --- /dev/null +++ b/example/emitter/server/main.go @@ -0,0 +1,52 @@ +// server is a simple Greeter server that uses the emitter transport. + +package main + +import ( + "context" + "log" + "time" + + "github.com/matryer/vice/v2/queues/emitter" +) + +// To run this, install emitter and start it with the "emitter" command. +// Run this program: +// $ export EMITTER_SECRET_KEY=... +// $ go run main.go +// Then use the "example/emitter/client" program to send names to the server. + +const runDuration = 1 * time.Minute + +// Greeter is a service that greets people. +func Greeter(ctx context.Context, names <-chan []byte, greetings chan<- []byte, errs <-chan error) { + for { + select { + case <-ctx.Done(): + log.Println("finished") + return + case err := <-errs: + log.Fatalf("an error occurred: %v", err) + case name := <-names: + greeting := "Hello " + string(name) + greetings <- []byte(greeting) + log.Printf("Sent greeting: %v", greeting) + } + } +} + +func main() { + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, runDuration) + defer cancel() + transport := emitter.New() + defer func() { + transport.Stop() + <-transport.Done() + }() + names := transport.Receive("names") + greetings := transport.Send("greetings") + log.Printf("Greeter server listening on 'names' channel and responding on 'greetings' channel...") + log.Printf("Note that this server will automatically shut down in %v", runDuration) + Greeter(ctx, names, greetings, transport.ErrChan()) +} diff --git a/example/greeter/client/main.go b/example/greeter/client/main.go index c7cdc6f..ddf9beb 100644 --- a/example/greeter/client/main.go +++ b/example/greeter/client/main.go @@ -8,7 +8,7 @@ import ( "os" "os/signal" - "github.com/matryer/vice/queues/nsq" + "github.com/matryer/vice/v2/queues/nsq" ) func main() { diff --git a/example/greeter/service/main.go b/example/greeter/service/main.go index dc0e655..fd88d98 100644 --- a/example/greeter/service/main.go +++ b/example/greeter/service/main.go @@ -5,7 +5,7 @@ import ( "log" "time" - "github.com/matryer/vice/queues/nsq" + "github.com/matryer/vice/v2/queues/nsq" ) // To run this, install NSQ and start it with nsqd command. diff --git a/go.mod b/go.mod index 09af8b2..a7dee50 100644 --- a/go.mod +++ b/go.mod @@ -4,16 +4,16 @@ go 1.17 require ( github.com/aws/aws-sdk-go v1.44.16 + github.com/emitter-io/go/v2 v2.0.9 github.com/go-redis/redis v6.15.9+incompatible github.com/gofrs/uuid v4.2.0+incompatible github.com/matryer/is v1.4.0 - github.com/matryer/vice v1.0.0 github.com/maxbrunsfeld/counterfeiter/v6 v6.5.0 github.com/nats-io/nats.go v1.15.0 github.com/nats-io/stan.go v0.10.2 github.com/nsqio/go-nsq v1.1.0 github.com/streadway/amqp v1.0.0 - github.com/stretchr/testify v1.7.1 + github.com/stretchr/testify v1.8.0 golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 honnef.co/go/tools v0.3.2 ) @@ -21,8 +21,10 @@ require ( require ( github.com/BurntSushi/toml v0.4.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/eclipse/paho.mqtt.golang v1.4.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v0.0.1 // indirect + github.com/gorilla/websocket v1.4.2 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/kr/pretty v0.3.0 // indirect github.com/nats-io/nats-server/v2 v2.8.2 // indirect @@ -35,8 +37,10 @@ require ( golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd // indirect golang.org/x/exp/typeparams v0.0.0-20220218215828-6cf2b201936e // indirect golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect + golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect golang.org/x/tools v0.1.11-0.20220513221640-090b14e8501f // indirect google.golang.org/protobuf v1.28.0 // indirect - gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index f499d38..9fa9800 100644 --- a/go.sum +++ b/go.sum @@ -15,6 +15,11 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts= +github.com/eclipse/paho.mqtt.golang v1.4.1 h1:tUSpviiL5G3P9SZZJPC4ZULZJsxQKXxfENpMvdbAXAI= +github.com/eclipse/paho.mqtt.golang v1.4.1/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA= +github.com/emitter-io/go/v2 v2.0.9 h1:qA+cnG3kS2uLzo5ETFY6zbHBGl+FmNj0cGf3da7foA4= +github.com/emitter-io/go/v2 v2.0.9/go.mod h1:St++epE1u/6ueCVw47xhu4shpkGNxKRVtkWv4Xi33mg= github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= @@ -46,6 +51,8 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-hclog v0.9.1/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ= github.com/hashicorp/go-hclog v1.1.0 h1:QsGcniKx5/LuX2eYoeL+Np3UKYPNaN7YKpTh29h8rbw= @@ -146,11 +153,13 @@ github.com/sclevine/spec v1.4.0/go.mod h1:LvpgJaFyvQzRvc1kaDs0bulYwzC70PbiYjC4Qn github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -180,6 +189,8 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/net v0.0.0-20200513185701-a91f0712d120/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= @@ -195,6 +206,7 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -267,7 +279,8 @@ gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.3.2 h1:ytYb4rOqyp1TSa2EPvNVwtPQJctSELKaMyLfqNP4+34= honnef.co/go/tools v0.3.2/go.mod h1:jzwdWgg7Jdq75wlfblQxO4neNaFFSvgc1tD5Wv8U0Yw= diff --git a/queues/emitter/README.md b/queues/emitter/README.md new file mode 100644 index 0000000..9fafe11 --- /dev/null +++ b/queues/emitter/README.md @@ -0,0 +1,8 @@ +# Emitter + +[Emitter](https://emitter.io) is a real-time communication service +for connecting online devices. The Publish-Subscribe messaging API +is built for speed and security. + +Kelindar has made a number of excellent videos about Emitter on +his [YouTube channel](https://www.youtube.com/c/Kelindar/videos). diff --git a/queues/emitter/client.go b/queues/emitter/client.go new file mode 100644 index 0000000..eb79d19 --- /dev/null +++ b/queues/emitter/client.go @@ -0,0 +1,23 @@ +package emitter + +import ( + "errors" + "fmt" + + eio "github.com/emitter-io/go/v2" +) + +func (t *Transport) newClient() (*eio.Client, error) { + if t.emitterAddress == "" { + return nil, errors.New("missing emitter address") + } + if t.secretKey == "" { + return nil, errors.New("missing emitter secret key") + } + + c := eio.NewClient(eio.WithBrokers(t.emitterAddress), eio.WithAutoReconnect(true)) + if err := c.Connect(); err != nil { + return nil, fmt.Errorf("emitter.Connect(%q): %w - is the emitter service running?", t.emitterAddress, err) + } + return c, nil +} diff --git a/queues/emitter/emitter.go b/queues/emitter/emitter.go new file mode 100644 index 0000000..1866cc4 --- /dev/null +++ b/queues/emitter/emitter.go @@ -0,0 +1,152 @@ +// Package emitter provides a Vice implementation for Emitter.io. +package emitter + +import ( + "log" + "os" + "sync" + + "github.com/matryer/vice/v2" +) + +const ( + defaultAddress = "tcp://127.0.0.1:8080" + emitterEnvVar = "EMITTER_SECRET_KEY" +) + +// Transport is a vice.Transport for Emitter.io. +type Transport struct { + secretKey string + sendChans map[string]chan []byte + receiveChans map[string]chan []byte + + sync.Mutex + wg sync.WaitGroup + + errChan chan error + stopchan chan struct{} + stopPubChan chan struct{} + stopSubChan chan struct{} + + sharedGroup string + emitterAddress string + ttl int +} + +// make sure Transport satisfies vice.Transport interface. +var _ vice.Transport = (*Transport)(nil) + +// Option is a function that modifies a Transport. +type Option func(*Transport) + +// New returns a new Transport. +// The environment variable "EMITTER_SECRET_KEY" is used for the secret key. +// The default emitter address is "tcp://127.0.0.1:8080". +func New(opts ...Option) *Transport { + t := &Transport{ + secretKey: os.Getenv(emitterEnvVar), + sendChans: make(map[string]chan []byte), + receiveChans: make(map[string]chan []byte), + errChan: make(chan error, 10), + stopchan: make(chan struct{}), + stopPubChan: make(chan struct{}), + stopSubChan: make(chan struct{}), + sharedGroup: "vice", + emitterAddress: defaultAddress, + } + + for _, o := range opts { + o(t) + } + + if t.secretKey == "" { + log.Printf("WARNING: env var %q missing, calls will fail.", emitterEnvVar) + } + + return t +} + +// WithAddress overrides the default emitter.io address. +func WithAddress(address string) Option { + return func(t *Transport) { t.emitterAddress = address } +} + +// WithSecretKey overrides the emitter secret key. +// The default is to use the environment variable "EMITTER_SECRET_KEY". +func WithSecretKey(secretKey string) Option { + return func(t *Transport) { t.secretKey = secretKey } +} + +// WithSharedGroup overrides the default MQTT shared group name of "vice". +// Each individual shared group delivers its message to exactly one subscriber. +// +// If you instead want all subscribers to receive a copy of each message, +// then you can disable the use of shared groups by setting the groupName to "". +func WithSharedGroup(groupName string) Option { + return func(t *Transport) { t.sharedGroup = groupName } +} + +// WithTTL sets the ttl (time to live) value (in seconds) for each message. +// The default value is 0 which means it does not expire (although the +// default is that no messages are saved). +func WithTTL(ttl int) Option { + return func(t *Transport) { t.ttl = ttl } +} + +// Receive gets a channel on which to receive messages +// with the specified name. +func (t *Transport) Receive(name string) <-chan []byte { + t.Lock() + defer t.Unlock() + + ch, ok := t.receiveChans[name] + if ok { + return ch + } + + ch, err := t.makeSubscriber(name) + if err != nil { + t.errChan <- &vice.Err{Name: name, Err: err} + return make(chan []byte) + } + + t.receiveChans[name] = ch + return ch +} + +// Send gets a channel on which messages with the +// specified name may be sent. +func (t *Transport) Send(name string) chan<- []byte { + t.Lock() + defer t.Unlock() + + ch, ok := t.sendChans[name] + if ok { + return ch + } + + ch, err := t.makePublisher(name) + if err != nil { + t.errChan <- &vice.Err{Name: name, Err: err} + return make(chan []byte) + } + t.sendChans[name] = ch + return ch +} + +// ErrChan gets a channel through which errors +// are sent. +func (t *Transport) ErrChan() <-chan error { return t.errChan } + +// Stop stops the transport. The channel returned from Done() will be closed +// when the transport has stopped. +func (t *Transport) Stop() { + close(t.stopSubChan) + close(t.stopPubChan) + t.wg.Wait() + close(t.stopchan) +} + +// Done gets a channel which is closed when the +// transport has successfully stopped. +func (t *Transport) Done() chan struct{} { return t.stopchan } diff --git a/queues/emitter/emitter_test.go b/queues/emitter/emitter_test.go new file mode 100644 index 0000000..3289c8b --- /dev/null +++ b/queues/emitter/emitter_test.go @@ -0,0 +1,13 @@ +package emitter + +import ( + "testing" + + "github.com/matryer/vice/v2" + "github.com/matryer/vice/v2/vicetest" +) + +func TestTransport(t *testing.T) { + f := func() vice.Transport { return New() } + vicetest.Transport(t, f) +} diff --git a/queues/emitter/publisher.go b/queues/emitter/publisher.go new file mode 100644 index 0000000..ea58692 --- /dev/null +++ b/queues/emitter/publisher.go @@ -0,0 +1,53 @@ +package emitter + +import ( + "fmt" + "strings" + "time" + + eio "github.com/emitter-io/go/v2" + "github.com/matryer/vice/v2" +) + +func (t *Transport) makePublisher(name string) (chan []byte, error) { + c, err := t.newClient() + if err != nil { + return nil, err + } + + // Note that the $share syntax is not used on the Publish side. + // See: https://youtu.be/Vl7iGKEQrTg for details. + channelName := name + if !strings.HasSuffix(channelName, "/") { + channelName += "/" // emitter channel names end with a slash. + } + + key, err := c.GenerateKey(t.secretKey, channelName, "w", t.ttl) + if err != nil { + return nil, fmt.Errorf("emitter.GenerateKey(%q,'w',%v): %w", channelName, t.ttl, err) + } + + ch := make(chan []byte) + t.wg.Add(1) + go func() { + defer t.wg.Done() + for { + select { + case <-t.stopPubChan: + // uncomment the following code if using buffered channel + /* + if len(ch) != 0 { + continue + } + */ + c.Disconnect(100 * time.Millisecond) + return + case msg := <-ch: + if err := c.Publish(key, name, msg, eio.WithoutEcho()); err != nil { + t.errChan <- &vice.Err{Message: msg, Name: name, Err: err} + } + } + } + }() + return ch, nil +} diff --git a/queues/emitter/subscriber.go b/queues/emitter/subscriber.go new file mode 100644 index 0000000..217ae2b --- /dev/null +++ b/queues/emitter/subscriber.go @@ -0,0 +1,60 @@ +package emitter + +import ( + "fmt" + "strings" + "time" + + eio "github.com/emitter-io/go/v2" +) + +func (t *Transport) makeSubscriber(name string) (chan []byte, error) { + c, err := t.newClient() + if err != nil { + return nil, err + } + + channelName := name + if t.sharedGroup != "" { + channelName = fmt.Sprintf("$share/%v/%v", t.sharedGroup, name) + } + if !strings.HasSuffix(channelName, "/") { + channelName += "/" // emitter channel names end with a slash. + } + + key, err := c.GenerateKey(t.secretKey, channelName, "r", t.ttl) + if err != nil { + return nil, fmt.Errorf("emitter.GenerateKey(%q,'r',%v): %w", channelName, t.ttl, err) + } + + msgs := make(chan []byte, 1024) + ch := make(chan []byte) + t.wg.Add(1) + go func() { + defer t.wg.Done() + for { + select { + case d := <-msgs: + ch <- d + case <-t.stopSubChan: + c.Disconnect(100 * time.Millisecond) + return + } + } + }() + f := func(_ *eio.Client, msg eio.Message) { + msgs <- msg.Payload() + } + + if t.sharedGroup != "" { + if err := c.SubscribeWithGroup(key, name, t.sharedGroup, f, eio.WithoutEcho()); err != nil { + return nil, fmt.Errorf("emitter.SubscribeWithGroup(%q): %w", name, err) + } + } else { + if err := c.Subscribe(key, name, f, eio.WithoutEcho()); err != nil { + return nil, fmt.Errorf("emitter.Subscribe(%q): %w", name, err) + } + } + + return ch, nil +} diff --git a/queues/nsq/nsq.go b/queues/nsq/nsq.go index ab690ce..ee156a1 100644 --- a/queues/nsq/nsq.go +++ b/queues/nsq/nsq.go @@ -5,8 +5,8 @@ import ( "sync" "time" - "github.com/matryer/vice/backoff" "github.com/matryer/vice/v2" + "github.com/matryer/vice/v2/backoff" "github.com/nsqio/go-nsq" )