Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;

import datawave.query.attributes.Attribute;
import datawave.query.attributes.Attributes;
Expand All @@ -28,17 +30,34 @@ public static String chooseBestIdentifier(List<String> identifiers) {

/**
* Choose the best language from a non-null, non-empty list of languages, otherwise return null.
* <p>
* If preferredLanguages is provided, find the intersection of available and preferred languages
* <p>
* If YAKE cannot provide a language, take the first available language
*
* @param languages
* a list to choose from
* @param preferredLanguages
* an optional set of preferred language in upper case
* @return the best identifier or null.
*/
public static String chooseBestLanguage(List<String> languages) {
public static String chooseBestLanguage(List<String> languages, Set<String> preferredLanguages) {
if (languages == null || languages.isEmpty()) {
return null;
}

for (String language : languages) {
List<String> normalizedLanguages = languages.stream().map(String::toUpperCase).collect(Collectors.toList());
List<String> availablePreferredLanguages = normalizedLanguages;
if (preferredLanguages != null && !preferredLanguages.isEmpty()) {
availablePreferredLanguages = normalizedLanguages.stream().filter(preferredLanguages::contains).collect(Collectors.toList());

if (availablePreferredLanguages.isEmpty()) {
// no overlap so revert back to available languages
availablePreferredLanguages = normalizedLanguages;
}
}

for (String language : availablePreferredLanguages) {
// if the language can't be found in the language registry, the language
// registry will return English. So, if the language name returned by the
// registry and the input language name match - it confirms we have
Expand All @@ -52,7 +71,7 @@ public static String chooseBestLanguage(List<String> languages) {

// if we get here, we couldn't find an ideal language, just return the first value, yake will default
// to processing the data as if it were English.
return languages.get(0);
return availablePreferredLanguages.get(0);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;

import org.apache.accumulo.core.client.AccumuloClient;
Expand Down Expand Up @@ -42,8 +44,10 @@ protected Query buildLatterQuery(Query initialQuery, Iterator<Entry<Key,Value>>
public Iterator<Entry<Key,Value>> runChainedQuery(AccumuloClient client, Query initialQuery, Set<Authorizations> auths,
Iterator<Entry<Key,Value>> initialQueryResults, QueryLogic<Entry<Key,Value>> latterQueryLogic) {

final List<TagCloudInputExtractor> activeExtractors = getActiveExtractors(initialQuery);
final boolean buildKeywordCloud = isKeywordCloudRequested(initialQuery);
String[] categories = getCategories(initialQuery);
final List<TagCloudInputExtractor> activeExtractors = getActiveExtractors(categories);
final boolean buildKeywordCloud = isKeywordCloudRequested(categories);
Set<String> requiredLanguages = getRequiredLanguages(categories);

Iterator<Entry<Key,Value>> wrapped = new Iterator<>() {
private Iterator<Entry<Key,Value>> batchIterator;
Expand All @@ -53,7 +57,7 @@ public boolean hasNext() {
while (batchIterator == null || (!batchIterator.hasNext() && initialQueryResults.hasNext())) {
try {
StatefulKeywordUUIDChainStrategy statefulChainStrategy = new StatefulKeywordUUIDChainStrategy(initialQuery, latterQueryLogic,
activeExtractors, buildKeywordCloud);
activeExtractors, buildKeywordCloud, requiredLanguages);
statefulChainStrategy.setBatchSize(batchSize);
batchIterator = statefulChainStrategy.runChainedQuery(client, initialQuery, auths, initialQueryResults, latterQueryLogic);
} catch (Exception e) {
Expand Down Expand Up @@ -95,33 +99,62 @@ private String[] getCategories(Query settings) {
/**
* Check if the keyword cloud should be constructed
*
* @param settings
* @param categories
* @return
*/
private boolean isKeywordCloudRequested(Query settings) {
String[] categories = getCategories(settings);
private boolean isKeywordCloudRequested(String[] categories) {
for (String category : categories) {
if (category.equals(KEYWORD_CATEGORY)) {
if (category.equals(KEYWORD_CATEGORY) || category.startsWith(KEYWORD_CATEGORY + ".")) {
return true;
}
}

return false;
}

/**
*
* @param categories
* the non-null parsed categories
* @return set of languages to restrict keyword clouds to in upper case
*/
private Set<String> getRequiredLanguages(String[] categories) {
Set<String> requiredLanguages = new HashSet<>();
for (String category : categories) {
if (category.startsWith(KEYWORD_CATEGORY + ".") && category.length() > KEYWORD_CATEGORY.length() + 1) {
requiredLanguages.add(category.substring(KEYWORD_CATEGORY.length() + 1).toUpperCase());
}
}

return requiredLanguages;
}

/**
* pull parameters to determine which type of tag cloud we are generating. All extraction is triggered beyond this point for the given ids
*
* @param settings
* @param categories
* @return
*/
private List<TagCloudInputExtractor> getActiveExtractors(Query settings) {
private List<TagCloudInputExtractor> getActiveExtractors(String[] categories) {
List<TagCloudInputExtractor> activeExtractors = new ArrayList<>();

for (String name : getCategories(settings)) {
for (String name : categories) {
boolean found = false;
String subType = null;
if (name.indexOf(".") > 0) {
// split into category and subtype
String[] splits = name.split("\\.");
if (splits.length > 2) {
throw new IllegalArgumentException(
name + " is malformed. When specifying a subType with a category separate the category and subType with a single .");
} else if (splits.length == 2) {
name = splits[0];
subType = splits[1];
}
}
for (TagCloudInputExtractor extractor : extractors) {
if (extractor.getName().equals(name) && !activeExtractors.contains(extractor)) {
if (extractor.getName().equals(name) && (subType == null || Objects.equals(extractor.getSubType(), subType))
&& !activeExtractors.contains(extractor)) {
activeExtractors.add(extractor);
found = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,17 @@ public class StatefulKeywordUUIDChainStrategy extends FullChainStrategy<Entry<Ke
private final List<TagCloudInputExtractor> extractors;
// will be true when a keyword query should be run, false otherwise
private final boolean runKeywordQuery;
// used to filter keyword results that don't match the given language. Stored in uppercase
private final Set<String> requiredLanguages;
private boolean addedExtractedData = false;

public StatefulKeywordUUIDChainStrategy(Query settings, QueryLogic<Entry<Key,Value>> nextLogic, List<TagCloudInputExtractor> extractors,
boolean runKeywordQuery) {
boolean runKeywordQuery, Set<String> requiredLanguages) {
this.deserializer = DocumentSerialization.getDocumentDeserializer(settings);
this.nextLogic = nextLogic;
this.extractors = extractors;
this.runKeywordQuery = runKeywordQuery;
this.requiredLanguages = requiredLanguages;
}

public int getBatchSize() {
Expand Down Expand Up @@ -137,12 +140,12 @@ public String captureResultsAndBuildQuery(Iterator<Entry<Key,Value>> initialQuer
}
}

if (runKeywordQuery) {
if (runKeywordQuery && hasRequiredLanguage(documentData)) {
// run query term extraction for next logic if needed
queryTerms.add(extractKeywordQueryTerm(docId, documentData));
}

count++;
count++;
}
}

if (nextLogic instanceof KeywordQueryLogic) {
Expand Down Expand Up @@ -173,6 +176,30 @@ public String captureResultsAndBuildQuery(Iterator<Entry<Key,Value>> initialQuer
return queryTerms.isEmpty() ? null : StringUtils.join(queryTerms, " ");
}

/**
* If requiredLanguages have been specified, return true if at least one language matches a required language
*
* @param documentData
* @return true if there are no required languages, or the document matches at least one required language
*/
private boolean hasRequiredLanguage(Map<String,Attribute<? extends Comparable<?>>> documentData) {
if (requiredLanguages.isEmpty()) {
return true;
}

Attribute<?> langaugeAttribute = documentData.get("LANGUAGE");
if (langaugeAttribute != null) {
List<String> languages = KeywordQueryUtil.getStringValuesFromAttribute(langaugeAttribute);
for (String language : languages) {
if (requiredLanguages.contains(language.toUpperCase())) {
return true;
}
}
}

return false;
}

/**
* Generates queries for the KeywordQueryLogic. Minimally they will include things like:
*
Expand Down Expand Up @@ -214,7 +241,7 @@ private String extractKeywordQueryTerm(String docId, Map<String,Attribute<? exte
log.trace("No identifier found for query " + queryTerm);
}

if (((language = KeywordQueryUtil.chooseBestLanguage(languages)) != null)) {
if (((language = KeywordQueryUtil.chooseBestLanguage(languages, requiredLanguages)) != null)) {
if (log.isTraceEnabled()) {
log.trace("Chose best language '" + languages + "' from '" + languages + "' for query " + queryTerm);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ public double getMinScore() {
return minScore;
}

@Override
public String getSubType() {
return subType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,42 @@
import datawave.query.tables.keyword.transform.TagCloudInputTransformer;
import datawave.util.keyword.TagCloudPartition;

/**
* Interface for extracting tag cloud input data from Key/Value pairs during keyword query processing.
* <p>
* Tag cloud extractors are responsible for transforming document field data into {@link TagCloudPartition} objects. Implementations extract relevant fields
* from document data, apply scoring logic, and accumulate the results into partitions for tag clouds.
* <p>
* The extraction lifecycle follows this pattern:
* <ol>
* <li>Optional initialization via {@link #initialize(Query)} with query settings</li>
* <li>Repeated calls to {@link #extract(Key, Map)} for each document</li>
* <li>Retrieval of accumulated results via {@link #get()}</li>
* <li>Reset state via {@link #clear()} when moving to the next partition</li>
* </ol>
*
* @see TagCloudPartition
* @see TagCloudInputTransformer
* @see FieldedTagCloudInputExtractor
* @see ParameterFieldedTagCloudInputExtractor
*/
public interface TagCloudInputExtractor {
/**
* Extracts a document identifier from an event Key in the shard table
* <p>
* The document ID is constructed from the shard row key and column family, which follows the format:
* <ul>
* <li>row: yyyymmDD_X (e.g. 20260303_0)</li>
* <li>cf: dataType\0uid</li>
* </ul>
* The resulting document id format is: {@code shardId/dataType/uid}
*
* @param source
* the shard event Key
* @return document identifier in the format "shardId/dataType/uid"
* @throws IllegalArgumentException
* if the cf does not contain the null byte separator
*/
default String getDocId(Key source) {
String row = source.getRow().toString();
String cf = source.getColumnFamily().toString();
Expand All @@ -24,15 +59,81 @@ default String getDocId(Key source) {
return row + "/" + dataType + "/" + uid;
}

/**
* Initialize the extractor with query-specific settings
* <p>
* This optional hook allows implementations to configure themselves based on the query parameters before extraction begins. By default, does nothing
*
* @param settings
*/
default void initialize(Query settings) {}

/**
* Returns the name or category identifier of this extractor
* <p>
* The name is used to identify the partition category and group related tag cloud results
*
* @return the extractor name or category identifier
*/
String getName();

/**
* Returns the name or category subtype identifier
* <p>
* This name is used to create subgroups of data within a given name or category when creating tag cloud results
* <p>
* This is an optional parameter
*
* @return the optional subType name or category if set, otherwise null
*/
String getSubType();
Comment thread
FineAndDandy marked this conversation as resolved.

/**
* Extracts tag cloud input data from a single document's fielded data
* <p>
* This method processes the document's fields, applies scoring logic, and accumulates the results into an internal partition. The accumulated data can be
* retrieved via {@link #get()}
*
* @param source
* the Key for the document
* @param documentData
* the document's field data
* @throws TagCloudInputExtractorException
* if extraction fails due to malformed data or configuration issues
* @see #get()
* @see #clear()
*/
void extract(Key source, Map<String,Attribute<? extends Comparable<?>>> documentData) throws TagCloudInputExtractorException;

/**
* Retrieves the accumulated tag cloud partition containing all extracted data
* <p>
* This method returns the partition built up through repeated calls to {@link #extract(Key, Map)}. The partition contains aggregated term frequencies and
* scores across all processed documents
*
* @return the accumulated tag cloud partition, or null if no data has been extracted
* @see #extract(Key, Map)
* @see #clear()
*/
TagCloudPartition get();

/**
* Clears the internal state and resets the accumulated partition
* <p>
* This method should be called after retrieving results via {@link #get()} to prepare the extractor for processing the next partition
*
* @see #get()
*/
void clear();

/**
* Returns the transformer used to convert the partition into the final output format
* <p>
* The transformer is responsible for converting the accumulated {@link TagCloudPartition} into the appropriate response format
*
* @return the input transformer for this extractor's partition type
* @see TagCloudInputTransformer
* @see TagCloudPartition
*/
TagCloudInputTransformer<TagCloudPartition> getInputTransformer();
}
Loading
Loading