Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/src/main/sphinx/connector/delta-lake.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,13 @@ values. Typical usage does not require you to configure them.
- Number of threads used for retrieving checkpoint files of each table. Currently, only
retrievals of V2 Checkpoint's sidecar files are parallelized.
- `4`
* - `delta.load-metadata-from-checksum-file`
- Speed up query planning by reading table metadata and protocol
entries from the Delta version checksum file (`<version>.crc`) when
available. Falls back to scanning the transaction log if the checksum
file is missing, incomplete, or malformed. The equivalent catalog
session property is `load_metadata_from_checksum_file`.
- `true`
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Comment thread
raunaqmorarka marked this conversation as resolved.
:::

### Catalog session properties
Expand Down Expand Up @@ -234,6 +241,12 @@ The following table describes {ref}`catalog session properties
- Read only projected fields from row columns while performing `SELECT`
queries.
- `true`
* - `load_metadata_from_checksum_file`
- Speed up query planning by reading table metadata and protocol
entries from the Delta version checksum file (`<version>.crc`) when
available. Falls back to scanning the transaction log if the checksum
file is missing, incomplete, or malformed.
- `true`
:::

(delta-lake-fte-support)=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
import com.azure.storage.file.datalake.models.PathItem;
import com.azure.storage.file.datalake.options.DataLakePathDeleteOptions;
import com.google.common.collect.ImmutableList;
import com.google.common.io.Closer;
import io.opentelemetry.api.OpenTelemetry;
import io.trino.filesystem.AbstractTestTrinoFileSystem;
import io.trino.filesystem.FileIterator;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoInput;
Expand Down Expand Up @@ -291,4 +294,27 @@ public void testDirectoryExists()
}
super.testDirectoryExists();
}

@Test
void testListFilesStartingFromConsecutiveSlashesInLocation()
throws IOException
{
// ADLS Gen2 hierarchical canonicalizes runs of slashes; verifies
// EmulatedListFilesStartingFromIterator falls back to the slash-collapsed prefix.
if (!isHierarchical()) {
abort("Only ADLS Gen2 hierarchical canonicalizes `//` and routes through EmulatedListFilesStartingFromIterator");
}
try (Closer closer = Closer.create()) {
Location file1 = createBlob(closer, "level0/level1-file1");
Location file2 = createBlob(closer, "level0/level1-file2");

Location doubledSlash = createLocation("level0").appendSuffix("//");
ImmutableList.Builder<Location> builder = ImmutableList.builder();
FileIterator iterator = getFileSystem().listFilesStartingFrom(doubledSlash, "level1-file1");
while (iterator.hasNext()) {
builder.add(iterator.next().location());
}
assertThat(builder.build()).containsExactlyInAnyOrder(file1, file2);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,19 @@

Comment thread
raunaqmorarka marked this conversation as resolved.
import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.regex.Pattern;

import static com.google.common.base.Preconditions.checkState;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public final class EmulatedListFilesStartingFromIterator
implements FileIterator
{
private static final Pattern CONSECUTIVE_SLASHES = Pattern.compile("/+");

private final FileIterator delegate;
private final String locationPath;
private final String collapsedLocationPath;
private final String startingFrom;
private FileEntry nextEntry;

Expand All @@ -33,6 +37,7 @@ public EmulatedListFilesStartingFromIterator(FileIterator delegate, Location loc

String locationPath = location.path();
this.locationPath = (locationPath.isEmpty() || locationPath.endsWith("/")) ? locationPath : locationPath + "/";
this.collapsedLocationPath = CONSECUTIVE_SLASHES.matcher(this.locationPath).replaceAll("/");
Comment thread
raunaqmorarka marked this conversation as resolved.

this.startingFrom = requireNonNull(startingFrom, "startingFrom is null");
}
Expand Down Expand Up @@ -69,9 +74,22 @@ private void loadNextEntry()
while (delegate.hasNext()) {
FileEntry entry = delegate.next();
String entryPath = entry.location().path();
checkState(entryPath.startsWith(locationPath), "Expected listed file to start with directory path '%s': %s", locationPath, entry.location());

String entryTail = entryPath.substring(locationPath.length());
// LocalFileSystem, AlluxioFileSystem, and ADLS Gen2 hierarchical canonicalize runs of
// slashes in returned paths. Try the original prefix first to preserve blob-store keys
// where `//` is meaningful; fall back to the slash-collapsed form.
String prefix;
if (entryPath.startsWith(locationPath)) {
prefix = locationPath;
}
else if (entryPath.startsWith(collapsedLocationPath)) {
prefix = collapsedLocationPath;
}
else {
throw new IllegalStateException(format("Expected listed file to start with directory path '%s': %s", locationPath, entry.location()));
}

String entryTail = entryPath.substring(prefix.length());
if (entryTail.compareTo(startingFrom) >= 0) {
nextEntry = entry;
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,32 @@ void testListFilesStartingFromDoubleSlashPathComponent()
Location.of("file:///double/a"));
}

@Test
void testListFilesStartingFromHierarchicalLocationNormalization()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test succeeds also without the productive code changes on EmulatedListFilesStartingFromIterator.java
I would have assumed that it was suposed to fail.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verified by reverting the iterator change locally: the test fails with IllegalStateException: Expected listed file to start with directory path 'dir//sub/_delta_log/': abfs://container@account.dfs.core.windows.net/dir/sub/_delta_log/00000000000000000000.json.

throws IOException
{
// FS canonicalizes `//` to `/`; iterator falls back to slash-collapsed prefix.
assertThat(listFilesStartingFrom(
Location.of("abfs://container@account.dfs.core.windows.net/dir//sub/_delta_log/"),
"00000000000000000000",
List.of(
entry("abfs://container@account.dfs.core.windows.net/dir/sub/_delta_log/00000000000000000000.json"),
entry("abfs://container@account.dfs.core.windows.net/dir/sub/_delta_log/00000000000000000001.checkpoint.parquet"))))
.containsExactly(
Location.of("abfs://container@account.dfs.core.windows.net/dir/sub/_delta_log/00000000000000000000.json"),
Location.of("abfs://container@account.dfs.core.windows.net/dir/sub/_delta_log/00000000000000000001.checkpoint.parquet"));

// startingFrom filtering still applies to the slash-collapsed remainder.
assertThat(listFilesStartingFrom(
Location.of("abfs://container@account.dfs.core.windows.net/dir//sub/_delta_log/"),
"00000000000000000001",
List.of(
entry("abfs://container@account.dfs.core.windows.net/dir/sub/_delta_log/00000000000000000000.json"),
entry("abfs://container@account.dfs.core.windows.net/dir/sub/_delta_log/00000000000000000001.checkpoint.parquet"))))
.containsExactly(
Location.of("abfs://container@account.dfs.core.windows.net/dir/sub/_delta_log/00000000000000000001.checkpoint.parquet"));
}

@Test
void testListFilesStartingFromIncludesAllNonAsciiFilenames()
throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
*/
package io.trino.filesystem.local;

import com.google.common.collect.ImmutableList;
import com.google.common.io.Closer;
import io.trino.filesystem.AbstractTestTrinoFileSystem;
import io.trino.filesystem.FileIterator;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import org.junit.jupiter.api.AfterAll;
Expand Down Expand Up @@ -119,6 +122,26 @@ protected void verifyFileSystemIsEmpty()
}
}

@Test
void testListFilesStartingFromConsecutiveSlashesInLocation()
throws IOException
{
// LocalFileSystem (Java NIO) canonicalizes runs of slashes; verifies
// EmulatedListFilesStartingFromIterator falls back to the slash-collapsed prefix.
try (Closer closer = Closer.create()) {
Location file1 = createBlob(closer, "level0/level1-file1");
Location file2 = createBlob(closer, "level0/level1-file2");

Location doubledSlash = createLocation("level0").appendSuffix("//");
ImmutableList.Builder<Location> builder = ImmutableList.builder();
FileIterator iterator = getFileSystem().listFilesStartingFrom(doubledSlash, "level1-file1");
while (iterator.hasNext()) {
builder.add(iterator.next().location());
}
assertThat(builder.build()).containsExactlyInAnyOrder(file1, file2);
}
}

@Test
void testPathsOutOfBounds()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public class DeltaLakeConfig
private boolean deltaLogFileSystemCacheDisabled;
private int metadataParallelism = 8;
private int checkpointProcessingParallelism = 4;
private boolean loadMetadataFromChecksumFile = true;
Comment thread
raunaqmorarka marked this conversation as resolved.

public Duration getMetadataCacheTtl()
{
Expand Down Expand Up @@ -587,4 +588,17 @@ public DeltaLakeConfig setCheckpointProcessingParallelism(int checkpointProcessi
this.checkpointProcessingParallelism = checkpointProcessingParallelism;
return this;
}

public boolean isLoadMetadataFromChecksumFile()
{
return loadMetadataFromChecksumFile;
}

@Config("delta.load-metadata-from-checksum-file")
Comment thread
adam-richardson-openai marked this conversation as resolved.
@ConfigDescription("Read table metadata and protocol from the Delta version checksum file when available, falling back to the transaction log")
public DeltaLakeConfig setLoadMetadataFromChecksumFile(boolean loadMetadataFromChecksumFile)
{
this.loadMetadataFromChecksumFile = loadMetadataFromChecksumFile;
return this;
}
}
Loading
Loading