diff --git a/.github/actions/setup/action.yml b/.github/actions/setup/action.yml index 7f97273d2760..6d3effa1b397 100644 --- a/.github/actions/setup/action.yml +++ b/.github/actions/setup/action.yml @@ -111,6 +111,30 @@ runs: key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }} restore-keys: | ${{ runner.os }}-maven- + - name: Cache and Restore Maven wrapper + id: cache-maven-wrapper + if: ${{ format('{0}', inputs.cache) == 'true' }} + uses: actions/cache@v5 + with: + # Note: must be same set of paths as for cache:restore mode + path: ~/.m2/wrapper/dists + key: ${{ runner.os }}-maven-wrapper-${{ hashFiles('.mvn/wrapper/maven-wrapper.properties') }} + restore-keys: | + ${{ runner.os }}-maven-wrapper- + - name: Restore Maven wrapper + id: cache_restore-maven-wrapper + if: ${{ format('{0}', inputs.cache) == 'restore' }} + uses: actions/cache/restore@v5 + with: + # Note: must be same set of paths as for cache:true mode + path: ~/.m2/wrapper/dists + key: ${{ runner.os }}-maven-wrapper-${{ hashFiles('.mvn/wrapper/maven-wrapper.properties') }} + restore-keys: | + ${{ runner.os }}-maven-wrapper- + - name: Verify Maven wrapper + if: ${{ inputs.java-version != '' }} + shell: bash + run: .github/bin/retry ./mvnw -v - name: Configure Problem Matchers if: ${{ inputs.java-version != '' }} shell: bash diff --git a/core/trino-spi/src/test/java/io/trino/spi/variant/TestMetadata.java b/core/trino-spi/src/test/java/io/trino/spi/variant/TestMetadata.java index 7455470b0f3e..49ec3acf06aa 100644 --- a/core/trino-spi/src/test/java/io/trino/spi/variant/TestMetadata.java +++ b/core/trino-spi/src/test/java/io/trino/spi/variant/TestMetadata.java @@ -18,6 +18,7 @@ import org.apache.iceberg.variants.VariantMetadata; import org.apache.iceberg.variants.Variants; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Isolated; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -35,6 +36,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +@Isolated // this test uses lots of memory class TestMetadata { // from the Iceberg implementation, an empty metadata has 3 bytes: header + offsetCount + 0th offset diff --git a/lib/trino-filesystem/src/test/java/io/trino/filesystem/AbstractTestTrinoFileSystem.java b/lib/trino-filesystem/src/test/java/io/trino/filesystem/AbstractTestTrinoFileSystem.java index dcfa0c88f506..c5e3f6f6bfbc 100644 --- a/lib/trino-filesystem/src/test/java/io/trino/filesystem/AbstractTestTrinoFileSystem.java +++ b/lib/trino-filesystem/src/test/java/io/trino/filesystem/AbstractTestTrinoFileSystem.java @@ -1056,25 +1056,28 @@ public void testPreSignedUris() abort("Generating pre-signed URI is not supported"); } - Optional directLocation = getFileSystem() - .preSignedUri(location, new Duration(30, SECONDS)); + UriLocation directLocation = assertThat(getFileSystem() + .preSignedUri(location, new Duration(30, SECONDS))) + .get().actual(); - Optional expiredDirectLocation = getFileSystem() - .preSignedUri(location, new Duration(1, SECONDS)); + UriLocation expiredDirectLocation = assertThat(getFileSystem() + .preSignedUri(location, new Duration(3, SECONDS))) + .get().actual(); - assertThat(directLocation).isPresent(); - - assertEventually(new Duration(5, SECONDS), () -> assertThat(retrieveUri(directLocation.get())) + assertEventually(new Duration(5, SECONDS), () -> assertThat(retrieveUri(directLocation)) .isEqualTo(TEST_BLOB_CONTENT_PREFIX + location)); // Check if it can be retrieved more than once - assertEventually(new Duration(5, SECONDS), () -> assertThat(retrieveUri(directLocation.get())) + assertEventually(new Duration(5, SECONDS), () -> assertThat(retrieveUri(directLocation)) .isEqualTo(TEST_BLOB_CONTENT_PREFIX + location)); // Check if after a timeout the pre-signed URI is no longer valid - assertEventually(new Duration(5, SECONDS), new Duration(1, SECONDS), () -> assertThatThrownBy(() -> retrieveUri(expiredDirectLocation.get())) - .isInstanceOf(IOException.class) - .hasMessageContaining("Failed to retrieve")); + assertEventually( + new Duration(10, SECONDS), + new Duration(1, SECONDS), + () -> assertThatThrownBy(() -> retrieveUri(expiredDirectLocation)) + .isInstanceOf(IOException.class) + .hasMessageContaining("Failed to retrieve")); } } diff --git a/lib/trino-hdfs/src/test/java/io/trino/filesystem/hdfs/Hadoop.java b/lib/trino-hdfs/src/test/java/io/trino/filesystem/hdfs/Hadoop.java index 6008c0730461..d58e49f4df13 100644 --- a/lib/trino-hdfs/src/test/java/io/trino/filesystem/hdfs/Hadoop.java +++ b/lib/trino-hdfs/src/test/java/io/trino/filesystem/hdfs/Hadoop.java @@ -16,11 +16,14 @@ import io.airlift.log.Logger; import io.trino.testing.containers.BaseTestContainer; import io.trino.testing.containers.PrintingLogConsumer; +import org.junit.jupiter.api.extension.AfterTestExecutionCallback; +import org.testcontainers.containers.Container; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.function.Supplier; import static com.google.common.base.StandardSystemProperty.USER_NAME; import static io.trino.testing.TestingProperties.getDockerImagesVersion; @@ -64,6 +67,12 @@ public void start() { super.start(); executeInContainerFailOnError("hadoop", "fs", "-rm", "-r", "/*"); + executeInContainerFailOnError("bash", "-e", "-c", + """ + printf 'ready' | hadoop fs -put - /_trino_hdfs_ready + hadoop fs -cat /_trino_hdfs_ready + hadoop fs -rm -f /_trino_hdfs_ready + """); log.info("Hadoop container started with HDFS endpoint: %s", getHdfsUri()); } @@ -71,4 +80,52 @@ public String getHdfsUri() { return "hdfs://%s/".formatted(getMappedHostAndPortForExposedPort(HDFS_PORT)); } + + public void printDiagnostics() + { + log.warn("Printing Hadoop container diagnostics"); + printDiagnostic("supervisor status", "supervisorctl", "status"); + printDiagnostic("container disk usage", "df", "-h"); + printDiagnostic("container inode usage", "df", "-i"); + printDiagnostic("HDFS report", "hdfs", "dfsadmin", "-report"); + printDiagnostic("HDFS disk usage", "hadoop", "fs", "-df", "-h", "/"); + printDiagnostic("HDFS directory usage", "hadoop", "fs", "-du", "-h", "/"); + printDiagnostic("HDFS fsck", "hdfs", "fsck", "/", "-files", "-blocks", "-locations"); + printDiagnostic("NameNode log", "bash", "-c", "tail -n 500 /var/log/hadoop-hdfs/*namenode*.log 2>&1 || true"); + printDiagnostic("DataNode log", "bash", "-c", "tail -n 500 /var/log/hadoop-hdfs/*datanode*.log 2>&1 || true"); + } + + private void printDiagnostic(String name, String... command) + { + try { + Container.ExecResult result = executeInContainer(command); + log.warn( + "Hadoop diagnostic: %s%ncommand: %s%nexit code: %s%nstdout:%n%s%nstderr:%n%s", + name, + String.join(" ", command), + result.getExitCode(), + result.getStdout(), + result.getStderr()); + } + catch (RuntimeException e) { + log.warn(e, "Failed to print Hadoop diagnostic: %s", name); + } + } + + private static boolean isIncompleteExecution(Throwable failure) + { + for (Class failureClass = failure.getClass(); failureClass != null; failureClass = failureClass.getSuperclass()) { + if (failureClass.getName().equals("org.opentest4j.IncompleteExecutionException")) { + return true; + } + } + return false; + } + + public static AfterTestExecutionCallback printDiagnosticsOnFailure(Supplier hadoop) + { + return context -> context.getExecutionException() + .filter(failure -> !isIncompleteExecution(failure)) + .ifPresent(_ -> hadoop.get().printDiagnostics()); + } } diff --git a/lib/trino-hdfs/src/test/java/io/trino/filesystem/hdfs/TestHdfsFileSystemHdfs.java b/lib/trino-hdfs/src/test/java/io/trino/filesystem/hdfs/TestHdfsFileSystemHdfs.java index 8948a0727025..52268d9e4b63 100644 --- a/lib/trino-hdfs/src/test/java/io/trino/filesystem/hdfs/TestHdfsFileSystemHdfs.java +++ b/lib/trino-hdfs/src/test/java/io/trino/filesystem/hdfs/TestHdfsFileSystemHdfs.java @@ -32,6 +32,8 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.AfterTestExecutionCallback; +import org.junit.jupiter.api.extension.RegisterExtension; import java.io.IOException; import java.io.UncheckedIOException; @@ -48,6 +50,9 @@ public class TestHdfsFileSystemHdfs private HdfsContext hdfsContext; private TrinoFileSystem fileSystem; + @RegisterExtension + final AfterTestExecutionCallback hadoopDiagnostics = Hadoop.printDiagnosticsOnFailure(() -> hadoop); + @BeforeAll void beforeAll() { diff --git a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java index 19a93489f85e..42e18e561aab 100644 --- a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java +++ b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java @@ -52,6 +52,7 @@ import java.util.function.Function; import static com.google.common.base.Strings.nullToEmpty; +import static com.google.common.base.Throwables.getCausalChain; import static com.google.common.collect.ImmutableMultiset.toImmutableMultiset; import static com.google.common.collect.MoreCollectors.onlyElement; import static io.trino.plugin.bigquery.BigQueryQueryRunner.BigQuerySqlExecutor; @@ -83,6 +84,12 @@ public abstract class BaseBigQueryConnectorTest extends BaseConnectorTest { + private static final RetryPolicy BIGQUERY_TRANSIENT_FAILURE_RETRY_POLICY = RetryPolicy.builder() + .handleIf(BaseBigQueryConnectorTest::isTransientBigQueryFailure) + .withDelay(java.time.Duration.ofSeconds(10)) + .withMaxAttempts(3) + .build(); + protected BigQuerySqlExecutor bigQuerySqlExecutor; private String gcpStorageBucket; private String bigQueryConnectionId; @@ -1573,6 +1580,13 @@ protected void verifyColumnNameLengthFailurePermissible(Throwable e) private void onBigQuery(@Language("SQL") String sql) { - bigQuerySqlExecutor.execute(sql); + Failsafe.with(BIGQUERY_TRANSIENT_FAILURE_RETRY_POLICY) + .run(() -> bigQuerySqlExecutor.execute(sql)); + } + + private static boolean isTransientBigQueryFailure(Throwable throwable) + { + return getCausalChain(throwable).stream().anyMatch(cause -> + nullToEmpty(cause.getMessage()).contains("Visibility check was unavailable")); } } diff --git a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/CassandraCreateAndInsertDataSetup.java b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/CassandraCreateAndInsertDataSetup.java index 9f59c84ee67f..f3cd2a7adb01 100644 --- a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/CassandraCreateAndInsertDataSetup.java +++ b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/CassandraCreateAndInsertDataSetup.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableList; import io.airlift.json.JsonCodec; +import io.trino.testing.QueryRunner; import io.trino.testing.datatype.ColumnSetup; import io.trino.testing.datatype.DataSetup; import io.trino.testing.sql.SqlExecutor; @@ -49,13 +50,15 @@ public class CassandraCreateAndInsertDataSetup private final String tableNamePrefix; private final String keyspaceName; private final CassandraServer cassandraServer; + private final QueryRunner queryRunner; - public CassandraCreateAndInsertDataSetup(SqlExecutor sqlExecutor, String tableNamePrefix, CassandraServer cassandraServer) + public CassandraCreateAndInsertDataSetup(SqlExecutor sqlExecutor, String tableNamePrefix, CassandraServer cassandraServer, QueryRunner queryRunner) { this.sqlExecutor = requireNonNull(sqlExecutor, "sqlExecutor is null"); this.tableNamePrefix = requireNonNull(tableNamePrefix, "tableNamePrefix is null"); keyspaceName = verifyTableNamePrefixAndGetKeyspaceName(tableNamePrefix); this.cassandraServer = requireNonNull(cassandraServer, "cassandraServer is null"); + this.queryRunner = requireNonNull(queryRunner, "queryRunner is null"); } private static String verifyTableNamePrefixAndGetKeyspaceName(String tableNamePrefix) @@ -105,9 +108,8 @@ private void refreshSizeEstimates(String keyspaceName, String tableName) private void waitForTableVisibility(String keyspaceName, String tableName) { - assertEventually(() -> assertThat(cassandraServer.getSession() - .execute(format("SELECT table_name FROM system_schema.tables WHERE keyspace_name = '%s' AND table_name = '%s'", keyspaceName, tableName)).all()) - .isNotEmpty()); + assertEventually(() -> assertThat(queryRunner.execute(format("SELECT * FROM %s.%s", keyspaceName, tableName)).getRowCount()) + .isEqualTo(1)); } private TestTable createTestTable(List inputs) diff --git a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraTable.java b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraTable.java index f5e1be35e02e..af39c432562e 100644 --- a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraTable.java +++ b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraTable.java @@ -14,6 +14,7 @@ package io.trino.plugin.cassandra; import com.google.common.collect.ImmutableList; +import io.airlift.units.Duration; import io.trino.testing.QueryRunner; import io.trino.testing.sql.SqlExecutor; @@ -25,6 +26,7 @@ import static io.trino.testing.assertions.Assert.assertEventually; import static java.lang.String.format; import static java.lang.String.join; +import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.stream.Collectors.joining; import static org.assertj.core.api.Assertions.assertThat; @@ -65,8 +67,11 @@ public TestCassandraTable( } // Ensure that the currently created table is visible to Trino - assertEventually(() -> assertThat(queryRunner.execute("SELECT * FROM %s.%s".formatted(keyspace, tableName)).getRowCount()) - .isEqualTo(rowsToInsert.size())); + assertEventually( + new Duration(1, MINUTES), + () -> assertThat(queryRunner.execute("SELECT * FROM %s.%s".formatted(keyspace, tableName)) + .getRowCount()) + .isEqualTo(rowsToInsert.size())); } catch (Exception e) { try (TestCassandraTable ignored = this) { diff --git a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraTypeMapping.java b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraTypeMapping.java index fd5cbbbd1558..c5abbd9a440b 100644 --- a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraTypeMapping.java +++ b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraTypeMapping.java @@ -689,7 +689,7 @@ private DataSetup trinoCreateAndInsert(Session session, String tableNamePrefix) private DataSetup cassandraCreateAndInsert(String tableNamePrefix) { - return new CassandraCreateAndInsertDataSetup(session::execute, tableNamePrefix, server); + return new CassandraCreateAndInsertDataSetup(session::execute, tableNamePrefix, server, getQueryRunner()); } private TestCassandraTable testTable(String namePrefix, List columnDefinitions, List rowsToInsert) diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSharedGlueMetastoreWithTableRedirections.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSharedGlueMetastoreWithTableRedirections.java index 6869582b32fc..82bafb046191 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSharedGlueMetastoreWithTableRedirections.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSharedGlueMetastoreWithTableRedirections.java @@ -22,10 +22,12 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.parallel.Execution; import software.amazon.awssdk.services.glue.GlueClient; import software.amazon.awssdk.services.glue.model.Column; import software.amazon.awssdk.services.glue.model.CreateTableRequest; import software.amazon.awssdk.services.glue.model.DeleteTableRequest; +import software.amazon.awssdk.services.glue.model.EntityNotFoundException; import software.amazon.awssdk.services.glue.model.GetTableRequest; import software.amazon.awssdk.services.glue.model.GetTableResponse; import software.amazon.awssdk.services.glue.model.SerDeInfo; @@ -36,10 +38,12 @@ import java.util.Map; import static io.trino.plugin.hive.metastore.glue.TestingGlueHiveMetastore.createTestingGlueHiveMetastore; +import static io.trino.testing.TestingNames.randomNameSuffix; import static io.trino.testing.TestingSession.testSessionBuilder; import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; +import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD; /** * Tests metadata operations on a schema which has a mix of Hive and Delta Lake tables. @@ -48,6 +52,7 @@ * See https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-default */ @TestInstance(PER_CLASS) +@Execution(SAME_THREAD) // Tests share a Glue schema and assert exact table listings public class TestDeltaLakeSharedGlueMetastoreWithTableRedirections extends BaseDeltaLakeSharedMetastoreWithTableRedirectionsTest { @@ -124,7 +129,7 @@ protected String getExpectedDeltaLakeCreateSchema(String catalogName) @Test public void testUnsupportedHiveTypeRedirect() { - String tableName = "unsupported_types"; + String tableName = "unsupported_types_" + randomNameSuffix(); // Use another complete table location so `SHOW CREATE TABLE` doesn't fail on reading metadata String location; try (GlueClient glueClient = GlueClient.create()) { @@ -168,21 +173,34 @@ public void testUnsupportedHiveTypeRedirect() try (GlueClient glueClient = GlueClient.create()) { glueClient.createTable(createTableRequest); - String tableDefinition = (String) computeScalar("SHOW CREATE TABLE hive_with_redirections." + schema + "." + tableName); - String expected = - """ - CREATE TABLE delta_with_redirections.%s.%s ( - a_varchar varchar - ) - WITH ( - location = '%s' - )"""; - assertThat(tableDefinition).isEqualTo(expected.formatted(schema, tableName, location)); + try { + String tableDefinition = (String) computeScalar( + "SHOW CREATE TABLE hive_with_redirections." + schema + "." + tableName); + String expected = + """ + CREATE TABLE delta_with_redirections.%s.%s ( + a_varchar varchar + ) + WITH ( + location = '%s' + )"""; + assertThat(tableDefinition).isEqualTo(expected.formatted(schema, tableName, location)); + } + finally { + deleteTableIfExists(glueClient, tableInput.name()); + } + } + } + private void deleteTableIfExists(GlueClient glueClient, String tableName) + { + try { glueClient.deleteTable(DeleteTableRequest.builder() .databaseName(schema) - .name(tableInput.name()) + .name(tableName) .build()); } + catch (EntityNotFoundException _) { + } } } diff --git a/plugin/trino-google-sheets/src/main/java/io/trino/plugin/google/sheets/SheetsClient.java b/plugin/trino-google-sheets/src/main/java/io/trino/plugin/google/sheets/SheetsClient.java index a5f0a83da5b5..5449a827a31a 100644 --- a/plugin/trino-google-sheets/src/main/java/io/trino/plugin/google/sheets/SheetsClient.java +++ b/plugin/trino-google-sheets/src/main/java/io/trino/plugin/google/sheets/SheetsClient.java @@ -24,6 +24,7 @@ import com.google.api.services.sheets.v4.Sheets; import com.google.api.services.sheets.v4.SheetsScopes; import com.google.api.services.sheets.v4.model.ValueRange; +import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -73,12 +74,7 @@ public class SheetsClient public static final String RANGE_SEPARATOR = "#"; private static final Logger log = Logger.get(SheetsClient.class); - public static final ExponentialBackOff BACKOFF = new ExponentialBackOff.Builder() - .setInitialIntervalMillis(500) - .setMaxIntervalMillis(10_000) - .setMaxElapsedTimeMillis(60_000) - .setMultiplier(1.5) - .build(); + private static final int HTTP_TOO_MANY_REQUESTS = 429; private static final String APPLICATION_NAME = "trino google sheets integration"; private static final JsonFactory JSON_FACTORY = JacksonFactory.getDefaultInstance(); @@ -344,8 +340,27 @@ private HttpRequestInitializer setTimeout(HttpRequestInitializer requestInitiali httpRequest.setConnectTimeout(toIntExact(config.getConnectionTimeout().toMillis())); httpRequest.setReadTimeout(toIntExact(config.getReadTimeout().toMillis())); httpRequest.setWriteTimeout(toIntExact(config.getWriteTimeout().toMillis())); - httpRequest.setUnsuccessfulResponseHandler(new HttpBackOffUnsuccessfulResponseHandler(BACKOFF)); - httpRequest.setIOExceptionHandler(new HttpBackOffIOExceptionHandler(BACKOFF)); + httpRequest.setUnsuccessfulResponseHandler(newUnsuccessfulResponseHandler()); + httpRequest.setIOExceptionHandler(new HttpBackOffIOExceptionHandler(newBackOff())); }; } + + @VisibleForTesting + static HttpBackOffUnsuccessfulResponseHandler newUnsuccessfulResponseHandler() + { + return new HttpBackOffUnsuccessfulResponseHandler(newBackOff()) + .setBackOffRequired(response -> response.getStatusCode() == HTTP_TOO_MANY_REQUESTS || + (response.getStatusCode() >= 500 && response.getStatusCode() <= 599)); + } + + @VisibleForTesting + static ExponentialBackOff newBackOff() + { + return new ExponentialBackOff.Builder() + .setInitialIntervalMillis(500) + .setMaxIntervalMillis(10_000) + .setMaxElapsedTimeMillis(60_000) + .setMultiplier(1.5) + .build(); + } } diff --git a/plugin/trino-google-sheets/src/test/java/io/trino/plugin/google/sheets/TestGoogleSheets.java b/plugin/trino-google-sheets/src/test/java/io/trino/plugin/google/sheets/TestGoogleSheets.java index 7772fb7830ca..5372a3521f4c 100644 --- a/plugin/trino-google-sheets/src/test/java/io/trino/plugin/google/sheets/TestGoogleSheets.java +++ b/plugin/trino-google-sheets/src/test/java/io/trino/plugin/google/sheets/TestGoogleSheets.java @@ -15,7 +15,6 @@ import com.google.api.client.googleapis.auth.oauth2.GoogleCredential; import com.google.api.client.http.HttpBackOffIOExceptionHandler; -import com.google.api.client.http.HttpBackOffUnsuccessfulResponseHandler; import com.google.api.client.http.HttpRequestInitializer; import com.google.api.client.json.jackson2.JacksonFactory; import com.google.api.services.sheets.v4.Sheets; @@ -33,20 +32,26 @@ import org.intellij.lang.annotations.Language; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ResourceLock; import java.io.FileInputStream; import java.io.IOException; import java.util.concurrent.TimeUnit; import static com.google.api.client.googleapis.javanet.GoogleNetHttpTransport.newTrustedTransport; -import static io.trino.plugin.google.sheets.SheetsClient.BACKOFF; +import static io.trino.plugin.google.sheets.SheetsClient.newBackOff; +import static io.trino.plugin.google.sheets.SheetsClient.newUnsuccessfulResponseHandler; import static io.trino.plugin.google.sheets.TestSheetsPlugin.DATA_SHEET_ID; import static io.trino.plugin.google.sheets.TestSheetsPlugin.getTestCredentialsPath; import static io.trino.testing.assertions.Assert.assertEventually; import static java.lang.Math.toIntExact; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD; @Disabled // TODO https://github.com/trinodb/trino/issues/29407 Fix 'Invalid JWT Signature' failure +@Execution(SAME_THREAD) +@ResourceLock("google-sheets") public class TestGoogleSheets extends AbstractTestQueryFramework { @@ -335,8 +340,8 @@ private static HttpRequestInitializer setTimeout(HttpRequestInitializer requestI requestInitializer.initialize(httpRequest); httpRequest.setConnectTimeout(toIntExact(TimeUnit.MINUTES.toMillis(1))); httpRequest.setReadTimeout(toIntExact(TimeUnit.MINUTES.toMillis(1))); - httpRequest.setUnsuccessfulResponseHandler(new HttpBackOffUnsuccessfulResponseHandler(BACKOFF)); - httpRequest.setIOExceptionHandler(new HttpBackOffIOExceptionHandler(BACKOFF)); + httpRequest.setUnsuccessfulResponseHandler(newUnsuccessfulResponseHandler()); + httpRequest.setIOExceptionHandler(new HttpBackOffIOExceptionHandler(newBackOff())); }; } } diff --git a/plugin/trino-google-sheets/src/test/java/io/trino/plugin/google/sheets/TestGoogleSheetsWithoutMetadataSheetId.java b/plugin/trino-google-sheets/src/test/java/io/trino/plugin/google/sheets/TestGoogleSheetsWithoutMetadataSheetId.java index fa1e332d662a..1259c9e73fa2 100644 --- a/plugin/trino-google-sheets/src/test/java/io/trino/plugin/google/sheets/TestGoogleSheetsWithoutMetadataSheetId.java +++ b/plugin/trino-google-sheets/src/test/java/io/trino/plugin/google/sheets/TestGoogleSheetsWithoutMetadataSheetId.java @@ -17,9 +17,14 @@ import io.trino.testing.QueryRunner; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ResourceLock; import static io.trino.plugin.google.sheets.TestSheetsPlugin.DATA_SHEET_ID; +import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD; +@Execution(SAME_THREAD) +@ResourceLock("google-sheets") public class TestGoogleSheetsWithoutMetadataSheetId extends AbstractTestQueryFramework { diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/containers/Hive3MinioDataLake.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/containers/Hive3MinioDataLake.java index 840e18d55bf9..ce1ea43150c5 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/containers/Hive3MinioDataLake.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/containers/Hive3MinioDataLake.java @@ -51,6 +51,7 @@ public void start() { super.start(); hiveHadoop.start(); + hiveHadoop.waitForHiveServer(); state = STARTED; } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/containers/HiveHadoop.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/containers/HiveHadoop.java index 4b691e6a637d..09e4c3711116 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/containers/HiveHadoop.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/containers/HiveHadoop.java @@ -17,6 +17,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.net.HostAndPort; import io.airlift.log.Logger; +import io.airlift.units.Duration; import io.trino.testing.TestingProperties; import io.trino.testing.containers.BaseTestContainer; import io.trino.testing.containers.PrintingLogConsumer; @@ -27,6 +28,10 @@ import java.util.Optional; import java.util.Set; +import static io.trino.testing.assertions.Assert.assertEventually; +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; + public class HiveHadoop extends BaseTestContainer { @@ -86,6 +91,11 @@ public String runOnHive(String query) return executeInContainerFailOnError("beeline", "-u", "jdbc:hive2://localhost:10000/default", "-n", "hive", "-e", query); } + public void waitForHiveServer() + { + assertEventually(new Duration(2, MINUTES), new Duration(1, SECONDS), () -> runOnHive("SELECT 1")); + } + public String runOnMetastore(String query) { return executeInContainerFailOnError("mysql", "-D", "metastore", "-uroot", "-proot", "--batch", "--column-names=false", "-e", query).replaceAll("\n$", ""); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java index 8088e982b8a4..9e8f28c6fa5f 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java @@ -144,11 +144,6 @@ public void testDeleteRowsConcurrently() ExecutorService executor = newFixedThreadPool(threads); List rows = ImmutableList.of("(1, 0, 0, 0)", "(0, 1, 0, 0)", "(0, 0, 1, 0)", "(0, 0, 0, 1)"); - String[] expectedErrors = { - "Failed to commit the transaction during write:", - "Failed to replace table due to concurrent updates:", - "Failed to commit during write:", - }; try (TestTable table = newTrinoTable( "test_concurrent_delete", "(col0 INTEGER, col1 INTEGER, col2 INTEGER, col3 INTEGER)")) { @@ -164,7 +159,7 @@ public void testDeleteRowsConcurrently() return true; } catch (Exception e) { - assertThat(e.getMessage()).containsAnyOf(expectedErrors); + verifyConcurrentDeleteFailurePermissible(e); return false; } })) @@ -186,6 +181,14 @@ public void testDeleteRowsConcurrently() } } + protected void verifyConcurrentDeleteFailurePermissible(Exception e) + { + assertThat(e.getMessage()).containsAnyOf( + "Failed to commit the transaction during write:", + "Failed to replace table due to concurrent updates:", + "Failed to commit during write:"); + } + @Test public void testCreateOrReplaceTable() { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestIcebergNessieCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestIcebergNessieCatalogConnectorSmokeTest.java index 28cb18139e4b..e58b2c12f95d 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestIcebergNessieCatalogConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestIcebergNessieCatalogConnectorSmokeTest.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.iceberg.catalog.nessie; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.trino.filesystem.FileIterator; @@ -30,6 +31,7 @@ import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.nessie.NessieCatalog; import org.junit.jupiter.api.AfterAll; @@ -42,6 +44,7 @@ import java.nio.file.Path; import java.util.Optional; +import static com.google.common.base.Strings.nullToEmpty; import static com.google.common.io.MoreFiles.deleteRecursively; import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; import static io.trino.plugin.iceberg.IcebergTestUtils.FILE_IO_FACTORY; @@ -124,6 +127,23 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) }; } + @Override + protected void verifyConcurrentDeleteFailurePermissible(Exception e) + { + if (!nullToEmpty(e.getMessage()).contains("Failed to commit during write:")) { + super.verifyConcurrentDeleteFailurePermissible(e); + return; + } + + assertThat(e) + .hasMessageContaining("Failed to commit during write:") + .hasMessageContaining("Cannot commit: ref hash is out of date"); + assertThat(Throwables.getCausalChain(e)) + .anySatisfy(throwable -> assertThat(throwable) + .isInstanceOf(CommitFailedException.class) + .hasMessageContaining("Cannot commit: ref hash is out of date")); + } + @Test @Override public void testView() diff --git a/plugin/trino-redshift/src/test/java/io/trino/plugin/redshift/TestingRedshiftServer.java b/plugin/trino-redshift/src/test/java/io/trino/plugin/redshift/TestingRedshiftServer.java index 56bdae94d481..f49457b74a92 100644 --- a/plugin/trino-redshift/src/test/java/io/trino/plugin/redshift/TestingRedshiftServer.java +++ b/plugin/trino-redshift/src/test/java/io/trino/plugin/redshift/TestingRedshiftServer.java @@ -19,9 +19,11 @@ import org.jdbi.v3.core.HandleConsumer; import org.jdbi.v3.core.Jdbi; +import java.net.ConnectException; import java.net.SocketTimeoutException; import java.time.Duration; +import static com.google.common.base.Strings.nullToEmpty; import static com.google.common.base.Throwables.getCausalChain; import static io.trino.testing.TestingProperties.requiredNonEmptySystemProperty; @@ -67,12 +69,17 @@ public static T executeWithRedshift(HandleCallback e instanceof SocketTimeoutException)); + if (exception == null) { + return false; + } + + String message = nullToEmpty(exception.getMessage()); + return message.matches(".* concurrent transaction.*") + || message.matches(".*deadlock detected.*") + || message.matches(".*could not open relation with OID.*") + || message.matches(".*The connection attempt failed.*") + || message.matches(".*Connection to .* refused.*") + || getCausalChain(exception).stream() + .anyMatch(e -> e instanceof ConnectException || e instanceof SocketTimeoutException); } } diff --git a/plugin/trino-sqlserver/src/test/java/io/trino/plugin/sqlserver/TestingSqlServer.java b/plugin/trino-sqlserver/src/test/java/io/trino/plugin/sqlserver/TestingSqlServer.java index 737bae7d6e30..0a50f9050f18 100644 --- a/plugin/trino-sqlserver/src/test/java/io/trino/plugin/sqlserver/TestingSqlServer.java +++ b/plugin/trino-sqlserver/src/test/java/io/trino/plugin/sqlserver/TestingSqlServer.java @@ -33,6 +33,7 @@ import java.util.UUID; import java.util.function.BiConsumer; +import static com.google.common.base.Strings.nullToEmpty; import static com.google.common.base.Throwables.getCausalChain; import static io.airlift.testing.Closeables.closeAllSuppress; import static io.trino.testing.containers.TestContainers.startOrReuse; @@ -55,7 +56,7 @@ public final class TestingSqlServer .withBackoff(1, 5, ChronoUnit.SECONDS) .withMaxRetries(5) .handleIf(throwable -> getCausalChain(throwable).stream() - .anyMatch(SQLException.class::isInstance) || throwable.getMessage().contains("Container exited with code")) + .anyMatch(TestingSqlServer::isRetryableContainerStartupFailure)) .onRetry(event -> log.warn( "Query failed on attempt %s, will retry. Exception: %s", event.getAttemptCount(), @@ -159,6 +160,16 @@ public String getUsername() } } + private static boolean isRetryableContainerStartupFailure(Throwable throwable) + { + if (throwable instanceof SQLException) { + return true; + } + + String message = nullToEmpty(throwable.getMessage()); + return message.contains("Container exited with code") || message.contains("Can't get Docker image"); + } + private static void setUpDatabase(SqlExecutor executor, String databaseName, BiConsumer databaseSetUp) { executor.execute("CREATE DATABASE " + databaseName); diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodeGcs.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodeGcs.java index a3aa9f5ae124..ccf13c35a6c9 100644 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodeGcs.java +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodeGcs.java @@ -88,7 +88,7 @@ public void extendEnvironment(Environment.Builder builder) String gcpCredentials = new String(gcpCredentialsBytes, UTF_8); File gcpCredentialsFile; try { - gcpCredentialsFile = Files.createTempFile("gcp-credentials", ".xml", PosixFilePermissions.asFileAttribute(fromString("rw-r--r--"))).toFile(); + gcpCredentialsFile = Files.createTempFile("gcp-credentials", ".json", PosixFilePermissions.asFileAttribute(fromString("rw-r--r--"))).toFile(); gcpCredentialsFile.deleteOnExit(); Files.write(gcpCredentialsFile.toPath(), gcpCredentialsBytes); } @@ -96,7 +96,7 @@ public void extendEnvironment(Environment.Builder builder) throw new UncheckedIOException(e); } - String containerGcpCredentialsFile = CONTAINER_TRINO_ETC + "gcp-credentials.json"; + String containerGcpCredentialsFile = CONTAINER_TRINO_ETC + "/gcp-credentials.json"; builder.configureContainer(HADOOP, container -> { container.withCopyFileToContainer( forHostPath(getCoreSiteOverrideXml(containerGcpCredentialsFile)), diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/jdk/TarDownloadingJdkProvider.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/jdk/TarDownloadingJdkProvider.java index e2b3d4bdbebe..dc63b4c2af08 100644 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/jdk/TarDownloadingJdkProvider.java +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/jdk/TarDownloadingJdkProvider.java @@ -16,10 +16,13 @@ import com.github.dockerjava.api.model.AccessMode; import com.github.dockerjava.api.model.Bind; import com.github.dockerjava.api.model.Volume; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; import io.airlift.log.Logger; import io.trino.testing.containers.TestContainers.DockerArchitecture; import io.trino.testing.containers.TestContainers.DockerArchitectureInfo; import io.trino.tests.product.launcher.env.DockerContainer; +import io.trino.tests.product.launcher.util.UriDownloader; import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; @@ -40,10 +43,10 @@ import static com.google.common.base.Verify.verify; import static io.trino.testing.containers.TestContainers.getDockerArchitectureInfo; import static io.trino.tests.product.launcher.util.DirectoryUtils.getOnlyDescendant; -import static io.trino.tests.product.launcher.util.UriDownloader.download; import static java.nio.file.Files.exists; import static java.nio.file.Files.isDirectory; import static java.nio.file.Files.newInputStream; +import static java.time.temporal.ChronoUnit.SECONDS; import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNullElse; @@ -96,7 +99,11 @@ public DockerContainer applyTo(DockerContainer container) } else if (!exists(extractPath)) { // Distribution not extracted and not downloaded yet log.info("Downloading %s from %s to %s", fullName, downloadUri, targetDownloadPath); - download(downloadUri, targetDownloadPath, new EveryNthPercentProgress(progress -> log.info("Downloading %s %d%%...", fullName, progress), 5)); + downloadWithRetries( + fullName, + downloadUri, + targetDownloadPath, + new EveryNthPercentProgress(progress -> log.info("Downloading %s %d%%...", fullName, progress), 5)); log.info("Downloaded %s to %s", fullName, targetDownloadPath); } @@ -177,6 +184,51 @@ private void ensureDownloadPathExists() verify(isDirectory(downloadPath), "--jdk-tmp-download-path '%s' is not a directory", downloadPath); } + private RetryPolicy downloadRetryPolicy(String fullName, String downloadUri) + { + return RetryPolicy.builder() + .handle(UncheckedIOException.class) + .withMaxAttempts(5) + .withBackoff(1, 10, SECONDS) + .onFailedAttempt(event -> log.warn(event.getLastException(), "Could not download %s from %s", fullName, downloadUri)) + .onRetry(event -> log.info("Retrying download of %s from %s, %d failed attempt(s)", fullName, downloadUri, event.getAttemptCount())) + .build(); + } + + private void downloadWithRetries( + String fullName, + String downloadUri, + Path targetDownloadPath, + Consumer progressListener) + { + try { + Failsafe.with(downloadRetryPolicy(fullName, downloadUri)) + .run(() -> { + deleteDownload(targetDownloadPath); + UriDownloader.download(downloadUri, targetDownloadPath, progressListener); + }); + } + catch (RuntimeException e) { + try { + deleteDownload(targetDownloadPath); + } + catch (RuntimeException deleteException) { + e.addSuppressed(deleteException); + } + throw e; + } + } + + private static void deleteDownload(Path targetDownloadPath) + { + try { + Files.deleteIfExists(targetDownloadPath); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + private static class EveryNthPercentProgress implements Consumer { diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/DeltaLakeTestUtils.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/DeltaLakeTestUtils.java index 064ed4e7caef..5cf49dd61735 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/DeltaLakeTestUtils.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/DeltaLakeTestUtils.java @@ -53,10 +53,12 @@ public final class DeltaLakeTestUtils "|SocketTimeoutException: Read timed out" + "|Error while closing operation" + "|HTTP request failed by code: 50[02]" + - "|HTTP response code: 503"; + "|HTTP response code: 503" + + "|The cluster is temporarily unavailable" + + "|The current cluster state is Terminated"; + private static final Pattern DATABRICKS_COMMUNICATION_FAILURE_PATTERN = Pattern.compile(DATABRICKS_COMMUNICATION_FAILURE_MATCH); private static final RetryPolicy DATABRICKS_COMMUNICATION_FAILURE_RETRY_POLICY = RetryPolicy.builder() - .handleIf(throwable -> Throwables.getRootCause(throwable) instanceof SQLException) - .handleIf(throwable -> Pattern.compile(DATABRICKS_COMMUNICATION_FAILURE_MATCH).matcher(Throwables.getRootCause(throwable).getMessage()).find()) + .handleIf(DeltaLakeTestUtils::isDatabricksCommunicationFailure) .withBackoff(1, 10, ChronoUnit.SECONDS) .withMaxRetries(3) .onRetry(event -> log.warn(event.getLastException(), "Query failed on attempt %d, will retry.", event.getAttemptCount())) @@ -71,6 +73,12 @@ public final class DeltaLakeTestUtils private DeltaLakeTestUtils() {} + static boolean isDatabricksCommunicationFailure(Throwable throwable) + { + return Throwables.getRootCause(throwable) instanceof SQLException && + DATABRICKS_COMMUNICATION_FAILURE_PATTERN.matcher(Throwables.getStackTraceAsString(throwable)).find(); + } + public static Optional getDatabricksRuntimeVersion() { String version = (String) Failsafe.with(DATABRICKS_COMMUNICATION_FAILURE_RETRY_POLICY) diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveConnectorKerberosSmokeTest.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveConnectorKerberosSmokeTest.java index 04af8d4ebeb1..df0b49338861 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveConnectorKerberosSmokeTest.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveConnectorKerberosSmokeTest.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeoutException; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Strings.nullToEmpty; import static io.trino.tests.product.TestGroups.HIVE_KERBEROS; import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; import static io.trino.tests.product.utils.QueryExecutors.onTrino; @@ -36,6 +37,9 @@ public class TestHiveConnectorKerberosSmokeTest { + private static final String CANCELLATION_MESSAGE = "explicitly cancelled for test without failure"; + private static final String CANCELLATION_FAILURE_SUFFIX = "Query killed. Message: " + CANCELLATION_MESSAGE; + private Closer closer; private ExecutorService executor; @@ -69,7 +73,8 @@ public void kerberosTicketExpiryTest() // 2x of ticket_lifetime as configured in hadoop-kerberos krb5.conf, sufficient to cause at-least 1 ticket expiry SECONDS.sleep(60L); - cancelQueryIfRunning(sql); + failIfQueryFinishedBeforeCancellation(queryExecution, null); + cancelQueryIfRunning(sql, queryExecution); try { queryExecution.get(30, SECONDS); @@ -81,17 +86,55 @@ public void kerberosTicketExpiryTest() } catch (ExecutionException expected) { assertThat(expected.getCause()) - .hasMessageEndingWith("Message: explicitly cancelled for test without failure"); + .hasMessageEndingWith(CANCELLATION_FAILURE_SUFFIX); } } - private void cancelQueryIfRunning(String sql) + private void cancelQueryIfRunning(String sql, Future queryExecution) + throws Exception { QueryResult queryResult = onTrino().executeQuery("SELECT query_id FROM system.runtime.queries WHERE query = '%s' AND state = 'RUNNING' LIMIT 2".formatted(sql)); checkState(queryResult.getRowsCount() < 2, "Found multiple queries"); - if (queryResult.getRowsCount() == 1) { - String queryId = (String) queryResult.getOnlyValue(); - onTrino().executeQuery("CALL system.runtime.kill_query(query_id => '%s', message => 'explicitly cancelled for test without failure')".formatted(queryId)); + if (queryResult.getRowsCount() == 0) { + failIfQueryFinishedBeforeCancellation(queryExecution, null); + return; + } + + String queryId = (String) queryResult.getOnlyValue(); + try { + onTrino().executeQuery("CALL system.runtime.kill_query(query_id => '%s', message => '%s')".formatted(queryId, CANCELLATION_MESSAGE)); + } + catch (RuntimeException e) { + if (!nullToEmpty(e.getMessage()).contains("Target query is not running")) { + throw e; + } + failIfQueryFinishedBeforeCancellation(queryExecution, e); + throw e; + } + } + + private static void failIfQueryFinishedBeforeCancellation(Future queryExecution, RuntimeException cancellationFailure) + throws Exception + { + if (!queryExecution.isDone()) { + return; + } + + try { + queryExecution.get(); + } + catch (ExecutionException e) { + AssertionError failure = new AssertionError("Query failed before it could be cancelled; Kerberos ticket refresh was not verified", e.getCause()); + if (cancellationFailure != null) { + failure.addSuppressed(cancellationFailure); + } + throw failure; + } + + AssertionError failure = new AssertionError("Query completed before it could be cancelled; increase the input size to exercise Kerberos ticket refresh"); + if (cancellationFailure != null) { + failure.addSuppressed(cancellationFailure); } + throw failure; } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/utils/QueryExecutors.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/utils/QueryExecutors.java index 455914138f8a..d3edb9b70ba0 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/utils/QueryExecutors.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/utils/QueryExecutors.java @@ -127,14 +127,7 @@ public static QueryExecutor onDelta() RetryPolicy databricksRetryPolicy = RetryPolicy.builder() // Retry on 503 may lead to unexpected test results: https://github.com/trinodb/trino/pull/14392#issuecomment-1264041917 - .handleIf(throwable -> { - String stackTrace = Throwables.getStackTraceAsString(throwable); - // Don't retry if the error occurred during statement close — the statement already executed successfully - if (stackTrace.contains("closeStatement") || stackTrace.contains("CloseOperation")) { - return false; - } - return stackTrace.contains("HTTP Response code: 502") || stackTrace.contains("The current cluster state is Pending") || stackTrace.contains("The current cluster state is Terminated"); - }) + .handleIf(QueryExecutors::isDatabricksTransientFailure) .withDelay(Duration.of(30, ChronoUnit.SECONDS)) .withMaxRetries(60) .onRetry(event -> log.warn(event.getLastException(), "Query failed on attempt %d, will retry.", event.getAttemptCount())) @@ -166,6 +159,20 @@ public void close() }; } + static boolean isDatabricksTransientFailure(Throwable throwable) + { + String stackTrace = Throwables.getStackTraceAsString(throwable); + // Don't retry if the error occurred during statement close — the statement already executed successfully + if (stackTrace.contains("closeStatement") || stackTrace.contains("CloseOperation")) { + return false; + } + return stackTrace.contains("HTTP Response code: 502") || + stackTrace.contains("The current cluster state is Pending") || + stackTrace.contains("The current cluster state is Terminated") || + stackTrace.contains("The cluster is temporarily unavailable") || + stackTrace.contains("AWSCatalogMetastoreClient.isCompatibleWith"); + } + public static QueryExecutor onHudi() { return testContext().getDependency(QueryExecutor.class, "hudi"); diff --git a/testing/trino-product-tests/src/test/java/io/trino/tests/product/deltalake/util/TestDeltaLakeTestUtils.java b/testing/trino-product-tests/src/test/java/io/trino/tests/product/deltalake/util/TestDeltaLakeTestUtils.java new file mode 100644 index 000000000000..2f8284b43e52 --- /dev/null +++ b/testing/trino-product-tests/src/test/java/io/trino/tests/product/deltalake/util/TestDeltaLakeTestUtils.java @@ -0,0 +1,40 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.tests.product.deltalake.util; + +import org.junit.jupiter.api.Test; + +import java.sql.SQLException; + +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.isDatabricksCommunicationFailure; +import static org.assertj.core.api.Assertions.assertThat; + +class TestDeltaLakeTestUtils +{ + @Test + void testDatabricksCommunicationFailureClassification() + { + SQLException httpFailure = new SQLException("[Databricks][JDBCDriver](500593) Communication link failure. Failed to connect to server. Reason: HTTP request failed by code: 502"); + SQLException clusterUnavailable = new SQLException("[Databricks][JDBCDriver](500593) Communication link failure. Failed to connect to server. Reason: The cluster is temporarily unavailable"); + SQLException terminatedCluster = new SQLException("[Databricks][JDBCDriver](500593) Communication link failure. Failed to connect to server. Reason: The current cluster state is Terminated"); + + assertThat(isDatabricksCommunicationFailure(httpFailure)).isTrue(); + assertThat(isDatabricksCommunicationFailure(clusterUnavailable)).isTrue(); + assertThat(isDatabricksCommunicationFailure(terminatedCluster)).isTrue(); + + assertThat(isDatabricksCommunicationFailure(new SQLException("The current cluster state is Running"))).isFalse(); + assertThat(isDatabricksCommunicationFailure(new SQLException("syntax error"))).isFalse(); + assertThat(isDatabricksCommunicationFailure(new RuntimeException("HTTP request failed by code: 502"))).isFalse(); + } +} diff --git a/testing/trino-product-tests/src/test/java/io/trino/tests/product/utils/TestQueryExecutors.java b/testing/trino-product-tests/src/test/java/io/trino/tests/product/utils/TestQueryExecutors.java new file mode 100644 index 000000000000..7c82595a6aa0 --- /dev/null +++ b/testing/trino-product-tests/src/test/java/io/trino/tests/product/utils/TestQueryExecutors.java @@ -0,0 +1,36 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.tests.product.utils; + +import org.junit.jupiter.api.Test; + +import static io.trino.tests.product.utils.QueryExecutors.isDatabricksTransientFailure; +import static org.assertj.core.api.Assertions.assertThat; + +class TestQueryExecutors +{ + @Test + void testDatabricksTransientFailureClassification() + { + assertThat(isDatabricksTransientFailure(new RuntimeException("HTTP Response code: 502"))).isTrue(); + assertThat(isDatabricksTransientFailure(new RuntimeException("The current cluster state is Pending"))).isTrue(); + assertThat(isDatabricksTransientFailure(new RuntimeException("The current cluster state is Terminated"))).isTrue(); + assertThat(isDatabricksTransientFailure(new RuntimeException("The cluster is temporarily unavailable"))).isTrue(); + assertThat(isDatabricksTransientFailure(new RuntimeException("AWSCatalogMetastoreClient.isCompatibleWith"))).isTrue(); + + assertThat(isDatabricksTransientFailure(new RuntimeException("HTTP Response code: 503"))).isFalse(); + assertThat(isDatabricksTransientFailure(new RuntimeException("closeStatement failed after HTTP Response code: 502"))).isFalse(); + assertThat(isDatabricksTransientFailure(new RuntimeException("CloseOperation failed while The current cluster state is Terminated"))).isFalse(); + } +} diff --git a/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestEngineOnlyQueries.java b/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestEngineOnlyQueries.java index fe18dcb1dd52..7428a3bcaaa2 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestEngineOnlyQueries.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestEngineOnlyQueries.java @@ -6281,6 +6281,21 @@ public void testShowTablesLikeWithEscape() assertThat(result).contains("table_privileges").allMatch(schemaName -> ((String) schemaName).contains("_")); } + @Test + public void testShowSchemasLikeWithEscape() + { + assertQueryFails("SHOW SCHEMAS LIKE '%$_%' ESCAPE", "line 1:32: mismatched input ''. Expecting: "); + assertQueryFails("SHOW SCHEMAS LIKE 't$_%' ESCAPE ''", "Escape string must be a single character"); + assertQueryFails("SHOW SCHEMAS LIKE 't$_%' ESCAPE '$$'", "Escape string must be a single character"); + + Set allSchemas = computeActual("SHOW SCHEMAS").getOnlyColumnAsSet(); + assertThat(allSchemas).isEqualTo(computeActual("SHOW SCHEMAS LIKE '%_%'").getOnlyColumnAsSet()); + Set result = computeActual("SHOW SCHEMAS LIKE '%$_%' ESCAPE '$'").getOnlyColumnAsSet(); + assertThat(allSchemas) + .isNotEqualTo(result); + assertThat(result).contains("information_schema").allMatch(schemaName -> ((String) schemaName).contains("_")); + } + @Test public void testShowCatalogs() { diff --git a/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestQueries.java b/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestQueries.java index 82159508dc18..8cd26695fa8f 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestQueries.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestQueries.java @@ -22,14 +22,12 @@ import java.util.List; import java.util.Set; -import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.connector.informationschema.InformationSchemaTable.INFORMATION_SCHEMA; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.VarcharType.VARCHAR; import static io.trino.testing.MaterializedResult.resultBuilder; import static io.trino.testing.QueryAssertions.assertContains; -import static io.trino.testing.assertions.Assert.assertEventually; import static io.trino.tpch.TpchTable.NATION; import static io.trino.tpch.TpchTable.ORDERS; import static io.trino.tpch.TpchTable.REGION; @@ -269,26 +267,6 @@ public void testShowSchemasLike() assertThat(result.getOnlyColumnAsSet()).isEqualTo(ImmutableSet.of(getSession().getSchema().get())); } - @Test - public void testShowSchemasLikeWithEscape() - { - assertQueryFails("SHOW SCHEMAS LIKE 't$_%' ESCAPE ''", "Escape string must be a single character"); - assertQueryFails("SHOW SCHEMAS LIKE 't$_%' ESCAPE '$$'", "Escape string must be a single character"); - - // Using eventual assertion because set of schemas may change concurrently. - assertEventually(() -> { - Set allSchemas = computeActual("SHOW SCHEMAS").getOnlyColumnAsSet(); - assertThat(allSchemas).isEqualTo(computeActual("SHOW SCHEMAS LIKE '%_%'").getOnlyColumnAsSet()); - Set result = computeActual("SHOW SCHEMAS LIKE '%$_%' ESCAPE '$'").getOnlyColumnAsSet(); - verify(allSchemas.stream().anyMatch(schema -> !((String) schema).contains("_")), - "This test expects at least one schema without underscore in it's name. Satisfy this assumption or override the test."); - assertThat(result) - .isSubsetOf(allSchemas) - .isNotEqualTo(allSchemas); - assertThat(result).contains("information_schema").allMatch(schemaName -> ((String) schemaName).contains("_")); - }); - } - @Test public void testShowTables() { diff --git a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java index c4b35a8bff3b..0c0cd4f231f0 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java @@ -4186,28 +4186,32 @@ public void testRenameSchemaToLongName() skipTestUnless(hasBehavior(SUPPORTS_RENAME_SCHEMA)); String sourceSchemaName = "test_rename_source_" + randomNameSuffix(); - assertUpdate(createSchemaSql(sourceSchemaName)); - String baseSchemaName = "test_rename_target_" + randomNameSuffix(); - int maxLength = maxSchemaNameLength() // Assume 2^16 is enough for most use cases. Add a bit more to ensure 2^16 isn't actual limit. .orElse(65536 + 5); String validTargetSchemaName = baseSchemaName + "z".repeat(maxLength - baseSchemaName.length()); - assertUpdate("ALTER SCHEMA " + sourceSchemaName + " RENAME TO " + validTargetSchemaName); - assertThat(computeActual("SHOW SCHEMAS").getOnlyColumnAsSet()).contains(validTargetSchemaName); - assertUpdate("DROP SCHEMA " + validTargetSchemaName); + try { + assertUpdate(createSchemaSql(sourceSchemaName)); + assertUpdate("ALTER SCHEMA " + sourceSchemaName + " RENAME TO " + validTargetSchemaName); + assertThat(computeActual("SHOW SCHEMAS").getOnlyColumnAsSet()).contains(validTargetSchemaName); + assertUpdate("DROP SCHEMA " + validTargetSchemaName); - if (maxSchemaNameLength().isEmpty()) { - return; - } + if (maxSchemaNameLength().isEmpty()) { + return; + } - assertUpdate(createSchemaSql(sourceSchemaName)); - String invalidTargetSchemaName = validTargetSchemaName + "z"; - assertThatThrownBy(() -> assertUpdate("ALTER SCHEMA " + sourceSchemaName + " RENAME TO " + invalidTargetSchemaName)) - .satisfies(this::verifySchemaNameLengthFailurePermissible); - assertThat(computeActual("SHOW SCHEMAS").getOnlyColumnAsSet()).doesNotContain(invalidTargetSchemaName); + assertUpdate(createSchemaSql(sourceSchemaName)); + String invalidTargetSchemaName = validTargetSchemaName + "z"; + assertThatThrownBy(() -> assertUpdate("ALTER SCHEMA " + sourceSchemaName + " RENAME TO " + invalidTargetSchemaName)) + .satisfies(this::verifySchemaNameLengthFailurePermissible); + assertThat(computeActual("SHOW SCHEMAS").getOnlyColumnAsSet()).doesNotContain(invalidTargetSchemaName); + } + finally { + assertUpdate("DROP SCHEMA IF EXISTS " + sourceSchemaName); + assertUpdate("DROP SCHEMA IF EXISTS " + validTargetSchemaName); + } } protected OptionalInt maxSchemaNameLength()