diff --git a/.gitignore b/.gitignore index 840a6fd1e..38f95cd86 100644 --- a/.gitignore +++ b/.gitignore @@ -21,6 +21,7 @@ bazel-* # Output of the go coverage tool *.out coverage.txt +coverage.tmp # Jekyll stuff _site/ diff --git a/binding/format/protobuf/v2/go.mod b/binding/format/protobuf/v2/go.mod index b40901038..350b7d444 100644 --- a/binding/format/protobuf/v2/go.mod +++ b/binding/format/protobuf/v2/go.mod @@ -4,14 +4,15 @@ go 1.18 require ( github.com/cloudevents/sdk-go/v2 v2.5.0 - github.com/golang/protobuf v1.5.0 + github.com/golang/protobuf v1.5.3 github.com/stretchr/testify v1.8.0 - google.golang.org/protobuf v1.26.0 + google.golang.org/grpc v1.59.0 + google.golang.org/protobuf v1.31.0 ) require ( github.com/davecgh/go-spew v1.1.1 // indirect - github.com/google/uuid v1.1.1 // indirect + github.com/google/uuid v1.3.1 // indirect github.com/json-iterator/go v1.1.10 // indirect github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 // indirect @@ -19,6 +20,10 @@ require ( go.uber.org/atomic v1.4.0 // indirect go.uber.org/multierr v1.1.0 // indirect go.uber.org/zap v1.10.0 // indirect + golang.org/x/net v0.14.0 // indirect + golang.org/x/sys v0.11.0 // indirect + golang.org/x/text v0.12.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/binding/format/protobuf/v2/go.sum b/binding/format/protobuf/v2/go.sum index ccf8bc8e3..1f5f72d31 100644 --- a/binding/format/protobuf/v2/go.sum +++ b/binding/format/protobuf/v2/go.sum @@ -1,13 +1,14 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/golang/protobuf v1.5.0 h1:LUVKkCeviFUMKqHa4tXIIij/lbhnMbP7Fn5wKdKkRh4= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= -github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= +github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -32,12 +33,22 @@ go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14= +golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= +golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= +golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc= +golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac h1:7zkz7BUtwNFFqcowJ+RIgu2MaV/MapERkDIy+mwPyjs= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d h1:uvYuEyMHKNt+lT4K3bN6fGswmK8qSvcreM3BwjDh+y4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d/go.mod h1:+Bk1OCOj40wS2hwAMA+aCW9ypzm63QTBBHp6lQ3p+9M= +google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= +google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/binding/format/protobuf/v2/pb/cloudevent.pb.go b/binding/format/protobuf/v2/pb/cloudevent.pb.go index 87d71c768..de0765c0e 100644 --- a/binding/format/protobuf/v2/pb/cloudevent.pb.go +++ b/binding/format/protobuf/v2/pb/cloudevent.pb.go @@ -1,16 +1,17 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.28.1 -// protoc v4.24.4 +// protoc-gen-go v1.31.0 +// protoc v3.12.4 // source: cloudevent.proto package pb import ( + any1 "github.com/golang/protobuf/ptypes/any" + empty "github.com/golang/protobuf/ptypes/empty" + timestamp "github.com/golang/protobuf/ptypes/timestamp" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" - anypb "google.golang.org/protobuf/types/known/anypb" - timestamppb "google.golang.org/protobuf/types/known/timestamppb" reflect "reflect" sync "sync" ) @@ -137,7 +138,7 @@ func (x *CloudEvent) GetTextData() string { return "" } -func (x *CloudEvent) GetProtoData() *anypb.Any { +func (x *CloudEvent) GetProtoData() *any1.Any { if x, ok := x.GetData().(*CloudEvent_ProtoData); ok { return x.ProtoData } @@ -165,7 +166,7 @@ type CloudEvent_ProtoData struct { // type. The datacontenttype attribute should be set to // application/protobuf and the dataschema attribute set to the message // type. - ProtoData *anypb.Any `protobuf:"bytes,8,opt,name=proto_data,json=protoData,proto3,oneof"` + ProtoData *any1.Any `protobuf:"bytes,8,opt,name=proto_data,json=protoData,proto3,oneof"` } func (*CloudEvent_BinaryData) isCloudEvent_Data() {} @@ -276,7 +277,7 @@ func (x *CloudEventAttributeValue) GetCeUriRef() string { return "" } -func (x *CloudEventAttributeValue) GetCeTimestamp() *timestamppb.Timestamp { +func (x *CloudEventAttributeValue) GetCeTimestamp() *timestamp.Timestamp { if x, ok := x.GetAttr().(*CloudEventAttributeValue_CeTimestamp); ok { return x.CeTimestamp } @@ -319,7 +320,7 @@ type CloudEventAttributeValue_CeUriRef struct { type CloudEventAttributeValue_CeTimestamp struct { // Timestamp value. - CeTimestamp *timestamppb.Timestamp `protobuf:"bytes,7,opt,name=ce_timestamp,json=ceTimestamp,proto3,oneof"` + CeTimestamp *timestamp.Timestamp `protobuf:"bytes,7,opt,name=ce_timestamp,json=ceTimestamp,proto3,oneof"` } func (*CloudEventAttributeValue_CeBoolean) isCloudEventAttributeValue_Attr() {} @@ -336,64 +337,192 @@ func (*CloudEventAttributeValue_CeUriRef) isCloudEventAttributeValue_Attr() {} func (*CloudEventAttributeValue_CeTimestamp) isCloudEventAttributeValue_Attr() {} +type PublishRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Required. The topic to which event should be published. + // Format is `myhome/groundfloor/livingroom/temperature`. + Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` + Event *CloudEvent `protobuf:"bytes,2,opt,name=event,proto3" json:"event,omitempty"` +} + +func (x *PublishRequest) Reset() { + *x = PublishRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_cloudevent_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PublishRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PublishRequest) ProtoMessage() {} + +func (x *PublishRequest) ProtoReflect() protoreflect.Message { + mi := &file_cloudevent_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PublishRequest.ProtoReflect.Descriptor instead. +func (*PublishRequest) Descriptor() ([]byte, []int) { + return file_cloudevent_proto_rawDescGZIP(), []int{2} +} + +func (x *PublishRequest) GetTopic() string { + if x != nil { + return x.Topic + } + return "" +} + +func (x *PublishRequest) GetEvent() *CloudEvent { + if x != nil { + return x.Event + } + return nil +} + +type SubscriptionRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Required. The topic from which event should be pulled. + // Format is `myhome/groundfloor/livingroom/temperature`. + Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` +} + +func (x *SubscriptionRequest) Reset() { + *x = SubscriptionRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_cloudevent_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SubscriptionRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubscriptionRequest) ProtoMessage() {} + +func (x *SubscriptionRequest) ProtoReflect() protoreflect.Message { + mi := &file_cloudevent_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubscriptionRequest.ProtoReflect.Descriptor instead. +func (*SubscriptionRequest) Descriptor() ([]byte, []int) { + return file_cloudevent_proto_rawDescGZIP(), []int{3} +} + +func (x *SubscriptionRequest) GetTopic() string { + if x != nil { + return x.Topic + } + return "" +} + var File_cloudevent_proto protoreflect.FileDescriptor var file_cloudevent_proto_rawDesc = []byte{ 0x0a, 0x10, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x11, 0x69, 0x6f, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x65, 0x76, 0x65, 0x6e, - 0x74, 0x73, 0x2e, 0x76, 0x31, 0x1a, 0x19, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x61, 0x6e, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, - 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x22, 0xa7, 0x03, 0x0a, 0x0a, 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, - 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, - 0x12, 0x16, 0x0a, 0x06, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x06, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x73, 0x70, 0x65, 0x63, - 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, - 0x73, 0x70, 0x65, 0x63, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x74, - 0x79, 0x70, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, - 0x4d, 0x0a, 0x0a, 0x61, 0x74, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x73, 0x18, 0x05, 0x20, - 0x03, 0x28, 0x0b, 0x32, 0x2d, 0x2e, 0x69, 0x6f, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x65, 0x76, - 0x65, 0x6e, 0x74, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x45, 0x76, 0x65, - 0x6e, 0x74, 0x2e, 0x41, 0x74, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x73, 0x45, 0x6e, 0x74, - 0x72, 0x79, 0x52, 0x0a, 0x61, 0x74, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x73, 0x12, 0x21, - 0x0a, 0x0b, 0x62, 0x69, 0x6e, 0x61, 0x72, 0x79, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x06, 0x20, - 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x0a, 0x62, 0x69, 0x6e, 0x61, 0x72, 0x79, 0x44, 0x61, 0x74, - 0x61, 0x12, 0x1d, 0x0a, 0x09, 0x74, 0x65, 0x78, 0x74, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x07, - 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x08, 0x74, 0x65, 0x78, 0x74, 0x44, 0x61, 0x74, 0x61, - 0x12, 0x35, 0x0a, 0x0a, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x08, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x41, 0x6e, 0x79, 0x48, 0x00, 0x52, 0x09, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x44, 0x61, 0x74, 0x61, 0x1a, 0x6a, 0x0a, 0x0f, 0x41, 0x74, 0x74, 0x72, 0x69, - 0x62, 0x75, 0x74, 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, - 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x41, 0x0a, 0x05, - 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x69, 0x6f, - 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x2e, 0x76, 0x31, 0x2e, - 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x41, 0x74, 0x74, 0x72, 0x69, 0x62, - 0x75, 0x74, 0x65, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, - 0x02, 0x38, 0x01, 0x42, 0x06, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x9a, 0x02, 0x0a, 0x18, - 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x41, 0x74, 0x74, 0x72, 0x69, 0x62, - 0x75, 0x74, 0x65, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x1f, 0x0a, 0x0a, 0x63, 0x65, 0x5f, 0x62, - 0x6f, 0x6f, 0x6c, 0x65, 0x61, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x48, 0x00, 0x52, 0x09, - 0x63, 0x65, 0x42, 0x6f, 0x6f, 0x6c, 0x65, 0x61, 0x6e, 0x12, 0x1f, 0x0a, 0x0a, 0x63, 0x65, 0x5f, - 0x69, 0x6e, 0x74, 0x65, 0x67, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x48, 0x00, 0x52, - 0x09, 0x63, 0x65, 0x49, 0x6e, 0x74, 0x65, 0x67, 0x65, 0x72, 0x12, 0x1d, 0x0a, 0x09, 0x63, 0x65, - 0x5f, 0x73, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, - 0x08, 0x63, 0x65, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x12, 0x1b, 0x0a, 0x08, 0x63, 0x65, 0x5f, - 0x62, 0x79, 0x74, 0x65, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x07, 0x63, - 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x12, 0x17, 0x0a, 0x06, 0x63, 0x65, 0x5f, 0x75, 0x72, 0x69, - 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x05, 0x63, 0x65, 0x55, 0x72, 0x69, 0x12, - 0x1e, 0x0a, 0x0a, 0x63, 0x65, 0x5f, 0x75, 0x72, 0x69, 0x5f, 0x72, 0x65, 0x66, 0x18, 0x06, 0x20, - 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x08, 0x63, 0x65, 0x55, 0x72, 0x69, 0x52, 0x65, 0x66, 0x12, - 0x3f, 0x0a, 0x0c, 0x63, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, - 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, - 0x70, 0x48, 0x00, 0x52, 0x0b, 0x63, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, - 0x42, 0x06, 0x0a, 0x04, 0x61, 0x74, 0x74, 0x72, 0x42, 0x3d, 0x5a, 0x3b, 0x67, 0x69, 0x74, 0x68, - 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x65, 0x76, 0x65, 0x6e, - 0x74, 0x73, 0x2f, 0x73, 0x64, 0x6b, 0x2d, 0x67, 0x6f, 0x2f, 0x62, 0x69, 0x6e, 0x64, 0x69, 0x6e, - 0x67, 0x2f, 0x66, 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, - 0x66, 0x2f, 0x76, 0x32, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x74, 0x73, 0x2e, 0x76, 0x31, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x1a, 0x19, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x62, 0x75, 0x66, 0x2f, 0x61, 0x6e, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1f, 0x67, + 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, + 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xa7, + 0x03, 0x0a, 0x0a, 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x0e, 0x0a, + 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x16, 0x0a, + 0x06, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, + 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x73, 0x70, 0x65, 0x63, 0x5f, 0x76, 0x65, + 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x73, 0x70, 0x65, + 0x63, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, + 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x4d, 0x0a, 0x0a, + 0x61, 0x74, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, + 0x32, 0x2d, 0x2e, 0x69, 0x6f, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x65, 0x76, 0x65, 0x6e, 0x74, + 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x2e, + 0x41, 0x74, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, + 0x0a, 0x61, 0x74, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x73, 0x12, 0x21, 0x0a, 0x0b, 0x62, + 0x69, 0x6e, 0x61, 0x72, 0x79, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0c, + 0x48, 0x00, 0x52, 0x0a, 0x62, 0x69, 0x6e, 0x61, 0x72, 0x79, 0x44, 0x61, 0x74, 0x61, 0x12, 0x1d, + 0x0a, 0x09, 0x74, 0x65, 0x78, 0x74, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x07, 0x20, 0x01, 0x28, + 0x09, 0x48, 0x00, 0x52, 0x08, 0x74, 0x65, 0x78, 0x74, 0x44, 0x61, 0x74, 0x61, 0x12, 0x35, 0x0a, + 0x0a, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x08, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x62, 0x75, 0x66, 0x2e, 0x41, 0x6e, 0x79, 0x48, 0x00, 0x52, 0x09, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x44, 0x61, 0x74, 0x61, 0x1a, 0x6a, 0x0a, 0x0f, 0x41, 0x74, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, + 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x41, 0x0a, 0x05, 0x76, 0x61, 0x6c, + 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x69, 0x6f, 0x2e, 0x63, 0x6c, + 0x6f, 0x75, 0x64, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6c, 0x6f, + 0x75, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x41, 0x74, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, + 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, + 0x42, 0x06, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x9a, 0x02, 0x0a, 0x18, 0x43, 0x6c, 0x6f, + 0x75, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x41, 0x74, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, + 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x1f, 0x0a, 0x0a, 0x63, 0x65, 0x5f, 0x62, 0x6f, 0x6f, 0x6c, + 0x65, 0x61, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x48, 0x00, 0x52, 0x09, 0x63, 0x65, 0x42, + 0x6f, 0x6f, 0x6c, 0x65, 0x61, 0x6e, 0x12, 0x1f, 0x0a, 0x0a, 0x63, 0x65, 0x5f, 0x69, 0x6e, 0x74, + 0x65, 0x67, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x48, 0x00, 0x52, 0x09, 0x63, 0x65, + 0x49, 0x6e, 0x74, 0x65, 0x67, 0x65, 0x72, 0x12, 0x1d, 0x0a, 0x09, 0x63, 0x65, 0x5f, 0x73, 0x74, + 0x72, 0x69, 0x6e, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x08, 0x63, 0x65, + 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x12, 0x1b, 0x0a, 0x08, 0x63, 0x65, 0x5f, 0x62, 0x79, 0x74, + 0x65, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x07, 0x63, 0x65, 0x42, 0x79, + 0x74, 0x65, 0x73, 0x12, 0x17, 0x0a, 0x06, 0x63, 0x65, 0x5f, 0x75, 0x72, 0x69, 0x18, 0x05, 0x20, + 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x05, 0x63, 0x65, 0x55, 0x72, 0x69, 0x12, 0x1e, 0x0a, 0x0a, + 0x63, 0x65, 0x5f, 0x75, 0x72, 0x69, 0x5f, 0x72, 0x65, 0x66, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, + 0x48, 0x00, 0x52, 0x08, 0x63, 0x65, 0x55, 0x72, 0x69, 0x52, 0x65, 0x66, 0x12, 0x3f, 0x0a, 0x0c, + 0x63, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x07, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x48, 0x00, + 0x52, 0x0b, 0x63, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x42, 0x06, 0x0a, + 0x04, 0x61, 0x74, 0x74, 0x72, 0x22, 0x5b, 0x0a, 0x0e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x33, 0x0a, + 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x69, + 0x6f, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x2e, 0x76, 0x31, + 0x2e, 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x05, 0x65, 0x76, 0x65, + 0x6e, 0x74, 0x22, 0x2b, 0x0a, 0x13, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, + 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x70, + 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x32, + 0xb3, 0x01, 0x0a, 0x11, 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x53, 0x65, + 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x46, 0x0a, 0x07, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, + 0x12, 0x21, 0x2e, 0x69, 0x6f, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x65, 0x76, 0x65, 0x6e, 0x74, + 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x56, 0x0a, + 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x26, 0x2e, 0x69, 0x6f, 0x2e, + 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x53, + 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x69, 0x6f, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x65, 0x76, 0x65, + 0x6e, 0x74, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x45, 0x76, 0x65, 0x6e, + 0x74, 0x22, 0x00, 0x30, 0x01, 0x42, 0x3d, 0x5a, 0x3b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, + 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x2f, + 0x73, 0x64, 0x6b, 0x2d, 0x67, 0x6f, 0x2f, 0x62, 0x69, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x2f, 0x66, + 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x76, + 0x32, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -408,24 +537,32 @@ func file_cloudevent_proto_rawDescGZIP() []byte { return file_cloudevent_proto_rawDescData } -var file_cloudevent_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_cloudevent_proto_msgTypes = make([]protoimpl.MessageInfo, 5) var file_cloudevent_proto_goTypes = []interface{}{ (*CloudEvent)(nil), // 0: io.cloudevents.v1.CloudEvent (*CloudEventAttributeValue)(nil), // 1: io.cloudevents.v1.CloudEventAttributeValue - nil, // 2: io.cloudevents.v1.CloudEvent.AttributesEntry - (*anypb.Any)(nil), // 3: google.protobuf.Any - (*timestamppb.Timestamp)(nil), // 4: google.protobuf.Timestamp + (*PublishRequest)(nil), // 2: io.cloudevents.v1.PublishRequest + (*SubscriptionRequest)(nil), // 3: io.cloudevents.v1.SubscriptionRequest + nil, // 4: io.cloudevents.v1.CloudEvent.AttributesEntry + (*any1.Any)(nil), // 5: google.protobuf.Any + (*timestamp.Timestamp)(nil), // 6: google.protobuf.Timestamp + (*empty.Empty)(nil), // 7: google.protobuf.Empty } var file_cloudevent_proto_depIdxs = []int32{ - 2, // 0: io.cloudevents.v1.CloudEvent.attributes:type_name -> io.cloudevents.v1.CloudEvent.AttributesEntry - 3, // 1: io.cloudevents.v1.CloudEvent.proto_data:type_name -> google.protobuf.Any - 4, // 2: io.cloudevents.v1.CloudEventAttributeValue.ce_timestamp:type_name -> google.protobuf.Timestamp - 1, // 3: io.cloudevents.v1.CloudEvent.AttributesEntry.value:type_name -> io.cloudevents.v1.CloudEventAttributeValue - 4, // [4:4] is the sub-list for method output_type - 4, // [4:4] is the sub-list for method input_type - 4, // [4:4] is the sub-list for extension type_name - 4, // [4:4] is the sub-list for extension extendee - 0, // [0:4] is the sub-list for field type_name + 4, // 0: io.cloudevents.v1.CloudEvent.attributes:type_name -> io.cloudevents.v1.CloudEvent.AttributesEntry + 5, // 1: io.cloudevents.v1.CloudEvent.proto_data:type_name -> google.protobuf.Any + 6, // 2: io.cloudevents.v1.CloudEventAttributeValue.ce_timestamp:type_name -> google.protobuf.Timestamp + 0, // 3: io.cloudevents.v1.PublishRequest.event:type_name -> io.cloudevents.v1.CloudEvent + 1, // 4: io.cloudevents.v1.CloudEvent.AttributesEntry.value:type_name -> io.cloudevents.v1.CloudEventAttributeValue + 2, // 5: io.cloudevents.v1.CloudEventService.Publish:input_type -> io.cloudevents.v1.PublishRequest + 3, // 6: io.cloudevents.v1.CloudEventService.Subscribe:input_type -> io.cloudevents.v1.SubscriptionRequest + 7, // 7: io.cloudevents.v1.CloudEventService.Publish:output_type -> google.protobuf.Empty + 0, // 8: io.cloudevents.v1.CloudEventService.Subscribe:output_type -> io.cloudevents.v1.CloudEvent + 7, // [7:9] is the sub-list for method output_type + 5, // [5:7] is the sub-list for method input_type + 5, // [5:5] is the sub-list for extension type_name + 5, // [5:5] is the sub-list for extension extendee + 0, // [0:5] is the sub-list for field type_name } func init() { file_cloudevent_proto_init() } @@ -458,6 +595,30 @@ func file_cloudevent_proto_init() { return nil } } + file_cloudevent_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PublishRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_cloudevent_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SubscriptionRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } file_cloudevent_proto_msgTypes[0].OneofWrappers = []interface{}{ (*CloudEvent_BinaryData)(nil), @@ -479,9 +640,9 @@ func file_cloudevent_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_cloudevent_proto_rawDesc, NumEnums: 0, - NumMessages: 3, + NumMessages: 5, NumExtensions: 0, - NumServices: 0, + NumServices: 1, }, GoTypes: file_cloudevent_proto_goTypes, DependencyIndexes: file_cloudevent_proto_depIdxs, diff --git a/binding/format/protobuf/v2/pb/cloudevent.proto b/binding/format/protobuf/v2/pb/cloudevent.proto index f6a972afa..6f91a4778 100644 --- a/binding/format/protobuf/v2/pb/cloudevent.proto +++ b/binding/format/protobuf/v2/pb/cloudevent.proto @@ -4,6 +4,7 @@ package io.cloudevents.v1; option go_package = "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb"; +import "google/protobuf/empty.proto"; import "google/protobuf/any.proto"; import "google/protobuf/timestamp.proto"; @@ -59,3 +60,24 @@ message CloudEventAttributeValue { google.protobuf.Timestamp ce_timestamp = 7; } } + +message PublishRequest { + // Required. The topic to which event should be published. + // Format is `myhome/groundfloor/livingroom/temperature`. + string topic = 1; + CloudEvent event = 2; +} + +message SubscriptionRequest { + // Required. The topic from which event should be pulled. + // Format is `myhome/groundfloor/livingroom/temperature`. + string topic = 1; +} + +service CloudEventService { + rpc Publish(PublishRequest) returns (google.protobuf.Empty) { + } + + rpc Subscribe(SubscriptionRequest) returns (stream CloudEvent) { + } +} diff --git a/binding/format/protobuf/v2/pb/cloudevent_grpc.pb.go b/binding/format/protobuf/v2/pb/cloudevent_grpc.pb.go new file mode 100644 index 000000000..bddc84bde --- /dev/null +++ b/binding/format/protobuf/v2/pb/cloudevent_grpc.pb.go @@ -0,0 +1,175 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.3.0 +// - protoc v3.12.4 +// source: cloudevent.proto + +package pb + +import ( + context "context" + empty "github.com/golang/protobuf/ptypes/empty" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +const ( + CloudEventService_Publish_FullMethodName = "/io.cloudevents.v1.CloudEventService/Publish" + CloudEventService_Subscribe_FullMethodName = "/io.cloudevents.v1.CloudEventService/Subscribe" +) + +// CloudEventServiceClient is the client API for CloudEventService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type CloudEventServiceClient interface { + Publish(ctx context.Context, in *PublishRequest, opts ...grpc.CallOption) (*empty.Empty, error) + Subscribe(ctx context.Context, in *SubscriptionRequest, opts ...grpc.CallOption) (CloudEventService_SubscribeClient, error) +} + +type cloudEventServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewCloudEventServiceClient(cc grpc.ClientConnInterface) CloudEventServiceClient { + return &cloudEventServiceClient{cc} +} + +func (c *cloudEventServiceClient) Publish(ctx context.Context, in *PublishRequest, opts ...grpc.CallOption) (*empty.Empty, error) { + out := new(empty.Empty) + err := c.cc.Invoke(ctx, CloudEventService_Publish_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *cloudEventServiceClient) Subscribe(ctx context.Context, in *SubscriptionRequest, opts ...grpc.CallOption) (CloudEventService_SubscribeClient, error) { + stream, err := c.cc.NewStream(ctx, &CloudEventService_ServiceDesc.Streams[0], CloudEventService_Subscribe_FullMethodName, opts...) + if err != nil { + return nil, err + } + x := &cloudEventServiceSubscribeClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type CloudEventService_SubscribeClient interface { + Recv() (*CloudEvent, error) + grpc.ClientStream +} + +type cloudEventServiceSubscribeClient struct { + grpc.ClientStream +} + +func (x *cloudEventServiceSubscribeClient) Recv() (*CloudEvent, error) { + m := new(CloudEvent) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// CloudEventServiceServer is the server API for CloudEventService service. +// All implementations must embed UnimplementedCloudEventServiceServer +// for forward compatibility +type CloudEventServiceServer interface { + Publish(context.Context, *PublishRequest) (*empty.Empty, error) + Subscribe(*SubscriptionRequest, CloudEventService_SubscribeServer) error + mustEmbedUnimplementedCloudEventServiceServer() +} + +// UnimplementedCloudEventServiceServer must be embedded to have forward compatible implementations. +type UnimplementedCloudEventServiceServer struct { +} + +func (UnimplementedCloudEventServiceServer) Publish(context.Context, *PublishRequest) (*empty.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method Publish not implemented") +} +func (UnimplementedCloudEventServiceServer) Subscribe(*SubscriptionRequest, CloudEventService_SubscribeServer) error { + return status.Errorf(codes.Unimplemented, "method Subscribe not implemented") +} +func (UnimplementedCloudEventServiceServer) mustEmbedUnimplementedCloudEventServiceServer() {} + +// UnsafeCloudEventServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to CloudEventServiceServer will +// result in compilation errors. +type UnsafeCloudEventServiceServer interface { + mustEmbedUnimplementedCloudEventServiceServer() +} + +func RegisterCloudEventServiceServer(s grpc.ServiceRegistrar, srv CloudEventServiceServer) { + s.RegisterService(&CloudEventService_ServiceDesc, srv) +} + +func _CloudEventService_Publish_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(PublishRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(CloudEventServiceServer).Publish(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: CloudEventService_Publish_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(CloudEventServiceServer).Publish(ctx, req.(*PublishRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _CloudEventService_Subscribe_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(SubscriptionRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(CloudEventServiceServer).Subscribe(m, &cloudEventServiceSubscribeServer{stream}) +} + +type CloudEventService_SubscribeServer interface { + Send(*CloudEvent) error + grpc.ServerStream +} + +type cloudEventServiceSubscribeServer struct { + grpc.ServerStream +} + +func (x *cloudEventServiceSubscribeServer) Send(m *CloudEvent) error { + return x.ServerStream.SendMsg(m) +} + +// CloudEventService_ServiceDesc is the grpc.ServiceDesc for CloudEventService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var CloudEventService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "io.cloudevents.v1.CloudEventService", + HandlerType: (*CloudEventServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Publish", + Handler: _CloudEventService_Publish_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "Subscribe", + Handler: _CloudEventService_Subscribe_Handler, + ServerStreams: true, + }, + }, + Metadata: "cloudevent.proto", +} diff --git a/binding/format/protobuf/v2/pb/gen.go b/binding/format/protobuf/v2/pb/gen.go index 014567472..aa896543b 100644 --- a/binding/format/protobuf/v2/pb/gen.go +++ b/binding/format/protobuf/v2/pb/gen.go @@ -1,3 +1,3 @@ package pb -//go:generate protoc --go_out=. --go_opt=paths=source_relative cloudevent.proto +//go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative cloudevent.proto diff --git a/protocol/grpc/go.mod b/protocol/grpc/go.mod new file mode 100644 index 000000000..97674a99f --- /dev/null +++ b/protocol/grpc/go.mod @@ -0,0 +1,32 @@ +module github.com/cloudevents/sdk-go/protocol/grpc + +go 1.18 + +replace github.com/cloudevents/sdk-go/v2 => ../../v2 + +replace github.com/cloudevents/sdk-go/binding/format/protobuf/v2 => ../../binding/format/protobuf/v2 + +require ( + github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.14.0 + github.com/cloudevents/sdk-go/v2 v2.14.0 + github.com/stretchr/testify v1.8.4 + google.golang.org/grpc v1.59.0 + google.golang.org/protobuf v1.31.0 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/golang/protobuf v1.5.3 // indirect + github.com/google/go-cmp v0.6.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.26.0 // indirect + golang.org/x/net v0.18.0 // indirect + golang.org/x/sys v0.14.0 // indirect + golang.org/x/text v0.14.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/protocol/grpc/go.sum b/protocol/grpc/go.sum new file mode 100644 index 000000000..087a89c34 --- /dev/null +++ b/protocol/grpc/go.sum @@ -0,0 +1,49 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= +go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= +golang.org/x/net v0.18.0 h1:mIYleuAkSbHh0tCv7RvjL3F6ZVbLjq4+R7zbOn3Kokg= +golang.org/x/net v0.18.0/go.mod h1:/czyP5RqHAH4odGYxBJ1qz0+CE5WZ+2j1YgoEo8F2jQ= +golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= +golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 h1:Jyp0Hsi0bmHXG6k9eATXoYtjd6e2UzZ1SCn/wIupY14= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17/go.mod h1:oQ5rr10WTTMvP4A36n8JpR1OrO1BEiV4f78CneXZxkA= +google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= +google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/protocol/grpc/message.go b/protocol/grpc/message.go new file mode 100644 index 000000000..01ab0000d --- /dev/null +++ b/protocol/grpc/message.go @@ -0,0 +1,209 @@ +/* + Copyright 2023 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package grpc + +import ( + "bytes" + "context" + "fmt" + "net/url" + "strings" + + "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb" + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/binding/format" + "github.com/cloudevents/sdk-go/v2/binding/spec" + "github.com/cloudevents/sdk-go/v2/types" +) + +const ( + prefix = "ce-" + contenttype = "contenttype" + // dataSchema = "dataschema" + subject = "subject" + time = "time" +) + +var specs = spec.WithPrefix(prefix) + +// Message represents a gRPC message. +// This message *can* be read several times safely +type Message struct { + internal *pb.CloudEvent + version spec.Version + format format.Format +} + +// Check if Message implements binding.Message +var ( + _ binding.Message = (*Message)(nil) + _ binding.MessageMetadataReader = (*Message)(nil) +) + +func NewMessage(msg *pb.CloudEvent) *Message { + var f format.Format + var v spec.Version + if msg.Attributes != nil { + if contentType, ok := msg.Attributes[contenttype]; ok && format.IsFormat(contentType.GetCeString()) { + f = format.Lookup(contentType.GetCeString()) + } else if s := msg.SpecVersion; s != "" { + v = specs.Version(s) + } + } + return &Message{ + internal: msg, + version: v, + format: f, + } +} + +func (m *Message) ReadEncoding() binding.Encoding { + if m.version != nil { + return binding.EncodingBinary + } + if m.format != nil { + return binding.EncodingStructured + } + + return binding.EncodingUnknown +} + +func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error { + if m.format == nil { + return binding.ErrNotStructured + } + + return encoder.SetStructuredEvent(ctx, m.format, bytes.NewReader(m.internal.GetBinaryData())) +} + +func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) error { + if m.version == nil { + return binding.ErrNotBinary + } + + if m.format != nil { + return binding.ErrNotBinary + } + + if m.internal.SpecVersion != "" { + err := encoder.SetAttribute(m.version.AttributeFromKind(spec.SpecVersion), m.internal.SpecVersion) + if err != nil { + return err + } + } + if m.internal.Id != "" { + err := encoder.SetAttribute(m.version.AttributeFromKind(spec.ID), m.internal.Id) + if err != nil { + return err + } + } + if m.internal.Source != "" { + err := encoder.SetAttribute(m.version.AttributeFromKind(spec.Source), m.internal.Source) + if err != nil { + return err + } + } + if m.internal.Type != "" { + err := encoder.SetAttribute(m.version.AttributeFromKind(spec.Type), m.internal.Type) + if err != nil { + return err + } + } + + for name, value := range m.internal.Attributes { + attrVal, err := valueFrom(value) + if err != nil { + return fmt.Errorf("failed to convert attribute %s: %s", name, err) + } + + if strings.HasPrefix(name, prefix) { + attr := m.version.Attribute(name) + if attr != nil { + err = encoder.SetAttribute(attr, attrVal) + if err != nil { + return err + } + } else { + err = encoder.SetExtension(strings.TrimPrefix(name, prefix), attrVal) + if err != nil { + return err + } + } + } else if name == contenttype { + err = encoder.SetAttribute(m.version.AttributeFromKind(spec.DataContentType), attrVal) + if err != nil { + return err + } + } + } + + if m.internal.GetBinaryData() != nil { + return encoder.SetData(bytes.NewBuffer(m.internal.GetBinaryData())) + } + + return nil +} + +func (m *Message) Finish(error) error { + return nil +} + +func (m *Message) GetAttribute(k spec.Kind) (spec.Attribute, interface{}) { + attr := m.version.AttributeFromKind(k) + if attr != nil { + switch attr.Kind() { + case spec.SpecVersion: + return attr, m.internal.SpecVersion + case spec.Type: + return attr, m.internal.Type + case spec.Source: + return attr, m.internal.Source + case spec.ID: + return attr, m.internal.Id + case spec.DataContentType: + return attr, m.internal.Attributes[contenttype].GetCeString() + default: + return attr, m.internal.Attributes[prefix+attr.Name()] + } + } + + return nil, nil +} + +func (m *Message) GetExtension(name string) interface{} { + return m.internal.Attributes[prefix+name] +} + +func valueFrom(attr *pb.CloudEventAttributeValue) (interface{}, error) { + var v interface{} + switch vt := attr.Attr.(type) { + case *pb.CloudEventAttributeValue_CeBoolean: + v = vt.CeBoolean + case *pb.CloudEventAttributeValue_CeInteger: + v = vt.CeInteger + case *pb.CloudEventAttributeValue_CeString: + v = vt.CeString + case *pb.CloudEventAttributeValue_CeBytes: + v = vt.CeBytes + case *pb.CloudEventAttributeValue_CeUri: + uri, err := url.Parse(vt.CeUri) + if err != nil { + return nil, fmt.Errorf("failed to parse URI value %s: %s", vt.CeUri, err.Error()) + } + v = uri + case *pb.CloudEventAttributeValue_CeUriRef: + uri, err := url.Parse(vt.CeUriRef) + if err != nil { + return nil, fmt.Errorf("failed to parse URIRef value %s: %s", vt.CeUriRef, err.Error()) + } + v = types.URIRef{URL: *uri} + case *pb.CloudEventAttributeValue_CeTimestamp: + v = vt.CeTimestamp.AsTime() + default: + return nil, fmt.Errorf("unsupported attribute type: %T", vt) + } + return types.Validate(v) +} diff --git a/protocol/grpc/message_test.go b/protocol/grpc/message_test.go new file mode 100644 index 000000000..18680e67f --- /dev/null +++ b/protocol/grpc/message_test.go @@ -0,0 +1,69 @@ +/* +Copyright 2023 The CloudEvents Authors +SPDX-License-Identifier: Apache-2.0 +*/ + +package grpc + +import ( + "context" + "testing" + + "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb" + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/event" +) + +func TestReadStructured(t *testing.T) { + tests := []struct { + name string + msg *pb.CloudEvent + wantErr error + }{ + { + name: "nil format", + msg: &pb.CloudEvent{}, + wantErr: binding.ErrNotStructured, + }, + { + name: "json format", + msg: &pb.CloudEvent{ + Attributes: map[string]*pb.CloudEventAttributeValue{ + contenttype: { + Attr: &pb.CloudEventAttributeValue_CeString{ + CeString: event.ApplicationCloudEventsJSON, + }, + }, + }, + }, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + msg := NewMessage(tc.msg) + err := msg.ReadStructured(context.Background(), (*pbEventWriter)(tc.msg)) + if err != tc.wantErr { + t.Errorf("Error unexpected. got: %v, want: %v", err, tc.wantErr) + } + }) + } +} + +func TestReadBinary(t *testing.T) { + msg := &pb.CloudEvent{ + SpecVersion: "1.0", + Id: "ABC-123", + Source: "test-source", + Type: "binary.test", + Attributes: map[string]*pb.CloudEventAttributeValue{}, + Data: &pb.CloudEvent_BinaryData{ + BinaryData: []byte("{hello:world}"), + }, + } + + message := NewMessage(msg) + err := message.ReadBinary(context.Background(), (*pbEventWriter)(msg)) + if err != nil { + t.Errorf("Error unexpected. got: %v", err) + } +} diff --git a/protocol/grpc/option.go b/protocol/grpc/option.go new file mode 100644 index 000000000..4181c37af --- /dev/null +++ b/protocol/grpc/option.go @@ -0,0 +1,45 @@ +/* + Copyright 2023 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package grpc + +import ( + "fmt" +) + +// Option is the function signature +type Option func(*Protocol) error + +// PublishOption +type PublishOption struct { + Topic string +} + +// SubscribeOption +type SubscribeOption struct { + Topics []string +} + +// WithPublishOption sets the Publish configuration for the client. This option is required if you want to send messages. +func WithPublishOption(publishOpt *PublishOption) Option { + return func(p *Protocol) error { + if publishOpt == nil { + return fmt.Errorf("the publish option must not be nil") + } + p.publishOption = publishOpt + return nil + } +} + +// WithSubscribeOption sets the Subscribe configuration for the client. This option is required if you want to receive messages. +func WithSubscribeOption(subscribeOpt *SubscribeOption) Option { + return func(p *Protocol) error { + if subscribeOpt == nil { + return fmt.Errorf("the subscribe option must not be nil") + } + p.subscribeOption = subscribeOpt + return nil + } +} diff --git a/protocol/grpc/protocol.go b/protocol/grpc/protocol.go new file mode 100644 index 000000000..b8e14aa1c --- /dev/null +++ b/protocol/grpc/protocol.go @@ -0,0 +1,166 @@ +/* + Copyright 2023 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package grpc + +import ( + "context" + "fmt" + "io" + "sync" + + "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb" + "google.golang.org/grpc" + + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/protocol" + + cecontext "github.com/cloudevents/sdk-go/v2/context" +) + +// protocol for grpc +// define protocol for grpc + +type Protocol struct { + client pb.CloudEventServiceClient + publishOption *PublishOption + subscribeOption *SubscribeOption + // receiver + incoming chan *pb.CloudEvent + // inOpen + openerMutex sync.Mutex + + closeChan chan struct{} +} + +var ( + _ protocol.Sender = (*Protocol)(nil) + _ protocol.Opener = (*Protocol)(nil) + _ protocol.Receiver = (*Protocol)(nil) + _ protocol.Closer = (*Protocol)(nil) +) + +// new create grpc protocol +func NewProtocol(clientConn grpc.ClientConnInterface, opts ...Option) (*Protocol, error) { + if clientConn == nil { + return nil, fmt.Errorf("the client connection must not be nil") + } + + // TODO: support clientID and error handling in grpc connection + p := &Protocol{ + client: pb.NewCloudEventServiceClient(clientConn), + // subClient: + incoming: make(chan *pb.CloudEvent), + closeChan: make(chan struct{}), + } + + if err := p.applyOptions(opts...); err != nil { + return nil, err + } + + return p, nil +} + +func (p *Protocol) applyOptions(opts ...Option) error { + for _, fn := range opts { + if err := fn(p); err != nil { + return err + } + } + return nil +} + +func (p *Protocol) Send(ctx context.Context, m binding.Message, transformers ...binding.Transformer) error { + if p.publishOption == nil { + return fmt.Errorf("the publish option must not be nil") + } + + var err error + defer m.Finish(err) + + msg := &pb.CloudEvent{} + err = WritePBMessage(ctx, m, msg, transformers...) + if err != nil { + return err + } + + topic := p.publishOption.Topic + if cecontext.TopicFrom(ctx) != "" { + topic = cecontext.TopicFrom(ctx) + cecontext.WithTopic(ctx, "") + } + + logger := cecontext.LoggerFrom(ctx) + logger.Infof("publishing event to topic: %v", topic) + _, err = p.client.Publish(ctx, &pb.PublishRequest{ + Topic: topic, + Event: msg, + }) + if err != nil { + return err + } + return err +} + +func (p *Protocol) OpenInbound(ctx context.Context) error { + if p.subscribeOption == nil { + return fmt.Errorf("the subscribe option must not be nil") + } + + if len(p.subscribeOption.Topics) == 0 { + return fmt.Errorf("the subscribe option topics must not be empty") + } + + p.openerMutex.Lock() + defer p.openerMutex.Unlock() + + logger := cecontext.LoggerFrom(ctx) + for _, topic := range p.subscribeOption.Topics { + subClient, err := p.client.Subscribe(ctx, &pb.SubscriptionRequest{ + Topic: topic, + }) + if err != nil { + return err + } + + logger.Infof("subscribing to topic: %v", topic) + go func() { + for { + msg, err := subClient.Recv() + if err != nil { + return + } + p.incoming <- msg + } + }() + } + + // Wait until external or internal context done + select { + case <-ctx.Done(): + case <-p.closeChan: + } + + return nil +} + +// Receive implements Receiver.Receive +func (p *Protocol) Receive(ctx context.Context) (binding.Message, error) { + select { + case m, ok := <-p.incoming: + if !ok { + return nil, io.EOF + } + msg := NewMessage(m) + return msg, nil + case <-ctx.Done(): + return nil, io.EOF + } +} + +func (p *Protocol) Close(ctx context.Context) error { + close(p.closeChan) + return nil +} diff --git a/protocol/grpc/write_message.go b/protocol/grpc/write_message.go new file mode 100644 index 000000000..d7ae9532c --- /dev/null +++ b/protocol/grpc/write_message.go @@ -0,0 +1,219 @@ +/* +Copyright 2023 The CloudEvents Authors +SPDX-License-Identifier: Apache-2.0 +*/ + +package grpc + +import ( + "bytes" + "context" + "fmt" + "io" + + pb "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb" + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/binding/format" + "github.com/cloudevents/sdk-go/v2/binding/spec" + "github.com/cloudevents/sdk-go/v2/types" + "google.golang.org/protobuf/types/known/timestamppb" +) + +// WritePBMessage fills the provided pubMessage with the message m. +// Using context you can tweak the encoding processing (more details on binding.Write documentation). +func WritePBMessage(ctx context.Context, m binding.Message, pbEvt *pb.CloudEvent, transformers ...binding.Transformer) error { + structuredWriter := (*pbEventWriter)(pbEvt) + binaryWriter := (*pbEventWriter)(pbEvt) + + _, err := binding.Write( + ctx, + m, + structuredWriter, + binaryWriter, + transformers..., + ) + return err +} + +type pbEventWriter pb.CloudEvent + +var ( + _ binding.StructuredWriter = (*pbEventWriter)(nil) + _ binding.BinaryWriter = (*pbEventWriter)(nil) +) + +func (b *pbEventWriter) SetStructuredEvent(ctx context.Context, f format.Format, event io.Reader) error { + if b.Attributes == nil { + b.Attributes = make(map[string]*pb.CloudEventAttributeValue) + } + + b.Attributes[contenttype], _ = attributeFor(f.MediaType()) + + var buf bytes.Buffer + _, err := io.Copy(&buf, event) + if err != nil { + return err + } + + // TODO: check the data content type and set the right data format + b.Data = &pb.CloudEvent_BinaryData{ + BinaryData: buf.Bytes(), + } + + return nil +} + +func (b *pbEventWriter) Start(ctx context.Context) error { + if b.Attributes == nil { + b.Attributes = make(map[string]*pb.CloudEventAttributeValue) + } + + return nil +} + +func (b *pbEventWriter) End(ctx context.Context) error { + return nil +} + +func (b *pbEventWriter) SetData(reader io.Reader) error { + buf, ok := reader.(*bytes.Buffer) + if !ok { + buf = new(bytes.Buffer) + _, err := io.Copy(buf, reader) + if err != nil { + return err + } + } + + b.Data = &pb.CloudEvent_BinaryData{ + BinaryData: buf.Bytes(), + } + + return nil +} + +func (b *pbEventWriter) SetAttribute(attribute spec.Attribute, value interface{}) error { + switch attribute.Kind() { + case spec.SpecVersion: + val, ok := value.(string) + if !ok { + return fmt.Errorf("invalid SpecVersion type, expected string got %T", value) + } + b.SpecVersion = val + case spec.ID: + val, ok := value.(string) + if !ok { + return fmt.Errorf("invalid ID type, expected string got %T", value) + } + b.Id = val + case spec.Source: + val, ok := value.(string) + if !ok { + return fmt.Errorf("invalid Source type, expected string got %T", value) + } + b.Source = val + case spec.Type: + val, ok := value.(string) + if !ok { + return fmt.Errorf("invalid Type type, expected string got %T", value) + } + b.Type = val + case spec.DataContentType: + if value == nil { + delete(b.Attributes, contenttype) + } else { + attrVal, err := attributeFor(value) + if err != nil { + return err + } + b.Attributes[contenttype] = attrVal + } + case spec.Subject: + if value == nil { + delete(b.Attributes, prefix+subject) + } else { + attrVal, err := attributeFor(value) + if err != nil { + return err + } + b.Attributes[prefix+subject] = attrVal + } + case spec.Time: + if value == nil { + delete(b.Attributes, prefix+time) + } else { + attrVal, err := attributeFor(value) + if err != nil { + return err + } + b.Attributes[prefix+time] = attrVal + } + default: + if value == nil { + delete(b.Attributes, prefix+attribute.Name()) + } else { + attrVal, err := attributeFor(value) + if err != nil { + return err + } + b.Attributes[prefix+attribute.Name()] = attrVal + } + } + + return nil +} + +func (b *pbEventWriter) SetExtension(name string, value interface{}) error { + if value == nil { + delete(b.Attributes, prefix+name) + } else { + attrVal, err := attributeFor(value) + if err != nil { + return err + } + b.Attributes[prefix+name] = attrVal + } + + return nil +} + +func attributeFor(v interface{}) (*pb.CloudEventAttributeValue, error) { + vv, err := types.Validate(v) + if err != nil { + return nil, err + } + attr := &pb.CloudEventAttributeValue{} + switch vt := vv.(type) { + case bool: + attr.Attr = &pb.CloudEventAttributeValue_CeBoolean{ + CeBoolean: vt, + } + case int32: + attr.Attr = &pb.CloudEventAttributeValue_CeInteger{ + CeInteger: vt, + } + case string: + attr.Attr = &pb.CloudEventAttributeValue_CeString{ + CeString: vt, + } + case []byte: + attr.Attr = &pb.CloudEventAttributeValue_CeBytes{ + CeBytes: vt, + } + case types.URI: + attr.Attr = &pb.CloudEventAttributeValue_CeUri{ + CeUri: vt.String(), + } + case types.URIRef: + attr.Attr = &pb.CloudEventAttributeValue_CeUriRef{ + CeUriRef: vt.String(), + } + case types.Timestamp: + attr.Attr = &pb.CloudEventAttributeValue_CeTimestamp{ + CeTimestamp: timestamppb.New(vt.Time), + } + default: + return nil, fmt.Errorf("unsupported attribute type: %T", v) + } + return attr, nil +} diff --git a/protocol/grpc/write_message_test.go b/protocol/grpc/write_message_test.go new file mode 100644 index 000000000..ed2c743ec --- /dev/null +++ b/protocol/grpc/write_message_test.go @@ -0,0 +1,66 @@ +/* +Copyright 2023 The CloudEvents Authors +SPDX-License-Identifier: Apache-2.0 +*/ + +package grpc + +import ( + "context" + "testing" + + "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb" + "github.com/cloudevents/sdk-go/v2/binding" + . "github.com/cloudevents/sdk-go/v2/binding/test" + "github.com/cloudevents/sdk-go/v2/event" + . "github.com/cloudevents/sdk-go/v2/test" + "github.com/stretchr/testify/require" +) + +func TestEncodeMessage(t *testing.T) { + ctx := context.Background() + tests := []struct { + name string + messageFactory func(e event.Event) binding.Message + expectedEncoding binding.Encoding + }{ + { + name: "Structured to Structured", + messageFactory: func(e event.Event) binding.Message { + return MustCreateMockStructuredMessage(t, e) + }, + expectedEncoding: binding.EncodingStructured, + }, + { + name: "Binary to Binary", + messageFactory: MustCreateMockBinaryMessage, + expectedEncoding: binding.EncodingBinary, + }, + } + + EachEvent(t, Events(), func(t *testing.T, e event.Event) { + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + eventIn := ConvertEventExtensionsToString(t, e.Clone()) + // convert the event to binding.Message with specific encoding + messageIn := tc.messageFactory(eventIn) + + // load the binding.Message into a protobuf event + pbEvt := &pb.CloudEvent{} + err := WritePBMessage(ctx, messageIn, pbEvt) + require.NoError(t, err) + + // convert the protobuf event back to binding.Message + messageOut := NewMessage(pbEvt) + require.Equal(t, tc.expectedEncoding, messageOut.ReadEncoding()) + + // convert the binding.Message back to event.Event + eventOut, err := binding.ToEvent(ctx, messageOut) + require.NoError(t, err) + + // check if the event is the same + AssertEventEquals(t, eventIn, *eventOut) + }) + } + }) +} diff --git a/protocol/kafka_sarama/v2/coverage.tmp b/protocol/kafka_sarama/v2/coverage.tmp deleted file mode 100644 index 79b28a0b6..000000000 --- a/protocol/kafka_sarama/v2/coverage.tmp +++ /dev/null @@ -1 +0,0 @@ -mode: atomic diff --git a/samples/grpc/README.md b/samples/grpc/README.md new file mode 100644 index 000000000..e42f62a0b --- /dev/null +++ b/samples/grpc/README.md @@ -0,0 +1,19 @@ +gRPC samples + +To run the samples, you need a running gRPC server. + +To run a sample gRPC server using the following command: + +```bash +go run ./server/main.go +``` + +Then run the sender and receiver samples in separate terminals: + +```bash +go run ./receiver/main.go +``` + +```bash +go run ./sender/main.go +``` diff --git a/samples/grpc/go.mod b/samples/grpc/go.mod new file mode 100644 index 000000000..6c3360636 --- /dev/null +++ b/samples/grpc/go.mod @@ -0,0 +1,31 @@ +module github.com/cloudevents/sdk-go/samples/grpc + +go 1.18 + +require ( + github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.14.0 + github.com/cloudevents/sdk-go/protocol/grpc v0.0.0-00010101000000-000000000000 + github.com/cloudevents/sdk-go/v2 v2.14.0 + github.com/google/uuid v1.3.1 + google.golang.org/grpc v1.59.0 + google.golang.org/protobuf v1.31.0 +) + +require ( + github.com/golang/protobuf v1.5.3 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.26.0 // indirect + golang.org/x/net v0.18.0 // indirect + golang.org/x/sys v0.14.0 // indirect + golang.org/x/text v0.14.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect +) + +replace github.com/cloudevents/sdk-go/v2 => ../../v2 + +replace github.com/cloudevents/sdk-go/binding/format/protobuf/v2 => ../../binding/format/protobuf/v2 + +replace github.com/cloudevents/sdk-go/protocol/grpc => ../../protocol/grpc diff --git a/samples/grpc/go.sum b/samples/grpc/go.sum new file mode 100644 index 000000000..63b20220b --- /dev/null +++ b/samples/grpc/go.sum @@ -0,0 +1,46 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= +github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= +go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= +golang.org/x/net v0.18.0 h1:mIYleuAkSbHh0tCv7RvjL3F6ZVbLjq4+R7zbOn3Kokg= +golang.org/x/net v0.18.0/go.mod h1:/czyP5RqHAH4odGYxBJ1qz0+CE5WZ+2j1YgoEo8F2jQ= +golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= +golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac h1:7zkz7BUtwNFFqcowJ+RIgu2MaV/MapERkDIy+mwPyjs= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 h1:Jyp0Hsi0bmHXG6k9eATXoYtjd6e2UzZ1SCn/wIupY14= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17/go.mod h1:oQ5rr10WTTMvP4A36n8JpR1OrO1BEiV4f78CneXZxkA= +google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= +google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/samples/grpc/receiver/main.go b/samples/grpc/receiver/main.go new file mode 100644 index 000000000..8ab596702 --- /dev/null +++ b/samples/grpc/receiver/main.go @@ -0,0 +1,49 @@ +/* +Copyright 2023 The CloudEvents Authors +SPDX-License-Identifier: Apache-2.0 +*/ + +package main + +import ( + "context" + "log" + + grpc "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + grpcprotocol "github.com/cloudevents/sdk-go/protocol/grpc" + cloudevents "github.com/cloudevents/sdk-go/v2" +) + +func main() { + ctx := context.Background() + conn, err := grpc.Dial("localhost:50051", grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + log.Fatalf("failed to connect: %v", err) + } + defer conn.Close() + + p, err := grpcprotocol.NewProtocol(conn, grpcprotocol.WithSubscribeOption(&grpcprotocol.SubscribeOption{Topics: []string{"test-topic"}})) + if err != nil { + log.Fatalf("failed to create protocol: %v", err) + } + defer p.Close(ctx) + + c, err := cloudevents.NewClient(p) + if err != nil { + log.Fatalf("failed to create client: %v", err) + } + + log.Printf("Receiver start consuming messages from test-topic\n") + err = c.StartReceiver(ctx, receive) + if err != nil { + log.Fatalf("failed to start receiver: %v", err) + } else { + log.Printf("receiver stopped") + } +} + +func receive(ctx context.Context, event cloudevents.Event) { + log.Printf("received event:\n%s", event) +} diff --git a/samples/grpc/sender/main.go b/samples/grpc/sender/main.go new file mode 100644 index 000000000..24f0314e4 --- /dev/null +++ b/samples/grpc/sender/main.go @@ -0,0 +1,64 @@ +/* +Copyright 2023 The CloudEvents Authors +SPDX-License-Identifier: Apache-2.0 +*/ + +package main + +import ( + "context" + "log" + "time" + + grpc "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/google/uuid" + + grpcprotocol "github.com/cloudevents/sdk-go/protocol/grpc" +) + +const ( + count = 100 +) + +func main() { + ctx := context.Background() + conn, err := grpc.Dial("localhost:50051", grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + log.Fatalf("failed to connect: %v", err) + } + defer conn.Close() + + p, err := grpcprotocol.NewProtocol(conn, grpcprotocol.WithPublishOption(&grpcprotocol.PublishOption{Topic: "test-topic"})) + if err != nil { + log.Fatalf("failed to create protocol: %v", err) + } + defer p.Close(ctx) + + c, err := cloudevents.NewClient(p, cloudevents.WithTimeNow(), cloudevents.WithUUIDs()) + if err != nil { + log.Fatalf("failed to create client: %v", err) + } + + for i := 0; i < count; i++ { + e := cloudevents.NewEvent() + e.SetID(uuid.New().String()) + e.SetType("com.cloudevents.sample.sent") + e.SetSource("https://github.com/cloudevents/sdk-go/samples/grpc/sender") + err = e.SetData(cloudevents.ApplicationJSON, map[string]interface{}{ + "id": i, + "message": "Hello, World!", + }) + if err != nil { + log.Printf("failed to set data: %v", err) + } + if result := c.Send(ctx, e); cloudevents.IsUndelivered(result) { + log.Printf("failed to send event: %v", result) + } else { + log.Printf("sent: %d, accepted: %t", i, cloudevents.IsACK(result)) + } + time.Sleep(1 * time.Second) + } +} diff --git a/samples/grpc/server/server.go b/samples/grpc/server/server.go new file mode 100644 index 000000000..079748258 --- /dev/null +++ b/samples/grpc/server/server.go @@ -0,0 +1,73 @@ +/* +Copyright 2023 The CloudEvents Authors +SPDX-License-Identifier: Apache-2.0 +*/ + +package main + +import ( + "context" + "fmt" + "log" + "net" + + cepbv2 "github.com/cloudevents/sdk-go/binding/format/protobuf/v2" + cepb "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb" + cloudevents "github.com/cloudevents/sdk-go/v2" + + "google.golang.org/grpc" + emptypb "google.golang.org/protobuf/types/known/emptypb" +) + +const ( + MAX_EVENT_CACHE = 100 +) + +type cloudEventServer struct { + cepb.UnimplementedCloudEventServiceServer + EventChan chan *cloudevents.Event +} + +func newCloudEventServer() *cloudEventServer { + return &cloudEventServer{ + EventChan: make(chan *cloudevents.Event, MAX_EVENT_CACHE), + } +} + +func (svr *cloudEventServer) Publish(_ context.Context, pubReq *cepb.PublishRequest) (*emptypb.Empty, error) { + evt, err := cepbv2.FromProto(pubReq.Event) + if err != nil { + return nil, fmt.Errorf("failed to convert protobuf to cloudevent: %v", err) + } + + log.Printf("received event:\n%s", evt) + svr.EventChan <- evt + + return &emptypb.Empty{}, nil +} + +func (svc *cloudEventServer) Subscribe(subReq *cepb.SubscriptionRequest, subServer cepb.CloudEventService_SubscribeServer) error { + for evt := range svc.EventChan { + pbEvt, err := cepbv2.ToProto(evt) + if err != nil { + return fmt.Errorf("failed to convert cloudevent to protobuf: %v", err) + } + log.Printf("sending event:\n%s", evt) + if err := subServer.Send(pbEvt); err != nil { + return err + } + } + + return nil +} + +func main() { + lis, err := net.Listen("tcp", "0.0.0.0:50051") + if err != nil { + log.Fatalf("failed to listen: %v", err) + } + grpcServer := grpc.NewServer() + cepb.RegisterCloudEventServiceServer(grpcServer, newCloudEventServer()) + log.Println("Starting server on port :50051") + grpcServer.Serve(lis) +} diff --git a/samples/http/go.mod b/samples/http/go.mod index f22486d9d..afd559273 100644 --- a/samples/http/go.mod +++ b/samples/http/go.mod @@ -9,7 +9,7 @@ require ( github.com/cloudevents/sdk-go/observability/opentelemetry/v2 v2.5.0 github.com/cloudevents/sdk-go/v2 v2.5.0 github.com/gin-gonic/gin v1.8.2 - github.com/google/uuid v1.3.0 + github.com/google/uuid v1.3.1 github.com/gorilla/mux v1.7.3 github.com/kelseyhightower/envconfig v1.4.0 github.com/rs/zerolog v1.29.0 @@ -19,7 +19,7 @@ require ( go.opentelemetry.io/otel/exporters/jaeger v1.0.0 go.opentelemetry.io/otel/sdk v1.0.0 go.opentelemetry.io/otel/trace v1.18.0 - google.golang.org/protobuf v1.30.0 + google.golang.org/protobuf v1.31.0 ) require ( @@ -56,8 +56,8 @@ require ( golang.org/x/net v0.17.0 // indirect golang.org/x/sys v0.15.0 // indirect golang.org/x/text v0.14.0 // indirect - google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect - google.golang.org/grpc v1.56.3 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect + google.golang.org/grpc v1.59.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/samples/http/go.sum b/samples/http/go.sum index d818c6867..46f94bec5 100644 --- a/samples/http/go.sum +++ b/samples/http/go.sum @@ -77,8 +77,8 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= -github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= +github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/mux v1.7.3 h1:gnP5JzjVOuiZD07fKKToCAOjS0yOpj/qPETTXCCS6hw= github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= @@ -275,12 +275,12 @@ google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7 google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= -google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 h1:KpwkzHKEF7B9Zxg18WzOa7djJ+Ha5DzthMyZYQfEn2A= -google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d h1:uvYuEyMHKNt+lT4K3bN6fGswmK8qSvcreM3BwjDh+y4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d/go.mod h1:+Bk1OCOj40wS2hwAMA+aCW9ypzm63QTBBHp6lQ3p+9M= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= -google.golang.org/grpc v1.56.3 h1:8I4C0Yq1EjstUzUJzpcRVbuYA2mODtEmpWiQoN/b2nc= -google.golang.org/grpc v1.56.3/go.mod h1:I9bI3vqKfayGqPUAwGdOSu7kt6oIJLixfffKrpXqQ9s= +google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= +google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -289,8 +289,8 @@ google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzi google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= -google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=