Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
dd063f1
Initial approach
dtspence Jun 16, 2025
55adf22
Check for committer initialized before singleton initialize
dtspence Jun 18, 2025
0f07837
Refactor SplitsCache implementers to require default constructor and …
dtspence Jun 18, 2025
cc28943
SplitsFileTest fixes for test failures
dtspence Jun 24, 2025
f0a5da7
SplitsFile as default implementation setup
dtspence Jun 24, 2025
6397aed
MultiTableRangePartitioner fix for holder variable
dtspence Jun 24, 2025
0e9e944
Splits size method
dtspence Jun 24, 2025
76e49ed
Specifically look for missing splits and splits that are before split…
dtspence Jun 24, 2025
36a0afd
JUnit5 test update
dtspence Jun 24, 2025
c51e2fd
More JUnit5 updates
dtspence Jun 24, 2025
6c36e10
Splits cache factory reset for tests
dtspence Jun 24, 2025
7c910ee
Update to use JUnit 5 semantics
dtspence Jun 24, 2025
4695c26
Formatting changes
dtspence Aug 4, 2025
47225b4
Adding in unit test qol updates from TX Dylan
avgAGB Mar 30, 2026
bf3a6f9
Merge remote-tracking branch 'origin/integration' into feature/splits…
avgAGB Apr 1, 2026
1ea5bd4
Merge remote-tracking branch 'origin/integration' into feature/splits…
avgAGB Apr 1, 2026
ca0e08d
Fixing import warnings
avgAGB Apr 2, 2026
278cdb9
Working through some unit testing
avgAGB Apr 2, 2026
1c9e55a
Adding in getInstance
avgAGB Apr 2, 2026
c3c7842
Working on MROF unit tests
avgAGB Apr 3, 2026
ea87a82
More ut tweaks
avgAGB Apr 3, 2026
5fcb830
Merge remote-tracking branch 'origin/integration' into feature/splits…
avgAGB Apr 6, 2026
e7904d2
Merge remote-tracking branch 'origin/integration' into feature/splits…
avgAGB Apr 6, 2026
7ae18ba
Making tweaks to move the getsplits deeper in the stack
avgAGB Apr 20, 2026
d8c255d
Formatting tweak
avgAGB Apr 20, 2026
c53b46a
Pruning out unused method
avgAGB Apr 22, 2026
8ea0fca
Merge remote-tracking branch 'origin/integration' into feature/splits…
avgAGB Apr 23, 2026
f0957f2
Merge remote-tracking branch 'origin/integration' into feature/splits…
avgAGB Apr 24, 2026
8152fcb
Merge remote-tracking branch 'origin/integration' into feature/splits…
avgAGB Apr 30, 2026
0cc68aa
Merge remote-tracking branch 'origin/integration' into feature/splits…
avgAGB May 4, 2026
3263c21
Merge remote-tracking branch 'origin/integration' into feature/splits…
avgAGB May 8, 2026
3d62c73
Merge remote-tracking branch 'origin/integration' into feature/splits…
avgAGB May 13, 2026
7d788a5
Merge remote-tracking branch 'origin/integration' into feature/splits…
avgAGB May 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,6 @@ public int run(String[] args) throws Exception {
log.info("InputFormat: " + job.getInputFormatClass().getName());
log.info("Mapper: " + job.getMapperClass().getName());
log.info("Reduce tasks: " + (useMapOnly ? 0 : reduceTasks));
log.info("Split File: " + conf.get(TableSplitsCache.SPLITS_CACHE_DIR) + "/"
+ conf.get(TableSplitsCache.SPLITS_CACHE_FILE, TableSplitsCache.DEFAULT_SPLITS_CACHE_FILE));

// Note that if we run any other jobs in the same vm (such as a sampler), then we may
// need to catch and throw away an exception here
Expand Down Expand Up @@ -665,7 +663,7 @@ protected Configuration parseArguments(String[] args, Configuration conf) throws
} else if (args[i].equals("-disableRefreshSplits")) {
conf.setBoolean(TableSplitsCache.REFRESH_SPLITS, false);
} else if (args[i].equals("-splitsCacheDir")) {
conf.set(TableSplitsCache.SPLITS_CACHE_DIR, args[++i]);
conf.set(SplitsConstants.SPLITS_CACHE_DIR, args[++i]);
} else if (args[i].equals("-multipleNumShardsCacheDir")) {
conf.set(NumShards.MULTIPLE_NUMSHARDS_CACHE_PATH, args[++i]);
} else if (args[i].equals("-enableAccumuloConfigCache")) {
Expand Down Expand Up @@ -849,7 +847,7 @@ protected void configureBulkPartitionerAndOutputFormatter(Job job, AccumuloHelpe
conf.setInt("splits.num.reduce", this.reduceTasks);
// used by the output formatter and the sharded partitioner
long before = System.currentTimeMillis();
SplitsFile.setupFile(job, conf);
SplitsCache.getInstance(conf).setupJob(job);
long after = System.currentTimeMillis();

log.info("Sharded splits files setup time: " + (after - before) + "ms");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public class MultiRFileOutputFormatter extends FileOutputFormat<BulkIngestKey,Va
protected Configuration conf;
protected Map<String,ConfigurationCopy> tableConfigs;
protected Set<String> tableIds = null;
protected SplitsCache splitsCache;
protected long maxRFileSize = 0;
protected int maxRFileEntries = 0;
protected boolean generateMapFileRowKeys = false;
Expand Down Expand Up @@ -274,9 +275,10 @@ protected SizeTrackingWriter openWriter(String filename, AccumuloConfiguration t

var builder = org.apache.accumulo.core.client.rfile.RFile.newWriter().to(filename).withFileSystem(fs).withTableProperties(tableConf);
if(this.loadPlanningEnabled) {
var splits = SplitsFile.getSplits(conf, table);
if(splits != null && !splits.isEmpty()) {
LoadPlan.SplitResolver splitResolver = row->findContainingSplits(row, splits);
SplitsCache splits = SplitsCache.getInstance(conf);
List<Text> splitsFile = splits.getSplits(conf, table);
if(splitsFile != null && !splitsFile.isEmpty()) {
LoadPlan.SplitResolver splitResolver = row->SplitsFile.findContainingSplits(row, splitsFile);
builder = builder.withSplitResolver(splitResolver);
}
}
Expand Down Expand Up @@ -363,27 +365,6 @@ private void writeLoadPlans(TaskAttemptContext context) {
log.debug("Finished writing bulk load plans to disk");
}

/**
* Finds two contiguous table splits that contain the specified row should reside
*
* @param lookupRow
* Row value to be mapped
* @param tableSplits
* Splits for the table in question
* @return KeyExtent mapping for the given row
*/
static LoadPlan.TableSplits findContainingSplits(Text lookupRow, List<Text> tableSplits) {
int position = Collections.binarySearch(tableSplits, lookupRow);
if (position < 0) {
position = -1 * (position + 1);
}

Text prevRow = position == 0 ? null : tableSplits.get(position - 1);
Text endRow = position == tableSplits.size() ? null : tableSplits.get(position);

return new LoadPlan.TableSplits(prevRow, endRow);
}

public static class SizeTrackingWriter {
private RFileWriter delegate;
long size = 0;
Expand Down Expand Up @@ -550,6 +531,7 @@ public RecordWriter<BulkIngestKey,Value> getRecordWriter(final TaskAttemptContex
FileOutputCommitter committer = (FileOutputCommitter) getOutputCommitter(context);
workDir = committer.getWorkPath();
conf = context.getConfiguration();
splitsCache = SplitsCache.getInstance(conf);

setTableIdsAndConfigs();

Expand Down Expand Up @@ -743,15 +725,10 @@ private SizeTrackingWriter getOrCreateWriter(TaskAttemptContext context, String
if (generateMapFilePerShardLocation) {
// Look up the shard location (tablet server serving shard ID rowKey)
// If we don't have a location, then just use the rowKey itself.
Map<Text,String> shardLocs = getShardLocations(tableName);
shardLocation = shardLocs.containsKey(rowKey) ? shardLocs.get(rowKey) : null;
if (shardLocation == null) {
// in this case we have a shard id that has no split. Lets put this in one "extra" file
shardLocation = "extra";
} else {
// Ensure there's no colon
shardLocation = shardLocation.replace(":", "_");
}
String fallback = conf.get("shard.fallback.name." + rowKey.toString(), "extra");
shardLocation = splitsCache.getExactLocation(tableName, rowKey, () -> fallback);
// in the case we have a shard id that has no split. Lets put this in one "extra" file
shardLocation = shardLocation.replace(":", "_");
}
// Combine table name with shard location so that we end up
// with all of the shard map files under directories that can be
Expand All @@ -777,27 +754,4 @@ private SizeTrackingWriter getOrCreateWriter(TaskAttemptContext context, String
}
};
}

/**
* Read in the sequence file (that was created at job startup) for the given table that contains a list of shard IDs and the corresponding tablet server to
* which that shard is assigned.
*
* @param tableName
* the table name
* @return a mapping of the shard ids and tablet server
* @throws IOException
* if there is an issue with read or write
*/
protected Map<Text,String> getShardLocations(String tableName) throws IOException {
// Create the Map of sharded table name to [shardId -> server]
if (this.tableShardLocations == null) {
this.tableShardLocations = new HashMap<>();
}

if (null == this.tableShardLocations.get(tableName)) {
this.tableShardLocations.put(tableName, SplitsFile.getSplitsAndLocations(conf, tableName));
}

return tableShardLocations.get(tableName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package datawave.ingest.mapreduce.job;

import java.io.IOException;
import java.util.List;
import java.util.function.Supplier;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;

public interface SplitsCache extends AutoCloseable {
static SplitsCache getInstance(final Configuration conf) {
return SplitsCacheFactory.getSplitsCache(conf);
}

void init(Configuration conf);

void setupJob(final Job job) throws IOException;

boolean hasSplits();

int getSplitsCount(String table);

int getExactIndex(String table, Text key);

int getExactPartition(String table, Text key);

int getNearestPartition(String table, Text key);

String getExactLocation(String table, Text key, Supplier<String> defaultFn);

List<Text> getSplits(Configuration conf, String tableName) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package datawave.ingest.mapreduce.job;

import java.lang.reflect.InvocationTargetException;

import org.apache.hadoop.conf.Configuration;

public class SplitsCacheFactory {
public static final String SPLITS_CACHE_IMPL = "datawave.ingest.splits.cache.impl";

static volatile SplitsCache INSTANCE;

public static SplitsCache getSplitsCache(final Configuration conf) {
if (INSTANCE == null) {
synchronized (SplitsCacheFactory.class) {
if (INSTANCE == null) {
try {
String splitsCacheImpl = conf.get(SPLITS_CACHE_IMPL);
// noinspection unchecked
Class<? extends SplitsCache> clazz = splitsCacheImpl != null ? (Class<? extends SplitsCache>) Class.forName(splitsCacheImpl)
: SplitsFile.class;
INSTANCE = clazz.getDeclaredConstructor().newInstance();
INSTANCE.init(conf);
} catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException ex) {
throw new RuntimeException(ex);
}
}
}
}
return INSTANCE;
}

public static void clearInstance() {
INSTANCE = null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package datawave.ingest.mapreduce.job;

public class SplitsConstants {
public static final String SPLITS_CACHE_DIR = "datawave.ingest.splits.cache.dir";
public static final String SPLITS_CACHE_FILE = "datawave.ingest.splits.cache.file";
public static final String DEFAULT_SPLITS_CACHE_DIR = "/data/splitsCache";
}
Loading
Loading