From e8ecfa672c8bd0072aba4b2fb82242ca77ea236b Mon Sep 17 00:00:00 2001 From: Moriarty <22225248+apmoriarty@users.noreply.github.com> Date: Tue, 19 May 2026 12:09:12 +0000 Subject: [PATCH 1/2] Integrate ScannerBuilder into index lookups Add ExecutionHintHelper to bridge the map of hints in the query logic to scanner specific configs ScanBuilder is now opinionated about setting null execution hints and consistency levels and will throw an IllegalArgumentException AsynIndexLookup now tracks ScanBuilder as a protected variable so unit tests can verify execution hints and consistency levels AsynIndexLookup tests now validate execution hints and consistency level Fix bug where new index lookups were not setting the correct execution hints --- .../query/scan/ExecutionHintHelper.java | 136 +++++++++++++ .../java/datawave/query/scan/ScanBuilder.java | 24 ++- .../query/scan/ExecutionHintHelperTest.java | 190 ++++++++++++++++++ .../query/jexl/lookups/AsyncIndexLookup.java | 2 + .../jexl/lookups/BoundedRangeIndexLookup.java | 27 ++- .../jexl/lookups/FieldedRegexIndexLookup.java | 29 ++- .../lookups/UnfieldedLiteralIndexLookup.java | 26 ++- .../lookups/UnfieldedRegexIndexLookup.java | 29 ++- .../lookups/BoundedRangeIndexLookupTest.java | 66 ++++++ .../lookups/FieldedRegexIndexLookupTest.java | 74 ++++++- .../UnfieldedLiteralIndexLookupTest.java | 73 ++++++- .../UnfieldedRegexIndexLookupTest.java | 73 ++++++- 12 files changed, 733 insertions(+), 16 deletions(-) create mode 100644 warehouse/core/src/main/java/datawave/query/scan/ExecutionHintHelper.java create mode 100644 warehouse/core/src/test/java/datawave/query/scan/ExecutionHintHelperTest.java diff --git a/warehouse/core/src/main/java/datawave/query/scan/ExecutionHintHelper.java b/warehouse/core/src/main/java/datawave/query/scan/ExecutionHintHelper.java new file mode 100644 index 00000000000..1dba68fdf0a --- /dev/null +++ b/warehouse/core/src/main/java/datawave/query/scan/ExecutionHintHelper.java @@ -0,0 +1,136 @@ +package datawave.query.scan; + +import java.util.Map; + +import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A helper class that bridges the execution hints and consistency levels configured in the ShardQueryLogic with a specific context. + *

+ * The shard query logic is configured with multiple execution hint maps that are keyed to a context. The context is often a table name, however the context + * could also be based on a stage of query planning, or even an entire class of query logic. + *

+ * For example, scan resources may be reserved for a specific query logic. + *

+ * In addition to execution hints there is also support for selecting a consistency level from a configuration map. + */ +public class ExecutionHintHelper { + + private static final Logger log = LoggerFactory.getLogger(ExecutionHintHelper.class); + + public static String SCAN_TYPE = "scan_type"; + public static String PRIORITY = "priority"; + + private ExecutionHintHelper() { + // enforce static access + } + + /** + * Get the execution hint map based on a key. If no execution hints are configured for the primary key, the secondary key is used. + *

+ * The key is usually a table name, but in some cases may be tied to a particular query execution stage (e.g., index expansion). + * + * @param primary + * the primary key + * @param secondary + * the secondary key, used if the primary key is not found + * @param hintMap + * the map of execution hints, organized by key + * @return the map of execution hints, or null no hint matches the primary or secondary key + */ + public static Map getExecutionHints(String primary, String secondary, Map> hintMap) { + Map hints = getExecutionHints(primary, hintMap); + + if (hints == null) { + hints = getExecutionHints(secondary, hintMap); + } + + return hints; + } + + /** + * Get the execution hint map for the provided key. If no execution hints are configured for the specified key a warning is logged. + * + * @param key + * the key + * @param hintMap + * a map of execution hints + * @return a map of execution hints, or null if no hint matches the provided key + */ + public static Map getExecutionHints(String key, Map> hintMap) { + if (hintMap == null) { + log.warn("Execution hints is null"); + return null; + } + Map hints = hintMap.get(key); + if (hints == null) { + log.warn("No execution hints found for key {}", key); + } + return hints; + } + + /** + * Get the scan_type hint from the map of execution hints. A warning is logged if no scan type is found. + * + * @param executionHints + * the map of execution hints + * @return the scan type, or null if no scan type is configured + */ + public static String getScanType(Map executionHints) { + if (executionHints == null) { + log.warn("Execution hints is null"); + return null; + } + + String scanType = executionHints.get(SCAN_TYPE); + if (scanType == null) { + log.warn("No scan type found"); + } + return scanType; + } + + /** + * Get the scan priority from the map of execution hints. A warning is logged if no priority is found + * + * @param executionHints + * the map of execution hints + * @return the scan priority, or {@link Integer#MAX_VALUE} if no priority is configured + */ + public static int getPriority(Map executionHints) { + if (executionHints == null) { + log.warn("Execution hints is null"); + return Integer.MAX_VALUE; + } + + String value = executionHints.get(PRIORITY); + if (value == null) { + log.warn("No priority found"); + return Integer.MAX_VALUE; + } + return Integer.parseInt(value); + } + + /** + * Get the consistency level for the provided key + * + * @param key + * the key + * @param levels + * a map of consistency levels + * @return the consistency level, or null if no configured consistency level exists + */ + public static ConsistencyLevel getConsistencyLevel(String key, Map levels) { + if (levels == null) { + log.warn("ConsistencyLevels is null"); + return null; + } + + ConsistencyLevel consistencyLevel = levels.get(key); + if (consistencyLevel == null) { + log.warn("No consistency level found for key {}", key); + } + return consistencyLevel; + } +} diff --git a/warehouse/core/src/main/java/datawave/query/scan/ScanBuilder.java b/warehouse/core/src/main/java/datawave/query/scan/ScanBuilder.java index 2b67025c8f2..a6189d32639 100644 --- a/warehouse/core/src/main/java/datawave/query/scan/ScanBuilder.java +++ b/warehouse/core/src/main/java/datawave/query/scan/ScanBuilder.java @@ -1,5 +1,7 @@ package datawave.query.scan; +import static org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; + import java.util.HashMap; import java.util.Map; @@ -8,6 +10,8 @@ import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.ScannerBase; import org.apache.accumulo.core.security.Authorizations; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; @@ -27,6 +31,8 @@ */ public abstract class ScanBuilder { + private static final Logger log = LoggerFactory.getLogger(ScanBuilder.class); + // required variables protected final AccumuloClient client; protected String tableName; @@ -96,6 +102,10 @@ public B setAuthorizations(Authorizations authorizations) { * @return this builder */ public B setConsistencyLevel(ScannerBase.ConsistencyLevel consistencyLevel) { + if (consistencyLevel == null) { + log.warn("Null consistencyLevel"); + throw new IllegalArgumentException("consistencyLevel cannot be null"); + } this.consistencyLevel = consistencyLevel; return self(); } @@ -114,7 +124,11 @@ public B setConsistencyLevel(ScannerBase.ConsistencyLevel consistencyLevel) { * @return this builder */ public B setConsistencyLevel(String consistencyLevel) { - this.consistencyLevel = ScannerBase.ConsistencyLevel.valueOf(consistencyLevel); + if (consistencyLevel == null) { + log.warn("Null consistencyLevel"); + throw new IllegalArgumentException("consistencyLevel cannot be null"); + } + this.consistencyLevel = ConsistencyLevel.valueOf(consistencyLevel); return self(); } @@ -128,6 +142,10 @@ public B setConsistencyLevel(String consistencyLevel) { * @return this builder */ public B setScanType(String scanType) { + if (scanType == null) { + log.warn("Null scanType"); + throw new IllegalArgumentException("scanType cannot be null"); + } executionHints.put(SCAN_TYPE_KEY, scanType); return self(); } @@ -142,6 +160,10 @@ public B setScanType(String scanType) { * @return this builder */ public B setScanPriority(int scanPriority) { + if (scanPriority < 0) { + log.warn("Negative scanPriority"); + throw new IllegalArgumentException("scanPriority cannot be negative"); + } executionHints.put(PRIORITY_KEY, Integer.toString(scanPriority)); return self(); } diff --git a/warehouse/core/src/test/java/datawave/query/scan/ExecutionHintHelperTest.java b/warehouse/core/src/test/java/datawave/query/scan/ExecutionHintHelperTest.java new file mode 100644 index 00000000000..3a5904796a6 --- /dev/null +++ b/warehouse/core/src/test/java/datawave/query/scan/ExecutionHintHelperTest.java @@ -0,0 +1,190 @@ +package datawave.query.scan; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; +import org.junit.jupiter.api.Test; + +public class ExecutionHintHelperTest { + + private final Map tableBasedHints = createTableBasedHints(); + private final Map stageBasedHints = createStageBasedHints(); + private final Map defaultHints = createDefaultHints(); + private final Map> executionHints = createExecutionHints(); + + private final Map consistencyLevels = createConsistencyLevels(); + + private Map createTableBasedHints() { + Map hints = new HashMap<>(); + hints.put(ExecutionHintHelper.SCAN_TYPE, "executor-pool-a"); + hints.put(ExecutionHintHelper.PRIORITY, "1"); + return hints; + } + + private Map createStageBasedHints() { + Map hints = new HashMap<>(); + hints.put(ExecutionHintHelper.SCAN_TYPE, "executor-pool-b"); + hints.put(ExecutionHintHelper.PRIORITY, "2"); + return hints; + } + + private Map createDefaultHints() { + Map hints = new HashMap<>(); + hints.put(ExecutionHintHelper.SCAN_TYPE, "executor-pool-c"); + hints.put(ExecutionHintHelper.PRIORITY, "3"); + return hints; + } + + private Map> createExecutionHints() { + Map> hints = new HashMap<>(); + hints.put("tableBased", tableBasedHints); + hints.put("stageBased", stageBasedHints); + hints.put("default", defaultHints); + return hints; + } + + private Map createConsistencyLevels() { + Map levels = new HashMap<>(); + levels.put("tableBased", ConsistencyLevel.EVENTUAL); + levels.put("stageBased", ConsistencyLevel.IMMEDIATE); + levels.put("default", ConsistencyLevel.IMMEDIATE); + return levels; + } + + @Test + public void testGetExecutionHintsSingleKeyNullMap() { + Map hints = ExecutionHintHelper.getExecutionHints("tableBased", null); + assertNull(hints); + } + + @Test + public void testGetExecutionHintsSingleKeyNullKey() { + Map hints = ExecutionHintHelper.getExecutionHints(null, executionHints); + assertNull(hints); + } + + @Test + public void testGetExecutionHintsSingleKeyNoMatch() { + Map hints = ExecutionHintHelper.getExecutionHints("dateBased", executionHints); + assertNull(hints); + } + + @Test + public void testGetExecutionHintsSingleKeyGoodMatch() { + Map hints = ExecutionHintHelper.getExecutionHints("tableBased", executionHints); + assertNotNull(hints); + assertScanType("executor-pool-a", hints); + assertScanPriority("1", hints); + } + + @Test + public void testGetExecutionHintsPrimaryKeySecondaryKeyNullMap() { + Map hints = ExecutionHintHelper.getExecutionHints("tableBased", "default", null); + assertNull(hints); + } + + @Test + public void testGetExecutionHintsPrimaryNullKeySecondaryNullKey() { + Map hints = ExecutionHintHelper.getExecutionHints(null, null, executionHints); + assertNull(hints); + } + + @Test + public void testGetExecutionHintsPrimaryNullKeySecondaryKeyMatches() { + Map hints = ExecutionHintHelper.getExecutionHints(null, "default", executionHints); + assertEquals(defaultHints, hints); + } + + @Test + public void testGetExecutionHintsPrimaryKeyNotFoundSecondaryKeyMatches() { + Map hints = ExecutionHintHelper.getExecutionHints("timeBased", "stageBased", executionHints); + assertEquals(stageBasedHints, hints); + } + + @Test + public void testGetExecutionHintsPrimaryKeyMatchesSecondaryKeyNotNeeded() { + Map hints = ExecutionHintHelper.getExecutionHints("stageBased", "default", executionHints); + assertEquals(stageBasedHints, hints); + } + + @Test + public void testGetScanTypeNullMap() { + String scanType = ExecutionHintHelper.getScanType(null); + assertNull(scanType); + } + + @Test + public void testGetScanTypeNoMatch() { + Map hints = Map.of("priority", "7"); + String scanType = ExecutionHintHelper.getScanType(hints); + assertNull(scanType); + } + + @Test + public void testGetScanTypeMatches() { + Map hints = Map.of("scan_type", "executor-pool-a"); + String scanType = ExecutionHintHelper.getScanType(hints); + assertEquals("executor-pool-a", scanType); + } + + @Test + public void testGetPriorityNullMap() { + int priority = ExecutionHintHelper.getPriority(null); + assertEquals(Integer.MAX_VALUE, priority); + } + + @Test + public void testGetPriorityNoMatch() { + Map hints = Map.of("scan_type", "executor-pool-c"); + int priority = ExecutionHintHelper.getPriority(hints); + assertEquals(Integer.MAX_VALUE, priority); + } + + @Test + public void testGetPriorityMatches() { + Map hints = Map.of("priority", "2"); + int priority = ExecutionHintHelper.getPriority(hints); + assertEquals(2, priority); + } + + @Test + public void testGetConsistencyLevelNullMap() { + ConsistencyLevel level = ExecutionHintHelper.getConsistencyLevel("tableBased", null); + assertNull(level); + } + + @Test + public void testGetConsistencyLevelNullKey() { + ConsistencyLevel level = ExecutionHintHelper.getConsistencyLevel(null, consistencyLevels); + assertNull(level); + } + + @Test + public void testGetConsistencyLevelKeyNotFound() { + ConsistencyLevel level = ExecutionHintHelper.getConsistencyLevel("timeBased", consistencyLevels); + assertNull(level); + } + + @Test + public void testGetConsistencyLevel() { + assertEquals(ConsistencyLevel.EVENTUAL, ExecutionHintHelper.getConsistencyLevel("tableBased", consistencyLevels)); + assertEquals(ConsistencyLevel.IMMEDIATE, ExecutionHintHelper.getConsistencyLevel("stageBased", consistencyLevels)); + assertEquals(ConsistencyLevel.IMMEDIATE, ExecutionHintHelper.getConsistencyLevel("default", consistencyLevels)); + } + + private void assertScanType(String expected, Map hints) { + assertTrue(hints.containsKey(ExecutionHintHelper.SCAN_TYPE)); + assertEquals(expected, hints.get(ExecutionHintHelper.SCAN_TYPE)); + } + + private void assertScanPriority(String expected, Map hints) { + assertTrue(hints.containsKey(ExecutionHintHelper.PRIORITY)); + assertEquals(expected, hints.get(ExecutionHintHelper.PRIORITY)); + } +} diff --git a/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/AsyncIndexLookup.java b/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/AsyncIndexLookup.java index 72d929ee9b2..98ca550d864 100644 --- a/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/AsyncIndexLookup.java +++ b/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/AsyncIndexLookup.java @@ -13,6 +13,7 @@ import datawave.core.common.logging.ThreadConfigurableLogger; import datawave.query.config.ShardQueryConfiguration; +import datawave.query.scan.ScannerBuilder; import datawave.query.tables.ScannerFactory; /** @@ -32,6 +33,7 @@ public abstract class AsyncIndexLookup extends IndexLookup { // flag for unfielded lookups protected final boolean unfieldedLookup; + protected ScannerBuilder builder = null; protected ScanMonitor monitor; public AsyncIndexLookup(ShardQueryConfiguration config, ScannerFactory scannerFactory, boolean unfieldedLookup, ExecutorService execService) { diff --git a/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/BoundedRangeIndexLookup.java b/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/BoundedRangeIndexLookup.java index 6097eb06141..08d0e8d746d 100644 --- a/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/BoundedRangeIndexLookup.java +++ b/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/BoundedRangeIndexLookup.java @@ -10,6 +10,7 @@ import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; @@ -28,6 +29,8 @@ import datawave.query.config.ShardQueryConfiguration; import datawave.query.exceptions.IllegalRangeArgumentException; import datawave.query.jexl.LiteralRange; +import datawave.query.scan.ExecutionHintHelper; +import datawave.query.scan.ScannerBuilder; import datawave.query.tables.ScannerFactory; import datawave.util.time.DateHelper; import datawave.webservice.query.exception.DatawaveErrorCode; @@ -112,9 +115,27 @@ public void submit() { */ protected Runnable createRunnable(String tableName, Authorizations auths) { return () -> { - try (Scanner scanner = config.getClient().createScanner(tableName, auths)) { - String hintKey = config.getTableHints().containsKey(EXPANSION_HINT_KEY) ? EXPANSION_HINT_KEY : config.getIndexTableName(); - scanner.setExecutionHints(Map.of(tableName, hintKey)); + + // @formatter:off + builder = ScannerBuilder.create(config.getClient()) + .setTableName(tableName) + .setAuthorizations(auths); + // @formatter:on + + // only set the consistency level if configured + ConsistencyLevel consistencyLevel = ExecutionHintHelper.getConsistencyLevel(tableName, config.getTableConsistencyLevels()); + if (consistencyLevel != null) { + builder.setConsistencyLevel(consistencyLevel); + } + + // only set execution hints if configured + Map executionHints = ExecutionHintHelper.getExecutionHints(EXPANSION_HINT_KEY, config.getIndexTableName(), config.getTableHints()); + if (executionHints != null) { + builder.setScanType(ExecutionHintHelper.getScanType(executionHints)); + builder.setScanPriority(ExecutionHintHelper.getPriority(executionHints)); + } + + try (Scanner scanner = builder.build()) { scanner.setRange(scanRange); scanner.fetchColumnFamily(literalRange.getFieldName()); diff --git a/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/FieldedRegexIndexLookup.java b/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/FieldedRegexIndexLookup.java index d844c024c25..4bdd1bbb3cf 100644 --- a/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/FieldedRegexIndexLookup.java +++ b/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/FieldedRegexIndexLookup.java @@ -1,10 +1,13 @@ package datawave.query.jexl.lookups; +import static datawave.query.jexl.lookups.ShardIndexQueryTableStaticMethods.EXPANSION_HINT_KEY; + import java.util.Map; import java.util.concurrent.ExecutorService; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; @@ -17,6 +20,8 @@ import datawave.core.iterators.FieldedRegexExpansionIterator; import datawave.query.config.ShardQueryConfiguration; +import datawave.query.scan.ExecutionHintHelper; +import datawave.query.scan.ScannerBuilder; import datawave.query.tables.ScannerFactory; import datawave.util.time.DateHelper; @@ -71,9 +76,27 @@ public void submit() { protected Runnable createRunnable() { return () -> { String tableName = getTableName(); - try (Scanner scanner = config.getClient().createScanner(tableName, config.getAuthorizations().iterator().next())) { - String hintKey = getHintKey(tableName); - scanner.setExecutionHints(Map.of(tableName, hintKey)); + + // @formatter:off + builder = ScannerBuilder.create(config.getClient()) + .setTableName(tableName) + .setAuthorizations(config.getAuthorizations().iterator().next()); + // @formatter:on + + // only set the consistency level if configured + ConsistencyLevel consistencyLevel = ExecutionHintHelper.getConsistencyLevel(tableName, config.getTableConsistencyLevels()); + if (consistencyLevel != null) { + builder.setConsistencyLevel(consistencyLevel); + } + + // only set execution hints if configured + Map executionHints = ExecutionHintHelper.getExecutionHints(EXPANSION_HINT_KEY, config.getIndexTableName(), config.getTableHints()); + if (executionHints != null) { + builder.setScanType(ExecutionHintHelper.getScanType(executionHints)); + builder.setScanPriority(ExecutionHintHelper.getPriority(executionHints)); + } + + try (Scanner scanner = builder.build()) { IteratorSetting regexIterator = createRegexIterator(); scanner.addScanIterator(regexIterator); diff --git a/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/UnfieldedLiteralIndexLookup.java b/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/UnfieldedLiteralIndexLookup.java index 0dfdd55ee97..8bdbdf2af24 100644 --- a/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/UnfieldedLiteralIndexLookup.java +++ b/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/UnfieldedLiteralIndexLookup.java @@ -10,6 +10,7 @@ import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.PartialKey; import org.apache.accumulo.core.data.Range; @@ -25,6 +26,8 @@ import datawave.core.iterators.FieldExpansionIterator; import datawave.query.config.ShardQueryConfiguration; import datawave.query.exceptions.DatawaveFatalQueryException; +import datawave.query.scan.ExecutionHintHelper; +import datawave.query.scan.ScannerBuilder; import datawave.query.tables.ScannerFactory; import datawave.util.time.DateHelper; @@ -78,9 +81,26 @@ public void submit() { */ protected Runnable createRunnable(String tableName, Authorizations auths) { return () -> { - try (Scanner scanner = config.getClient().createScanner(tableName, auths)) { - String hintKey = config.getTableHints().containsKey(EXPANSION_HINT_KEY) ? EXPANSION_HINT_KEY : config.getIndexTableName(); - scanner.setExecutionHints(Map.of(tableName, hintKey)); + // @formatter:off + builder = ScannerBuilder.create(config.getClient()) + .setTableName(tableName) + .setAuthorizations(auths); + // @formatter:on + + // only set the consistency level if configured + ConsistencyLevel consistencyLevel = ExecutionHintHelper.getConsistencyLevel(tableName, config.getTableConsistencyLevels()); + if (consistencyLevel != null) { + builder.setConsistencyLevel(consistencyLevel); + } + + // only set execution hints if configured + Map executionHints = ExecutionHintHelper.getExecutionHints(EXPANSION_HINT_KEY, config.getIndexTableName(), config.getTableHints()); + if (executionHints != null) { + builder.setScanType(ExecutionHintHelper.getScanType(executionHints)); + builder.setScanPriority(ExecutionHintHelper.getPriority(executionHints)); + } + + try (Scanner scanner = builder.build()) { scanner.setRange(range); for (String field : fields) { diff --git a/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/UnfieldedRegexIndexLookup.java b/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/UnfieldedRegexIndexLookup.java index 6805dd04df0..7518c72d165 100644 --- a/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/UnfieldedRegexIndexLookup.java +++ b/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/UnfieldedRegexIndexLookup.java @@ -1,5 +1,7 @@ package datawave.query.jexl.lookups; +import static datawave.query.jexl.lookups.ShardIndexQueryTableStaticMethods.EXPANSION_HINT_KEY; + import java.util.Collections; import java.util.Map; import java.util.Objects; @@ -8,6 +10,7 @@ import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; @@ -20,6 +23,8 @@ import datawave.core.iterators.UnfieldedRegexExpansionIterator; import datawave.query.config.ShardQueryConfiguration; +import datawave.query.scan.ExecutionHintHelper; +import datawave.query.scan.ScannerBuilder; import datawave.query.tables.ScannerFactory; import datawave.util.time.DateHelper; @@ -70,9 +75,27 @@ public void submit() { protected Runnable createRunnable() { return () -> { String tableName = reverse ? config.getReverseIndexTableName() : getTableName(); - try (Scanner scanner = config.getClient().createScanner(tableName, config.getAuthorizations().iterator().next())) { - String hintKey = getHintKey(tableName); - scanner.setExecutionHints(Map.of(tableName, hintKey)); + + // @formatter:off + builder = ScannerBuilder.create(config.getClient()) + .setTableName(tableName) + .setAuthorizations(config.getAuthorizations().iterator().next()); + // @formatter:on + + // only set the consistency level if configured + ConsistencyLevel consistencyLevel = ExecutionHintHelper.getConsistencyLevel(tableName, config.getTableConsistencyLevels()); + if (consistencyLevel != null) { + builder.setConsistencyLevel(consistencyLevel); + } + + // only set execution hints if configured + Map executionHints = ExecutionHintHelper.getExecutionHints(EXPANSION_HINT_KEY, config.getIndexTableName(), config.getTableHints()); + if (executionHints != null) { + builder.setScanType(ExecutionHintHelper.getScanType(executionHints)); + builder.setScanPriority(ExecutionHintHelper.getPriority(executionHints)); + } + + try (Scanner scanner = builder.build()) { IteratorSetting regexIterator = createRegexIterator(); scanner.addScanIterator(regexIterator); diff --git a/warehouse/query-core/src/test/java/datawave/query/jexl/lookups/BoundedRangeIndexLookupTest.java b/warehouse/query-core/src/test/java/datawave/query/jexl/lookups/BoundedRangeIndexLookupTest.java index fa648640e99..949d7486967 100644 --- a/warehouse/query-core/src/test/java/datawave/query/jexl/lookups/BoundedRangeIndexLookupTest.java +++ b/warehouse/query-core/src/test/java/datawave/query/jexl/lookups/BoundedRangeIndexLookupTest.java @@ -11,6 +11,7 @@ import java.util.AbstractMap; import java.util.ArrayList; import java.util.Date; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -23,6 +24,7 @@ import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.data.Key; @@ -241,6 +243,70 @@ public void testWithNoBackingData() { test(lookup, "FIELD_A"); } + @Test + public void testExecutionHints_expansionPoolSelectedOverIndexTable() { + withDateRange("20240701", "20240701"); + withDatatypeFilter(Set.of("datatype-b")); + withExpected(Set.of("value-1")); + + Map expansionHints = new HashMap<>(); + expansionHints.put("scan_type", "expansion-pool-a"); + expansionHints.put("priority", "2"); + + Map indexHints = new HashMap<>(); + indexHints.put("scan_type", "index-a"); + indexHints.put("priority", "1"); + + Map> tableHints = new HashMap<>(); + tableHints.put("expansion", expansionHints); + tableHints.put("shardIndex", indexHints); + config.setTableHints(tableHints); + + BoundedRangeIndexLookup lookup = createLookup("FIELD_A", "value-1", "value-1"); + test(lookup, "FIELD_A"); + + assertNotNull(lookup.builder); + assertEquals(expansionHints, lookup.builder.getExecutionHints()); + } + + @Test + public void testExecutionHints_indexTableNameSelectedWhenNoExpansionPoolExists() { + withDateRange("20240701", "20240701"); + withDatatypeFilter(Set.of("datatype-b")); + withExpected(Set.of("value-1")); + + Map indexHints = new HashMap<>(); + indexHints.put("scan_type", "index-a"); + indexHints.put("priority", "1"); + + Map> tableHints = new HashMap<>(); + tableHints.put("shardIndex", indexHints); + config.setTableHints(tableHints); + + BoundedRangeIndexLookup lookup = createLookup("FIELD_A", "value-1", "value-1"); + test(lookup, "FIELD_A"); + + assertNotNull(lookup.builder); + assertEquals(indexHints, lookup.builder.getExecutionHints()); + } + + @Test + public void testConsistencyLevel() { + withDateRange("20240701", "20240701"); + withDatatypeFilter(Set.of("datatype-b")); + withExpected(Set.of("value-1")); + + Map consistencyLevels = new HashMap<>(); + consistencyLevels.put("shardIndex", ConsistencyLevel.EVENTUAL); + config.setTableConsistencyLevels(consistencyLevels); + + BoundedRangeIndexLookup lookup = createLookup("FIELD_A", "value-1", "value-1"); + test(lookup, "FIELD_A"); + + assertNotNull(lookup.builder); + assertEquals(ConsistencyLevel.EVENTUAL, lookup.builder.getConsistencyLevel()); + } + private void test(BoundedRangeIndexLookup lookup, String field) { lookup.submit(); diff --git a/warehouse/query-core/src/test/java/datawave/query/jexl/lookups/FieldedRegexIndexLookupTest.java b/warehouse/query-core/src/test/java/datawave/query/jexl/lookups/FieldedRegexIndexLookupTest.java index 8405ba4866d..973f6f1012a 100644 --- a/warehouse/query-core/src/test/java/datawave/query/jexl/lookups/FieldedRegexIndexLookupTest.java +++ b/warehouse/query-core/src/test/java/datawave/query/jexl/lookups/FieldedRegexIndexLookupTest.java @@ -1,11 +1,17 @@ package datawave.query.jexl.lookups; +import static org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.fail; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.Set; +import org.apache.accumulo.core.client.ScannerBase; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException; import org.apache.commons.jexl3.parser.JexlNode; @@ -24,6 +30,8 @@ public class FieldedRegexIndexLookupTest extends BaseIndexLookupTest { private static final Logger log = LoggerFactory.getLogger(FieldedRegexIndexLookupTest.class); + private AsyncIndexLookup lookup; + @Test public void testExpansionWithNodata() { // no data @@ -206,6 +214,70 @@ public void testReverseExpansionMultipleValues() { assertResultValues("FIELD_A", Set.of("tim", "tam")); } + @Test + public void testExecutionHints_expansionPoolSelectedOverIndexTable() { + write("bar", "FIELD_A"); + withQuery("FIELD_A =~ 'ba.*'"); + + Map expansionHints = new HashMap<>(); + expansionHints.put("scan_type", "expansion-pool-a"); + expansionHints.put("priority", "2"); + + Map indexHints = new HashMap<>(); + indexHints.put("scan_type", "index-a"); + indexHints.put("priority", "1"); + + Map> tableHints = new HashMap<>(); + tableHints.put("expansion", expansionHints); + tableHints.put("shardIndex", indexHints); + config.setTableHints(tableHints); + + executeLookup(); + assertResultFields(Set.of("FIELD_A")); + assertResultValues("FIELD_A", Set.of("bar")); + + assertNotNull(lookup.builder); + assertEquals(expansionHints, lookup.builder.getExecutionHints()); + } + + @Test + public void testExecutionHints_indexTableNameSelectedWhenNoExpansionPoolExists() { + write("bar", "FIELD_A"); + withQuery("FIELD_A =~ 'ba.*'"); + + Map indexHints = new HashMap<>(); + indexHints.put("scan_type", "index-a"); + indexHints.put("priority", "1"); + + Map> tableHints = new HashMap<>(); + tableHints.put("shardIndex", indexHints); + config.setTableHints(tableHints); + + executeLookup(); + assertResultFields(Set.of("FIELD_A")); + assertResultValues("FIELD_A", Set.of("bar")); + + assertNotNull(lookup.builder); + assertEquals(indexHints, lookup.builder.getExecutionHints()); + } + + @Test + public void testConsistencyLevel() { + write("bar", "FIELD_A"); + withQuery("FIELD_A =~ 'ba.*'"); + + Map consistencyLevels = new HashMap<>(); + consistencyLevels.put("shardIndex", ConsistencyLevel.EVENTUAL); + config.setTableConsistencyLevels(consistencyLevels); + + executeLookup(); + assertResultFields(Set.of("FIELD_A")); + assertResultValues("FIELD_A", Set.of("bar")); + + assertNotNull(lookup.builder); + assertEquals(ConsistencyLevel.EVENTUAL, lookup.builder.getConsistencyLevel()); + } + @Override protected void executeLookup() { try { @@ -220,7 +292,7 @@ protected void executeLookup() { Range range = desc.range; boolean reverse = desc.isForReverseIndex; - AsyncIndexLookup lookup = createLookup(field, value, range, reverse); + lookup = createLookup(field, value, range, reverse); executeLookup(lookup); } catch (Exception e) { log.error(e.getMessage(), e); diff --git a/warehouse/query-core/src/test/java/datawave/query/jexl/lookups/UnfieldedLiteralIndexLookupTest.java b/warehouse/query-core/src/test/java/datawave/query/jexl/lookups/UnfieldedLiteralIndexLookupTest.java index b3e78cf2f11..a5cea368e4d 100644 --- a/warehouse/query-core/src/test/java/datawave/query/jexl/lookups/UnfieldedLiteralIndexLookupTest.java +++ b/warehouse/query-core/src/test/java/datawave/query/jexl/lookups/UnfieldedLiteralIndexLookupTest.java @@ -1,11 +1,16 @@ package datawave.query.jexl.lookups; import static datawave.core.iterators.TimeoutExceptionIterator.EXCEPTEDVALUE; +import static org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.fail; +import java.util.HashMap; +import java.util.Map; import java.util.Set; +import org.apache.accumulo.core.client.ScannerBase; import org.apache.commons.jexl3.parser.JexlNode; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -23,6 +28,8 @@ public class UnfieldedLiteralIndexLookupTest extends BaseIndexLookupTest { private static final Logger log = LoggerFactory.getLogger(UnfieldedLiteralIndexLookupTest.class); + private AsyncIndexLookup lookup; + @Test public void testValueDoesNotExpand() { // no data @@ -96,6 +103,70 @@ public void testExpansionHitsValueThreshold_singleValueMultiField() { } } + @Test + public void testExecutionHints_expansionPoolSelectedOverIndexTable() { + write("bar", "FIELD_A"); + withQuery("_ANYFIELD_ == 'bar'"); + + Map expansionHints = new HashMap<>(); + expansionHints.put("scan_type", "expansion-pool-a"); + expansionHints.put("priority", "2"); + + Map indexHints = new HashMap<>(); + indexHints.put("scan_type", "index-a"); + indexHints.put("priority", "1"); + + Map> tableHints = new HashMap<>(); + tableHints.put("expansion", expansionHints); + tableHints.put("shardIndex", indexHints); + config.setTableHints(tableHints); + + executeLookup(); + assertResultFields(Set.of("FIELD_A")); + assertResultValues("FIELD_A", Set.of("bar")); + + assertNotNull(lookup.builder); + assertEquals(expansionHints, lookup.builder.getExecutionHints()); + } + + @Test + public void testExecutionHints_indexTableNameSelectedWhenNoExpansionPoolExists() { + write("bar", "FIELD_A"); + withQuery("_ANYFIELD_ == 'bar'"); + + Map indexHints = new HashMap<>(); + indexHints.put("scan_type", "index-a"); + indexHints.put("priority", "1"); + + Map> tableHints = new HashMap<>(); + tableHints.put("shardIndex", indexHints); + config.setTableHints(tableHints); + + executeLookup(); + assertResultFields(Set.of("FIELD_A")); + assertResultValues("FIELD_A", Set.of("bar")); + + assertNotNull(lookup.builder); + assertEquals(indexHints, lookup.builder.getExecutionHints()); + } + + @Test + public void testConsistencyLevel() { + write("bar", "FIELD_A"); + withQuery("_ANYFIELD_ == 'bar'"); + + Map consistencyLevels = new HashMap<>(); + consistencyLevels.put("shardIndex", ConsistencyLevel.EVENTUAL); + config.setTableConsistencyLevels(consistencyLevels); + + executeLookup(); + assertResultFields(Set.of("FIELD_A")); + assertResultValues("FIELD_A", Set.of("bar")); + + assertNotNull(lookup.builder); + assertEquals(ConsistencyLevel.EVENTUAL, lookup.builder.getConsistencyLevel()); + } + @Override protected void executeLookup() { try { @@ -107,7 +178,7 @@ protected void executeLookup() { Object literal = JexlASTHelper.getLiteralValueSafely(node); String value = String.valueOf(literal); - AsyncIndexLookup lookup = createLookup(value); + lookup = createLookup(value); executeLookup(lookup); } catch (Exception e) { log.error(e.getMessage(), e); diff --git a/warehouse/query-core/src/test/java/datawave/query/jexl/lookups/UnfieldedRegexIndexLookupTest.java b/warehouse/query-core/src/test/java/datawave/query/jexl/lookups/UnfieldedRegexIndexLookupTest.java index 203580c88de..8cc4eb420a3 100644 --- a/warehouse/query-core/src/test/java/datawave/query/jexl/lookups/UnfieldedRegexIndexLookupTest.java +++ b/warehouse/query-core/src/test/java/datawave/query/jexl/lookups/UnfieldedRegexIndexLookupTest.java @@ -1,10 +1,15 @@ package datawave.query.jexl.lookups; +import static org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.Set; +import org.apache.accumulo.core.client.ScannerBase; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException; import org.apache.commons.jexl3.parser.JexlNode; @@ -24,6 +29,8 @@ */ public class UnfieldedRegexIndexLookupTest extends BaseIndexLookupTest { + private AsyncIndexLookup lookup; + @Test public void testExpansionZeroHits() throws Exception { // no data @@ -226,6 +233,70 @@ public void testReverseExpandsIntoMultipleFieldsWithMultipleValues() throws Exce assertResultValues("FIELD_B", Set.of("tim", "tam")); } + @Test + public void testExecutionHints_expansionPoolSelectedOverIndexTable() throws Exception { + write("bar", "FIELD"); + withQuery("_ANYFIELD_ =~ 'ba.*'"); + + Map expansionHints = new HashMap<>(); + expansionHints.put("scan_type", "expansion-pool-a"); + expansionHints.put("priority", "2"); + + Map indexHints = new HashMap<>(); + indexHints.put("scan_type", "index-a"); + indexHints.put("priority", "1"); + + Map> tableHints = new HashMap<>(); + tableHints.put("expansion", expansionHints); + tableHints.put("shardIndex", indexHints); + config.setTableHints(tableHints); + + executeLookup(); + assertResultFields(Set.of("FIELD")); + assertResultValues("FIELD", Set.of("bar")); + + assertNotNull(lookup.builder); + assertEquals(expansionHints, lookup.builder.getExecutionHints()); + } + + @Test + public void testExecutionHints_indexTableNameSelectedWhenNoExpansionPoolExists() throws Exception { + write("bar", "FIELD"); + withQuery("_ANYFIELD_ =~ 'ba.*'"); + + Map indexHints = new HashMap<>(); + indexHints.put("scan_type", "index-a"); + indexHints.put("priority", "1"); + + Map> tableHints = new HashMap<>(); + tableHints.put("shardIndex", indexHints); + config.setTableHints(tableHints); + + executeLookup(); + assertResultFields(Set.of("FIELD")); + assertResultValues("FIELD", Set.of("bar")); + + assertNotNull(lookup.builder); + assertEquals(indexHints, lookup.builder.getExecutionHints()); + } + + @Test + public void testConsistencyLevel() throws Exception { + write("bar", "FIELD"); + withQuery("_ANYFIELD_ =~ 'ba.*'"); + + Map consistencyLevels = new HashMap<>(); + consistencyLevels.put("shardIndex", ConsistencyLevel.EVENTUAL); + config.setTableConsistencyLevels(consistencyLevels); + + executeLookup(); + assertResultFields(Set.of("FIELD")); + assertResultValues("FIELD", Set.of("bar")); + + assertNotNull(lookup.builder); + assertEquals(ConsistencyLevel.EVENTUAL, lookup.builder.getConsistencyLevel()); + } + /** * Build an index lookup from the query and store the results */ @@ -243,7 +314,7 @@ protected void executeLookup() throws Exception { Range range = desc.range; boolean reverse = desc.isForReverseIndex; - AsyncIndexLookup lookup = createLookup(value, range, reverse, null); + lookup = createLookup(value, range, reverse, null); executeLookup(lookup); } From 9c52c951eb055d6db51739509426d229bf44ba24 Mon Sep 17 00:00:00 2001 From: Moriarty <22225248+apmoriarty@users.noreply.github.com> Date: Wed, 20 May 2026 12:56:49 +0000 Subject: [PATCH 2/2] Make constants final --- .../main/java/datawave/query/scan/ExecutionHintHelper.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/warehouse/core/src/main/java/datawave/query/scan/ExecutionHintHelper.java b/warehouse/core/src/main/java/datawave/query/scan/ExecutionHintHelper.java index 1dba68fdf0a..ca87e84f81a 100644 --- a/warehouse/core/src/main/java/datawave/query/scan/ExecutionHintHelper.java +++ b/warehouse/core/src/main/java/datawave/query/scan/ExecutionHintHelper.java @@ -20,8 +20,8 @@ public class ExecutionHintHelper { private static final Logger log = LoggerFactory.getLogger(ExecutionHintHelper.class); - public static String SCAN_TYPE = "scan_type"; - public static String PRIORITY = "priority"; + public static final String SCAN_TYPE = "scan_type"; + public static final String PRIORITY = "priority"; private ExecutionHintHelper() { // enforce static access