Skip to content

Commit ddb25d3

Browse files
authored
fix: stop retrying 429s caused by Fabric capacity limits (#2539)
* fix: stop retrying 429s caused by Fabric capacity limits Fabric returns HTTP 429 with body code "CapacityLimitExceeded" when serverless capacity is exhausted. Unlike transient OpenAI rate limits, these are non-retryable — retrying causes infinite loops and hangs. This change inspects 429 response bodies in sendWithRetries. If the body contains "CapacityLimitExceeded", the response is returned immediately instead of entering the retry/backoff loop. Normal rate-limit 429s continue to retry with exponential backoff as before. The response entity is buffered (BufferedHttpEntity) before inspection so callers can still read it after the method returns. Adds 3 regression tests: - CapacityLimitExceeded 429 returns immediately (no retry) - CapacityLimitExceeded 429 ignores Retry-After header - Non-capacity 429 with body still retries normally * fix: address PR feedback — size guard, log truncation, test stability - Guard BufferedHttpEntity with Content-Length cap (1MB) to avoid buffering unexpectedly large 429 response payloads - Remove full response body from log warning to prevent data exposure - Remove tight elapsed-time assertion from capacity test (requestCount already validates no-retry); widen Retry-After test bound to 4s * fix: handle chunked 429 responses in capacity check The Content-Length guard was skipping body inspection when getContentLength() returns -1 (chunked/unknown encoding). This meant capacity-exceeded 429s sent without Content-Length would still retry forever. Changed guard to only skip when Content-Length is known AND exceeds the 1MB cap — unknown-length responses are now inspected. Added regression test for chunked capacity-exceeded 429.
1 parent 0a2b0b4 commit ddb25d3

2 files changed

Lines changed: 155 additions & 10 deletions

File tree

core/src/main/scala/com/microsoft/azure/synapse/ml/io/http/HTTPClients.scala

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import com.microsoft.azure.synapse.ml.logging.SynapseMLLogging
77
import org.apache.commons.io.IOUtils
88
import org.apache.http.client.config.RequestConfig
99
import org.apache.http.client.methods.{CloseableHttpResponse, HttpPost, HttpRequestBase}
10+
import org.apache.http.entity.BufferedHttpEntity
1011
import org.apache.http.impl.client.{CloseableHttpClient, HttpClientBuilder}
1112
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager
1213
import org.apache.spark.injections.UDFUtils
@@ -104,17 +105,37 @@ object HandlingUtils extends SparkLogging {
104105
case 201 => true
105106
case 202 => true
106107
case 429 =>
107-
Option(response.getFirstHeader("Retry-After"))
108-
.foreach { h =>
109-
logInfo(s"waiting ${h.getValue} on ${
110-
request match {
111-
case p: HttpPost => p.getURI + " " +
112-
Try(IOUtils.toString(p.getEntity.getContent, "UTF-8")).getOrElse("")
113-
case _ => request.getURI
114-
}
115-
}")
108+
// Inspect body to distinguish capacity errors from transient rate limits.
109+
// Guard with Content-Length cap to avoid buffering unexpectedly large payloads.
110+
val MaxInspectionBytes = 1024 * 1024L
111+
val bodyStr = Option(response.getEntity).flatMap { entity =>
112+
val contentLength = entity.getContentLength
113+
if (contentLength > MaxInspectionBytes) {
114+
None
115+
} else {
116+
response.setEntity(new BufferedHttpEntity(entity))
117+
Option(response.getEntity)
118+
.flatMap(e => Try(IOUtils.toString(e.getContent, "UTF-8")).toOption)
116119
}
117-
false
120+
}.getOrElse("")
121+
if (bodyStr.contains("CapacityLimitExceeded")) {
122+
// Fabric capacity-exceeded 429s are NOT transient rate limits —
123+
// retrying will not help and causes hangs
124+
logWarning(s"Capacity limit exceeded (non-retryable 429) on ${request.getURI}")
125+
true
126+
} else {
127+
Option(response.getFirstHeader("Retry-After"))
128+
.foreach { h =>
129+
logInfo(s"waiting ${h.getValue} on ${
130+
request match {
131+
case p: HttpPost => p.getURI + " " +
132+
Try(IOUtils.toString(p.getEntity.getContent, "UTF-8")).getOrElse("")
133+
case _ => request.getURI
134+
}
135+
}")
136+
}
137+
false
138+
}
118139
case code =>
119140
logWarning(s"got error $code: ${response.getStatusLine.getReasonPhrase} on ${
120141
request match {

core/src/test/scala/com/microsoft/azure/synapse/ml/io/split1/VerifySendWithRetries.scala

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,130 @@ class VerifySendWithRetries extends TestBase {
273273
}
274274
}
275275

276+
test("429 with CapacityLimitExceeded body is not retried") {
277+
val port = getFreePort
278+
val requestCount = new AtomicInteger(0)
279+
val capacityBody =
280+
"""{"error":{"code":"CapacityLimitExceeded","message":"Serverless capacity limit exceeded"}}"""
281+
val server = startServer(port) { exchange =>
282+
val n = requestCount.incrementAndGet()
283+
if (n == 1) {
284+
respond(exchange, 429, capacityBody)
285+
} else {
286+
respond(exchange, 200, """{"ok":true}""")
287+
}
288+
}
289+
try {
290+
val client = HttpClients.createDefault()
291+
val request = new HttpGet(s"http://localhost:$port/test")
292+
val response = HandlingUtils.sendWithRetries(
293+
client, request, Array(100, 100, 100))
294+
val code = response.getStatusLine.getStatusCode
295+
response.close()
296+
client.close()
297+
298+
assert(code === 429, "Capacity-exceeded 429 should be returned immediately, not retried")
299+
assert(requestCount.get() === 1, "Should not retry on CapacityLimitExceeded")
300+
} finally {
301+
server.stop(0)
302+
}
303+
}
304+
305+
test("429 with CapacityLimitExceeded ignores Retry-After header") {
306+
val port = getFreePort
307+
val requestCount = new AtomicInteger(0)
308+
val capacityBody =
309+
"""{"error":{"code":"CapacityLimitExceeded","message":"Serverless capacity limit exceeded"}}"""
310+
val server = startServer(port) { exchange =>
311+
val n = requestCount.incrementAndGet()
312+
if (n == 1) {
313+
respond(exchange, 429, capacityBody, headers = Map("Retry-After" -> "5"))
314+
} else {
315+
respond(exchange, 200, """{"ok":true}""")
316+
}
317+
}
318+
try {
319+
val client = HttpClients.createDefault()
320+
val request = new HttpGet(s"http://localhost:$port/test")
321+
val start = System.currentTimeMillis()
322+
val response = HandlingUtils.sendWithRetries(
323+
client, request, Array(100, 100, 100))
324+
val elapsed = System.currentTimeMillis() - start
325+
val code = response.getStatusLine.getStatusCode
326+
response.close()
327+
client.close()
328+
329+
assert(code === 429, "Capacity-exceeded should not retry even with Retry-After")
330+
assert(requestCount.get() === 1, "Should not retry on CapacityLimitExceeded")
331+
// Verify we didn't sleep for the 5s Retry-After
332+
assert(elapsed < 4000, s"Should ignore Retry-After and return quickly, took ${elapsed}ms")
333+
} finally {
334+
server.stop(0)
335+
}
336+
}
337+
338+
test("429 with non-capacity error body still retries normally") {
339+
val port = getFreePort
340+
val requestCount = new AtomicInteger(0)
341+
val rateLimitBody = """{"error":{"code":"RateLimitExceeded","message":"Too many requests"}}"""
342+
val server = startServer(port) { exchange =>
343+
val n = requestCount.incrementAndGet()
344+
if (n <= 2) {
345+
respond(exchange, 429, rateLimitBody)
346+
} else {
347+
respond(exchange, 200, """{"ok":true}""")
348+
}
349+
}
350+
try {
351+
val client = HttpClients.createDefault()
352+
val request = new HttpGet(s"http://localhost:$port/test")
353+
val response = HandlingUtils.sendWithRetries(
354+
client, request, Array(100, 100, 100))
355+
val code = response.getStatusLine.getStatusCode
356+
response.close()
357+
client.close()
358+
359+
assert(code === 200, "Non-capacity 429 should still retry and eventually succeed")
360+
assert(requestCount.get() === 3, "Should have retried past the rate-limit 429s")
361+
} finally {
362+
server.stop(0)
363+
}
364+
}
365+
366+
test("429 with CapacityLimitExceeded and chunked encoding (no Content-Length) is not retried") {
367+
val port = getFreePort
368+
val requestCount = new AtomicInteger(0)
369+
val capacityBody =
370+
"""{"error":{"code":"CapacityLimitExceeded","message":"Serverless capacity limit exceeded"}}"""
371+
val server = startServer(port) { exchange =>
372+
val n = requestCount.incrementAndGet()
373+
if (n == 1) {
374+
// Send with Content-Length = 0 (chunked) to simulate no Content-Length header
375+
exchange.sendResponseHeaders(429, 0)
376+
val os = exchange.getResponseBody
377+
os.write(capacityBody.getBytes("UTF-8"))
378+
os.close()
379+
exchange.close()
380+
} else {
381+
respond(exchange, 200, """{"ok":true}""")
382+
}
383+
}
384+
try {
385+
val client = HttpClients.createDefault()
386+
val request = new HttpGet(s"http://localhost:$port/test")
387+
val response = HandlingUtils.sendWithRetries(
388+
client, request, Array(100, 100, 100))
389+
val code = response.getStatusLine.getStatusCode
390+
response.close()
391+
client.close()
392+
393+
assert(code === 429, "Chunked capacity-exceeded 429 should be returned immediately")
394+
assert(requestCount.get() === 1, "Should not retry on chunked CapacityLimitExceeded")
395+
} finally {
396+
server.stop(0)
397+
}
398+
}
399+
276400
test("429 with Retry-After 0 means retry immediately") {
277401
val port = getFreePort
278402
val requestCount = new AtomicInteger(0)

0 commit comments

Comments
 (0)