diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/IngestJob.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/IngestJob.java index 27fa88ccb11..462c0d5adc1 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/IngestJob.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/IngestJob.java @@ -362,8 +362,6 @@ public int run(String[] args) throws Exception { log.info("InputFormat: " + job.getInputFormatClass().getName()); log.info("Mapper: " + job.getMapperClass().getName()); log.info("Reduce tasks: " + (useMapOnly ? 0 : reduceTasks)); - log.info("Split File: " + conf.get(TableSplitsCache.SPLITS_CACHE_DIR) + "/" - + conf.get(TableSplitsCache.SPLITS_CACHE_FILE, TableSplitsCache.DEFAULT_SPLITS_CACHE_FILE)); // Note that if we run any other jobs in the same vm (such as a sampler), then we may // need to catch and throw away an exception here @@ -665,7 +663,7 @@ protected Configuration parseArguments(String[] args, Configuration conf) throws } else if (args[i].equals("-disableRefreshSplits")) { conf.setBoolean(TableSplitsCache.REFRESH_SPLITS, false); } else if (args[i].equals("-splitsCacheDir")) { - conf.set(TableSplitsCache.SPLITS_CACHE_DIR, args[++i]); + conf.set(SplitsConstants.SPLITS_CACHE_DIR, args[++i]); } else if (args[i].equals("-multipleNumShardsCacheDir")) { conf.set(NumShards.MULTIPLE_NUMSHARDS_CACHE_PATH, args[++i]); } else if (args[i].equals("-enableAccumuloConfigCache")) { @@ -849,7 +847,7 @@ protected void configureBulkPartitionerAndOutputFormatter(Job job, AccumuloHelpe conf.setInt("splits.num.reduce", this.reduceTasks); // used by the output formatter and the sharded partitioner long before = System.currentTimeMillis(); - SplitsFile.setupFile(job, conf); + SplitsCache.getInstance(conf).setupJob(job); long after = System.currentTimeMillis(); log.info("Sharded splits files setup time: " + (after - before) + "ms"); diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/MultiRFileOutputFormatter.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/MultiRFileOutputFormatter.java index b3e2e3c2a8e..8c2c54068f6 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/MultiRFileOutputFormatter.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/MultiRFileOutputFormatter.java @@ -112,6 +112,7 @@ public class MultiRFileOutputFormatter extends FileOutputFormat tableConfigs; protected Set tableIds = null; + protected SplitsCache splitsCache; protected long maxRFileSize = 0; protected int maxRFileEntries = 0; protected boolean generateMapFileRowKeys = false; @@ -274,9 +275,10 @@ protected SizeTrackingWriter openWriter(String filename, AccumuloConfiguration t var builder = org.apache.accumulo.core.client.rfile.RFile.newWriter().to(filename).withFileSystem(fs).withTableProperties(tableConf); if(this.loadPlanningEnabled) { - var splits = SplitsFile.getSplits(conf, table); - if(splits != null && !splits.isEmpty()) { - LoadPlan.SplitResolver splitResolver = row->findContainingSplits(row, splits); + SplitsCache splits = SplitsCache.getInstance(conf); + List splitsFile = splits.getSplits(conf, table); + if(splitsFile != null && !splitsFile.isEmpty()) { + LoadPlan.SplitResolver splitResolver = row->SplitsFile.findContainingSplits(row, splitsFile); builder = builder.withSplitResolver(splitResolver); } } @@ -363,27 +365,6 @@ private void writeLoadPlans(TaskAttemptContext context) { log.debug("Finished writing bulk load plans to disk"); } - /** - * Finds two contiguous table splits that contain the specified row should reside - * - * @param lookupRow - * Row value to be mapped - * @param tableSplits - * Splits for the table in question - * @return KeyExtent mapping for the given row - */ - static LoadPlan.TableSplits findContainingSplits(Text lookupRow, List tableSplits) { - int position = Collections.binarySearch(tableSplits, lookupRow); - if (position < 0) { - position = -1 * (position + 1); - } - - Text prevRow = position == 0 ? null : tableSplits.get(position - 1); - Text endRow = position == tableSplits.size() ? null : tableSplits.get(position); - - return new LoadPlan.TableSplits(prevRow, endRow); - } - public static class SizeTrackingWriter { private RFileWriter delegate; long size = 0; @@ -550,6 +531,7 @@ public RecordWriter getRecordWriter(final TaskAttemptContex FileOutputCommitter committer = (FileOutputCommitter) getOutputCommitter(context); workDir = committer.getWorkPath(); conf = context.getConfiguration(); + splitsCache = SplitsCache.getInstance(conf); setTableIdsAndConfigs(); @@ -743,15 +725,10 @@ private SizeTrackingWriter getOrCreateWriter(TaskAttemptContext context, String if (generateMapFilePerShardLocation) { // Look up the shard location (tablet server serving shard ID rowKey) // If we don't have a location, then just use the rowKey itself. - Map shardLocs = getShardLocations(tableName); - shardLocation = shardLocs.containsKey(rowKey) ? shardLocs.get(rowKey) : null; - if (shardLocation == null) { - // in this case we have a shard id that has no split. Lets put this in one "extra" file - shardLocation = "extra"; - } else { - // Ensure there's no colon - shardLocation = shardLocation.replace(":", "_"); - } + String fallback = conf.get("shard.fallback.name." + rowKey.toString(), "extra"); + shardLocation = splitsCache.getExactLocation(tableName, rowKey, () -> fallback); + // in the case we have a shard id that has no split. Lets put this in one "extra" file + shardLocation = shardLocation.replace(":", "_"); } // Combine table name with shard location so that we end up // with all of the shard map files under directories that can be @@ -777,27 +754,4 @@ private SizeTrackingWriter getOrCreateWriter(TaskAttemptContext context, String } }; } - - /** - * Read in the sequence file (that was created at job startup) for the given table that contains a list of shard IDs and the corresponding tablet server to - * which that shard is assigned. - * - * @param tableName - * the table name - * @return a mapping of the shard ids and tablet server - * @throws IOException - * if there is an issue with read or write - */ - protected Map getShardLocations(String tableName) throws IOException { - // Create the Map of sharded table name to [shardId -> server] - if (this.tableShardLocations == null) { - this.tableShardLocations = new HashMap<>(); - } - - if (null == this.tableShardLocations.get(tableName)) { - this.tableShardLocations.put(tableName, SplitsFile.getSplitsAndLocations(conf, tableName)); - } - - return tableShardLocations.get(tableName); - } } diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/SplitsCache.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/SplitsCache.java new file mode 100644 index 00000000000..42cb83ab0e6 --- /dev/null +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/SplitsCache.java @@ -0,0 +1,33 @@ +package datawave.ingest.mapreduce.job; + +import java.io.IOException; +import java.util.List; +import java.util.function.Supplier; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; + +public interface SplitsCache extends AutoCloseable { + static SplitsCache getInstance(final Configuration conf) { + return SplitsCacheFactory.getSplitsCache(conf); + } + + void init(Configuration conf); + + void setupJob(final Job job) throws IOException; + + boolean hasSplits(); + + int getSplitsCount(String table); + + int getExactIndex(String table, Text key); + + int getExactPartition(String table, Text key); + + int getNearestPartition(String table, Text key); + + String getExactLocation(String table, Text key, Supplier defaultFn); + + List getSplits(Configuration conf, String tableName) throws IOException; +} diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/SplitsCacheFactory.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/SplitsCacheFactory.java new file mode 100644 index 00000000000..480724d76be --- /dev/null +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/SplitsCacheFactory.java @@ -0,0 +1,35 @@ +package datawave.ingest.mapreduce.job; + +import java.lang.reflect.InvocationTargetException; + +import org.apache.hadoop.conf.Configuration; + +public class SplitsCacheFactory { + public static final String SPLITS_CACHE_IMPL = "datawave.ingest.splits.cache.impl"; + + static volatile SplitsCache INSTANCE; + + public static SplitsCache getSplitsCache(final Configuration conf) { + if (INSTANCE == null) { + synchronized (SplitsCacheFactory.class) { + if (INSTANCE == null) { + try { + String splitsCacheImpl = conf.get(SPLITS_CACHE_IMPL); + // noinspection unchecked + Class clazz = splitsCacheImpl != null ? (Class) Class.forName(splitsCacheImpl) + : SplitsFile.class; + INSTANCE = clazz.getDeclaredConstructor().newInstance(); + INSTANCE.init(conf); + } catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException ex) { + throw new RuntimeException(ex); + } + } + } + } + return INSTANCE; + } + + public static void clearInstance() { + INSTANCE = null; + } +} diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/SplitsConstants.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/SplitsConstants.java new file mode 100644 index 00000000000..fe89dcff99e --- /dev/null +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/SplitsConstants.java @@ -0,0 +1,7 @@ +package datawave.ingest.mapreduce.job; + +public class SplitsConstants { + public static final String SPLITS_CACHE_DIR = "datawave.ingest.splits.cache.dir"; + public static final String SPLITS_CACHE_FILE = "datawave.ingest.splits.cache.file"; + public static final String DEFAULT_SPLITS_CACHE_DIR = "/data/splitsCache"; +} diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/SplitsFile.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/SplitsFile.java index 3a10934ccb5..518c7752350 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/SplitsFile.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/SplitsFile.java @@ -1,14 +1,21 @@ package datawave.ingest.mapreduce.job; +import static datawave.ingest.mapreduce.job.SplitsConstants.SPLITS_CACHE_DIR; +import static datawave.ingest.mapreduce.job.SplitsConstants.SPLITS_CACHE_FILE; + import java.io.IOException; +import java.io.UncheckedIOException; import java.net.URI; -import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.data.LoadPlan; import org.apache.commons.lang.time.DateUtils; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.hadoop.conf.Configuration; @@ -17,14 +24,15 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import datawave.ingest.mapreduce.handler.shard.ShardIdFactory; import datawave.ingest.mapreduce.handler.shard.ShardedDataTypeHandler; import datawave.util.time.DateHelper; -public class SplitsFile { - private static final Logger log = Logger.getLogger(SplitsFile.class); +public class SplitsFile implements SplitsCache { + private static final Logger log = LoggerFactory.getLogger(SplitsFile.class); public static final String SPLIT_WORK_DIR = "split.work.dir"; public static final String MAX_SHARDS_PER_TSERVER = "shardedMap.max.shards.per.tserver"; @@ -33,7 +41,25 @@ public class SplitsFile { public static final String CONFIGURED_SHARDED_TABLE_NAMES = ShardedDataTypeHandler.SHARDED_TNAMES + ".configured"; public static final String DIST_CACHE_LABEL = "splitsFile"; - public static void setupFile(Job job, Configuration conf) throws IOException, URISyntaxException, AccumuloSecurityException, AccumuloException { + private static final long NOW = System.currentTimeMillis(); + private static final String TODAY = formatDay(0); + + private ConcurrentHashMap> shardPartitionsByTable; + private TableSplitsCache instance; + private Configuration conf; + + public SplitsFile() { + this.shardPartitionsByTable = new ConcurrentHashMap<>(); + } + + @Override + public void init(Configuration conf) { + this.conf = conf; + this.instance = TableSplitsCache.getCurrentCache(conf); + } + + @Override + public void setupJob(Job job) throws IOException { Path baseSplitsPath = TableSplitsCache.getSplitsPath(conf); FileSystem sourceFs = baseSplitsPath.getFileSystem(conf); @@ -45,12 +71,11 @@ public static void setupFile(Job job, Configuration conf) throws IOException, UR try { log.info("Base splits: " + baseSplitsPath); - Path destSplits = new Path( - conf.get(SPLIT_WORK_DIR) + "/" + conf.get(TableSplitsCache.SPLITS_CACHE_FILE, TableSplitsCache.DEFAULT_SPLITS_CACHE_FILE)); + Path destSplits = new Path(conf.get(SPLIT_WORK_DIR) + "/" + conf.get(SPLITS_CACHE_FILE, TableSplitsCache.DEFAULT_SPLITS_CACHE_FILE)); log.info("Dest splits: " + destSplits); FileUtil.copy(sourceFs, baseSplitsPath, destFs, destSplits, false, conf); - conf.set(TableSplitsCache.SPLITS_CACHE_DIR, conf.get(SPLIT_WORK_DIR)); + conf.set(SPLITS_CACHE_DIR, conf.get(SPLIT_WORK_DIR)); // // if we want the freshest splits, go ahead and update them in the work dir // if (TableSplitsCache.shouldRefreshSplits(conf)) { @@ -64,22 +89,110 @@ public static void setupFile(Job job, Configuration conf) throws IOException, UR } } catch (Exception e) { - log.error("Unable to use splits file because " + e.getMessage()); - throw e; + log.error("Unable to use splits file because {}", e.getMessage(), e); + throw new IOException(e); + } + } + + @Override + public void close() { + // no code + } + + @Override + public boolean hasSplits() { + try { + return !instance.getSplits().isEmpty(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public int getSplitsCount(String table) { + int count = 0; + try { + List splits = instance.getSplits(table); + count = splits == null ? -1 : splits.size(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return count; + } + + @Override + public String getExactLocation(String table, Text key, Supplier defaultFn) { + String location; + try { + Map locationByTable = instance.getSplitsAndLocationByTable(table); + String keyValue = locationByTable.get(key); + location = keyValue == null ? defaultFn.get() : keyValue; + } catch (IOException e) { + throw new UncheckedIOException(e); } + return location; } - private static void validate(Configuration conf) throws IOException { + @Override + public int getExactIndex(String table, Text key) { + List splits; + try { + splits = instance.getSplits(table); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return splits != null ? Collections.binarySearch(splits, key) : -1; + } + + @Override + public int getExactPartition(String tableName, Text shardId) { + Map assignments; + try { + assignments = lazilyCreateAssignments(tableName); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + Integer partitionId = assignments.get(shardId); + if (partitionId != null) { + return partitionId; + } + return -1; + } + + @Override + public int getNearestPartition(String tableName, Text shardId) { + Map assignments = null; + try { + assignments = lazilyCreateAssignments(tableName); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + Integer partitionId = assignments.get(shardId); + if (partitionId != null) { + return partitionId; + } + + List keys = new ArrayList<>(assignments.keySet()); + Collections.sort(keys); + int closestAssignment = Collections.binarySearch(keys, shardId); + if (closestAssignment >= 0) { + log.warn("Something is screwy, found {} on the second try", shardId); + return assignments.get(shardId); + } + // insertion point in the index of the key greater + Text shardString = keys.get(Math.abs(closestAssignment + 1)); + return assignments.get(shardString); + } + + public void validate(Configuration conf) throws IOException { TableSplitsCache cache = TableSplitsCache.getCurrentCache(conf); int daysToVerify = conf.getInt(SHARDS_BALANCED_DAYS_TO_VERIFY, 2) - 1; - for (String table : conf.getStrings(ShardedDataTypeHandler.SHARDED_TNAMES)) { validateShardIdLocations(conf, table, daysToVerify, cache.getSplitsAndLocationByTable(table)); } - } - public static void validateShardIdLocations(Configuration conf, String tableName, int daysToVerify, Map shardIdToLocation) { + public void validateShardIdLocations(Configuration conf, String tableName, int daysToVerify, Map shardIdToLocation) { ShardIdFactory shardIdFactory = new ShardIdFactory(conf); // assume true unless proven otherwise boolean isValid = true; @@ -107,7 +220,125 @@ public static void validateShardIdLocations(Configuration conf, String tableName } } - private static boolean prefixMatches(byte[] prefixBytes, byte[] keyBytes, int keyLen) { + /** + * For a given tablename, provides the mapping from {@code shard id -> partition} + * + * @param tableName + * the name of the table + * @return a list of the shard mappings + * @throws IOException + * if there is an issue with read or write + */ + private Map lazilyCreateAssignments(String tableName) throws IOException { + Map assignments = this.shardPartitionsByTable.get(tableName); + if (null == assignments) { + assignments = getPartitionsByShardId(tableName); + this.shardPartitionsByTable.put(tableName, assignments); + } + return assignments; + } + + /** + * Loads the splits file for the table name and uses it to assign partitions. + * + * @param tableName + * name of the table + * @return a map of the partitions + * @throws IOException + * if there is an issue with read or write + */ + private HashMap getPartitionsByShardId(String tableName) throws IOException { + if (log.isDebugEnabled()) + log.debug("Loading splits data for " + tableName); + + List sortedSplits = instance.getSplits(tableName); + Map shardIdToLocation = instance.getSplitsAndLocationByTable(tableName); + + if (log.isDebugEnabled()) + log.debug("Assigning partitioners for each shard in " + tableName); + return assignPartitionsForEachShard(sortedSplits, shardIdToLocation); + } + + /** + * 1. sorts the the tablet assignments by shard id, starting with the most recent going backwards
+ * 2. assigns partitions to each tservers, starting from the beginning, but skipping future dates
+ * 3. assigns partitions to each shardid by looking up its tserver's assignments
+ * 4. returns the {@code shard id -> partition} map + *

+ * e.g.,
+ * 1. sorted assignments
+ * 2. tserver map
+ * 3. shard map: {@code future->tserver7 *no change* future->2 shard4->tserver2 tserver2->0 shard4->0 + * shard3->tserver3 tserver3->1 shard3->1 shard2->tserver2 *no change* shard2->0 shard1->tserver7 tserver7->2 shard1->2} + * + * @param shardIdToLocations + * the map of shard ids and their location + * @return shardId to + */ + private HashMap assignPartitionsForEachShard(List sortedShardIds, Map shardIdToLocations) { + int totalNumUniqueTServers = calculateNumberOfUniqueTservers(shardIdToLocations); + + HashMap partitionsByTServer = getTServerAssignments(totalNumUniqueTServers, sortedShardIds, shardIdToLocations); + HashMap partitionsByShardId = getShardIdAssignments(shardIdToLocations, partitionsByTServer); + + if (log.isDebugEnabled()) + log.debug("Number of shardIds assigned: " + partitionsByShardId.size()); + + return partitionsByShardId; + } + + private int calculateNumberOfUniqueTservers(Map shardIdToLocations) { + + int totalNumUniqueTServers = new HashSet(shardIdToLocations.values()).size(); + if (log.isDebugEnabled()) + log.debug("Total TServers involved: " + totalNumUniqueTServers); + return totalNumUniqueTServers; + } + + private HashMap getTServerAssignments(int totalNumTServers, List sortedShardIds, Map shardIdsToTservers) { + HashMap partitionsByTServer = new HashMap<>(totalNumTServers); + int nextAvailableSlot = 0; + boolean alreadySkippedFutureShards = false; + for (Text shard : sortedShardIds) { + if (alreadySkippedFutureShards || !isFutureShard(shard)) { // short circuiting for performance + alreadySkippedFutureShards = true; + String location = shardIdsToTservers.get(shard); + Integer assignedPartition = partitionsByTServer.get(location); + if (null == assignedPartition) { + assignedPartition = nextAvailableSlot; + partitionsByTServer.put(location, assignedPartition); + nextAvailableSlot++; + } + if (partitionsByTServer.size() == totalNumTServers) { + // all the tservers have been assigned partitions, so we can stop + return partitionsByTServer; + } + } + } + return partitionsByTServer; + } + + private boolean isFutureShard(Text shardId) { + String shardIdStr = shardId.toString().intern(); + if (shardIdStr.length() < 8) { + return true; + } + return shardIdStr.substring(0, 8).compareTo(TODAY) > 0; + } + + private static String formatDay(int numDaysBack) { + return DateHelper.format(NOW - (DateUtils.MILLIS_PER_DAY * numDaysBack)); + } + + private HashMap getShardIdAssignments(Map shardIdsToTservers, HashMap partitionsByTServer) { + HashMap partitionsByShardId = new HashMap<>(); + for (Map.Entry entry : shardIdsToTservers.entrySet()) { + partitionsByShardId.put(entry.getKey(), partitionsByTServer.get(entry.getValue())); + } + return partitionsByShardId; + } + + private boolean prefixMatches(byte[] prefixBytes, byte[] keyBytes, int keyLen) { // if key length is less than prefix size, no use comparing if (prefixBytes.length > keyLen) { return false; @@ -132,7 +363,7 @@ private static boolean prefixMatches(byte[] prefixBytes, byte[] keyBytes, int ke * that should exist * @return if the number of shards for the given date are as expected */ - private static boolean shardsExistForDate(Map locations, String datePrefix, int expectedNumberOfShards) { + private boolean shardsExistForDate(Map locations, String datePrefix, int expectedNumberOfShards) { int count = 0; byte[] prefixBytes = datePrefix.getBytes(); for (Text key : locations.keySet()) { @@ -152,7 +383,7 @@ private static boolean shardsExistForDate(Map locations, String dat * to check * @return if the shards are distributed in a balanced fashion */ - private static boolean shardsAreBalanced(Map locations, String datePrefix, int maxShardsPerTserver) { + private boolean shardsAreBalanced(Map locations, String datePrefix, int maxShardsPerTserver) { // assume true unless proven wrong boolean dateIsBalanced = true; @@ -173,7 +404,7 @@ private static boolean shardsAreBalanced(Map locations, String date // if shard is assigned to more tservers than allowed, then the shards are not balanced if (cnt.intValue() > maxShardsPerTserver) { - log.warn(cnt.toInteger() + " Shards for " + datePrefix + " assigned to tablet " + value); + log.warn("{} Shards for {} assigned to tablet {}", cnt.toInteger(), datePrefix, value); dateIsBalanced = false; } @@ -184,16 +415,32 @@ private static boolean shardsAreBalanced(Map locations, String date return dateIsBalanced; } - public static Map> getSplits(Configuration conf) throws IOException { - return TableSplitsCache.getCurrentCache(conf).getSplits(); - + public Map getSplitsAndLocations(Configuration conf, String tableName) throws IOException { + return instance.getSplitsAndLocationByTable(tableName); } - public static Map getSplitsAndLocations(Configuration conf, String tableName) throws IOException { - return TableSplitsCache.getCurrentCache(conf).getSplitsAndLocationByTable(tableName); + public List getSplits(Configuration conf, String tableName) throws IOException { + return instance.getSplits(tableName); } - public static List getSplits(Configuration conf, String tableName) throws IOException { - return TableSplitsCache.getCurrentCache(conf).getSplits(tableName); + /** + * Finds two contiguous table splits that contain the specified row should reside + * + * @param lookupRow + * Row value to be mapped + * @param tableSplits + * Splits for the table in question + * @return KeyExtent mapping for the given row + */ + static LoadPlan.TableSplits findContainingSplits(Text lookupRow, List tableSplits) { + int position = Collections.binarySearch(tableSplits, lookupRow); + if (position < 0) { + position = -1 * (position + 1); + } + + Text prevRow = position == 0 ? null : tableSplits.get(position - 1); + Text endRow = position == tableSplits.size() ? null : tableSplits.get(position); + + return new LoadPlan.TableSplits(prevRow, endRow); } } diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/TableSplitsCache.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/TableSplitsCache.java index 192702e1075..b4d12bc11b7 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/TableSplitsCache.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/TableSplitsCache.java @@ -1,5 +1,8 @@ package datawave.ingest.mapreduce.job; +import static datawave.ingest.mapreduce.job.SplitsConstants.SPLITS_CACHE_DIR; +import static datawave.ingest.mapreduce.job.SplitsConstants.SPLITS_CACHE_FILE; + import java.io.BufferedOutputStream; import java.io.BufferedReader; import java.io.File; @@ -51,14 +54,12 @@ public class TableSplitsCache extends BaseHdfsFileCacheUtil { public static final String REFRESH_SPLITS = "datawave.ingest.refresh.splits"; - public static final String SPLITS_CACHE_DIR = "datawave.ingest.splits.cache.dir"; - public static final String SPLITS_CACHE_FILE = "datawave.ingest.splits.cache.fileName"; + public static final String DEFAULT_SPLITS_CACHE_FILE = "all-splits.txt"; - public static final String MAX_SPLIT_DECREASE = "datawave.ingest.splits.max.decrease.number"; - public static final String MAX_SPLIT_PERCENTAGE_DECREASE = "datawave.ingest.splits.max.decrease.percentage"; + private static final String MAX_SPLIT_DECREASE = "datawave.ingest.splits.max.decrease.number"; + private static final String MAX_SPLIT_PERCENTAGE_DECREASE = "datawave.ingest.splits.max.decrease.percentage"; private static final Logger log = Logger.getLogger(TableSplitsCache.class); private static final String DEFAULT_SPLITS_CACHE_DIR = "/data/splitsCache"; - public static final String DEFAULT_SPLITS_CACHE_FILE = "all-splits.txt"; private static final short DEFAULT_MAX_SPLIT_DECREASE = 42; private static final double DEFAULT_MAX_SPLIT_PERCENTAGE_DECREASE = .5; private static final boolean DEFAULT_REFRESH_SPLITS = true; diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reindex/RepartitionerJob.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reindex/RepartitionerJob.java index 2b13c25d7be..ba8e694766f 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reindex/RepartitionerJob.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reindex/RepartitionerJob.java @@ -41,7 +41,7 @@ import datawave.ingest.mapreduce.job.DelegatingPartitioner; import datawave.ingest.mapreduce.job.IngestJob; import datawave.ingest.mapreduce.job.RFileInputFormat; -import datawave.ingest.mapreduce.job.SplitsFile; +import datawave.ingest.mapreduce.job.SplitsCache; import datawave.ingest.mapreduce.job.reduce.BulkIngestKeyDedupeCombiner; import datawave.ingest.mapreduce.job.writer.AbstractContextWriter; import datawave.ingest.mapreduce.job.writer.ContextWriter; @@ -103,7 +103,7 @@ private Job setupJob() throws URISyntaxException, IOException, AccumuloException Configuration config = j.getConfiguration(); // setup and cache table from config - SplitsFile.setupFile(j, config); + SplitsCache.getInstance(config).setupJob(j); Set tableNames = IngestJob.setupAndCacheTables(config, false); config.setInt("splits.num.reduce", jobConfig.reducers); diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reindex/ShardReindexJob.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reindex/ShardReindexJob.java index 55f202d41e7..05ebb9f8413 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reindex/ShardReindexJob.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reindex/ShardReindexJob.java @@ -61,7 +61,7 @@ import datawave.ingest.mapreduce.job.IngestJob; import datawave.ingest.mapreduce.job.MultiRFileOutputFormatter; import datawave.ingest.mapreduce.job.RFileInputFormat; -import datawave.ingest.mapreduce.job.SplitsFile; +import datawave.ingest.mapreduce.job.SplitsCache; import datawave.ingest.mapreduce.job.reduce.BulkIngestKeyAggregatingReducer; import datawave.ingest.mapreduce.job.reduce.BulkIngestKeyDedupeCombiner; import datawave.ingest.mapreduce.job.util.AccumuloUtil; @@ -230,7 +230,7 @@ private Job setupJob() throws IOException, ParseException, AccumuloException, Ta // all changes to configuration must be before this line Job j = Job.getInstance(getConf()); Configuration config = j.getConfiguration(); - SplitsFile.setupFile(j, config); + SplitsCache.getInstance(config).setupJob(j); // check if using some form of accumulo in input if (jobConfig.inputFiles == null) { diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/partition/BalancedShardPartitioner.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/partition/BalancedShardPartitioner.java index 8c48e537167..d0e75f4b068 100755 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/partition/BalancedShardPartitioner.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/partition/BalancedShardPartitioner.java @@ -1,15 +1,10 @@ package datawave.ingest.mapreduce.partition; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; -import java.util.List; import java.util.Map; import org.apache.accumulo.core.data.Value; -import org.apache.commons.lang.time.DateUtils; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; @@ -20,8 +15,7 @@ import datawave.ingest.mapreduce.handler.shard.ShardIdFactory; import datawave.ingest.mapreduce.handler.shard.ShardedDataTypeHandler; import datawave.ingest.mapreduce.job.BulkIngestKey; -import datawave.ingest.mapreduce.job.SplitsFile; -import datawave.util.time.DateHelper; +import datawave.ingest.mapreduce.job.SplitsCache; /** * The BalancedShardPartitioner takes advantage of the way that shards are balanced. See ShardedTableTabletBalancer. * The partitioner is designed to have no @@ -34,19 +28,33 @@ */ public class BalancedShardPartitioner extends Partitioner implements Configurable, DelegatePartitioner { private static final Logger log = Logger.getLogger(BalancedShardPartitioner.class); - private static final long now = System.currentTimeMillis(); - private static final String today = formatDay(0); private Configuration conf; private Map> shardPartitionsByTable; private Map offsetsFactorByTable; + private SplitsCache splitsCache; + private String missingShardStrategy; int missingShardIdCount = 0; + public enum MissingShardStrategy { + hash, collapse; + + public static MissingShardStrategy getStrategy(String stratString) { + if (collapse.name().equals(stratString)) { + return collapse; + } else { + return hash; + } + } + } + + private MissingShardStrategy strategy; + public static final String MISSING_SHARD_STRATEGY_PROP = "datawave.ingest.mapreduce.partition.BalancedShardPartitioner.missing.shard.strategy"; private ShardIdFactory shardIdFactory = null; @Override - public synchronized int getPartition(BulkIngestKey key, Value value, int numReduceTasks) { + public int getPartition(BulkIngestKey key, Value value, int numReduceTasks) { try { // partition will be balanced for a given day, more so for recent days int partition = getAssignedPartition(key.getTableName().toString(), key.getKey().getRow()); @@ -64,18 +72,15 @@ public synchronized int getPartition(BulkIngestKey key, Value value, int numRedu } private int getAssignedPartition(String tableName, Text shardId) throws IOException { - Map assignments = lazilyCreateAssignments(tableName); - - Integer partitionId = assignments.get(shardId); - if (partitionId != null) { - return partitionId; - } // if the partitionId is not there, either shards were not created for the day // or not all shards were created for the day - String missingShardStrategy = conf.get(MISSING_SHARD_STRATEGY_PROP, "hash"); switch (missingShardStrategy) { case "hash": + int partition = splitsCache.getExactPartition(tableName, shardId); + if (partition >= 0) { + return partition; + } // only warn a few times per partitioner to avoid flooding the logs if (missingShardIdCount < 10) { log.warn("shardId didn't have a partition assigned to it: " + shardId); @@ -83,141 +88,12 @@ private int getAssignedPartition(String tableName, Text shardId) throws IOExcept } return (shardId.hashCode() & Integer.MAX_VALUE); case "collapse": - ArrayList keys = new ArrayList<>(assignments.keySet()); - Collections.sort(keys); - int closestAssignment = Collections.binarySearch(keys, shardId); - if (closestAssignment >= 0) { - // Should have found it earlier, but just in case go ahead and return it - log.warn("Something is screwy, found " + shardId + " on the second try"); - return assignments.get(shardId); - } - // (-(insertion point) - 1) // insertion point in the index of the key greater - Text shardString = keys.get(Math.abs(closestAssignment + 1)); - return assignments.get(shardString); + return splitsCache.getNearestPartition(tableName, shardId); default: throw new RuntimeException("Unsupported missing shard strategy " + MISSING_SHARD_STRATEGY_PROP + "=" + missingShardStrategy); } } - /** - * For a given tablename, provides the mapping from {@code shard id -> partition} - * - * @param tableName - * the name of the table - * @return a list of the shard mappings - * @throws IOException - * if there is an issue with read or write - */ - private Map lazilyCreateAssignments(String tableName) throws IOException { - if (this.shardPartitionsByTable == null) { - this.shardPartitionsByTable = new HashMap<>(); - } - if (null == this.shardPartitionsByTable.get(tableName)) { - this.shardPartitionsByTable.put(tableName, getPartitionsByShardId(tableName)); - } - return this.shardPartitionsByTable.get(tableName); - } - - /** - * Loads the splits file for the table name and uses it to assign partitions. - * - * @param tableName - * name of the table - * @return a map of the partitions - * @throws IOException - * if there is an issue with read or write - */ - private HashMap getPartitionsByShardId(String tableName) throws IOException { - if (log.isDebugEnabled()) - log.debug("Loading splits data for " + tableName); - - List sortedSplits = SplitsFile.getSplits(conf, tableName); - Map shardIdToLocation = SplitsFile.getSplitsAndLocations(conf, tableName); - - if (log.isDebugEnabled()) - log.debug("Assigning partitioners for each shard in " + tableName); - return assignPartitionsForEachShard(sortedSplits, shardIdToLocation); - } - - /** - * 1. sorts the the tablet assignments by shard id, starting with the most recent going backwards
- * 2. assigns partitions to each tservers, starting from the beginning, but skipping future dates
- * 3. assigns partitions to each shardid by looking up its tserver's assignments
- * 4. returns the {@code shard id -> partition} map - *

- * e.g.,
- * 1. sorted assignments
- * 2. tserver map
- * 3. shard map: {@code future->tserver7 *no change* future->2 shard4->tserver2 tserver2->0 shard4->0 - * shard3->tserver3 tserver3->1 shard3->1 shard2->tserver2 *no change* shard2->0 shard1->tserver7 tserver7->2 shard1->2} - * - * @param shardIdToLocations - * the map of shard ids and their location - * @return shardId to - */ - private HashMap assignPartitionsForEachShard(List sortedShardIds, Map shardIdToLocations) { - int totalNumUniqueTServers = calculateNumberOfUniqueTservers(shardIdToLocations); - - HashMap partitionsByTServer = getTServerAssignments(totalNumUniqueTServers, sortedShardIds, shardIdToLocations); - HashMap partitionsByShardId = getShardIdAssignments(shardIdToLocations, partitionsByTServer); - - if (log.isDebugEnabled()) - log.debug("Number of shardIds assigned: " + partitionsByShardId.size()); - - return partitionsByShardId; - } - - private int calculateNumberOfUniqueTservers(Map shardIdToLocations) { - - int totalNumUniqueTServers = new HashSet(shardIdToLocations.values()).size(); - if (log.isDebugEnabled()) - log.debug("Total TServers involved: " + totalNumUniqueTServers); - return totalNumUniqueTServers; - } - - private HashMap getTServerAssignments(int totalNumTServers, List sortedShardIds, Map shardIdsToTservers) { - HashMap partitionsByTServer = new HashMap<>(totalNumTServers); - int nextAvailableSlot = 0; - boolean alreadySkippedFutureShards = false; - for (Text shard : sortedShardIds) { - if (alreadySkippedFutureShards || !isFutureShard(shard)) { // short circuiting for performance - alreadySkippedFutureShards = true; - String location = shardIdsToTservers.get(shard); - Integer assignedPartition = partitionsByTServer.get(location); - if (null == assignedPartition) { - assignedPartition = nextAvailableSlot; - partitionsByTServer.put(location, assignedPartition); - nextAvailableSlot++; - } - if (partitionsByTServer.size() == totalNumTServers) { - // all the tservers have been assigned partitions, so we can stop - return partitionsByTServer; - } - } - } - return partitionsByTServer; - } - - private static boolean isFutureShard(Text shardId) { - String shardIdStr = shardId.toString().intern(); - if (shardIdStr.length() < 8) { - return true; - } - return shardIdStr.substring(0, 8).compareTo(today) > 0; - } - - private static String formatDay(int numDaysBack) { - return DateHelper.format(now - (DateUtils.MILLIS_PER_DAY * numDaysBack)); - } - - private HashMap getShardIdAssignments(Map shardIdsToTservers, HashMap partitionsByTServer) { - HashMap partitionsByShardId = new HashMap<>(); - for (Map.Entry entry : shardIdsToTservers.entrySet()) { - partitionsByShardId.put(entry.getKey(), partitionsByTServer.get(entry.getValue())); - } - return partitionsByShardId; - } - @Override public void configureWithPrefix(String prefix) {/* no op */} @@ -248,6 +124,10 @@ public Configuration getConf() { public void setConf(Configuration conf) { this.conf = conf; shardIdFactory = new ShardIdFactory(conf); + missingShardStrategy = conf.get(MISSING_SHARD_STRATEGY_PROP, "hash"); + MissingShardStrategy shardStrat = MissingShardStrategy.getStrategy(conf.get(MISSING_SHARD_STRATEGY_PROP)); + splitsCache = SplitsCache.getInstance(conf); + defineOffsetsForTables(conf); } diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/partition/MultiTableRangePartitioner.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/partition/MultiTableRangePartitioner.java index 19d44c9e1f7..7d105ed8fcc 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/partition/MultiTableRangePartitioner.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/partition/MultiTableRangePartitioner.java @@ -3,8 +3,6 @@ import java.io.IOException; import java.text.DecimalFormat; import java.util.Arrays; -import java.util.Collections; -import java.util.List; import org.apache.accumulo.core.data.Value; import org.apache.hadoop.conf.Configuration; @@ -16,7 +14,7 @@ import org.apache.log4j.Logger; import datawave.ingest.mapreduce.job.BulkIngestKey; -import datawave.ingest.mapreduce.job.SplitsFile; +import datawave.ingest.mapreduce.job.SplitsCache; /** * Range partitioner that uses a split file with the format: {@code tableNamesplitPointtabletLocation} @@ -36,14 +34,14 @@ public class MultiTableRangePartitioner extends Partitioner private DecimalFormat formatter = new DecimalFormat("000"); private Configuration conf; private PartitionLimiter partitionLimiter; - protected Object semaphore = new Object(); + private SplitsCache splitsCache; private void readCacheFilesIfNecessary() { if (cacheFilesRead) { return; } - synchronized (semaphore) { + synchronized (this) { if (cacheFilesRead) { return; } @@ -61,7 +59,7 @@ private void readCacheFilesIfNecessary() { } try { - if (SplitsFile.getSplits(conf).isEmpty()) { + if (!splitsCache.hasSplits()) { log.error("Non-sharded splits by table cannot be empty. If this is a development system, please create at least one split in one of the non-sharded tables (see bin/ingest/seed_index_splits.sh)."); throw new IOException("splits by table cannot be empty"); } @@ -77,21 +75,21 @@ private void readCacheFilesIfNecessary() { @Override public int getPartition(BulkIngestKey key, Value value, int numPartitions) { readCacheFilesIfNecessary(); + holder.clear(); String tableName = key.getTableName().toString(); + key.getKey().getRow(holder); - List cutPointArray = null; - try { - cutPointArray = SplitsFile.getSplits(conf, tableName); - } catch (IOException e) { - log.error("Failed to read splits in MultiTableRangePartitioner for " + tableName); - } - if (null == cutPointArray) { + int splitSize = splitsCache.getSplitsCount(tableName); + if (splitSize < 0) { return (tableName.hashCode() & Integer.MAX_VALUE) % numPartitions; } - key.getKey().getRow(holder); - int index = Collections.binarySearch(cutPointArray, holder); - index = calculateIndex(index, numPartitions, tableName, cutPointArray.size()); + + int index = splitsCache.getExactIndex(tableName, holder); + + // Note that cut-point length may be used by derived classes + // even though its not used below + index = calculateIndex(index, numPartitions, tableName, splitSize); index = partitionLimiter.limit(numPartitions, index); @@ -118,6 +116,7 @@ public static void setContext(TaskInputOutputContext context) { public void setConf(Configuration conf) { this.conf = conf; partitionLimiter = new PartitionLimiter(conf); + splitsCache = SplitsCache.getInstance(conf); if (partitionLimiter.getNumPartitions() == 0) { partitionLimiter.setMaxPartitions(Integer.MAX_VALUE); } diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/partition/TabletLocationHashPartitioner.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/partition/TabletLocationHashPartitioner.java index 14560705b45..7d3cabda861 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/partition/TabletLocationHashPartitioner.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/partition/TabletLocationHashPartitioner.java @@ -1,7 +1,5 @@ package datawave.ingest.mapreduce.partition; -import java.io.IOException; -import java.util.HashMap; import java.util.Map; import org.apache.accumulo.core.data.Value; @@ -13,7 +11,7 @@ import org.apache.log4j.Logger; import datawave.ingest.mapreduce.job.BulkIngestKey; -import datawave.ingest.mapreduce.job.SplitsFile; +import datawave.ingest.mapreduce.job.SplitsCache; /** * The TabletLocationHashPartitioner will generate partitions for the shard table using the hashCode method on the tserver location string @@ -23,6 +21,7 @@ public class TabletLocationHashPartitioner extends Partitioner> shardHashes; + private SplitsCache splitsCache; /** * Given a map of shard IDs to tablet server locations, this method determines a partition for a given key's shard ID. The goal is that we want to ensure @@ -35,18 +34,14 @@ public class TabletLocationHashPartitioner extends Partitioner shardHash = getShardHashes(key.getTableName().toString()); - Integer hash = shardHash.get(shardId); - if (hash != null) { - return (hash & Integer.MAX_VALUE) % numReduceTasks; - } else { - return (shardId.hashCode() & Integer.MAX_VALUE) % numReduceTasks; - } - } catch (IOException e) { - throw new RuntimeException(e); + public int getPartition(BulkIngestKey key, Value value, int numReduceTasks) { + Text shardId = key.getKey().getRow(); + String location = splitsCache.getExactLocation(key.getTableName().toString(), shardId, () -> null); + if (location != null) { + int hash = location.hashCode(); + return (hash & Integer.MAX_VALUE) % numReduceTasks; + } else { + return (shardId.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } } @@ -58,35 +53,7 @@ public Configuration getConf() { @Override public void setConf(Configuration conf) { this.conf = conf; - } - - /** - * hashCode of the tserver name Read in the sequence file (that was created at job startup) that contains a list of shard IDs and the corresponding tablet - * server to which that shard is assigned. The hash is a simple hashCode of the location string. - * - * @param tableName - * the table name - * @throws IOException - * for issues with read or write - * @return a mapping of the shard hashes - */ - private Map getShardHashes(String tableName) throws IOException { - if (this.shardHashes == null) { - this.shardHashes = new HashMap<>(); - } - - if (null == this.shardHashes.get(tableName)) { - Map hashedForTable = new HashMap<>(); - - for (Map.Entry entry : SplitsFile.getSplitsAndLocations(conf, tableName).entrySet()) { - - hashedForTable.put(entry.getKey(), entry.getValue().toString().hashCode()); - } - - this.shardHashes.put(tableName, hashedForTable); - } - - return this.shardHashes.get(tableName); + this.splitsCache = SplitsCache.getInstance(conf); } @Override diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/partition/TabletLocationNamePartitioner.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/partition/TabletLocationNamePartitioner.java deleted file mode 100644 index c2a327bde61..00000000000 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/partition/TabletLocationNamePartitioner.java +++ /dev/null @@ -1,139 +0,0 @@ -package datawave.ingest.mapreduce.partition; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; -import java.util.SortedSet; -import java.util.TreeSet; - -import org.apache.accumulo.core.data.Value; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Partitioner; -import org.apache.log4j.Logger; - -import datawave.ingest.mapreduce.job.BulkIngestKey; -import datawave.ingest.mapreduce.job.SplitsFile; - -/** - * The TabletLocationHashPartitioner will look up the shard tablet servers, sort those tablet servers and then partition based on the index into that sorted - * list. - */ -public class TabletLocationNamePartitioner extends Partitioner implements Configurable, DelegatePartitioner { - private static final Logger log = Logger.getLogger(TabletLocationNamePartitioner.class); - private Configuration conf; - private Map> shardLocations; - - @Override - public synchronized int getPartition(BulkIngestKey key, Value value, int numReduceTasks) { - - try { - return getLocationPartition(key.getKey().getRow(), getShardLocations(key.getTableName().toString()), numReduceTasks); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public Configuration getConf() { - return conf; - } - - @Override - public void setConf(Configuration conf) { - this.conf = conf; - } - - // this is used by two schemes - /** - * Given a map of shard IDs to tablet server locations, this method determines a partition for a given key's shard ID. The goal is that we want to ensure - * that all shard IDs served by a given tablet server get sent to the same reducer. To do this, we look up where the shard ID is supposed to be stored and - * use a hash of that (modded by the number of reduces) to come up with the final allocation. This mapping needs to be computed at job startup and, so long - * as no migration goes on during a job, will produce a single map file per tablet server. Note that it is also possible that we receive data for a day that - * hasn't been loaded yet. In that case, we'll just hash the shard ID and send data to that reducer. This will spread out the data for a given day, but the - * map files produced for it will not belong to any given tablet server. So, in the worst case, we have other older data when is already assigned to a - * tablet server and new data which is not. In this case, we'd end up sending two map files to each tablet server. Of course, if tablets get moved around - * between when the job starts and the map files are loaded, then we may end up sending multiple map files to each tablet server. - * - * @param shardId - * the shard id - * @param shardHash - * the shard hash map - * @param numReduceTasks - * the number of reducer tasks - * @return the partition - */ - private int getLocationPartition(Text shardId, Map shardHash, int numReduceTasks) { - int partition = 0; - Integer hash = shardHash.get(shardId); - if (hash != null) { - partition = (hash & Integer.MAX_VALUE) % numReduceTasks; - } else { - partition = (shardId.hashCode() & Integer.MAX_VALUE) % numReduceTasks; - } - return partition; - } - - /** - * Sorts the tserver locations for all the shard ids. Each shardId is assigned a number which corresponds to where its tserver is found in that sorted list. - * sort by most recent shard id, tserver - * - * suggested change: sort by most recent day's locations first, not by location id - * - * Read in the sequence file (that was created at job startup) that contains a list of shard IDs and the corresponding tablet server to which that shard is - * assigned. The location is a number generated by adding the tuples of the location ip address and port. - * - * @param tableName - * the table name - * @return a map of the shard locations - * @throws IOException - * for read write issues - */ - private Map getShardLocations(String tableName) throws IOException { - if (this.shardLocations == null) { - this.shardLocations = new HashMap<>(); - } - - if (null == this.shardLocations.get(tableName)) { - Map shards = SplitsFile.getSplitsAndLocations(conf, tableName); - - // now sort the locations - SortedSet locations = new TreeSet<>(); - locations.addAll(shards.values()); - - ArrayList locList = new ArrayList<>(locations); - HashMap localShardLocations = new HashMap<>(); - for (Map.Entry entry : shards.entrySet()) { - localShardLocations.put(entry.getKey(), locList.indexOf(entry.getValue())); - } - - this.shardLocations.put(tableName, localShardLocations); - } - - return this.shardLocations.get(tableName); - } - - @Override - public void configureWithPrefix(String prefix) {/* no op */} - - @Override - public int getNumPartitions() { - return Integer.MAX_VALUE; - } - - @Override - public void initializeJob(Job job) {} - - @Override - public boolean needSplits() { - return true; - } - - @Override - public boolean needSplitLocations() { - return true; - } -} diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/util/GenerateSplitsFile.java b/warehouse/ingest-core/src/main/java/datawave/ingest/util/GenerateSplitsFile.java index 7abd33302e9..2662b273512 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/util/GenerateSplitsFile.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/util/GenerateSplitsFile.java @@ -8,6 +8,7 @@ import org.apache.commons.cli.ParseException; import org.apache.hadoop.conf.Configuration; +import datawave.ingest.mapreduce.job.SplitsConstants; import datawave.ingest.mapreduce.job.TableSplitsCache; /** @@ -42,8 +43,8 @@ public static void main(String[] args) { System.exit(1); } if (cl.hasOption("sp")) { - conf.set(TableSplitsCache.SPLITS_CACHE_DIR, cl.getOptionValue("sp")); - log.info("Set " + TableSplitsCache.SPLITS_CACHE_DIR + " to " + cl.getOptionValue("sp")); + conf.set(SplitsConstants.SPLITS_CACHE_DIR, cl.getOptionValue("sp")); + log.info("Set " + SplitsConstants.SPLITS_CACHE_DIR + " to " + cl.getOptionValue("sp")); } if (cl.hasOption("cs")) { configSuffix = cl.getOptionValue("cs"); diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/MultiRFileOutputFormatterTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/MultiRFileOutputFormatterTest.java index 2d8b236987f..f160953e434 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/MultiRFileOutputFormatterTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/MultiRFileOutputFormatterTest.java @@ -1,7 +1,6 @@ package datawave.ingest.mapreduce.job; import static datawave.ingest.mapreduce.job.BulkIngestMapFileLoader.BULK_IMPORT_MODE_CONFIG; -import static datawave.ingest.mapreduce.job.MultiRFileOutputFormatter.findContainingSplits; import static org.junit.Assert.assertEquals; import java.io.IOException; @@ -159,8 +158,6 @@ public void testSetCompressionTypeWithBadType() { } } - private Map> tableLoadPlans = new HashMap<>(); - private ArrayList getSplits() { var arr = new ArrayList(); arr.add(new Text("20170601_0")); // 0 @@ -235,7 +232,7 @@ public void testPlanning() { expectedExtents.add(new LoadPlan.TableSplits(new Text("20170603_0b"), new Text("20170603_0c"))); List tableSplits = getSplits(); - Set extents = rfileRows.stream().map(row -> findContainingSplits(row, tableSplits)) + Set extents = rfileRows.stream().map(row -> SplitsFile.findContainingSplits(row, tableSplits)) .collect(Collectors.toCollection(HashSet::new)); assertEquals(expectedExtents, extents); @@ -459,14 +456,6 @@ protected int getSeqFileBlockSize() { return 1; } - @Override - protected Map getShardLocations(String tableName) throws IOException { - Map locations = new HashMap<>(); - locations.put(new Text("20100101_1"), "server1"); - locations.put(new Text("20100101_2"), "server2"); - return locations; - } - @Override protected SizeTrackingWriter openWriter(String filename, AccumuloConfiguration tableConf, String table) { filenames.add(filename); @@ -515,6 +504,9 @@ public void testTableSeparation() throws IOException, InterruptedException { @Test public void testTableSeparationWithFilePerShardLoc() throws IOException, InterruptedException { + conf.set("shard.fallback.name.20100101_1", "server1"); + conf.set("shard.fallback.name.20100101_2", "server2"); + MultiRFileOutputFormatter.setGenerateMapFilePerShardLocation(conf, true); RecordWriter writer = createWriter(formatter, conf); writeShardPairs(writer, 2); @@ -542,6 +534,9 @@ private void expectShardFiles(int num) { @Test public void testRFileFileSizeLimitWithFilePerShardLoc() throws IOException, InterruptedException { + conf.set("shard.fallback.name.20100101_1", "server1"); + conf.set("shard.fallback.name.20100101_2", "server2"); + MultiRFileOutputFormatter.setGenerateMapFilePerShardLocation(conf, true); // each key we write is 16 characters total, so a limit of 32 should allow two keys per file MultiRFileOutputFormatter.setRFileLimits(conf, 0, 32); @@ -568,6 +563,9 @@ public void testRFileFileSizeLimit() throws IOException, InterruptedException { @Test public void testRFileEntrySizeLimitWithFilePerShardLoc() throws IOException, InterruptedException { + conf.set("shard.fallback.name.20100101_1", "server1"); + conf.set("shard.fallback.name.20100101_2", "server2"); + MultiRFileOutputFormatter.setRFileLimits(conf, 1, 0); MultiRFileOutputFormatter.setGenerateMapFilePerShardLocation(conf, true); RecordWriter writer = createWriter(formatter, conf); @@ -613,4 +611,13 @@ private void writeShardEntry(RecordWriter writer, int shard private void assertNumFileNames(int expectedNumFiles) { assertEquals(filenames.toString(), expectedNumFiles, filenames.size()); } + + private void setConfiguration(Configuration conf, int count) { + conf.set("mapred.output.dir", "/tmp"); + conf.set(SplitsFile.CONFIGURED_SHARDED_TABLE_NAMES, TableName.SHARD); + + for (int i = 1; i <= count; i++) { + conf.set(String.format("shard.fallback.name.20100101_%d", i), "server" + i); + } + } } diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/SplitsFileTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/SplitsFileTest.java index 5c019909f53..47276018ef6 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/SplitsFileTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/SplitsFileTest.java @@ -10,7 +10,6 @@ import java.io.PrintStream; import java.net.URI; import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; @@ -38,20 +37,24 @@ public class SplitsFileTest { private static final String TABLE_NAME = "unitTestTable"; private static final int SHARDS_PER_DAY = 10; private static Configuration conf; + private SplitsFile uut; @TempDir - public static java.nio.file.Path folder; + private java.nio.file.Path tempDir; @BeforeAll - public static void defineShardLocationsFile() { + public static void defineShardLocationsFile() throws IOException { conf = new Configuration(); conf.setInt(ShardIdFactory.NUM_SHARDS, SHARDS_PER_DAY); conf.set(ShardedDataTypeHandler.SHARDED_TNAMES, TableName.SHARD); } @BeforeEach - public void clearCache() { - TableSplitsCache.getCurrentCache(conf).clear(); + public void setUp() throws IOException { + // Wipe the underlying cache that the SplitsFile uses + // state is saved that will impact multiple test iterations + TableSplitsCache.clear(); + uut = new SplitsFile(); } private void createSplitsFile(Map splits, Configuration conf, int expectedNumRows, String tableName) throws IOException { @@ -61,18 +64,16 @@ private void createSplitsFile(Map splits, Configuration conf, int e writeBaseSplitsFile(splits, conf, tableName); long actualCount = splits.size(); - Map SplitsFiles = new HashMap<>(); - // SplitsFile.addToConf(conf, SplitsFiles); assertEquals(expectedNumRows, actualCount, "IngestJob#writeSplitsFile failed to create the expected number of rows"); } private void writeBaseSplitsFile(Map locations, Configuration conf, String tableName) throws IOException { - File tmpBaseSplitDir = folder.toFile(); + File tmpBaseSplitDir = tempDir.toFile(); String splitsFile = TableSplitsCache.DEFAULT_SPLITS_CACHE_FILE; Path splitsPath = new Path(tmpBaseSplitDir.getAbsolutePath() + "/" + splitsFile); FileSystem fs = new Path(tmpBaseSplitDir.getAbsolutePath()).getFileSystem(conf); - conf.set(TableSplitsCache.SPLITS_CACHE_DIR, tmpBaseSplitDir.getAbsolutePath()); + conf.set(SplitsConstants.SPLITS_CACHE_DIR, tmpBaseSplitDir.getAbsolutePath()); // constructor that takes a created list of locations try (PrintStream out = new PrintStream(new BufferedOutputStream(fs.create(splitsPath)))) { @@ -80,11 +81,14 @@ private void writeBaseSplitsFile(Map locations, Configuration conf, TableSplitsCache.getCurrentCache(conf).writeLocations(out, tableName, Lists.newArrayList(locations.keySet()), locations); } assertTrue(fs.exists(splitsPath)); + + conf.set(SplitsConstants.SPLITS_CACHE_DIR, tmpBaseSplitDir.getAbsolutePath()); + } private FileSystem setWorkingDirectory(Configuration conf) throws IOException { FileSystem fs = FileSystem.getLocal(conf); - File tempWorkDir = folder.toFile(); + File tempWorkDir = tempDir.toFile(); fs.setWorkingDirectory(new Path(tempWorkDir.toString())); conf.set(SplitsFile.SPLIT_WORK_DIR, tempWorkDir.toString()); return fs; @@ -93,16 +97,17 @@ private FileSystem setWorkingDirectory(Configuration conf) throws IOException { @Test public void testGetAllSplitsFilesWithoutPath() throws Exception { Configuration conf = new Configuration(); - File tempWorkDir = folder.toFile(); + File tempWorkDir = tempDir.toFile(); conf.set(FileSystem.FS_DEFAULT_NAME_KEY, tempWorkDir.toURI().toString()); FileSystem fs = FileSystem.get(tempWorkDir.toURI(), conf); fs.setWorkingDirectory(new Path(tempWorkDir.toString())); Path workDir = fs.makeQualified(new Path("work")); conf.set(SplitsFile.SPLIT_WORK_DIR, workDir.toString()); - conf.set(ShardedDataTypeHandler.SHARDED_TNAMES, "shard_ingest_unit_test_table_1,shard_ingest_unit_test_table_2,shard_ingest_unit_test_table_3"); - assertThrows(IOException.class, () -> SplitsFile.setupFile(Job.getInstance(conf), conf)); + uut.init(conf); + + assertThrows(IOException.class, () -> uut.setupJob(Job.getInstance(conf))); } @Test @@ -110,11 +115,14 @@ public void testSingleDaySplitsCreated_AndValid() throws Exception { String tableName = "validSplits"; SortedMap splits = createDistributedLocations(tableName); createSplitsFile(splits, conf, splits.size(), tableName); - Map locations = SplitsFile.getSplitsAndLocations(conf, tableName); + + uut.init(conf); + + Map locations = uut.getSplitsAndLocations(conf, tableName); // three days of splits, all should be good, none of these should error - SplitsFile.validateShardIdLocations(conf, tableName, 0, locations); - SplitsFile.validateShardIdLocations(conf, tableName, 1, locations); - SplitsFile.validateShardIdLocations(conf, tableName, 2, locations); + uut.validateShardIdLocations(conf, tableName, 0, locations); + uut.validateShardIdLocations(conf, tableName, 1, locations); + uut.validateShardIdLocations(conf, tableName, 2, locations); } @Test @@ -122,9 +130,12 @@ public void testMissingAllOfTodaysSplits() throws Exception { String tableName = "missingTodaysSplits"; SortedMap splits = simulateMissingSplitsForDay(0, tableName); createSplitsFile(splits, conf, splits.size(), tableName); - Map locations = SplitsFile.getSplitsAndLocations(conf, tableName); + + uut.init(conf); + + Map locations = uut.getSplitsAndLocations(conf, tableName); // three days of splits, today should be invalid, which makes the rest bad too - assertThrows(IllegalStateException.class, () -> SplitsFile.validateShardIdLocations(conf, tableName, 0, locations)); + assertThrows(IllegalStateException.class, () -> uut.validateShardIdLocations(conf, tableName, 0, locations)); } @Test @@ -132,9 +143,12 @@ public void testUnbalancedTodaysSplits() throws Exception { String tableName = "unbalancedTodaysSplits"; SortedMap splits = simulateUnbalancedSplitsForDay(0, tableName); createSplitsFile(splits, conf, splits.size(), tableName); - Map locations = SplitsFile.getSplitsAndLocations(conf, tableName); + + uut.init(conf); + + Map locations = uut.getSplitsAndLocations(conf, tableName); // three days of splits, today should be invalid, which makes the rest bad too - assertThrows(IllegalStateException.class, () -> SplitsFile.validateShardIdLocations(conf, tableName, 0, locations)); + assertThrows(IllegalStateException.class, () -> uut.validateShardIdLocations(conf, tableName, 0, locations)); } @Test @@ -142,13 +156,16 @@ public void testMissingAllOfYesterdaysSplits() throws Exception { String tableName = "missingYesterdaysSplits"; SortedMap splits = simulateMissingSplitsForDay(1, tableName); createSplitsFile(splits, conf, splits.size(), tableName); - Map locations = SplitsFile.getSplitsAndLocations(conf, tableName); - assertEquals(locations.size(), splits.size()); + + uut.init(conf); + + Map locations = uut.getSplitsAndLocations(conf, tableName); + assertEquals(splits.size(), locations.size()); // three days of splits, today should be valid // yesterday and all other days invalid - SplitsFile.validateShardIdLocations(conf, tableName, 0, locations); + uut.validateShardIdLocations(conf, tableName, 0, locations); // this should cause the exception - assertThrows(IllegalStateException.class, () -> SplitsFile.validateShardIdLocations(conf, tableName, 1, locations)); + assertThrows(IllegalStateException.class, () -> uut.validateShardIdLocations(conf, tableName, 1, locations)); } @Test @@ -156,12 +173,15 @@ public void testUnbalancedYesterdaysSplits() throws Exception { String tableName = "unbalancedYesterdaysSplits"; SortedMap splits = simulateUnbalancedSplitsForDay(1, tableName); createSplitsFile(splits, conf, splits.size(), tableName); - Map locations = SplitsFile.getSplitsAndLocations(conf, tableName); + + uut.init(conf); + + Map locations = uut.getSplitsAndLocations(conf, tableName); // three days of splits, today should be valid // yesterday and all other days invalid - SplitsFile.validateShardIdLocations(conf, tableName, 0, locations); + uut.validateShardIdLocations(conf, tableName, 0, locations); // this should cause the exception - assertThrows(IllegalStateException.class, () -> SplitsFile.validateShardIdLocations(conf, tableName, 1, locations)); + assertThrows(IllegalStateException.class, () -> uut.validateShardIdLocations(conf, tableName, 1, locations)); } @Test @@ -169,11 +189,13 @@ public void testUnbalancedMaxMoreThanConfigured() throws Exception { String tableName = "unbalancedMoreSplitsThenMaxPer"; SortedMap splits = simulateMultipleShardsPerTServer(tableName, 3); conf.setInt(SplitsFile.MAX_SHARDS_PER_TSERVER, 2); - createSplitsFile(splits, conf, splits.size(), tableName); - Map locations = SplitsFile.getSplitsAndLocations(conf, tableName); + + uut.init(conf); + + Map locations = uut.getSplitsAndLocations(conf, tableName); // this should cause the exception - assertThrows(IllegalStateException.class, () -> SplitsFile.validateShardIdLocations(conf, tableName, 0, locations)); + assertThrows(IllegalStateException.class, () -> uut.validateShardIdLocations(conf, tableName, 0, locations)); } @Test @@ -181,14 +203,16 @@ public void testUnbalancedButNotMoreThanConfigured() throws Exception { String tableName = "unbalancedNotMoreSplitsThenMaxPer"; SortedMap splits = simulateMultipleShardsPerTServer(tableName, 3); conf.setInt(SplitsFile.MAX_SHARDS_PER_TSERVER, 3); - createSplitsFile(splits, conf, splits.size(), tableName); - Map locations = SplitsFile.getSplitsAndLocations(conf, tableName); + + uut.init(conf); + + Map locations = uut.getSplitsAndLocations(conf, tableName); // this should NOT cause an exception - SplitsFile.validateShardIdLocations(conf, tableName, 0, locations); + uut.validateShardIdLocations(conf, tableName, 0, locations); } - private SortedMap simulateUnbalancedSplitsForDay(int daysAgo, String tableName) { + private SortedMap simulateUnbalancedSplitsForDay(int daysAgo, String tableName) throws IOException { // start with a well distributed set of shards per day for 3 days SortedMap locations = createDistributedLocations(tableName); // for shards from "daysAgo", peg them to first shard @@ -201,7 +225,7 @@ private SortedMap simulateUnbalancedSplitsForDay(int daysAgo, Strin return locations; } - private SortedMap simulateMultipleShardsPerTServer(String tableName, int shardsPerTServer) { + private SortedMap simulateMultipleShardsPerTServer(String tableName, int shardsPerTServer) throws IOException { SortedMap locations = new TreeMap<>(); long now = System.currentTimeMillis(); int tserverId = 1; @@ -223,7 +247,7 @@ private SortedMap simulateMultipleShardsPerTServer(String tableName return locations; } - private SortedMap simulateMissingSplitsForDay(int daysAgo, String tableName) { + private SortedMap simulateMissingSplitsForDay(int daysAgo, String tableName) throws IOException { // start with a well distributed set of shards per day for 3 days SortedMap locations = createDistributedLocations(tableName); // for shards from "daysAgo", remove them diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/TableSplitsCacheTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/TableSplitsCacheTest.java index d1cee905652..1c9a455d6e9 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/TableSplitsCacheTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/job/TableSplitsCacheTest.java @@ -167,11 +167,11 @@ public void setup() throws Exception { public void setSplitsCacheDir() { URL url = TableSplitsCacheTest.class.getResource("/datawave/ingest/mapreduce/job/all-splits.txt"); Assert.assertNotNull("TableSplitsCacheTest#setup failed to load test cache directory.", url); - mockConfiguration.put(TableSplitsCache.SPLITS_CACHE_DIR, url.getPath().substring(0, url.getPath().lastIndexOf(Path.SEPARATOR))); + mockConfiguration.put(SplitsConstants.SPLITS_CACHE_DIR, url.getPath().substring(0, url.getPath().lastIndexOf(Path.SEPARATOR))); } public void setSplitsCacheDir(String splitsCacheDir) { - mockConfiguration.put(TableSplitsCache.SPLITS_CACHE_DIR, splitsCacheDir); + mockConfiguration.put(SplitsConstants.SPLITS_CACHE_DIR, splitsCacheDir); } @After diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/partition/BalancedShardPartitionerTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/partition/BalancedShardPartitionerTest.java index f66578dfd82..021e8fcf2e0 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/partition/BalancedShardPartitionerTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/partition/BalancedShardPartitionerTest.java @@ -1,9 +1,11 @@ package datawave.ingest.mapreduce.partition; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -18,17 +20,16 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import datawave.ingest.mapreduce.handler.shard.ShardIdFactory; import datawave.ingest.mapreduce.handler.shard.ShardedDataTypeHandler; import datawave.ingest.mapreduce.job.BulkIngestKey; +import datawave.ingest.mapreduce.job.SplitsCacheFactory; import datawave.ingest.mapreduce.job.TableSplitsCache; import datawave.util.TableName; import datawave.util.time.DateHelper; @@ -40,36 +41,39 @@ public class BalancedShardPartitionerTest { private static final int NUM_REDUCE_TASKS = 270; private static Configuration conf; - @Rule - public TemporaryFolder temporaryFolder = new TemporaryFolder(); + @TempDir + public Path temporaryFolder; private BalancedShardPartitioner partitioner = null; private ShardIdFactory shardIdFactory = new ShardIdFactory(conf); - @BeforeClass + @BeforeAll public static void defineShardLocationsFile() throws IOException { conf = new Configuration(); conf.setInt(ShardIdFactory.NUM_SHARDS, SHARDS_PER_DAY); } - @Before + @BeforeEach public void setUp() throws IOException { conf = new Configuration(); - TableSplitsCache.getCurrentCache(conf).clear(); + TableSplitsCache.clear(); + SplitsCacheFactory.clearInstance(); conf.setInt(ShardIdFactory.NUM_SHARDS, SHARDS_PER_DAY); partitioner = new BalancedShardPartitioner(); // gotta load this every test, or using different values bleeds into other tests - new TestShardGenerator(conf, temporaryFolder.newFolder(), NUM_DAYS, SHARDS_PER_DAY, TOTAL_TSERVERS, TableName.SHARD); + new TestShardGenerator(conf, Files.createDirectory(temporaryFolder.resolve("root")).toFile(), NUM_DAYS, SHARDS_PER_DAY, TOTAL_TSERVERS, + TableName.SHARD); conf.setBoolean(TableSplitsCache.REFRESH_SPLITS, false); conf.set(ShardedDataTypeHandler.SHARDED_TNAMES, "shard"); + shardIdFactory = new ShardIdFactory(conf); partitioner.setConf(conf); } - @After + @AfterEach public void tearDown() { partitioner = null; conf.unset(BalancedShardPartitioner.MISSING_SHARD_STRATEGY_PROP); @@ -85,11 +89,17 @@ public void testNoCollisionsTodayAndBack2Days() throws Exception { @Test public void testTwoTablesAreOffsetted() throws Exception { // create another split files for this test that contains two tables. register the tables names for both shard and error shard - new TestShardGenerator(conf, temporaryFolder.newFolder(), NUM_DAYS, SHARDS_PER_DAY, TOTAL_TSERVERS, TableName.ERROR_SHARD, TableName.SHARD); + Path twoTablesDir = Files.createDirectory(temporaryFolder.resolve("test-two-tables")); + new TestShardGenerator(conf, twoTablesDir.toFile(), NUM_DAYS, SHARDS_PER_DAY, TOTAL_TSERVERS, TableName.SHARD, TableName.ERROR_SHARD); conf.set(ShardedDataTypeHandler.SHARDED_TNAMES, "errorShard,shard"); + conf.set("datawave.ingest.splits.cache.dir", twoTablesDir.toString()); + conf.set("datawave.ingest.splits.cache.file", "all-splits.txt"); - partitioner.setConf(conf); + TableSplitsCache.clear(); + SplitsCacheFactory.clearInstance(); + conf.setBoolean(TableSplitsCache.REFRESH_SPLITS, true); + partitioner.setConf(conf); // For a shard from today, we can assume that they're well balanced. // If offsetting is working, they will not go to the same partitions String today = formatDay(0); @@ -103,8 +113,8 @@ public void testTwoTablesAreOffsetted() throws Exception { private void verifyOffsetGroup(int group, int partitionId, String date) { int numShards = shardIdFactory.getNumShards(date); - Assert.assertTrue("partitionId " + partitionId + " is not >= " + (numShards * group), partitionId >= numShards * group); - Assert.assertTrue("partitionId " + partitionId + " is not < " + (numShards * (group + 1)), partitionId < numShards * (group + 1)); + assertTrue(partitionId >= numShards * group, "partitionId " + partitionId + " is not >= " + (numShards * group)); + assertTrue(partitionId < numShards * (group + 1), "partitionId " + partitionId + " is not < " + (numShards * (group + 1))); } @Test @@ -184,7 +194,7 @@ public void testDifferentNumberShardsPerDayHash() throws IOException { // hashing is the default implementation, so null is passed in String tableName = "shard3"; - simulateDifferentNumberShardsPerDay(null, tableName); + simulateDifferentNumberShardsPerDay("hash", tableName); // 1 day ago should get SHARDS_PER_DAY partitions assertPartitionsForDay(partitioner, tableName, 1, SHARDS_PER_DAY); @@ -210,13 +220,9 @@ public void testDifferentNumberShardsPerDayCollapseButOutsideRange() throws IOEx } private void simulateDifferentNumberShardsPerDay(String missingShardStrategy, String tableName) throws IOException { - // This emulates today, yesterday and the day before have SHARDS_PER_DAY splits and - // 3 days ago and 4 days ago only have 2 splits, _0 and _1. - SortedMap locations = new TreeMap<>(); long now = System.currentTimeMillis(); int tserverId = 1; - Text prevEndRow = new Text(); for (int daysAgo = 0; daysAgo <= 2; daysAgo++) { String day = DateHelper.format(now - (daysAgo * DateUtils.MILLIS_PER_DAY)); for (int currShard = 0; currShard < SHARDS_PER_DAY; currShard++) { @@ -229,8 +235,19 @@ private void simulateDifferentNumberShardsPerDay(String missingShardStrategy, St locations.put(new Text(day + "_" + currShard), Integer.toString(tserverId++)); } } - new TestShardGenerator(conf, temporaryFolder.newFolder(), locations, tableName); + + Path simulatedDiffDir = Files.createDirectory(temporaryFolder.resolve("simulated-diff")); + new TestShardGenerator(conf, simulatedDiffDir.toFile(), locations, tableName); conf.set(ShardedDataTypeHandler.SHARDED_TNAMES, tableName); + conf.set("datawave.ingest.splits.cache.dir", simulatedDiffDir.toString()); + conf.set("datawave.ingest.splits.cache.file", "all-splits.txt"); + conf.set(BalancedShardPartitioner.MISSING_SHARD_STRATEGY_PROP, missingShardStrategy); + + // Clear stale cache and force a reload + TableSplitsCache.clear(); + SplitsCacheFactory.clearInstance(); + conf.setBoolean(TableSplitsCache.REFRESH_SPLITS, true); + partitioner.setConf(conf); if (missingShardStrategy != null) { conf.set(BalancedShardPartitioner.MISSING_SHARD_STRATEGY_PROP, missingShardStrategy); @@ -271,7 +288,7 @@ public static void assertExpectedCollisions(Partitioner partitionerIn, int daysB partitionsUsed.add(partition); } // 9 is what we get by hashing the shardId - Assert.assertTrue("For " + daysBack + " days ago, we had a different number of collisions: " + collisions, expectedCollisions >= collisions); + assertTrue(expectedCollisions >= collisions, "For " + daysBack + " days ago, we had a different number of collisions: " + collisions); // this // has // more to diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/partition/MultiTableRRRangePartitionerTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/partition/MultiTableRRRangePartitionerTest.java index d05e36ca2bc..b09f6d29b26 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/partition/MultiTableRRRangePartitionerTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/partition/MultiTableRRRangePartitionerTest.java @@ -5,6 +5,8 @@ package datawave.ingest.mapreduce.partition; import static org.junit.Assert.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; import java.net.URISyntaxException; @@ -22,11 +24,12 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.task.MapContextImpl; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import datawave.ingest.mapreduce.job.BulkIngestKey; +import datawave.ingest.mapreduce.job.SplitsCacheFactory; +import datawave.ingest.mapreduce.job.SplitsConstants; import datawave.ingest.mapreduce.job.TableSplitsCache; import datawave.util.TableName; @@ -44,15 +47,15 @@ public class MultiTableRRRangePartitionerTest { Configuration configuration; Job mockJob; - @Before + @BeforeEach public void before() throws IOException { mockJob = Job.getInstance(); configuration = mockJob.getConfiguration(); configuration.set("job.output.table.names", TableName.SHARD); configuration.setBoolean(TableSplitsCache.REFRESH_SPLITS, false); - TableSplitsCache.getCurrentCache(configuration).clear(); - + TableSplitsCache.clear(); + SplitsCacheFactory.clearInstance(); } @Test @@ -72,27 +75,27 @@ public void testCalculateIndex() { int resultFour = instance.calculateIndex(indexFour, numPartitions, tableName, cutPointArrayLength); assertEquals(result, resultTwo); assertEquals(result, expectedResult); - Assert.assertEquals(4, resultThree); - Assert.assertEquals(0, resultFour); + assertEquals(4, resultThree); + assertEquals(0, resultFour); } - @Test(expected = RuntimeException.class) + @Test public void testEmptySplitsThrowsException() throws IOException, URISyntaxException { String filename = "full_empty_splits.txt"; URL url = createUrl(filename); mockContextForLocalCacheFile(url); - configuration.set(TableSplitsCache.SPLITS_CACHE_DIR, url.getPath().substring(0, url.getPath().lastIndexOf('/'))); - configuration.set(TableSplitsCache.SPLITS_CACHE_FILE, filename); - getPartition("23432"); + configuration.set(SplitsConstants.SPLITS_CACHE_DIR, url.getPath().substring(0, url.getPath().lastIndexOf('/'))); + configuration.set(SplitsConstants.SPLITS_CACHE_FILE, filename); + assertThrows(RuntimeException.class, () -> getPartition("23432")); } - @Test(expected = RuntimeException.class) + @Test public void testProblemGettingLocalCacheFiles() throws IOException, URISyntaxException { String filename = "full_splits.txt"; URL url = createUrl(filename); mockContextForLocalCacheFile(url); - configuration.set(TableSplitsCache.SPLITS_CACHE_DIR, url.getPath().substring(0, url.getPath().lastIndexOf('/'))); - configuration.set(TableSplitsCache.SPLITS_CACHE_FILE, filename); + configuration.set(SplitsConstants.SPLITS_CACHE_DIR, url.getPath().substring(0, url.getPath().lastIndexOf('/'))); + configuration.set(SplitsConstants.SPLITS_CACHE_FILE, filename); MultiTableRangePartitioner.setContext(new MapContextImpl(configuration, new TaskAttemptID(), null, null, null, null, null) { @Override @@ -101,7 +104,7 @@ public org.apache.hadoop.fs.Path[] getLocalCacheFiles() throws IOException { } }); - getPartition("23432"); + assertThrows(RuntimeException.class, () -> getPartition("23432")); } private URL createUrl(String fileName) { @@ -128,8 +131,8 @@ public void testAllDataForOneSplitGoesToOnePartitioner() { String filename = "full_splits.txt"; URL url = createUrl(filename); mockContextForLocalCacheFile(url); - configuration.set(TableSplitsCache.SPLITS_CACHE_DIR, url.getPath().substring(0, url.getPath().lastIndexOf('/'))); - configuration.set(TableSplitsCache.SPLITS_CACHE_FILE, filename); + configuration.set(SplitsConstants.SPLITS_CACHE_DIR, url.getPath().substring(0, url.getPath().lastIndexOf('/'))); + configuration.set(SplitsConstants.SPLITS_CACHE_FILE, filename); int numPartitions = 581; MultiTableRRRangePartitioner partitioner = new MultiTableRRRangePartitioner(); @@ -143,8 +146,8 @@ public void testAllDataForOneSplitGoesToOnePartitioner() { String rowStr = Character.toString((char) ("a".codePointAt(0) + i)); int resultRow = partitioner.getPartition(getBulkIngestKey(rowStr), new Value(), numPartitions); - Assert.assertEquals("These should have matched: resultRow: " + resultRow + " , resultForPrecedingRow: " + resultForPrecedingRow, resultRow, - resultForPrecedingRow); + assertTrue(resultRow == resultForPrecedingRow, + "These should have matched: resultRow: " + resultRow + " , resultForPrecedingRow: " + resultForPrecedingRow); } } @@ -156,8 +159,8 @@ public void testCalculateIndexRangeIsValid() { MultiTableRRRangePartitioner partitioner = new MultiTableRRRangePartitioner(); for (int i = -1 * cutPointArrayLength - 1; i < cutPointArrayLength; i++) { int result = partitioner.calculateIndex(i, numPartitions, "someTableName", cutPointArrayLength); - Assert.assertTrue("i: " + i + " result: " + result, 0 <= result); - Assert.assertTrue("i: " + i + " result: " + result, result < numPartitions); + assertTrue(0 <= result, "i: " + i + " result: " + result); + assertTrue(result < numPartitions, "i: " + i + " result: " + result); } } @@ -166,8 +169,8 @@ public void testPartitionerSpaceIsValid() { String filename = "full_splits.txt"; URL url = createUrl(filename); mockContextForLocalCacheFile(url); - configuration.set(TableSplitsCache.SPLITS_CACHE_DIR, url.getPath().substring(0, url.getPath().lastIndexOf('/'))); - configuration.set(TableSplitsCache.SPLITS_CACHE_FILE, filename); + configuration.set(SplitsConstants.SPLITS_CACHE_DIR, url.getPath().substring(0, url.getPath().lastIndexOf('/'))); + configuration.set(SplitsConstants.SPLITS_CACHE_FILE, filename); int numPartitions = 581; MultiTableRRRangePartitioner partitioner = new MultiTableRRRangePartitioner(); @@ -178,15 +181,15 @@ public void testPartitionerSpaceIsValid() { for (int i = 0; i < numSplits; i++) { String rowStr = Character.toString((char) ("a".codePointAt(0) + i)); int result = partitioner.getPartition(getBulkIngestKey(rowStr), new Value(), numPartitions); - Assert.assertTrue("rowStr: " + rowStr + " partition: " + result, numPartitions - numSplits - 1 <= result); - Assert.assertTrue("rowStr: " + rowStr + " partition: " + result, result < numPartitions); + assertTrue(numPartitions - numSplits - 1 <= result, "rowStr: " + rowStr + " partition: " + result); + assertTrue(result < numPartitions, "rowStr: " + rowStr + " partition: " + result); } // test rows before and after each split for (int i = -1; i < numSplits + 1; i++) { int result = partitioner.getPartition(getBulkIngestKey(Character.toString((char) ("a".codePointAt(0) + i)) + "_"), new Value(), numPartitions); - Assert.assertTrue("i: " + i + " partition: " + result, numPartitions - numSplits - 1 <= result); - Assert.assertTrue("i: " + i + " partition: " + result, result < numPartitions); + assertTrue(numPartitions - numSplits - 1 <= result, "i: " + i + " partition: " + result); + assertTrue(result < numPartitions, "i: " + i + " partition: " + result); } } @@ -199,12 +202,11 @@ public void testEvenDistributionWithExtraReducers() { // first split is a, last is z countPartitions(numberTimesPartitionSeen, numPartitions, partitioner); - Assert.assertEquals( - "Should have seen a total of 27 different partitions. There is a split for each letter of the alphabet and the null split which is not in the file", - 27, numberTimesPartitionSeen.size()); + assertTrue(27 == numberTimesPartitionSeen.size(), + "Should have seen a total of 27 different partitions. There is a split for each letter of the alphabet and the null split which is not in the file"); for (Map.Entry partitionAndNumSeen : numberTimesPartitionSeen.entrySet()) { - Assert.assertEquals("We haven't used the partition space so they should all be even, but partition " + partitionAndNumSeen.getKey().intValue() - + " did not see 2.", 2, partitionAndNumSeen.getValue().intValue()); + assertTrue(2 == partitionAndNumSeen.getValue().intValue(), "We haven't used the partition space so they should all be even, but partition " + + partitionAndNumSeen.getKey().intValue() + " did not see 2."); } } @@ -217,15 +219,15 @@ public void testEvenDistributionWithFewerReducers() { // first split is a, last is z countPartitions(numberTimesPartitionSeen, numPartitions, partitioner); - Assert.assertEquals("Should have seen a total of 10 different partitions given the small reducer space", 10, numberTimesPartitionSeen.size()); + assertTrue(numberTimesPartitionSeen.size() == 10, "Should have seen a total of 10 different partitions given the small reducer space"); System.out.println(numberTimesPartitionSeen); // we partitioned 27 splits // over a space of 10 partitioners // so each partitioners should have 2 splits or 3 splits assigned to it // we partitioned two rows per split, so each partition should have seen 4 or 6 rows for (Map.Entry partitionAndNumSeen : numberTimesPartitionSeen.entrySet()) { - Assert.assertTrue(partitionAndNumSeen.toString(), 4 <= partitionAndNumSeen.getValue().intValue()); - Assert.assertTrue(partitionAndNumSeen.toString(), partitionAndNumSeen.getValue().intValue() <= 6); + assertTrue(4 <= partitionAndNumSeen.getValue().intValue(), partitionAndNumSeen.toString()); + assertTrue(partitionAndNumSeen.getValue().intValue() <= 6, partitionAndNumSeen.toString()); } } @@ -240,7 +242,7 @@ public void testPartitionsInNonContiguousWay() { String row = Character.toString((char) ("a".codePointAt(0) + i)); partitionsFound.add(partitioner.getPartition(getBulkIngestKey(row), new Value(), numPartitions)); } - Assert.assertEquals(10, partitionsFound.size()); + assertEquals(10, partitionsFound.size()); // k - t go to different partitions partitionsFound.clear(); @@ -248,7 +250,7 @@ public void testPartitionsInNonContiguousWay() { String row = Character.toString((char) ("a".codePointAt(0) + i)); partitionsFound.add(partitioner.getPartition(getBulkIngestKey(row), new Value(), numPartitions)); } - Assert.assertEquals(10, partitionsFound.size()); + assertEquals(10, partitionsFound.size()); } @Test @@ -263,7 +265,7 @@ public void testOverlapsAtLastPartitions() { int previousCount = 0; for (Map.Entry partitionAndNumSeen : numberTimesPartitionSeen.entrySet()) { int currentCount = partitionAndNumSeen.getValue().intValue(); - Assert.assertTrue(partitionAndNumSeen.toString(), previousCount <= currentCount); + assertTrue(previousCount <= currentCount, partitionAndNumSeen.toString()); previousCount = currentCount; } } @@ -272,8 +274,8 @@ private MultiTableRRRangePartitioner createPartitionerFromSplits() { String filename = "full_splits.txt"; URL url = createUrl(filename); mockContextForLocalCacheFile(url); - configuration.set(TableSplitsCache.SPLITS_CACHE_DIR, url.getPath().substring(0, url.getPath().lastIndexOf('/'))); - configuration.set(TableSplitsCache.SPLITS_CACHE_FILE, filename); + configuration.set(SplitsConstants.SPLITS_CACHE_DIR, url.getPath().substring(0, url.getPath().lastIndexOf('/'))); + configuration.set(SplitsConstants.SPLITS_CACHE_FILE, filename); MultiTableRRRangePartitioner partitioner = new MultiTableRRRangePartitioner(); partitioner.setConf(configuration); return partitioner; diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/partition/MultiTableRangePartitionerTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/partition/MultiTableRangePartitionerTest.java index d1187b1d65f..49bd8a87cc4 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/partition/MultiTableRangePartitionerTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/partition/MultiTableRangePartitionerTest.java @@ -18,6 +18,8 @@ import org.junit.Test; import datawave.ingest.mapreduce.job.BulkIngestKey; +import datawave.ingest.mapreduce.job.SplitsCacheFactory; +import datawave.ingest.mapreduce.job.SplitsConstants; import datawave.ingest.mapreduce.job.TableConfigurationUtil; import datawave.ingest.mapreduce.job.TableSplitsCache; import datawave.util.TableName; @@ -32,8 +34,9 @@ public void before() throws IOException { mockJob = Job.getInstance(); configuration = mockJob.getConfiguration(); configuration.setBoolean(TableSplitsCache.REFRESH_SPLITS, false); - TableSplitsCache.getCurrentCache(configuration).clear(); configuration.set(TableConfigurationUtil.JOB_OUTPUT_TABLE_NAMES, TableName.SHARD); + TableSplitsCache.clear(); + SplitsCacheFactory.clearInstance(); } @Test @@ -41,8 +44,8 @@ public void testGoodSplitsFile() throws IOException, URISyntaxException { String filename = "trimmed_splits.txt"; URL url = createUrl(filename); mockContextForLocalCacheFile(url); - configuration.set(TableSplitsCache.SPLITS_CACHE_DIR, url.getPath().substring(0, url.getPath().lastIndexOf('/'))); - configuration.set(TableSplitsCache.SPLITS_CACHE_FILE, filename); + configuration.set(SplitsConstants.SPLITS_CACHE_DIR, url.getPath().substring(0, url.getPath().lastIndexOf('/'))); + configuration.set(SplitsConstants.SPLITS_CACHE_FILE, filename); Assert.assertEquals(5, getPartition()); } @@ -51,8 +54,8 @@ public void testEmptySplitsThrowsException() throws IOException, URISyntaxExcept String filename = "trimmed_empty_splits.txt"; URL url = createUrl(filename); mockContextForLocalCacheFile(url); - configuration.set(TableSplitsCache.SPLITS_CACHE_DIR, url.getPath().substring(0, url.getPath().lastIndexOf('/'))); - configuration.set(TableSplitsCache.SPLITS_CACHE_FILE, filename); + configuration.set(SplitsConstants.SPLITS_CACHE_DIR, url.getPath().substring(0, url.getPath().lastIndexOf('/'))); + configuration.set(SplitsConstants.SPLITS_CACHE_FILE, filename); getPartition(); } diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/partition/SplitBasedHashPartitionerTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/partition/SplitBasedHashPartitionerTest.java index aa5226a3e43..f8814adbe2c 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/partition/SplitBasedHashPartitionerTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/partition/SplitBasedHashPartitionerTest.java @@ -15,6 +15,7 @@ import org.junit.Test; import datawave.ingest.mapreduce.job.BulkIngestKey; +import datawave.ingest.mapreduce.job.SplitsConstants; import datawave.ingest.mapreduce.job.TableSplitsCache; public class SplitBasedHashPartitionerTest { @@ -27,8 +28,8 @@ public void before() { final String testFilePath = SplitBasedHashPartitionerTest.class.getClassLoader().getResource(TEST_FILE_LOCATION).getPath(); conf.setBoolean(TableSplitsCache.REFRESH_SPLITS, false); - conf.set(TableSplitsCache.SPLITS_CACHE_DIR, testFilePath.substring(0, testFilePath.lastIndexOf('/'))); - conf.set(TableSplitsCache.SPLITS_CACHE_FILE, "full_splits.txt"); + conf.set(SplitsConstants.SPLITS_CACHE_DIR, testFilePath.substring(0, testFilePath.lastIndexOf('/'))); + conf.set(SplitsConstants.SPLITS_CACHE_FILE, "full_splits.txt"); TableSplitsCache.getCurrentCache(conf).clear(); diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/partition/TabletLocationHashPartitionerTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/partition/TabletLocationHashPartitionerTest.java index 2271e04a998..c3b5a4a79b4 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/partition/TabletLocationHashPartitionerTest.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/partition/TabletLocationHashPartitionerTest.java @@ -13,6 +13,7 @@ import org.junit.rules.TemporaryFolder; import datawave.ingest.mapreduce.handler.shard.ShardIdFactory; +import datawave.ingest.mapreduce.job.SplitsCacheFactory; import datawave.ingest.mapreduce.job.TableSplitsCache; public class TabletLocationHashPartitionerTest { @@ -33,7 +34,8 @@ public void setUp() { conf = new Configuration(); conf.setBoolean(TableSplitsCache.REFRESH_SPLITS, false); - TableSplitsCache.getCurrentCache(conf).clear(); + TableSplitsCache.clear(); + SplitsCacheFactory.clearInstance(); partitioner = new TabletLocationHashPartitioner(); partitioner.setConf(conf); @@ -50,6 +52,9 @@ public void tearDown() { public void testLocationHashPartitioner() throws Exception { conf.setInt(ShardIdFactory.NUM_SHARDS, SHARDS_PER_DAY); new TestShardGenerator(conf, temporaryFolder.newFolder(), NUM_DAYS, SHARDS_PER_DAY, TOTAL_TSERVERS, "shard"); + TableSplitsCache.clear(); + SplitsCacheFactory.clearInstance(); + TabletLocationHashPartitioner partitionerTwo = new TabletLocationHashPartitioner(); partitionerTwo.setConf(conf); diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/partition/TabletLocationNamePartitionerTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/partition/TabletLocationNamePartitionerTest.java deleted file mode 100644 index db4190bfb3f..00000000000 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/partition/TabletLocationNamePartitionerTest.java +++ /dev/null @@ -1,82 +0,0 @@ -package datawave.ingest.mapreduce.partition; - -import java.io.BufferedReader; -import java.io.InputStreamReader; -import java.net.URL; -import java.util.HashMap; -import java.util.Map; - -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import datawave.ingest.mapreduce.job.BulkIngestKey; -import datawave.ingest.mapreduce.job.TableSplitsCache; - -public class TabletLocationNamePartitionerTest { - Configuration conf = new Configuration(); - TabletLocationNamePartitioner partitioner = null; - - @Before - public void setUp() { - conf = new Configuration(); - conf.setBoolean(TableSplitsCache.REFRESH_SPLITS, false); - partitioner = new TabletLocationNamePartitioner(); - partitioner.setConf(conf); - } - - @After - public void tearDown() { - conf.clear(); - conf = null; - partitioner = null; - } - - @Test - public void testSequentialLocationScheme() throws Exception { - Map shardedTableMapFiles = new HashMap<>(); - // setup the location partition sheme - URL file = getClass().getResource("/datawave/ingest/mapreduce/partition/_shards.lst"); - shardedTableMapFiles.put("shard", new Path(file.toURI().toString())); - - // now read in a list of shards and display the distribution - file = getClass().getResource("/datawave/ingest/mapreduce/partition/shards.list"); - conf.set(TableSplitsCache.SPLITS_CACHE_DIR, file.getPath().substring(0, file.getPath().lastIndexOf('/'))); - conf.set(TableSplitsCache.SPLITS_CACHE_FILE, "shards_n_locs.list"); - BufferedReader reader = new BufferedReader(new InputStreamReader(file.openStream())); - String line = reader.readLine(); - int[] partitions = new int[912]; - while (line != null) { - String shardId = line.trim(); - Key key = new Key(shardId); - Value value = new Value(); - int partition = partitioner.getPartition(new BulkIngestKey(new Text("shard"), key), value, 912); - partitions[partition]++; - line = reader.readLine(); - } - boolean errored = false; - int[] distribution = new int[8]; - int[] expected = new int[] {77, 207, 266, 196, 112, 42, 9, 3}; - for (int i = 0; i < 912; i++) { - distribution[partitions[i]]++; - } - for (int i = 0; i < 8; i++) { - try { - Assert.assertEquals("Unexpected distribution: ", expected[i], distribution[i]); - } catch (Throwable e) { - System.err.println(e.getMessage()); - errored = true; - } - } - if (errored) { - Assert.fail("Failed to get expected distribution. See console for unexpected entries"); - } - - } -} diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/partition/TestShardGenerator.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/partition/TestShardGenerator.java index b453f28e4dc..7b4ef27db38 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/partition/TestShardGenerator.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/partition/TestShardGenerator.java @@ -19,6 +19,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import datawave.ingest.mapreduce.job.SplitsConstants; import datawave.ingest.mapreduce.job.TableSplitsCache; import datawave.util.time.DateHelper; @@ -45,7 +46,7 @@ public TestShardGenerator(Configuration conf, File tmpDir, int numDays, int shar registerSomeTServers(); Map locations = simulateTabletAssignments(tableNames); // adding sorting here since it now happens when we generate the splits file - conf.set(TableSplitsCache.SPLITS_CACHE_DIR, tmpDir.getAbsolutePath()); + conf.set(SplitsConstants.SPLITS_CACHE_DIR, tmpDir.getAbsolutePath()); TableSplitsCache.getCurrentCache(conf).clear(); Map sortedLocations = TableSplitsCache.getCurrentCache(conf).reverseSortByShardIds(locations); String tmpDirectory = tmpDir + "/"; @@ -70,7 +71,7 @@ public TestShardGenerator(Configuration conf, File tmpDir, Map loca // constructor that takes a created list of locations String tmpDirectory = tmpDir + "/"; Path splitsPath = new Path(tmpDir.getAbsolutePath() + "/all-splits.txt"); - conf.set(TableSplitsCache.SPLITS_CACHE_DIR, tmpDir.getAbsolutePath()); + conf.set(SplitsConstants.SPLITS_CACHE_DIR, tmpDir.getAbsolutePath()); Map sortedLocations = TableSplitsCache.getCurrentCache(conf).reverseSortByShardIds(locations); try (PrintStream out = new PrintStream(new BufferedOutputStream(fs.create(splitsPath)))) { @@ -82,7 +83,7 @@ public TestShardGenerator(Configuration conf, File tmpDir, Map loca } } - conf.set(TableSplitsCache.SPLITS_CACHE_DIR, tmpDir.getAbsolutePath()); + conf.set(SplitsConstants.SPLITS_CACHE_DIR, tmpDir.getAbsolutePath()); } // create splits for all the shards from today back NUM_DAYS,