Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

# Go channels at horizontal scale [![Build Status](https://travis-ci.org/matryer/vice.svg?branch=master)](https://travis-ci.org/matryer/vice)

* Use Go channels transparently over a [messaging queue technology of your choice](https://github.com/matryer/vice/tree/master/queues) (Currently [NATS](http://nats.io), [Redis](http://redis.io) or [NSQ](http://nsq.io), [Amazon SQS](https://aws.amazon.com/sqs/))
* Use Go channels transparently over a [messaging queue technology of your choice](https://github.com/matryer/vice/tree/master/queues) (Currently [NATS](http://nats.io), [Redis](http://redis.io), [NSQ](http://nsq.io), [Amazon SQS](https://aws.amazon.com/sqs/), or [Emitter](https://emitter.io))
* Swap `vice.Transport` to change underlying queueing technologies transparently
* Write idiomatic Go code instead of learning queue specific APIs
* Develop against in-memory implementation before putting it into the wild
Expand Down
2 changes: 1 addition & 1 deletion docs/writing-transports.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ package my_transport

import (
"testing"
"github.com/matryer/vice/vicetest"
"github.com/matryer/vice/v2/vicetest"
)

func TestTransport(t *testing.T) {
Expand Down
24 changes: 24 additions & 0 deletions example/emitter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Emitter: Example Vice service

## Running

1. In a shell, install Emitter [https://emitter.io/download/](https://emitter.io/download/)
- `go install github.com/emitter-io/emitter@latest && emitter`
- The first run of emitter will create an `emitter.conf` file and
print a new license key and a new secret key to stdout.
2. In the same shell, save the new license in an environment variable:
- `export EMITTER_LICENSE=...`
3. In the same shell, start `emitter` again:
- `emitter |& tee emitter.log`
4. In a new shell, save the new secret key in an environment variable:
- `export EMITTER_SECRET_KEY=...`
then navigate to `example/emitter/server` and execute it with `go run main.go`
5. In another new shell, save the new secret key in an environment variable:
- `export EMITTER_SECRET_KEY=...`
then navigate to `example/emitter/client` and execute it with `go run main.go`
6. Type in names into the client, and see the responses coming from the service

Note that there is no reason why you can start up _multiple_ servers and
_multiple_ clients, but realize that a message from a client will only be
received (and responded to) by one of the running servers and likewise the
response will be randomly received by one of the clients.
58 changes: 58 additions & 0 deletions example/emitter/client/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// client is a simple Greeter client that uses the emitter transport.

package main

import (
"bufio"
"context"
"fmt"
"log"
"os"
"os/signal"

"github.com/matryer/vice/v2/queues/emitter"
)

func main() {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
defer func() {
signal.Stop(c)
cancel()
}()
go func() {
select {
case <-c:
cancel()
case <-ctx.Done():
}
}()
transport := emitter.New()
names := transport.Send("names")
greetings := transport.Receive("greetings")
go func() {
for greeting := range greetings {
fmt.Println(string(greeting))
}
}()
go func() {
fmt.Println("Type some names to send through the |names| channel:")
s := bufio.NewScanner(os.Stdin)
for s.Scan() {
b := s.Bytes()
if len(b) == 0 {
continue
}
names <- b
}
if err := s.Err(); err != nil {
log.Println(err)
}
}()
<-ctx.Done()
transport.Stop()
<-transport.Done()
log.Println("transport stopped")
}
52 changes: 52 additions & 0 deletions example/emitter/server/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// server is a simple Greeter server that uses the emitter transport.

package main

import (
"context"
"log"
"time"

"github.com/matryer/vice/v2/queues/emitter"
)

// To run this, install emitter and start it with the "emitter" command.
// Run this program:
// $ export EMITTER_SECRET_KEY=...
// $ go run main.go
// Then use the "example/emitter/client" program to send names to the server.

const runDuration = 1 * time.Minute

// Greeter is a service that greets people.
func Greeter(ctx context.Context, names <-chan []byte, greetings chan<- []byte, errs <-chan error) {
for {
select {
case <-ctx.Done():
log.Println("finished")
return
case err := <-errs:
log.Fatalf("an error occurred: %v", err)
case name := <-names:
greeting := "Hello " + string(name)
greetings <- []byte(greeting)
log.Printf("Sent greeting: %v", greeting)
}
}
}

func main() {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, runDuration)
defer cancel()
transport := emitter.New()
defer func() {
transport.Stop()
<-transport.Done()
}()
names := transport.Receive("names")
greetings := transport.Send("greetings")
log.Printf("Greeter server listening on 'names' channel and responding on 'greetings' channel...")
log.Printf("Note that this server will automatically shut down in %v", runDuration)
Greeter(ctx, names, greetings, transport.ErrChan())
}
2 changes: 1 addition & 1 deletion example/greeter/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"os"
"os/signal"

"github.com/matryer/vice/queues/nsq"
"github.com/matryer/vice/v2/queues/nsq"
)

func main() {
Expand Down
2 changes: 1 addition & 1 deletion example/greeter/service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"log"
"time"

"github.com/matryer/vice/queues/nsq"
"github.com/matryer/vice/v2/queues/nsq"
)

// To run this, install NSQ and start it with nsqd command.
Expand Down
10 changes: 7 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,27 @@ go 1.17

require (
github.com/aws/aws-sdk-go v1.44.16
github.com/emitter-io/go/v2 v2.0.9
github.com/go-redis/redis v6.15.9+incompatible
github.com/gofrs/uuid v4.2.0+incompatible
github.com/matryer/is v1.4.0
github.com/matryer/vice v1.0.0
github.com/maxbrunsfeld/counterfeiter/v6 v6.5.0
github.com/nats-io/nats.go v1.15.0
github.com/nats-io/stan.go v0.10.2
github.com/nsqio/go-nsq v1.1.0
github.com/streadway/amqp v1.0.0
github.com/stretchr/testify v1.7.1
github.com/stretchr/testify v1.8.0
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616
honnef.co/go/tools v0.3.2
)

require (
github.com/BurntSushi/toml v0.4.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/eclipse/paho.mqtt.golang v1.4.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/nats-io/nats-server/v2 v2.8.2 // indirect
Expand All @@ -35,8 +37,10 @@ require (
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd // indirect
golang.org/x/exp/typeparams v0.0.0-20220218215828-6cf2b201936e // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect
golang.org/x/tools v0.1.11-0.20220513221640-090b14e8501f // indirect
google.golang.org/protobuf v1.28.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
17 changes: 15 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts=
github.com/eclipse/paho.mqtt.golang v1.4.1 h1:tUSpviiL5G3P9SZZJPC4ZULZJsxQKXxfENpMvdbAXAI=
github.com/eclipse/paho.mqtt.golang v1.4.1/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA=
github.com/emitter-io/go/v2 v2.0.9 h1:qA+cnG3kS2uLzo5ETFY6zbHBGl+FmNj0cGf3da7foA4=
github.com/emitter-io/go/v2 v2.0.9/go.mod h1:St++epE1u/6ueCVw47xhu4shpkGNxKRVtkWv4Xi33mg=
github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
Expand Down Expand Up @@ -46,6 +51,8 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-hclog v0.9.1/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ=
github.com/hashicorp/go-hclog v1.1.0 h1:QsGcniKx5/LuX2eYoeL+Np3UKYPNaN7YKpTh29h8rbw=
Expand Down Expand Up @@ -146,11 +153,13 @@ github.com/sclevine/spec v1.4.0/go.mod h1:LvpgJaFyvQzRvc1kaDs0bulYwzC70PbiYjC4Qn
github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo=
github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down Expand Up @@ -180,6 +189,8 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200513185701-a91f0712d120/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
Expand All @@ -195,6 +206,7 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down Expand Up @@ -267,7 +279,8 @@ gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.3.2 h1:ytYb4rOqyp1TSa2EPvNVwtPQJctSELKaMyLfqNP4+34=
honnef.co/go/tools v0.3.2/go.mod h1:jzwdWgg7Jdq75wlfblQxO4neNaFFSvgc1tD5Wv8U0Yw=
8 changes: 8 additions & 0 deletions queues/emitter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Emitter

[Emitter](https://emitter.io) is a real-time communication service
for connecting online devices. The Publish-Subscribe messaging API
is built for speed and security.

Kelindar has made a number of excellent videos about Emitter on
his [YouTube channel](https://www.youtube.com/c/Kelindar/videos).
23 changes: 23 additions & 0 deletions queues/emitter/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package emitter

import (
"errors"
"fmt"

eio "github.com/emitter-io/go/v2"
)

func (t *Transport) newClient() (*eio.Client, error) {
if t.emitterAddress == "" {
return nil, errors.New("missing emitter address")
}
if t.secretKey == "" {
return nil, errors.New("missing emitter secret key")
}

c := eio.NewClient(eio.WithBrokers(t.emitterAddress), eio.WithAutoReconnect(true))
if err := c.Connect(); err != nil {
return nil, fmt.Errorf("emitter.Connect(%q): %w - is the emitter service running?", t.emitterAddress, err)
}
return c, nil
}
Loading