From f5e9877c967613b04cf70174d7c4e477a205a030 Mon Sep 17 00:00:00 2001 From: hlgp Date: Tue, 19 May 2026 23:47:06 +0000 Subject: [PATCH] add sorted tablet location partitioner --- .../SortedTabletLocationPartitioner.java | 113 +++++++++++ .../SortedTabletLocationPartitionerTest.java | 192 ++++++++++++++++++ .../job/SPLITSANDLOCATIONSsplits.txt | 32 +++ 3 files changed, 337 insertions(+) create mode 100644 warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/partition/SortedTabletLocationPartitioner.java create mode 100644 warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/partition/SortedTabletLocationPartitionerTest.java create mode 100644 warehouse/ingest-core/src/test/resources/datawave/ingest/mapreduce/job/SPLITSANDLOCATIONSsplits.txt diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/partition/SortedTabletLocationPartitioner.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/partition/SortedTabletLocationPartitioner.java new file mode 100644 index 00000000000..aba98f348b7 --- /dev/null +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/partition/SortedTabletLocationPartitioner.java @@ -0,0 +1,113 @@ +package datawave.ingest.mapreduce.partition; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; + +import com.google.common.collect.TreeMultimap; + +import datawave.ingest.mapreduce.job.SplitsFile; + +public class SortedTabletLocationPartitioner extends MultiTableRangePartitioner { + + private static final Logger log = Logger.getLogger(SortedTabletLocationPartitioner.class); + private final Map> SPLIT_TO_REDUCER_MAP = new HashMap<>(); + + @Override + protected int calculateIndex(int index, int numPartitions, String tableName, int cutPointArrayLength) { + if (isAssignedPartition(tableName, index)) { + return SPLIT_TO_REDUCER_MAP.get(tableName).get(index); + } + try { + assignPartitions(numPartitions, tableName, cutPointArrayLength); + } catch (IOException e) { + log.error("Unable to assign partitions for " + tableName + ". Defaulting to parent assignment."); + super.calculateIndex(index, numPartitions, tableName, cutPointArrayLength); + + } + return isAssignedPartition(tableName, index) ? SPLIT_TO_REDUCER_MAP.get(tableName).get(index) : 0; + + } + + private void assignPartitions(int numPartitions, String tableName, int cutPointArrayLength) throws IOException { + + List splitsByTable = SplitsFile.getSplits(getConf(), tableName); + + Map currentTableSplitToLocation = SplitsFile.getSplitsAndLocations(getConf(), tableName); + Map tempSplitReducerMap = new HashMap<>(); + Text[] cutPointArray = splitsByTable.toArray(new Text[0]); + + if (cutPointArrayLength > numPartitions) { + mapPartitions(numPartitions, cutPointArrayLength, currentTableSplitToLocation, tempSplitReducerMap, cutPointArray); + } else { + for (int i = 0; i < cutPointArrayLength; i++) { + tempSplitReducerMap.put(i, i); + tempSplitReducerMap.put(-i - 1, i); + } + } + + SPLIT_TO_REDUCER_MAP.put(tableName, tempSplitReducerMap); + } + + private void mapPartitions(int numPartitions, int cutPointArrayLength, Map currentTableSplitToLocation, + Map tempSplitReducerMap, Text[] cutPointArray) { + + int locationsAssigned = 0; + int assignedReducer = 0; + + Map reducerToSplitCount = new HashMap<>(); + + TreeMultimap locationToSplits = TreeMultimap.create(); + + for (int k = 0; k < cutPointArrayLength; k++) { + locationToSplits.put(currentTableSplitToLocation.get(cutPointArray[k]), k); + } + + Iterator locationIterator = locationToSplits.keySet().iterator(); + while (locationIterator.hasNext()) { + Set splitsForCurrentLocation = locationToSplits.get(locationIterator.next()); + + for (Integer splitIndex : splitsForCurrentLocation) { + tempSplitReducerMap.put(splitIndex, assignedReducer); + tempSplitReducerMap.put(-splitIndex - 1, assignedReducer); + } + + locationsAssigned++; + // simple round robin for now until we've assigned something to each partition + int sum = null == reducerToSplitCount.get(assignedReducer) ? 0 : reducerToSplitCount.get(assignedReducer); + reducerToSplitCount.put(assignedReducer, sum + splitsForCurrentLocation.size()); + + if (reducerToSplitCount.size() < numPartitions) { + assignedReducer = locationsAssigned % numPartitions; + } else { + // Once all partitions have at least one assignment, look for the one with the smallest number of splits + int leastSplits = Integer.MAX_VALUE; + int leastReducer = 0; + for (Map.Entry reducer : reducerToSplitCount.entrySet()) { + if (reducer.getValue() < leastSplits) { + leastReducer = reducer.getKey(); + leastSplits = reducer.getValue(); + } + } + assignedReducer = leastReducer; + } + + } + + } + + private boolean isAssignedPartition(String tableName, int index) { + return SPLIT_TO_REDUCER_MAP.containsKey(tableName) && SPLIT_TO_REDUCER_MAP.get(tableName).containsKey(index); + } + + @Override + public boolean needSplitLocations() { + return true; + } +} diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/partition/SortedTabletLocationPartitionerTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/partition/SortedTabletLocationPartitionerTest.java new file mode 100644 index 00000000000..0576e428def --- /dev/null +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/partition/SortedTabletLocationPartitionerTest.java @@ -0,0 +1,192 @@ +package datawave.ingest.mapreduce.partition; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +import java.io.IOException; +import java.net.URL; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.task.MapContextImpl; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import datawave.ingest.mapreduce.job.BulkIngestKey; +import datawave.ingest.mapreduce.job.TableSplitsCache; + +public class SortedTabletLocationPartitionerTest { + + String splitsFileName = "SPLITSANDLOCATIONS" + "splits.txt"; + + private static final String TABLE_NAME = "abc"; + Configuration configuration; + Job mockJob; + + @BeforeEach + public void before() throws IOException { + mockJob = new Job(); + configuration = mockJob.getConfiguration(); + configuration.set("job.table.names", TABLE_NAME); + configuration.setBoolean(TableSplitsCache.REFRESH_SPLITS, false); + configuration.set(TableSplitsCache.SPLITS_CACHE_DIR, + createUrl(splitsFileName).getPath().substring(0, createUrl(splitsFileName).getPath().lastIndexOf('/'))); + configuration.set(TableSplitsCache.SPLITS_CACHE_FILE, splitsFileName); + + TableSplitsCache.getCurrentCache(configuration); + + } + + private URL createUrl(String fileName) { + + return SortedTabletLocationPartitionerTest.class.getResource("/datawave/ingest/mapreduce/job/" + fileName); + } + + private BulkIngestKey getBulkIngestKey(String rowStr) { + return new BulkIngestKey(new Text(TABLE_NAME), new Key(new Text(rowStr))); + } + + private void mockContextForLocalCacheFile(final URL url) { + + SortedTabletLocationPartitioner + .setContext(new MapContextImpl(configuration, new TaskAttemptID(), null, null, null, null, null) { + @Override + public Path[] getLocalCacheFiles() throws IOException { + return new Path[] {new Path(url.getPath())}; + } + + }); + } + + @Test + public void testAllDataForOneSplitGoesToOnePartitioner() { + mockContextForLocalCacheFile(createUrl(splitsFileName)); + int numPartitions = 581; + + SortedTabletLocationPartitioner partitioner = new SortedTabletLocationPartitioner(); + partitioner.setConf(new Configuration()); + + // first split is a, last is z + for (int i = 0; i < 26; i++) { + String precedingRow = Character.toString((char) ("a".codePointAt(0) + i - 1)) + "_"; + int resultForPrecedingRow = partitioner.getPartition(getBulkIngestKey(precedingRow), new Value(), numPartitions); + + String rowStr = Character.toString((char) ("a".codePointAt(0) + i)); + int resultRow = partitioner.getPartition(getBulkIngestKey(rowStr), new Value(), numPartitions); + + assertEquals("These should have matched: resultRow: " + resultRow + " , resultForPrecedingRow: " + resultForPrecedingRow, resultRow, + resultForPrecedingRow); + } + } + + @Test + public void testOneToOne() { + mockContextForLocalCacheFile(createUrl(splitsFileName)); + int numPartitions = 8; + + SortedTabletLocationPartitioner partitioner = new SortedTabletLocationPartitioner(); + partitioner.setConf(new Configuration()); + + Map numberTimesPartitionSeen = new TreeMap<>(); + + countPartitions(numberTimesPartitionSeen, numPartitions, partitioner); + assertEquals(numPartitions, numberTimesPartitionSeen.size()); + + int resultRow1 = partitioner.getPartition(getBulkIngestKey("a"), new Value(), numPartitions); + int resultRow2 = partitioner.getPartition(getBulkIngestKey("p"), new Value(), numPartitions); + assertEquals(resultRow1, resultRow2); + + int resultRow3 = partitioner.getPartition(getBulkIngestKey("b"), new Value(), numPartitions); + int resultRow4 = partitioner.getPartition(getBulkIngestKey("w"), new Value(), numPartitions); + assertEquals(resultRow3, resultRow4); + + assertNotEquals(resultRow1, resultRow3); + + } + + @Test + public void testFewerPartitions() { + mockContextForLocalCacheFile(createUrl(splitsFileName)); + int numPartitions = 4; + + SortedTabletLocationPartitioner partitioner = new SortedTabletLocationPartitioner(); + partitioner.setConf(new Configuration()); + + Map numberTimesPartitionSeen = new TreeMap<>(); + + countPartitions(numberTimesPartitionSeen, numPartitions, partitioner); + assertEquals(numPartitions, numberTimesPartitionSeen.size()); + + assertEquals(7, numberTimesPartitionSeen.get(0).intValue()); + assertEquals(6, numberTimesPartitionSeen.get(1).intValue()); + assertEquals(7, numberTimesPartitionSeen.get(2).intValue()); + assertEquals(7, numberTimesPartitionSeen.get(3).intValue()); + + int resultRow1 = partitioner.getPartition(getBulkIngestKey("a"), new Value(), numPartitions); + int resultRow2 = partitioner.getPartition(getBulkIngestKey("p"), new Value(), numPartitions); + assertEquals(resultRow1, resultRow2); + + int resultRow3 = partitioner.getPartition(getBulkIngestKey("j"), new Value(), numPartitions); + int resultRow4 = partitioner.getPartition(getBulkIngestKey("z"), new Value(), numPartitions); + assertEquals(resultRow3, resultRow4); + + assertNotEquals(resultRow1, resultRow4); + + int resultRow5 = partitioner.getPartition(getBulkIngestKey("z1"), new Value(), numPartitions); + assertEquals(0, resultRow5); + + } + + @Test + public void testMorePartitions() { + mockContextForLocalCacheFile(createUrl(splitsFileName)); + int numPartitions = 10; + + SortedTabletLocationPartitioner partitioner = new SortedTabletLocationPartitioner(); + partitioner.setConf(new Configuration()); + + Map numberTimesPartitionSeen = new TreeMap<>(); + + // number of locations = 9 + countPartitions(numberTimesPartitionSeen, numPartitions, partitioner); + assertEquals(9, numberTimesPartitionSeen.size()); + + int resultRow1 = partitioner.getPartition(getBulkIngestKey("a"), new Value(), numPartitions); + int resultRow2 = partitioner.getPartition(getBulkIngestKey("p"), new Value(), numPartitions); + assertEquals(resultRow1, resultRow2); + + int resultRow3 = partitioner.getPartition(getBulkIngestKey("b"), new Value(), numPartitions); + int resultRow4 = partitioner.getPartition(getBulkIngestKey("w"), new Value(), numPartitions); + assertEquals(resultRow3, resultRow4); + + assertNotEquals(resultRow1, resultRow3); + + } + + public void countPartitions(Map timesSeenOrderedByPartition, int numPartitions, MultiTableRangePartitioner partitioner) { + // first split is a, last is z1 + for (int i = 0; i < 27; i++) { + String rowStr = Character.toString((char) ("a".codePointAt(0) + i)); + int resultRow = partitioner.getPartition(getBulkIngestKey(rowStr), new Value(), numPartitions); + updateCounter(timesSeenOrderedByPartition, resultRow); + } + + } + + private static void updateCounter(Map numberTimesPartitionSeen, int partition) { + Integer timesSeen = numberTimesPartitionSeen.get(partition); + if (null == timesSeen) { + timesSeen = 0; + } + numberTimesPartitionSeen.put(partition, timesSeen + 1); + } + +} diff --git a/warehouse/ingest-core/src/test/resources/datawave/ingest/mapreduce/job/SPLITSANDLOCATIONSsplits.txt b/warehouse/ingest-core/src/test/resources/datawave/ingest/mapreduce/job/SPLITSANDLOCATIONSsplits.txt new file mode 100644 index 00000000000..39c598d5dd4 --- /dev/null +++ b/warehouse/ingest-core/src/test/resources/datawave/ingest/mapreduce/job/SPLITSANDLOCATIONSsplits.txt @@ -0,0 +1,32 @@ +tableIds:abc=0,someTableName=1, +0 YQ== r2n2:9998 +0 Yg== r1n2:9997 +0 Yw== r1n1:9998 +0 ZA== r1n2:9997 +0 ZQ== r2n2:9997 +0 Zg== r1n1:9998 +0 Zw== r1n1:9998 +0 aA== r1n2:9997 +0 aQ== r1n2:9998 +0 ag== r1n1:9997 +0 aw== r1n1:9997 +0 bA== r2n2:9997 +0 bQ== r2n1:9997 +0 bg== r2n1:9998 +0 bw== r2n1:9998 +0 cA== r2n2:9998 +0 cQ== r2n1:9997 +0 cg== r2n1:9997 +0 cw== r2n2:9998 +0 dA== r2n2:9997 +0 dQ== r1n2:9998 +0 dg== r1n2:9998 +0 dw== r1n2:9997 +0 eA== r2n1:9998 +0 eQ== r1n1:9998 +0 eg== r1n1:9997 +0 ejE= noloc +1 [B@57c8b24d r1n1:9997 +1 [B@39320a41 r2n1:9998 +1 [B@43f4ebd r1n1:9998 +1 [B@79fcf790 r2n1:9997