Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions pubsub/gcppubsub/gcppubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ import (
"gocloud.dev/pubsub/driver"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/credentials/oauth"
Expand Down Expand Up @@ -368,6 +369,16 @@ func openTopic(client *raw.PublisherClient, topicPath string) driver.Topic {
return &topic{topicPath, client}
}

// isRequestTooLarge reports whether err is a gRPC InvalidArgument error
// indicating the publish request exceeded the maximum allowed size.
func isRequestTooLarge(err error) bool {
s, ok := status.FromError(err)
if !ok {
return false
}
return s.Code() == codes.InvalidArgument && strings.Contains(s.Message(), "request_size")
}

// SendBatch implements driver.Topic.SendBatch.
func (t *topic) SendBatch(ctx context.Context, dms []*driver.Message) error {
var ms []*pb.PubsubMessage
Expand All @@ -387,6 +398,26 @@ func (t *topic) SendBatch(ctx context.Context, dms []*driver.Message) error {
}
ms = append(ms, psm)
}
err := t.sendBatch(ctx, dms, ms)
if err != nil && isRequestTooLarge(err) && len(ms) > 1 {
// we *estimate* the batch size, and cap batches at approx 9mb before sending. however,
// estimates can be off, and some production use cases can have ~30KB data over the 10MB limit
// when sending batches.
//
// in this case, if we ever get "message too large" errors, split the batch in half and send
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any way to test the splitting logic?

// both batches. this assumes the approximation is not off by a factor of over 2.
mid := len(ms) / 2
if err := t.sendBatch(ctx, dms[:mid], ms[:mid]); err != nil {
return err
}
return t.sendBatch(ctx, dms[mid:], ms[mid:])
}
return err
}

// sendBatch publishes ms and runs AfterSend callbacks for the
// corresponding driver messages.
func (t *topic) sendBatch(ctx context.Context, dms []*driver.Message, ms []*pb.PubsubMessage) error {
req := &pb.PublishRequest{Topic: t.path, Messages: ms}
pr, err := t.client.Publish(ctx, req)
if err != nil {
Expand Down
21 changes: 21 additions & 0 deletions pubsub/gcppubsub/gcppubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,3 +443,24 @@ func TestOpenSubscriptionFromURL(t *testing.T) {
}
}
}

func TestIsRequestTooLarge(t *testing.T) {
tests := []struct {
name string
err error
want bool
}{
{"nil", nil, false},
{"unrelated", fmt.Errorf("something"), false},
{"wrong code", status.Errorf(codes.NotFound, "request_size too large"), false},
{"wrong message", status.Errorf(codes.InvalidArgument, "bad field"), false},
{"match", status.Errorf(codes.InvalidArgument, "The value for request_size is too large. You passed 10036929 in the request, but the maximum value is 10000000."), true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := isRequestTooLarge(tt.err); got != tt.want {
t.Errorf("isRequestTooLarge() = %v, want %v", got, tt.want)
}
})
}
}