diff --git a/pubsub/gcppubsub/gcppubsub.go b/pubsub/gcppubsub/gcppubsub.go index b51a606851..80978ac9e2 100644 --- a/pubsub/gcppubsub/gcppubsub.go +++ b/pubsub/gcppubsub/gcppubsub.go @@ -73,6 +73,7 @@ import ( "gocloud.dev/pubsub/driver" "google.golang.org/api/option" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/oauth" @@ -368,6 +369,16 @@ func openTopic(client *raw.PublisherClient, topicPath string) driver.Topic { return &topic{topicPath, client} } +// isRequestTooLarge reports whether err is a gRPC InvalidArgument error +// indicating the publish request exceeded the maximum allowed size. +func isRequestTooLarge(err error) bool { + s, ok := status.FromError(err) + if !ok { + return false + } + return s.Code() == codes.InvalidArgument && strings.Contains(s.Message(), "request_size") +} + // SendBatch implements driver.Topic.SendBatch. func (t *topic) SendBatch(ctx context.Context, dms []*driver.Message) error { var ms []*pb.PubsubMessage @@ -387,6 +398,26 @@ func (t *topic) SendBatch(ctx context.Context, dms []*driver.Message) error { } ms = append(ms, psm) } + err := t.sendBatch(ctx, dms, ms) + if err != nil && isRequestTooLarge(err) && len(ms) > 1 { + // we *estimate* the batch size, and cap batches at approx 9mb before sending. however, + // estimates can be off, and some production use cases can have ~30KB data over the 10MB limit + // when sending batches. + // + // in this case, if we ever get "message too large" errors, split the batch in half and send + // both batches. this assumes the approximation is not off by a factor of over 2. + mid := len(ms) / 2 + if err := t.sendBatch(ctx, dms[:mid], ms[:mid]); err != nil { + return err + } + return t.sendBatch(ctx, dms[mid:], ms[mid:]) + } + return err +} + +// sendBatch publishes ms and runs AfterSend callbacks for the +// corresponding driver messages. +func (t *topic) sendBatch(ctx context.Context, dms []*driver.Message, ms []*pb.PubsubMessage) error { req := &pb.PublishRequest{Topic: t.path, Messages: ms} pr, err := t.client.Publish(ctx, req) if err != nil { diff --git a/pubsub/gcppubsub/gcppubsub_test.go b/pubsub/gcppubsub/gcppubsub_test.go index 3ca2a7902a..253a7db80d 100644 --- a/pubsub/gcppubsub/gcppubsub_test.go +++ b/pubsub/gcppubsub/gcppubsub_test.go @@ -443,3 +443,24 @@ func TestOpenSubscriptionFromURL(t *testing.T) { } } } + +func TestIsRequestTooLarge(t *testing.T) { + tests := []struct { + name string + err error + want bool + }{ + {"nil", nil, false}, + {"unrelated", fmt.Errorf("something"), false}, + {"wrong code", status.Errorf(codes.NotFound, "request_size too large"), false}, + {"wrong message", status.Errorf(codes.InvalidArgument, "bad field"), false}, + {"match", status.Errorf(codes.InvalidArgument, "The value for request_size is too large. You passed 10036929 in the request, but the maximum value is 10000000."), true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := isRequestTooLarge(tt.err); got != tt.want { + t.Errorf("isRequestTooLarge() = %v, want %v", got, tt.want) + } + }) + } +}