Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/src/main/sphinx/object-storage/file-system-azure.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ system support:
for all requests sent to Azure Storage. Defaults to `Trino`.
* - `azure.multipart-write-enabled`
- Enable multipart writes for large files. Defaults to `false`.
* - `azure.max-error-retries`
- Maximum [integer](prop-type-integer) number of retries for transient Azure
HTTP request failures using exponential backoff. Defaults to `4`.
:::

(azure-user-assigned-managed-identity-authentication)=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import com.azure.storage.blob.sas.BlobSasPermission;
import com.azure.storage.blob.sas.BlobServiceSasSignatureValues;
import com.azure.storage.blob.specialized.BlockBlobClient;
import com.azure.storage.common.policy.RequestRetryOptions;
import com.azure.storage.common.policy.RetryPolicyType;
import com.azure.storage.common.sas.SasProtocol;
import com.azure.storage.file.datalake.DataLakeDirectoryClient;
import com.azure.storage.file.datalake.DataLakeFileClient;
Expand Down Expand Up @@ -94,6 +96,7 @@ public class AzureFileSystem
private final int maxWriteConcurrency;
private final long maxSingleUploadSizeBytes;
private final boolean multipartWriteEnabled;
private final int maxErrorRetries;

public AzureFileSystem(
HttpClient httpClient,
Expand All @@ -106,7 +109,8 @@ public AzureFileSystem(
DataSize writeBlockSize,
int maxWriteConcurrency,
DataSize maxSingleUploadSize,
boolean multipartWriteEnabled)
boolean multipartWriteEnabled,
int maxErrorRetries)
{
this.httpClient = requireNonNull(httpClient, "httpClient is null");
this.concurrencyPolicy = requireNonNull(concurrencyPolicy, "concurrencyPolicy is null");
Expand All @@ -120,6 +124,7 @@ public AzureFileSystem(
this.maxWriteConcurrency = maxWriteConcurrency;
this.maxSingleUploadSizeBytes = maxSingleUploadSize.toBytes();
this.multipartWriteEnabled = multipartWriteEnabled;
this.maxErrorRetries = maxErrorRetries;
}

@Override
Expand Down Expand Up @@ -684,6 +689,7 @@ private BlobContainerClient createBlobContainerClient(AzureLocation location, Op
BlobContainerClientBuilder builder = new BlobContainerClientBuilder()
.httpClient(httpClient)
.addPolicy(concurrencyPolicy)
.retryOptions(new RequestRetryOptions(RetryPolicyType.EXPONENTIAL, maxErrorRetries, (Integer) null, null, null, null))
.clientOptions(new ClientOptions().setTracingOptions(tracingOptions))
.endpoint("https://%s.blob.%s".formatted(location.account(), validatedEndpoint(location)));

Expand All @@ -701,6 +707,7 @@ private DataLakeFileSystemClient createFileSystemClient(AzureLocation location,
DataLakeServiceClientBuilder builder = new DataLakeServiceClientBuilder()
.httpClient(httpClient)
.addPolicy(concurrencyPolicy)
.retryOptions(new RequestRetryOptions(RetryPolicyType.EXPONENTIAL, maxErrorRetries, (Integer) null, null, null, null))
.clientOptions(new ClientOptions().setTracingOptions(tracingOptions))
.endpoint("https://%s.dfs.%s".formatted(location.account(), validatedEndpoint(location)));
key.ifPresent(encryption -> builder.customerProvidedKey(lakeCustomerProvidedKey(encryption)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public enum AuthType
private Duration httpRequestTimeout = new Duration(10, TimeUnit.MINUTES);
private String applicationId = "Trino";
private boolean multipartWriteEnabled;
private int maxErrorRetries = 4;

@NotNull
public AuthType getAuthType()
Expand Down Expand Up @@ -214,4 +215,18 @@ public AzureFileSystemConfig setMultipartWriteEnabled(boolean multipartWriteEnab
this.multipartWriteEnabled = multipartWriteEnabled;
return this;
}

@Min(0)
public int getMaxErrorRetries()
{
return maxErrorRetries;
}

@Config("azure.max-error-retries")
@ConfigDescription("Maximum number of retries for transient Azure HTTP request failures")
public AzureFileSystemConfig setMaxErrorRetries(int maxErrorRetries)
{
this.maxErrorRetries = maxErrorRetries;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class AzureFileSystemFactory
private final EventLoopGroup eventLoopGroup;
private final boolean multipart;
private final HttpPipelinePolicy concurrencyPolicy;
private final int maxErrorRetries;

@Inject
public AzureFileSystemFactory(OpenTelemetry openTelemetry, AzureAuth azureAuth, AzureFileSystemConfig config)
Expand All @@ -77,7 +78,8 @@ public AzureFileSystemFactory(OpenTelemetry openTelemetry, AzureAuth azureAuth,
config.getConnectionPoolMaxIdleTime(),
config.getHttpRequestTimeout(),
config.getApplicationId(),
config.isMultipartWriteEnabled());
config.isMultipartWriteEnabled(),
config.getMaxErrorRetries());
}

public AzureFileSystemFactory(
Expand All @@ -93,7 +95,8 @@ public AzureFileSystemFactory(
Duration connectionPoolMaxIdleTime,
Duration httpRequestTimeout,
String applicationId,
boolean multipart)
boolean multipart,
int maxErrorRetries)
{
this.auth = requireNonNull(azureAuth, "azureAuth is null");
this.endpoint = requireNonNull(endpoint, "endpoint is null");
Expand All @@ -116,6 +119,7 @@ public AzureFileSystemFactory(
clientOptions.setMaximumConnectionPoolSize(maxHttpConnections);
httpClient = createAzureHttpClient(connectionProvider, eventLoopGroup, clientOptions);
this.multipart = multipart;
this.maxErrorRetries = maxErrorRetries;
this.concurrencyPolicy = new ConcurrencyLimitHttpPipelinePolicy(
maxHttpRequests,
httpRequestTimeout.toJavaTime());
Expand Down Expand Up @@ -147,7 +151,7 @@ public void destroy()
public TrinoFileSystem create(ConnectorIdentity identity)
{
AzureAuth effectiveAuth = getEffectiveAuth(identity);
return new AzureFileSystem(httpClient, concurrencyPolicy, uploadExecutor, tracingOptions, effectiveAuth, endpoint, readBlockSize, writeBlockSize, maxWriteConcurrency, maxSingleUploadSize, multipart);
return new AzureFileSystem(httpClient, concurrencyPolicy, uploadExecutor, tracingOptions, effectiveAuth, endpoint, readBlockSize, writeBlockSize, maxWriteConcurrency, maxSingleUploadSize, multipart, maxErrorRetries);
}

private AzureAuth getEffectiveAuth(ConnectorIdentity identity)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ void testDefaults()
.setConnectionPoolMaxIdleTime(new Duration(5, MINUTES))
.setHttpRequestTimeout(new Duration(10, MINUTES))
.setApplicationId("Trino")
.setMultipartWriteEnabled(false));
.setMultipartWriteEnabled(false)
.setMaxErrorRetries(4));
}

@Test
Expand All @@ -64,6 +65,7 @@ public void testExplicitPropertyMappings()
.put("azure.http-request-timeout", "1m")
.put("azure.application-id", "application id")
.put("azure.multipart-write-enabled", "true")
.put("azure.max-error-retries", "10")
.buildOrThrow();

AzureFileSystemConfig expected = new AzureFileSystemConfig()
Expand All @@ -78,7 +80,8 @@ public void testExplicitPropertyMappings()
.setConnectionPoolMaxIdleTime(new Duration(1, MINUTES))
.setHttpRequestTimeout(new Duration(1, MINUTES))
.setApplicationId("application id")
.setMultipartWriteEnabled(true);
.setMultipartWriteEnabled(true)
.setMaxErrorRetries(10);

assertFullMapping(properties, expected);
}
Expand Down
Loading