From 32b322a20c35922cfcedd5d256c70f5c22ddff12 Mon Sep 17 00:00:00 2001 From: Tony HB Date: Wed, 18 Mar 2026 09:42:37 -0700 Subject: [PATCH] Split batch on "request_size is too large" errors We currently estimate the size of the batch and limit batch sizes to <= 10MB for GCP PubSub (#3005). This estimate can be wrong (by eg ~30kb). In these cases, we don't want data loss. Instead, if the batch is too large we split the batch in half then send both batches. This obviously assumes the estimate isn't off by a factor of 2, which we've never seen in prod. Also, ideally, we wouldn't even make the OG request and instead would inspect the size of the protobuf before sending over the wire. This requires changes to Google Cloud's apiv1 driver, so that's out of scope for gocloud.dev. In this case, we'll deal with a failing outbound request then attempt to recover. --- pubsub/gcppubsub/gcppubsub.go | 31 ++++++++++++++++++++++++++++++ pubsub/gcppubsub/gcppubsub_test.go | 21 ++++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/pubsub/gcppubsub/gcppubsub.go b/pubsub/gcppubsub/gcppubsub.go index b51a606851..80978ac9e2 100644 --- a/pubsub/gcppubsub/gcppubsub.go +++ b/pubsub/gcppubsub/gcppubsub.go @@ -73,6 +73,7 @@ import ( "gocloud.dev/pubsub/driver" "google.golang.org/api/option" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/oauth" @@ -368,6 +369,16 @@ func openTopic(client *raw.PublisherClient, topicPath string) driver.Topic { return &topic{topicPath, client} } +// isRequestTooLarge reports whether err is a gRPC InvalidArgument error +// indicating the publish request exceeded the maximum allowed size. +func isRequestTooLarge(err error) bool { + s, ok := status.FromError(err) + if !ok { + return false + } + return s.Code() == codes.InvalidArgument && strings.Contains(s.Message(), "request_size") +} + // SendBatch implements driver.Topic.SendBatch. func (t *topic) SendBatch(ctx context.Context, dms []*driver.Message) error { var ms []*pb.PubsubMessage @@ -387,6 +398,26 @@ func (t *topic) SendBatch(ctx context.Context, dms []*driver.Message) error { } ms = append(ms, psm) } + err := t.sendBatch(ctx, dms, ms) + if err != nil && isRequestTooLarge(err) && len(ms) > 1 { + // we *estimate* the batch size, and cap batches at approx 9mb before sending. however, + // estimates can be off, and some production use cases can have ~30KB data over the 10MB limit + // when sending batches. + // + // in this case, if we ever get "message too large" errors, split the batch in half and send + // both batches. this assumes the approximation is not off by a factor of over 2. + mid := len(ms) / 2 + if err := t.sendBatch(ctx, dms[:mid], ms[:mid]); err != nil { + return err + } + return t.sendBatch(ctx, dms[mid:], ms[mid:]) + } + return err +} + +// sendBatch publishes ms and runs AfterSend callbacks for the +// corresponding driver messages. +func (t *topic) sendBatch(ctx context.Context, dms []*driver.Message, ms []*pb.PubsubMessage) error { req := &pb.PublishRequest{Topic: t.path, Messages: ms} pr, err := t.client.Publish(ctx, req) if err != nil { diff --git a/pubsub/gcppubsub/gcppubsub_test.go b/pubsub/gcppubsub/gcppubsub_test.go index 3ca2a7902a..253a7db80d 100644 --- a/pubsub/gcppubsub/gcppubsub_test.go +++ b/pubsub/gcppubsub/gcppubsub_test.go @@ -443,3 +443,24 @@ func TestOpenSubscriptionFromURL(t *testing.T) { } } } + +func TestIsRequestTooLarge(t *testing.T) { + tests := []struct { + name string + err error + want bool + }{ + {"nil", nil, false}, + {"unrelated", fmt.Errorf("something"), false}, + {"wrong code", status.Errorf(codes.NotFound, "request_size too large"), false}, + {"wrong message", status.Errorf(codes.InvalidArgument, "bad field"), false}, + {"match", status.Errorf(codes.InvalidArgument, "The value for request_size is too large. You passed 10036929 in the request, but the maximum value is 10000000."), true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := isRequestTooLarge(tt.err); got != tt.want { + t.Errorf("isRequestTooLarge() = %v, want %v", got, tt.want) + } + }) + } +}