Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions .github/actions/setup/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,30 @@ runs:
key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
restore-keys: |
${{ runner.os }}-maven-
- name: Cache and Restore Maven wrapper
id: cache-maven-wrapper
if: ${{ format('{0}', inputs.cache) == 'true' }}
uses: actions/cache@v5
with:
# Note: must be same set of paths as for cache:restore mode
path: ~/.m2/wrapper/dists
key: ${{ runner.os }}-maven-wrapper-${{ hashFiles('.mvn/wrapper/maven-wrapper.properties') }}
restore-keys: |
${{ runner.os }}-maven-wrapper-
- name: Restore Maven wrapper
id: cache_restore-maven-wrapper
if: ${{ format('{0}', inputs.cache) == 'restore' }}
uses: actions/cache/restore@v5
with:
# Note: must be same set of paths as for cache:true mode
path: ~/.m2/wrapper/dists
key: ${{ runner.os }}-maven-wrapper-${{ hashFiles('.mvn/wrapper/maven-wrapper.properties') }}
restore-keys: |
${{ runner.os }}-maven-wrapper-
- name: Verify Maven wrapper
if: ${{ inputs.java-version != '' }}
shell: bash
run: .github/bin/retry ./mvnw -v
- name: Configure Problem Matchers
if: ${{ inputs.java-version != '' }}
shell: bash
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.apache.iceberg.variants.VariantMetadata;
import org.apache.iceberg.variants.Variants;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Isolated;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
Expand All @@ -35,6 +36,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

@Isolated // this test uses lots of memory
class TestMetadata
{
// from the Iceberg implementation, an empty metadata has 3 bytes: header + offsetCount + 0th offset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1056,25 +1056,28 @@ public void testPreSignedUris()
abort("Generating pre-signed URI is not supported");
}

Optional<UriLocation> directLocation = getFileSystem()
.preSignedUri(location, new Duration(30, SECONDS));
UriLocation directLocation = assertThat(getFileSystem()
.preSignedUri(location, new Duration(30, SECONDS)))
.get().actual();
Comment thread
electrum marked this conversation as resolved.

Optional<UriLocation> expiredDirectLocation = getFileSystem()
.preSignedUri(location, new Duration(1, SECONDS));
UriLocation expiredDirectLocation = assertThat(getFileSystem()
.preSignedUri(location, new Duration(3, SECONDS)))
.get().actual();
Comment thread
electrum marked this conversation as resolved.

assertThat(directLocation).isPresent();

assertEventually(new Duration(5, SECONDS), () -> assertThat(retrieveUri(directLocation.get()))
assertEventually(new Duration(5, SECONDS), () -> assertThat(retrieveUri(directLocation))
.isEqualTo(TEST_BLOB_CONTENT_PREFIX + location));

// Check if it can be retrieved more than once
assertEventually(new Duration(5, SECONDS), () -> assertThat(retrieveUri(directLocation.get()))
assertEventually(new Duration(5, SECONDS), () -> assertThat(retrieveUri(directLocation))
.isEqualTo(TEST_BLOB_CONTENT_PREFIX + location));

// Check if after a timeout the pre-signed URI is no longer valid
assertEventually(new Duration(5, SECONDS), new Duration(1, SECONDS), () -> assertThatThrownBy(() -> retrieveUri(expiredDirectLocation.get()))
.isInstanceOf(IOException.class)
.hasMessageContaining("Failed to retrieve"));
assertEventually(
new Duration(10, SECONDS),
new Duration(1, SECONDS),
() -> assertThatThrownBy(() -> retrieveUri(expiredDirectLocation))
.isInstanceOf(IOException.class)
.hasMessageContaining("Failed to retrieve"));
}
}

Expand Down
57 changes: 57 additions & 0 deletions lib/trino-hdfs/src/test/java/io/trino/filesystem/hdfs/Hadoop.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
import io.airlift.log.Logger;
import io.trino.testing.containers.BaseTestContainer;
import io.trino.testing.containers.PrintingLogConsumer;
import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
import org.testcontainers.containers.Container;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;

import static com.google.common.base.StandardSystemProperty.USER_NAME;
import static io.trino.testing.TestingProperties.getDockerImagesVersion;
Expand Down Expand Up @@ -64,11 +67,65 @@ public void start()
{
super.start();
executeInContainerFailOnError("hadoop", "fs", "-rm", "-r", "/*");
executeInContainerFailOnError("bash", "-e", "-c",
"""
printf 'ready' | hadoop fs -put - /_trino_hdfs_ready
hadoop fs -cat /_trino_hdfs_ready
hadoop fs -rm -f /_trino_hdfs_ready
""");
log.info("Hadoop container started with HDFS endpoint: %s", getHdfsUri());
}

public String getHdfsUri()
{
return "hdfs://%s/".formatted(getMappedHostAndPortForExposedPort(HDFS_PORT));
}

public void printDiagnostics()
{
log.warn("Printing Hadoop container diagnostics");
printDiagnostic("supervisor status", "supervisorctl", "status");
printDiagnostic("container disk usage", "df", "-h");
printDiagnostic("container inode usage", "df", "-i");
printDiagnostic("HDFS report", "hdfs", "dfsadmin", "-report");
printDiagnostic("HDFS disk usage", "hadoop", "fs", "-df", "-h", "/");
printDiagnostic("HDFS directory usage", "hadoop", "fs", "-du", "-h", "/");
printDiagnostic("HDFS fsck", "hdfs", "fsck", "/", "-files", "-blocks", "-locations");
printDiagnostic("NameNode log", "bash", "-c", "tail -n 500 /var/log/hadoop-hdfs/*namenode*.log 2>&1 || true");
printDiagnostic("DataNode log", "bash", "-c", "tail -n 500 /var/log/hadoop-hdfs/*datanode*.log 2>&1 || true");
}

private void printDiagnostic(String name, String... command)
{
try {
Container.ExecResult result = executeInContainer(command);
log.warn(
"Hadoop diagnostic: %s%ncommand: %s%nexit code: %s%nstdout:%n%s%nstderr:%n%s",
name,
String.join(" ", command),
result.getExitCode(),
result.getStdout(),
result.getStderr());
}
catch (RuntimeException e) {
log.warn(e, "Failed to print Hadoop diagnostic: %s", name);
}
}

private static boolean isIncompleteExecution(Throwable failure)
{
for (Class<?> failureClass = failure.getClass(); failureClass != null; failureClass = failureClass.getSuperclass()) {
if (failureClass.getName().equals("org.opentest4j.IncompleteExecutionException")) {
return true;
}
}
return false;
}

public static AfterTestExecutionCallback printDiagnosticsOnFailure(Supplier<Hadoop> hadoop)
{
return context -> context.getExecutionException()
.filter(failure -> !isIncompleteExecution(failure))
.ifPresent(_ -> hadoop.get().printDiagnostics());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
import org.junit.jupiter.api.extension.RegisterExtension;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand All @@ -48,6 +50,9 @@ public class TestHdfsFileSystemHdfs
private HdfsContext hdfsContext;
private TrinoFileSystem fileSystem;

@RegisterExtension
final AfterTestExecutionCallback hadoopDiagnostics = Hadoop.printDiagnosticsOnFailure(() -> hadoop);

@BeforeAll
void beforeAll()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.function.Function;

import static com.google.common.base.Strings.nullToEmpty;
import static com.google.common.base.Throwables.getCausalChain;
import static com.google.common.collect.ImmutableMultiset.toImmutableMultiset;
import static com.google.common.collect.MoreCollectors.onlyElement;
import static io.trino.plugin.bigquery.BigQueryQueryRunner.BigQuerySqlExecutor;
Expand Down Expand Up @@ -83,6 +84,12 @@
public abstract class BaseBigQueryConnectorTest
extends BaseConnectorTest
{
private static final RetryPolicy<Object> BIGQUERY_TRANSIENT_FAILURE_RETRY_POLICY = RetryPolicy.builder()
.handleIf(BaseBigQueryConnectorTest::isTransientBigQueryFailure)
.withDelay(java.time.Duration.ofSeconds(10))
.withMaxAttempts(3)
.build();

protected BigQuerySqlExecutor bigQuerySqlExecutor;
private String gcpStorageBucket;
private String bigQueryConnectionId;
Expand Down Expand Up @@ -1573,6 +1580,13 @@ protected void verifyColumnNameLengthFailurePermissible(Throwable e)

private void onBigQuery(@Language("SQL") String sql)
{
bigQuerySqlExecutor.execute(sql);
Failsafe.with(BIGQUERY_TRANSIENT_FAILURE_RETRY_POLICY)
.run(() -> bigQuerySqlExecutor.execute(sql));
}

private static boolean isTransientBigQueryFailure(Throwable throwable)
{
return getCausalChain(throwable).stream().anyMatch(cause ->
nullToEmpty(cause.getMessage()).contains("Visibility check was unavailable"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.collect.ImmutableList;
import io.airlift.json.JsonCodec;
import io.trino.testing.QueryRunner;
import io.trino.testing.datatype.ColumnSetup;
import io.trino.testing.datatype.DataSetup;
import io.trino.testing.sql.SqlExecutor;
Expand Down Expand Up @@ -49,13 +50,15 @@ public class CassandraCreateAndInsertDataSetup
private final String tableNamePrefix;
private final String keyspaceName;
private final CassandraServer cassandraServer;
private final QueryRunner queryRunner;

public CassandraCreateAndInsertDataSetup(SqlExecutor sqlExecutor, String tableNamePrefix, CassandraServer cassandraServer)
public CassandraCreateAndInsertDataSetup(SqlExecutor sqlExecutor, String tableNamePrefix, CassandraServer cassandraServer, QueryRunner queryRunner)
{
this.sqlExecutor = requireNonNull(sqlExecutor, "sqlExecutor is null");
this.tableNamePrefix = requireNonNull(tableNamePrefix, "tableNamePrefix is null");
keyspaceName = verifyTableNamePrefixAndGetKeyspaceName(tableNamePrefix);
this.cassandraServer = requireNonNull(cassandraServer, "cassandraServer is null");
this.queryRunner = requireNonNull(queryRunner, "queryRunner is null");
}

private static String verifyTableNamePrefixAndGetKeyspaceName(String tableNamePrefix)
Expand Down Expand Up @@ -105,9 +108,8 @@ private void refreshSizeEstimates(String keyspaceName, String tableName)

private void waitForTableVisibility(String keyspaceName, String tableName)
{
assertEventually(() -> assertThat(cassandraServer.getSession()
.execute(format("SELECT table_name FROM system_schema.tables WHERE keyspace_name = '%s' AND table_name = '%s'", keyspaceName, tableName)).all())
.isNotEmpty());
assertEventually(() -> assertThat(queryRunner.execute(format("SELECT * FROM %s.%s", keyspaceName, tableName)).getRowCount())
.isEqualTo(1));
}

private TestTable createTestTable(List<ColumnSetup> inputs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.cassandra;

import com.google.common.collect.ImmutableList;
import io.airlift.units.Duration;
import io.trino.testing.QueryRunner;
import io.trino.testing.sql.SqlExecutor;

Expand All @@ -25,6 +26,7 @@
import static io.trino.testing.assertions.Assert.assertEventually;
import static java.lang.String.format;
import static java.lang.String.join;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.stream.Collectors.joining;
import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -65,8 +67,11 @@ public TestCassandraTable(
}

// Ensure that the currently created table is visible to Trino
assertEventually(() -> assertThat(queryRunner.execute("SELECT * FROM %s.%s".formatted(keyspace, tableName)).getRowCount())
.isEqualTo(rowsToInsert.size()));
assertEventually(
new Duration(1, MINUTES),
() -> assertThat(queryRunner.execute("SELECT * FROM %s.%s".formatted(keyspace, tableName))
.getRowCount())
.isEqualTo(rowsToInsert.size()));
}
catch (Exception e) {
try (TestCassandraTable ignored = this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ private DataSetup trinoCreateAndInsert(Session session, String tableNamePrefix)

private DataSetup cassandraCreateAndInsert(String tableNamePrefix)
{
return new CassandraCreateAndInsertDataSetup(session::execute, tableNamePrefix, server);
return new CassandraCreateAndInsertDataSetup(session::execute, tableNamePrefix, server, getQueryRunner());
}

private TestCassandraTable testTable(String namePrefix, List<ColumnDefinition> columnDefinitions, List<String> rowsToInsert)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.parallel.Execution;
import software.amazon.awssdk.services.glue.GlueClient;
import software.amazon.awssdk.services.glue.model.Column;
import software.amazon.awssdk.services.glue.model.CreateTableRequest;
import software.amazon.awssdk.services.glue.model.DeleteTableRequest;
import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
import software.amazon.awssdk.services.glue.model.GetTableRequest;
import software.amazon.awssdk.services.glue.model.GetTableResponse;
import software.amazon.awssdk.services.glue.model.SerDeInfo;
Expand All @@ -36,10 +38,12 @@
import java.util.Map;

import static io.trino.plugin.hive.metastore.glue.TestingGlueHiveMetastore.createTestingGlueHiveMetastore;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static io.trino.testing.TestingSession.testSessionBuilder;
import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;
import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD;

/**
* Tests metadata operations on a schema which has a mix of Hive and Delta Lake tables.
Expand All @@ -48,6 +52,7 @@
* See https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-default
*/
@TestInstance(PER_CLASS)
@Execution(SAME_THREAD) // Tests share a Glue schema and assert exact table listings
public class TestDeltaLakeSharedGlueMetastoreWithTableRedirections
extends BaseDeltaLakeSharedMetastoreWithTableRedirectionsTest
{
Expand Down Expand Up @@ -124,7 +129,7 @@ protected String getExpectedDeltaLakeCreateSchema(String catalogName)
@Test
public void testUnsupportedHiveTypeRedirect()
{
String tableName = "unsupported_types";
String tableName = "unsupported_types_" + randomNameSuffix();
// Use another complete table location so `SHOW CREATE TABLE` doesn't fail on reading metadata
String location;
try (GlueClient glueClient = GlueClient.create()) {
Expand Down Expand Up @@ -168,21 +173,34 @@ public void testUnsupportedHiveTypeRedirect()
try (GlueClient glueClient = GlueClient.create()) {
glueClient.createTable(createTableRequest);

String tableDefinition = (String) computeScalar("SHOW CREATE TABLE hive_with_redirections." + schema + "." + tableName);
String expected =
"""
CREATE TABLE delta_with_redirections.%s.%s (
a_varchar varchar
)
WITH (
location = '%s'
)""";
assertThat(tableDefinition).isEqualTo(expected.formatted(schema, tableName, location));
try {
String tableDefinition = (String) computeScalar(
"SHOW CREATE TABLE hive_with_redirections." + schema + "." + tableName);
String expected =
"""
CREATE TABLE delta_with_redirections.%s.%s (
a_varchar varchar
)
WITH (
location = '%s'
)""";
assertThat(tableDefinition).isEqualTo(expected.formatted(schema, tableName, location));
}
finally {
deleteTableIfExists(glueClient, tableInput.name());
}
}
}

private void deleteTableIfExists(GlueClient glueClient, String tableName)
{
try {
glueClient.deleteTable(DeleteTableRequest.builder()
.databaseName(schema)
.name(tableInput.name())
.name(tableName)
.build());
}
catch (EntityNotFoundException _) {
Comment thread
electrum marked this conversation as resolved.
}
}
}
Loading