-
Notifications
You must be signed in to change notification settings - Fork 170
Support reporting statistics in spark datasource #8057
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,38 +3,62 @@ | |
|
|
||
| package dev.vortex.spark.read; | ||
|
|
||
| import dev.vortex.api.DataSource; | ||
| import dev.vortex.api.Session; | ||
| import dev.vortex.spark.VortexSparkSession; | ||
| import java.util.Arrays; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.OptionalLong; | ||
| import org.apache.spark.sql.connector.catalog.CatalogV2Util; | ||
| import org.apache.spark.sql.connector.catalog.Column; | ||
| import org.apache.spark.sql.connector.expressions.NamedReference; | ||
| import org.apache.spark.sql.connector.expressions.filter.Predicate; | ||
| import org.apache.spark.sql.connector.read.Batch; | ||
| import org.apache.spark.sql.connector.read.Scan; | ||
| import org.apache.spark.sql.connector.read.Statistics; | ||
| import org.apache.spark.sql.connector.read.SupportsReportStatistics; | ||
| import org.apache.spark.sql.connector.read.colstats.ColumnStatistics; | ||
| import org.apache.spark.sql.internal.SQLConf; | ||
| import org.apache.spark.sql.types.StructType; | ||
|
|
||
| /** Spark V2 {@link Scan} over a table of Vortex files. */ | ||
| public final class VortexScan implements Scan { | ||
| /** | ||
| * Spark V2 {@link Scan} over a table of Vortex files. | ||
| * | ||
| * <p>Implements {@link SupportsReportStatistics} to surface both the row count Vortex records in each file footer and a | ||
| * Spark scan-size estimate. The byte estimate starts from the on-storage file sizes collected by | ||
| * {@code MultiFileDataSource}, then follows Spark's file scan convention by applying the SQL file-compression factor | ||
| * and scaling by the pushed read schema's default size relative to the full table schema's default size. When the | ||
| * listing did not return a size for one or more files the file-byte total is extrapolated before Spark scaling is | ||
| * applied. | ||
| */ | ||
| public final class VortexScan implements Scan, SupportsReportStatistics { | ||
|
|
||
| private final List<String> paths; | ||
| private final List<Column> tableColumns; | ||
| private final List<Column> readColumns; | ||
| private final Map<String, String> formatOptions; | ||
| private final Predicate[] pushedPredicates; | ||
|
|
||
| private volatile Statistics cachedStatistics; | ||
|
|
||
| /** | ||
| * Creates a new VortexScan for the specified file paths and columns. The caller is responsible for passing | ||
| * immutable collections; the constructor does not copy. | ||
| * | ||
| * @param paths the list of Vortex file paths to scan | ||
| * @param tableColumns the full table columns before projection pushdown | ||
| * @param readColumns the list of columns to read from the files | ||
| * @param pushedPredicates predicates pushed down by Spark; {@code null} or empty means no pushdown | ||
| */ | ||
| public VortexScan( | ||
| List<String> paths, | ||
| List<Column> tableColumns, | ||
| List<Column> readColumns, | ||
| Map<String, String> formatOptions, | ||
| Predicate[] pushedPredicates) { | ||
| Predicate[] pushedPredicates, | ||
| Map<String, String> formatOptions) { | ||
| this.paths = paths; | ||
| this.tableColumns = tableColumns; | ||
| this.readColumns = readColumns; | ||
| this.formatOptions = formatOptions; | ||
| this.pushedPredicates = pushedPredicates == null ? new Predicate[0] : pushedPredicates.clone(); | ||
|
|
@@ -83,4 +107,70 @@ public Batch toBatch() { | |
| public ColumnarSupportMode columnarSupportMode() { | ||
| return ColumnarSupportMode.SUPPORTED; | ||
| } | ||
|
|
||
| /** | ||
| * Returns statistics for this scan. | ||
| * | ||
| * <p>Opens the Vortex {@link DataSource} on first invocation and caches the result. The row count is taken from the | ||
| * data source (sum of file-footer row counts; extrapolated from the first opened file when other files are | ||
| * deferred). {@link Statistics#sizeInBytes()} is derived from the per-file sizes reported by the filesystem | ||
| * listing, then adjusted by Spark's compression factor and the ratio between the pushed read schema and the full | ||
| * table schema. When a listing did not return a size for some file the file-byte total is extrapolated. When no | ||
| * file size is known at all the value is left empty so Spark falls back to its default heuristic. | ||
| * | ||
| * @return statistics with row-count and Spark scan-size estimates | ||
| */ | ||
| @Override | ||
| public Statistics estimateStatistics() { | ||
| Statistics local = cachedStatistics; | ||
| if (local != null) { | ||
| return local; | ||
| } | ||
| synchronized (this) { | ||
| if (cachedStatistics == null) { | ||
| cachedStatistics = computeStatistics(); | ||
| } | ||
| return cachedStatistics; | ||
| } | ||
| } | ||
|
|
||
| private Statistics computeStatistics() { | ||
| Session session = VortexSparkSession.get(formatOptions); | ||
| List<String> resolvedPaths = VortexBatchExec.resolveVortexPaths(session, paths, formatOptions); | ||
| if (resolvedPaths.isEmpty()) { | ||
| return new VortexStatistics(OptionalLong.empty(), OptionalLong.empty()); | ||
| } | ||
|
|
||
| DataSource source = DataSource.open(session, resolvedPaths, formatOptions); | ||
| return new VortexStatistics( | ||
| source.rowCount().asOptional(), | ||
| scaleSizeInBytes(source.byteSize().asOptional())); | ||
| } | ||
|
|
||
| private OptionalLong scaleSizeInBytes(OptionalLong fileBytes) { | ||
| if (fileBytes.isEmpty()) { | ||
| return OptionalLong.empty(); | ||
| } | ||
|
|
||
| StructType tableSchema = CatalogV2Util.v2ColumnsToStructType(tableColumns.toArray(new Column[0])); | ||
| StructType readSchema = readSchema(); | ||
| int tableDefaultSize = tableSchema.defaultSize(); | ||
| if (tableDefaultSize <= 0) { | ||
| return fileBytes; | ||
| } | ||
|
|
||
| double scaled = SQLConf.get().fileCompressionFactor() | ||
| * fileBytes.getAsLong() | ||
| / tableDefaultSize | ||
| * readSchema.defaultSize(); | ||
| return OptionalLong.of((long) scaled); | ||
| } | ||
|
|
||
| private record VortexStatistics(OptionalLong numRows, OptionalLong sizeInBytes) implements Statistics { | ||
|
|
||
| @Override | ||
| public Map<NamedReference, ColumnStatistics> columnStats() { | ||
| return Map.of(); | ||
| } | ||
|
Comment on lines
+172
to
+174
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in the future this is where we'd report the file stats right?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes but this doesn't happen right now anywhere in spark and you'd have to read all the footers here to produce this value which might be bad |
||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is this doing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is taking the logic that spark has for the size of the relation - it's trying to scale the file size by the ratio of read schema and file schema. Basically try to take column pruning into consideration