diff --git a/warehouse/core/src/main/java/datawave/query/scan/ExecutionHintHelper.java b/warehouse/core/src/main/java/datawave/query/scan/ExecutionHintHelper.java new file mode 100644 index 00000000000..ca87e84f81a --- /dev/null +++ b/warehouse/core/src/main/java/datawave/query/scan/ExecutionHintHelper.java @@ -0,0 +1,136 @@ +package datawave.query.scan; + +import java.util.Map; + +import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A helper class that bridges the execution hints and consistency levels configured in the ShardQueryLogic with a specific context. + *

+ * The shard query logic is configured with multiple execution hint maps that are keyed to a context. The context is often a table name, however the context + * could also be based on a stage of query planning, or even an entire class of query logic. + *

+ * For example, scan resources may be reserved for a specific query logic. + *

+ * In addition to execution hints there is also support for selecting a consistency level from a configuration map. + */ +public class ExecutionHintHelper { + + private static final Logger log = LoggerFactory.getLogger(ExecutionHintHelper.class); + + public static final String SCAN_TYPE = "scan_type"; + public static final String PRIORITY = "priority"; + + private ExecutionHintHelper() { + // enforce static access + } + + /** + * Get the execution hint map based on a key. If no execution hints are configured for the primary key, the secondary key is used. + *

+ * The key is usually a table name, but in some cases may be tied to a particular query execution stage (e.g., index expansion). + * + * @param primary + * the primary key + * @param secondary + * the secondary key, used if the primary key is not found + * @param hintMap + * the map of execution hints, organized by key + * @return the map of execution hints, or null no hint matches the primary or secondary key + */ + public static Map getExecutionHints(String primary, String secondary, Map> hintMap) { + Map hints = getExecutionHints(primary, hintMap); + + if (hints == null) { + hints = getExecutionHints(secondary, hintMap); + } + + return hints; + } + + /** + * Get the execution hint map for the provided key. If no execution hints are configured for the specified key a warning is logged. + * + * @param key + * the key + * @param hintMap + * a map of execution hints + * @return a map of execution hints, or null if no hint matches the provided key + */ + public static Map getExecutionHints(String key, Map> hintMap) { + if (hintMap == null) { + log.warn("Execution hints is null"); + return null; + } + Map hints = hintMap.get(key); + if (hints == null) { + log.warn("No execution hints found for key {}", key); + } + return hints; + } + + /** + * Get the scan_type hint from the map of execution hints. A warning is logged if no scan type is found. + * + * @param executionHints + * the map of execution hints + * @return the scan type, or null if no scan type is configured + */ + public static String getScanType(Map executionHints) { + if (executionHints == null) { + log.warn("Execution hints is null"); + return null; + } + + String scanType = executionHints.get(SCAN_TYPE); + if (scanType == null) { + log.warn("No scan type found"); + } + return scanType; + } + + /** + * Get the scan priority from the map of execution hints. A warning is logged if no priority is found + * + * @param executionHints + * the map of execution hints + * @return the scan priority, or {@link Integer#MAX_VALUE} if no priority is configured + */ + public static int getPriority(Map executionHints) { + if (executionHints == null) { + log.warn("Execution hints is null"); + return Integer.MAX_VALUE; + } + + String value = executionHints.get(PRIORITY); + if (value == null) { + log.warn("No priority found"); + return Integer.MAX_VALUE; + } + return Integer.parseInt(value); + } + + /** + * Get the consistency level for the provided key + * + * @param key + * the key + * @param levels + * a map of consistency levels + * @return the consistency level, or null if no configured consistency level exists + */ + public static ConsistencyLevel getConsistencyLevel(String key, Map levels) { + if (levels == null) { + log.warn("ConsistencyLevels is null"); + return null; + } + + ConsistencyLevel consistencyLevel = levels.get(key); + if (consistencyLevel == null) { + log.warn("No consistency level found for key {}", key); + } + return consistencyLevel; + } +} diff --git a/warehouse/core/src/main/java/datawave/query/scan/ScanBuilder.java b/warehouse/core/src/main/java/datawave/query/scan/ScanBuilder.java index 2b67025c8f2..a6189d32639 100644 --- a/warehouse/core/src/main/java/datawave/query/scan/ScanBuilder.java +++ b/warehouse/core/src/main/java/datawave/query/scan/ScanBuilder.java @@ -1,5 +1,7 @@ package datawave.query.scan; +import static org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; + import java.util.HashMap; import java.util.Map; @@ -8,6 +10,8 @@ import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.ScannerBase; import org.apache.accumulo.core.security.Authorizations; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; @@ -27,6 +31,8 @@ */ public abstract class ScanBuilder { + private static final Logger log = LoggerFactory.getLogger(ScanBuilder.class); + // required variables protected final AccumuloClient client; protected String tableName; @@ -96,6 +102,10 @@ public B setAuthorizations(Authorizations authorizations) { * @return this builder */ public B setConsistencyLevel(ScannerBase.ConsistencyLevel consistencyLevel) { + if (consistencyLevel == null) { + log.warn("Null consistencyLevel"); + throw new IllegalArgumentException("consistencyLevel cannot be null"); + } this.consistencyLevel = consistencyLevel; return self(); } @@ -114,7 +124,11 @@ public B setConsistencyLevel(ScannerBase.ConsistencyLevel consistencyLevel) { * @return this builder */ public B setConsistencyLevel(String consistencyLevel) { - this.consistencyLevel = ScannerBase.ConsistencyLevel.valueOf(consistencyLevel); + if (consistencyLevel == null) { + log.warn("Null consistencyLevel"); + throw new IllegalArgumentException("consistencyLevel cannot be null"); + } + this.consistencyLevel = ConsistencyLevel.valueOf(consistencyLevel); return self(); } @@ -128,6 +142,10 @@ public B setConsistencyLevel(String consistencyLevel) { * @return this builder */ public B setScanType(String scanType) { + if (scanType == null) { + log.warn("Null scanType"); + throw new IllegalArgumentException("scanType cannot be null"); + } executionHints.put(SCAN_TYPE_KEY, scanType); return self(); } @@ -142,6 +160,10 @@ public B setScanType(String scanType) { * @return this builder */ public B setScanPriority(int scanPriority) { + if (scanPriority < 0) { + log.warn("Negative scanPriority"); + throw new IllegalArgumentException("scanPriority cannot be negative"); + } executionHints.put(PRIORITY_KEY, Integer.toString(scanPriority)); return self(); } diff --git a/warehouse/core/src/test/java/datawave/query/scan/ExecutionHintHelperTest.java b/warehouse/core/src/test/java/datawave/query/scan/ExecutionHintHelperTest.java new file mode 100644 index 00000000000..3a5904796a6 --- /dev/null +++ b/warehouse/core/src/test/java/datawave/query/scan/ExecutionHintHelperTest.java @@ -0,0 +1,190 @@ +package datawave.query.scan; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; +import org.junit.jupiter.api.Test; + +public class ExecutionHintHelperTest { + + private final Map tableBasedHints = createTableBasedHints(); + private final Map stageBasedHints = createStageBasedHints(); + private final Map defaultHints = createDefaultHints(); + private final Map> executionHints = createExecutionHints(); + + private final Map consistencyLevels = createConsistencyLevels(); + + private Map createTableBasedHints() { + Map hints = new HashMap<>(); + hints.put(ExecutionHintHelper.SCAN_TYPE, "executor-pool-a"); + hints.put(ExecutionHintHelper.PRIORITY, "1"); + return hints; + } + + private Map createStageBasedHints() { + Map hints = new HashMap<>(); + hints.put(ExecutionHintHelper.SCAN_TYPE, "executor-pool-b"); + hints.put(ExecutionHintHelper.PRIORITY, "2"); + return hints; + } + + private Map createDefaultHints() { + Map hints = new HashMap<>(); + hints.put(ExecutionHintHelper.SCAN_TYPE, "executor-pool-c"); + hints.put(ExecutionHintHelper.PRIORITY, "3"); + return hints; + } + + private Map> createExecutionHints() { + Map> hints = new HashMap<>(); + hints.put("tableBased", tableBasedHints); + hints.put("stageBased", stageBasedHints); + hints.put("default", defaultHints); + return hints; + } + + private Map createConsistencyLevels() { + Map levels = new HashMap<>(); + levels.put("tableBased", ConsistencyLevel.EVENTUAL); + levels.put("stageBased", ConsistencyLevel.IMMEDIATE); + levels.put("default", ConsistencyLevel.IMMEDIATE); + return levels; + } + + @Test + public void testGetExecutionHintsSingleKeyNullMap() { + Map hints = ExecutionHintHelper.getExecutionHints("tableBased", null); + assertNull(hints); + } + + @Test + public void testGetExecutionHintsSingleKeyNullKey() { + Map hints = ExecutionHintHelper.getExecutionHints(null, executionHints); + assertNull(hints); + } + + @Test + public void testGetExecutionHintsSingleKeyNoMatch() { + Map hints = ExecutionHintHelper.getExecutionHints("dateBased", executionHints); + assertNull(hints); + } + + @Test + public void testGetExecutionHintsSingleKeyGoodMatch() { + Map hints = ExecutionHintHelper.getExecutionHints("tableBased", executionHints); + assertNotNull(hints); + assertScanType("executor-pool-a", hints); + assertScanPriority("1", hints); + } + + @Test + public void testGetExecutionHintsPrimaryKeySecondaryKeyNullMap() { + Map hints = ExecutionHintHelper.getExecutionHints("tableBased", "default", null); + assertNull(hints); + } + + @Test + public void testGetExecutionHintsPrimaryNullKeySecondaryNullKey() { + Map hints = ExecutionHintHelper.getExecutionHints(null, null, executionHints); + assertNull(hints); + } + + @Test + public void testGetExecutionHintsPrimaryNullKeySecondaryKeyMatches() { + Map hints = ExecutionHintHelper.getExecutionHints(null, "default", executionHints); + assertEquals(defaultHints, hints); + } + + @Test + public void testGetExecutionHintsPrimaryKeyNotFoundSecondaryKeyMatches() { + Map hints = ExecutionHintHelper.getExecutionHints("timeBased", "stageBased", executionHints); + assertEquals(stageBasedHints, hints); + } + + @Test + public void testGetExecutionHintsPrimaryKeyMatchesSecondaryKeyNotNeeded() { + Map hints = ExecutionHintHelper.getExecutionHints("stageBased", "default", executionHints); + assertEquals(stageBasedHints, hints); + } + + @Test + public void testGetScanTypeNullMap() { + String scanType = ExecutionHintHelper.getScanType(null); + assertNull(scanType); + } + + @Test + public void testGetScanTypeNoMatch() { + Map hints = Map.of("priority", "7"); + String scanType = ExecutionHintHelper.getScanType(hints); + assertNull(scanType); + } + + @Test + public void testGetScanTypeMatches() { + Map hints = Map.of("scan_type", "executor-pool-a"); + String scanType = ExecutionHintHelper.getScanType(hints); + assertEquals("executor-pool-a", scanType); + } + + @Test + public void testGetPriorityNullMap() { + int priority = ExecutionHintHelper.getPriority(null); + assertEquals(Integer.MAX_VALUE, priority); + } + + @Test + public void testGetPriorityNoMatch() { + Map hints = Map.of("scan_type", "executor-pool-c"); + int priority = ExecutionHintHelper.getPriority(hints); + assertEquals(Integer.MAX_VALUE, priority); + } + + @Test + public void testGetPriorityMatches() { + Map hints = Map.of("priority", "2"); + int priority = ExecutionHintHelper.getPriority(hints); + assertEquals(2, priority); + } + + @Test + public void testGetConsistencyLevelNullMap() { + ConsistencyLevel level = ExecutionHintHelper.getConsistencyLevel("tableBased", null); + assertNull(level); + } + + @Test + public void testGetConsistencyLevelNullKey() { + ConsistencyLevel level = ExecutionHintHelper.getConsistencyLevel(null, consistencyLevels); + assertNull(level); + } + + @Test + public void testGetConsistencyLevelKeyNotFound() { + ConsistencyLevel level = ExecutionHintHelper.getConsistencyLevel("timeBased", consistencyLevels); + assertNull(level); + } + + @Test + public void testGetConsistencyLevel() { + assertEquals(ConsistencyLevel.EVENTUAL, ExecutionHintHelper.getConsistencyLevel("tableBased", consistencyLevels)); + assertEquals(ConsistencyLevel.IMMEDIATE, ExecutionHintHelper.getConsistencyLevel("stageBased", consistencyLevels)); + assertEquals(ConsistencyLevel.IMMEDIATE, ExecutionHintHelper.getConsistencyLevel("default", consistencyLevels)); + } + + private void assertScanType(String expected, Map hints) { + assertTrue(hints.containsKey(ExecutionHintHelper.SCAN_TYPE)); + assertEquals(expected, hints.get(ExecutionHintHelper.SCAN_TYPE)); + } + + private void assertScanPriority(String expected, Map hints) { + assertTrue(hints.containsKey(ExecutionHintHelper.PRIORITY)); + assertEquals(expected, hints.get(ExecutionHintHelper.PRIORITY)); + } +} diff --git a/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/AsyncIndexLookup.java b/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/AsyncIndexLookup.java index 72d929ee9b2..98ca550d864 100644 --- a/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/AsyncIndexLookup.java +++ b/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/AsyncIndexLookup.java @@ -13,6 +13,7 @@ import datawave.core.common.logging.ThreadConfigurableLogger; import datawave.query.config.ShardQueryConfiguration; +import datawave.query.scan.ScannerBuilder; import datawave.query.tables.ScannerFactory; /** @@ -32,6 +33,7 @@ public abstract class AsyncIndexLookup extends IndexLookup { // flag for unfielded lookups protected final boolean unfieldedLookup; + protected ScannerBuilder builder = null; protected ScanMonitor monitor; public AsyncIndexLookup(ShardQueryConfiguration config, ScannerFactory scannerFactory, boolean unfieldedLookup, ExecutorService execService) { diff --git a/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/BoundedRangeIndexLookup.java b/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/BoundedRangeIndexLookup.java index 6097eb06141..08d0e8d746d 100644 --- a/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/BoundedRangeIndexLookup.java +++ b/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/BoundedRangeIndexLookup.java @@ -10,6 +10,7 @@ import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; @@ -28,6 +29,8 @@ import datawave.query.config.ShardQueryConfiguration; import datawave.query.exceptions.IllegalRangeArgumentException; import datawave.query.jexl.LiteralRange; +import datawave.query.scan.ExecutionHintHelper; +import datawave.query.scan.ScannerBuilder; import datawave.query.tables.ScannerFactory; import datawave.util.time.DateHelper; import datawave.webservice.query.exception.DatawaveErrorCode; @@ -112,9 +115,27 @@ public void submit() { */ protected Runnable createRunnable(String tableName, Authorizations auths) { return () -> { - try (Scanner scanner = config.getClient().createScanner(tableName, auths)) { - String hintKey = config.getTableHints().containsKey(EXPANSION_HINT_KEY) ? EXPANSION_HINT_KEY : config.getIndexTableName(); - scanner.setExecutionHints(Map.of(tableName, hintKey)); + + // @formatter:off + builder = ScannerBuilder.create(config.getClient()) + .setTableName(tableName) + .setAuthorizations(auths); + // @formatter:on + + // only set the consistency level if configured + ConsistencyLevel consistencyLevel = ExecutionHintHelper.getConsistencyLevel(tableName, config.getTableConsistencyLevels()); + if (consistencyLevel != null) { + builder.setConsistencyLevel(consistencyLevel); + } + + // only set execution hints if configured + Map executionHints = ExecutionHintHelper.getExecutionHints(EXPANSION_HINT_KEY, config.getIndexTableName(), config.getTableHints()); + if (executionHints != null) { + builder.setScanType(ExecutionHintHelper.getScanType(executionHints)); + builder.setScanPriority(ExecutionHintHelper.getPriority(executionHints)); + } + + try (Scanner scanner = builder.build()) { scanner.setRange(scanRange); scanner.fetchColumnFamily(literalRange.getFieldName()); diff --git a/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/FieldedRegexIndexLookup.java b/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/FieldedRegexIndexLookup.java index d844c024c25..4bdd1bbb3cf 100644 --- a/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/FieldedRegexIndexLookup.java +++ b/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/FieldedRegexIndexLookup.java @@ -1,10 +1,13 @@ package datawave.query.jexl.lookups; +import static datawave.query.jexl.lookups.ShardIndexQueryTableStaticMethods.EXPANSION_HINT_KEY; + import java.util.Map; import java.util.concurrent.ExecutorService; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; @@ -17,6 +20,8 @@ import datawave.core.iterators.FieldedRegexExpansionIterator; import datawave.query.config.ShardQueryConfiguration; +import datawave.query.scan.ExecutionHintHelper; +import datawave.query.scan.ScannerBuilder; import datawave.query.tables.ScannerFactory; import datawave.util.time.DateHelper; @@ -71,9 +76,27 @@ public void submit() { protected Runnable createRunnable() { return () -> { String tableName = getTableName(); - try (Scanner scanner = config.getClient().createScanner(tableName, config.getAuthorizations().iterator().next())) { - String hintKey = getHintKey(tableName); - scanner.setExecutionHints(Map.of(tableName, hintKey)); + + // @formatter:off + builder = ScannerBuilder.create(config.getClient()) + .setTableName(tableName) + .setAuthorizations(config.getAuthorizations().iterator().next()); + // @formatter:on + + // only set the consistency level if configured + ConsistencyLevel consistencyLevel = ExecutionHintHelper.getConsistencyLevel(tableName, config.getTableConsistencyLevels()); + if (consistencyLevel != null) { + builder.setConsistencyLevel(consistencyLevel); + } + + // only set execution hints if configured + Map executionHints = ExecutionHintHelper.getExecutionHints(EXPANSION_HINT_KEY, config.getIndexTableName(), config.getTableHints()); + if (executionHints != null) { + builder.setScanType(ExecutionHintHelper.getScanType(executionHints)); + builder.setScanPriority(ExecutionHintHelper.getPriority(executionHints)); + } + + try (Scanner scanner = builder.build()) { IteratorSetting regexIterator = createRegexIterator(); scanner.addScanIterator(regexIterator); diff --git a/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/UnfieldedLiteralIndexLookup.java b/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/UnfieldedLiteralIndexLookup.java index 0dfdd55ee97..8bdbdf2af24 100644 --- a/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/UnfieldedLiteralIndexLookup.java +++ b/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/UnfieldedLiteralIndexLookup.java @@ -10,6 +10,7 @@ import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.PartialKey; import org.apache.accumulo.core.data.Range; @@ -25,6 +26,8 @@ import datawave.core.iterators.FieldExpansionIterator; import datawave.query.config.ShardQueryConfiguration; import datawave.query.exceptions.DatawaveFatalQueryException; +import datawave.query.scan.ExecutionHintHelper; +import datawave.query.scan.ScannerBuilder; import datawave.query.tables.ScannerFactory; import datawave.util.time.DateHelper; @@ -78,9 +81,26 @@ public void submit() { */ protected Runnable createRunnable(String tableName, Authorizations auths) { return () -> { - try (Scanner scanner = config.getClient().createScanner(tableName, auths)) { - String hintKey = config.getTableHints().containsKey(EXPANSION_HINT_KEY) ? EXPANSION_HINT_KEY : config.getIndexTableName(); - scanner.setExecutionHints(Map.of(tableName, hintKey)); + // @formatter:off + builder = ScannerBuilder.create(config.getClient()) + .setTableName(tableName) + .setAuthorizations(auths); + // @formatter:on + + // only set the consistency level if configured + ConsistencyLevel consistencyLevel = ExecutionHintHelper.getConsistencyLevel(tableName, config.getTableConsistencyLevels()); + if (consistencyLevel != null) { + builder.setConsistencyLevel(consistencyLevel); + } + + // only set execution hints if configured + Map executionHints = ExecutionHintHelper.getExecutionHints(EXPANSION_HINT_KEY, config.getIndexTableName(), config.getTableHints()); + if (executionHints != null) { + builder.setScanType(ExecutionHintHelper.getScanType(executionHints)); + builder.setScanPriority(ExecutionHintHelper.getPriority(executionHints)); + } + + try (Scanner scanner = builder.build()) { scanner.setRange(range); for (String field : fields) { diff --git a/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/UnfieldedRegexIndexLookup.java b/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/UnfieldedRegexIndexLookup.java index 6805dd04df0..7518c72d165 100644 --- a/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/UnfieldedRegexIndexLookup.java +++ b/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/UnfieldedRegexIndexLookup.java @@ -1,5 +1,7 @@ package datawave.query.jexl.lookups; +import static datawave.query.jexl.lookups.ShardIndexQueryTableStaticMethods.EXPANSION_HINT_KEY; + import java.util.Collections; import java.util.Map; import java.util.Objects; @@ -8,6 +10,7 @@ import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; @@ -20,6 +23,8 @@ import datawave.core.iterators.UnfieldedRegexExpansionIterator; import datawave.query.config.ShardQueryConfiguration; +import datawave.query.scan.ExecutionHintHelper; +import datawave.query.scan.ScannerBuilder; import datawave.query.tables.ScannerFactory; import datawave.util.time.DateHelper; @@ -70,9 +75,27 @@ public void submit() { protected Runnable createRunnable() { return () -> { String tableName = reverse ? config.getReverseIndexTableName() : getTableName(); - try (Scanner scanner = config.getClient().createScanner(tableName, config.getAuthorizations().iterator().next())) { - String hintKey = getHintKey(tableName); - scanner.setExecutionHints(Map.of(tableName, hintKey)); + + // @formatter:off + builder = ScannerBuilder.create(config.getClient()) + .setTableName(tableName) + .setAuthorizations(config.getAuthorizations().iterator().next()); + // @formatter:on + + // only set the consistency level if configured + ConsistencyLevel consistencyLevel = ExecutionHintHelper.getConsistencyLevel(tableName, config.getTableConsistencyLevels()); + if (consistencyLevel != null) { + builder.setConsistencyLevel(consistencyLevel); + } + + // only set execution hints if configured + Map executionHints = ExecutionHintHelper.getExecutionHints(EXPANSION_HINT_KEY, config.getIndexTableName(), config.getTableHints()); + if (executionHints != null) { + builder.setScanType(ExecutionHintHelper.getScanType(executionHints)); + builder.setScanPriority(ExecutionHintHelper.getPriority(executionHints)); + } + + try (Scanner scanner = builder.build()) { IteratorSetting regexIterator = createRegexIterator(); scanner.addScanIterator(regexIterator); diff --git a/warehouse/query-core/src/test/java/datawave/query/jexl/lookups/BoundedRangeIndexLookupTest.java b/warehouse/query-core/src/test/java/datawave/query/jexl/lookups/BoundedRangeIndexLookupTest.java index fa648640e99..949d7486967 100644 --- a/warehouse/query-core/src/test/java/datawave/query/jexl/lookups/BoundedRangeIndexLookupTest.java +++ b/warehouse/query-core/src/test/java/datawave/query/jexl/lookups/BoundedRangeIndexLookupTest.java @@ -11,6 +11,7 @@ import java.util.AbstractMap; import java.util.ArrayList; import java.util.Date; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -23,6 +24,7 @@ import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.data.Key; @@ -241,6 +243,70 @@ public void testWithNoBackingData() { test(lookup, "FIELD_A"); } + @Test + public void testExecutionHints_expansionPoolSelectedOverIndexTable() { + withDateRange("20240701", "20240701"); + withDatatypeFilter(Set.of("datatype-b")); + withExpected(Set.of("value-1")); + + Map expansionHints = new HashMap<>(); + expansionHints.put("scan_type", "expansion-pool-a"); + expansionHints.put("priority", "2"); + + Map indexHints = new HashMap<>(); + indexHints.put("scan_type", "index-a"); + indexHints.put("priority", "1"); + + Map> tableHints = new HashMap<>(); + tableHints.put("expansion", expansionHints); + tableHints.put("shardIndex", indexHints); + config.setTableHints(tableHints); + + BoundedRangeIndexLookup lookup = createLookup("FIELD_A", "value-1", "value-1"); + test(lookup, "FIELD_A"); + + assertNotNull(lookup.builder); + assertEquals(expansionHints, lookup.builder.getExecutionHints()); + } + + @Test + public void testExecutionHints_indexTableNameSelectedWhenNoExpansionPoolExists() { + withDateRange("20240701", "20240701"); + withDatatypeFilter(Set.of("datatype-b")); + withExpected(Set.of("value-1")); + + Map indexHints = new HashMap<>(); + indexHints.put("scan_type", "index-a"); + indexHints.put("priority", "1"); + + Map> tableHints = new HashMap<>(); + tableHints.put("shardIndex", indexHints); + config.setTableHints(tableHints); + + BoundedRangeIndexLookup lookup = createLookup("FIELD_A", "value-1", "value-1"); + test(lookup, "FIELD_A"); + + assertNotNull(lookup.builder); + assertEquals(indexHints, lookup.builder.getExecutionHints()); + } + + @Test + public void testConsistencyLevel() { + withDateRange("20240701", "20240701"); + withDatatypeFilter(Set.of("datatype-b")); + withExpected(Set.of("value-1")); + + Map consistencyLevels = new HashMap<>(); + consistencyLevels.put("shardIndex", ConsistencyLevel.EVENTUAL); + config.setTableConsistencyLevels(consistencyLevels); + + BoundedRangeIndexLookup lookup = createLookup("FIELD_A", "value-1", "value-1"); + test(lookup, "FIELD_A"); + + assertNotNull(lookup.builder); + assertEquals(ConsistencyLevel.EVENTUAL, lookup.builder.getConsistencyLevel()); + } + private void test(BoundedRangeIndexLookup lookup, String field) { lookup.submit(); diff --git a/warehouse/query-core/src/test/java/datawave/query/jexl/lookups/FieldedRegexIndexLookupTest.java b/warehouse/query-core/src/test/java/datawave/query/jexl/lookups/FieldedRegexIndexLookupTest.java index 8405ba4866d..973f6f1012a 100644 --- a/warehouse/query-core/src/test/java/datawave/query/jexl/lookups/FieldedRegexIndexLookupTest.java +++ b/warehouse/query-core/src/test/java/datawave/query/jexl/lookups/FieldedRegexIndexLookupTest.java @@ -1,11 +1,17 @@ package datawave.query.jexl.lookups; +import static org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.fail; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.Set; +import org.apache.accumulo.core.client.ScannerBase; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException; import org.apache.commons.jexl3.parser.JexlNode; @@ -24,6 +30,8 @@ public class FieldedRegexIndexLookupTest extends BaseIndexLookupTest { private static final Logger log = LoggerFactory.getLogger(FieldedRegexIndexLookupTest.class); + private AsyncIndexLookup lookup; + @Test public void testExpansionWithNodata() { // no data @@ -206,6 +214,70 @@ public void testReverseExpansionMultipleValues() { assertResultValues("FIELD_A", Set.of("tim", "tam")); } + @Test + public void testExecutionHints_expansionPoolSelectedOverIndexTable() { + write("bar", "FIELD_A"); + withQuery("FIELD_A =~ 'ba.*'"); + + Map expansionHints = new HashMap<>(); + expansionHints.put("scan_type", "expansion-pool-a"); + expansionHints.put("priority", "2"); + + Map indexHints = new HashMap<>(); + indexHints.put("scan_type", "index-a"); + indexHints.put("priority", "1"); + + Map> tableHints = new HashMap<>(); + tableHints.put("expansion", expansionHints); + tableHints.put("shardIndex", indexHints); + config.setTableHints(tableHints); + + executeLookup(); + assertResultFields(Set.of("FIELD_A")); + assertResultValues("FIELD_A", Set.of("bar")); + + assertNotNull(lookup.builder); + assertEquals(expansionHints, lookup.builder.getExecutionHints()); + } + + @Test + public void testExecutionHints_indexTableNameSelectedWhenNoExpansionPoolExists() { + write("bar", "FIELD_A"); + withQuery("FIELD_A =~ 'ba.*'"); + + Map indexHints = new HashMap<>(); + indexHints.put("scan_type", "index-a"); + indexHints.put("priority", "1"); + + Map> tableHints = new HashMap<>(); + tableHints.put("shardIndex", indexHints); + config.setTableHints(tableHints); + + executeLookup(); + assertResultFields(Set.of("FIELD_A")); + assertResultValues("FIELD_A", Set.of("bar")); + + assertNotNull(lookup.builder); + assertEquals(indexHints, lookup.builder.getExecutionHints()); + } + + @Test + public void testConsistencyLevel() { + write("bar", "FIELD_A"); + withQuery("FIELD_A =~ 'ba.*'"); + + Map consistencyLevels = new HashMap<>(); + consistencyLevels.put("shardIndex", ConsistencyLevel.EVENTUAL); + config.setTableConsistencyLevels(consistencyLevels); + + executeLookup(); + assertResultFields(Set.of("FIELD_A")); + assertResultValues("FIELD_A", Set.of("bar")); + + assertNotNull(lookup.builder); + assertEquals(ConsistencyLevel.EVENTUAL, lookup.builder.getConsistencyLevel()); + } + @Override protected void executeLookup() { try { @@ -220,7 +292,7 @@ protected void executeLookup() { Range range = desc.range; boolean reverse = desc.isForReverseIndex; - AsyncIndexLookup lookup = createLookup(field, value, range, reverse); + lookup = createLookup(field, value, range, reverse); executeLookup(lookup); } catch (Exception e) { log.error(e.getMessage(), e); diff --git a/warehouse/query-core/src/test/java/datawave/query/jexl/lookups/UnfieldedLiteralIndexLookupTest.java b/warehouse/query-core/src/test/java/datawave/query/jexl/lookups/UnfieldedLiteralIndexLookupTest.java index b3e78cf2f11..a5cea368e4d 100644 --- a/warehouse/query-core/src/test/java/datawave/query/jexl/lookups/UnfieldedLiteralIndexLookupTest.java +++ b/warehouse/query-core/src/test/java/datawave/query/jexl/lookups/UnfieldedLiteralIndexLookupTest.java @@ -1,11 +1,16 @@ package datawave.query.jexl.lookups; import static datawave.core.iterators.TimeoutExceptionIterator.EXCEPTEDVALUE; +import static org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.fail; +import java.util.HashMap; +import java.util.Map; import java.util.Set; +import org.apache.accumulo.core.client.ScannerBase; import org.apache.commons.jexl3.parser.JexlNode; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -23,6 +28,8 @@ public class UnfieldedLiteralIndexLookupTest extends BaseIndexLookupTest { private static final Logger log = LoggerFactory.getLogger(UnfieldedLiteralIndexLookupTest.class); + private AsyncIndexLookup lookup; + @Test public void testValueDoesNotExpand() { // no data @@ -96,6 +103,70 @@ public void testExpansionHitsValueThreshold_singleValueMultiField() { } } + @Test + public void testExecutionHints_expansionPoolSelectedOverIndexTable() { + write("bar", "FIELD_A"); + withQuery("_ANYFIELD_ == 'bar'"); + + Map expansionHints = new HashMap<>(); + expansionHints.put("scan_type", "expansion-pool-a"); + expansionHints.put("priority", "2"); + + Map indexHints = new HashMap<>(); + indexHints.put("scan_type", "index-a"); + indexHints.put("priority", "1"); + + Map> tableHints = new HashMap<>(); + tableHints.put("expansion", expansionHints); + tableHints.put("shardIndex", indexHints); + config.setTableHints(tableHints); + + executeLookup(); + assertResultFields(Set.of("FIELD_A")); + assertResultValues("FIELD_A", Set.of("bar")); + + assertNotNull(lookup.builder); + assertEquals(expansionHints, lookup.builder.getExecutionHints()); + } + + @Test + public void testExecutionHints_indexTableNameSelectedWhenNoExpansionPoolExists() { + write("bar", "FIELD_A"); + withQuery("_ANYFIELD_ == 'bar'"); + + Map indexHints = new HashMap<>(); + indexHints.put("scan_type", "index-a"); + indexHints.put("priority", "1"); + + Map> tableHints = new HashMap<>(); + tableHints.put("shardIndex", indexHints); + config.setTableHints(tableHints); + + executeLookup(); + assertResultFields(Set.of("FIELD_A")); + assertResultValues("FIELD_A", Set.of("bar")); + + assertNotNull(lookup.builder); + assertEquals(indexHints, lookup.builder.getExecutionHints()); + } + + @Test + public void testConsistencyLevel() { + write("bar", "FIELD_A"); + withQuery("_ANYFIELD_ == 'bar'"); + + Map consistencyLevels = new HashMap<>(); + consistencyLevels.put("shardIndex", ConsistencyLevel.EVENTUAL); + config.setTableConsistencyLevels(consistencyLevels); + + executeLookup(); + assertResultFields(Set.of("FIELD_A")); + assertResultValues("FIELD_A", Set.of("bar")); + + assertNotNull(lookup.builder); + assertEquals(ConsistencyLevel.EVENTUAL, lookup.builder.getConsistencyLevel()); + } + @Override protected void executeLookup() { try { @@ -107,7 +178,7 @@ protected void executeLookup() { Object literal = JexlASTHelper.getLiteralValueSafely(node); String value = String.valueOf(literal); - AsyncIndexLookup lookup = createLookup(value); + lookup = createLookup(value); executeLookup(lookup); } catch (Exception e) { log.error(e.getMessage(), e); diff --git a/warehouse/query-core/src/test/java/datawave/query/jexl/lookups/UnfieldedRegexIndexLookupTest.java b/warehouse/query-core/src/test/java/datawave/query/jexl/lookups/UnfieldedRegexIndexLookupTest.java index 203580c88de..8cc4eb420a3 100644 --- a/warehouse/query-core/src/test/java/datawave/query/jexl/lookups/UnfieldedRegexIndexLookupTest.java +++ b/warehouse/query-core/src/test/java/datawave/query/jexl/lookups/UnfieldedRegexIndexLookupTest.java @@ -1,10 +1,15 @@ package datawave.query.jexl.lookups; +import static org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.Set; +import org.apache.accumulo.core.client.ScannerBase; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException; import org.apache.commons.jexl3.parser.JexlNode; @@ -24,6 +29,8 @@ */ public class UnfieldedRegexIndexLookupTest extends BaseIndexLookupTest { + private AsyncIndexLookup lookup; + @Test public void testExpansionZeroHits() throws Exception { // no data @@ -226,6 +233,70 @@ public void testReverseExpandsIntoMultipleFieldsWithMultipleValues() throws Exce assertResultValues("FIELD_B", Set.of("tim", "tam")); } + @Test + public void testExecutionHints_expansionPoolSelectedOverIndexTable() throws Exception { + write("bar", "FIELD"); + withQuery("_ANYFIELD_ =~ 'ba.*'"); + + Map expansionHints = new HashMap<>(); + expansionHints.put("scan_type", "expansion-pool-a"); + expansionHints.put("priority", "2"); + + Map indexHints = new HashMap<>(); + indexHints.put("scan_type", "index-a"); + indexHints.put("priority", "1"); + + Map> tableHints = new HashMap<>(); + tableHints.put("expansion", expansionHints); + tableHints.put("shardIndex", indexHints); + config.setTableHints(tableHints); + + executeLookup(); + assertResultFields(Set.of("FIELD")); + assertResultValues("FIELD", Set.of("bar")); + + assertNotNull(lookup.builder); + assertEquals(expansionHints, lookup.builder.getExecutionHints()); + } + + @Test + public void testExecutionHints_indexTableNameSelectedWhenNoExpansionPoolExists() throws Exception { + write("bar", "FIELD"); + withQuery("_ANYFIELD_ =~ 'ba.*'"); + + Map indexHints = new HashMap<>(); + indexHints.put("scan_type", "index-a"); + indexHints.put("priority", "1"); + + Map> tableHints = new HashMap<>(); + tableHints.put("shardIndex", indexHints); + config.setTableHints(tableHints); + + executeLookup(); + assertResultFields(Set.of("FIELD")); + assertResultValues("FIELD", Set.of("bar")); + + assertNotNull(lookup.builder); + assertEquals(indexHints, lookup.builder.getExecutionHints()); + } + + @Test + public void testConsistencyLevel() throws Exception { + write("bar", "FIELD"); + withQuery("_ANYFIELD_ =~ 'ba.*'"); + + Map consistencyLevels = new HashMap<>(); + consistencyLevels.put("shardIndex", ConsistencyLevel.EVENTUAL); + config.setTableConsistencyLevels(consistencyLevels); + + executeLookup(); + assertResultFields(Set.of("FIELD")); + assertResultValues("FIELD", Set.of("bar")); + + assertNotNull(lookup.builder); + assertEquals(ConsistencyLevel.EVENTUAL, lookup.builder.getConsistencyLevel()); + } + /** * Build an index lookup from the query and store the results */ @@ -243,7 +314,7 @@ protected void executeLookup() throws Exception { Range range = desc.range; boolean reverse = desc.isForReverseIndex; - AsyncIndexLookup lookup = createLookup(value, range, reverse, null); + lookup = createLookup(value, range, reverse, null); executeLookup(lookup); }