Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ public DiscoveredThing(String term, String field, String type, String date, Stri
this.countsByColumnVisibility = countsByColumnVisibility;
}

public DiscoveredThing(String term, String columnVisibility) {
this.term = term;
this.field = "";
this.type = "";
this.date = "";
this.columnVisibility = columnVisibility;
this.count = new VLongWritable(0L);
this.countsByColumnVisibility = new MapWritable();
}

public DiscoveredThing() {
count = new VLongWritable();
countsByColumnVisibility = new MapWritable();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package datawave.query.discovery;

import java.util.function.UnaryOperator;

public class DiscoveredThingValuesOnlyConditionalTransformer implements UnaryOperator<DiscoveredThing> {
Comment thread
hoper-38709 marked this conversation as resolved.

boolean valuesOnly;
Comment thread
hoper-38709 marked this conversation as resolved.
Outdated

DiscoveredThingValuesOnlyConditionalTransformer(boolean valuesOnly) {
this.valuesOnly = valuesOnly;
}

public DiscoveredThing apply(DiscoveredThing dt) {
if (valuesOnly) {
return new DiscoveredThing(dt.getTerm(), dt.getColumnVisibility());
}
return dt;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
Expand All @@ -19,6 +20,8 @@
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
Expand All @@ -41,11 +44,14 @@ public class DiscoveryIterator implements SortedKeyValueIterator<Key,Value> {

private Key key;
private Value value;
private Range lastRange;
private Collection<ByteSequence> columnFamilies;
private SortedKeyValueIterator<Key,Value> iterator;
private boolean separateCountsByColVis = false;
private boolean showReferenceCount = false;
private boolean reverseIndex = false;
private boolean sumCounts = false;
private boolean valuesOnly = false;

@Override
public DiscoveryIterator deepCopy(IteratorEnvironment env) {
Expand All @@ -54,14 +60,25 @@ public DiscoveryIterator deepCopy(IteratorEnvironment env) {
return copy;
}

@Override
public void next() throws IOException {
this.key = null;
this.value = null;
private void termsOnlyOperation() throws IOException {
while (Optional.ofNullable(iterator).isPresent() && iterator.hasTop()) {
// Get the entries to aggregate.
boolean firstTermOnly = true;
Set<TermInterface> terms = getTermsOnly(firstTermOnly);
if (terms.isEmpty()) {
log.trace("Couldn't aggregate index info; moving onto next date/field/term if data is available.");
} else {
List<DiscoveredThing> things = List.of(aggregate(terms));
setTop(things);
return;
}
}
}

while (iterator.hasTop() && key == null) {
private void standardOperation() throws IOException {
while (Optional.ofNullable(iterator).isPresent() && iterator.hasTop() && key == null) {
// Get the entries to aggregate.
Multimap<String,TermEntry> terms = getTermsByDatatype();
Multimap<String,TermInterface> terms = getTermsByDatatype();
if (terms.isEmpty()) {
log.trace("Couldn't aggregate index info; moving onto next date/field/term if data is available.");
} else {
Expand All @@ -74,21 +91,37 @@ public void next() throws IOException {
}
}
}
log.trace("No data found.");
}

@Override
public void next() throws IOException {
Comment thread
hoper-38709 marked this conversation as resolved.
Outdated
this.key = null;
this.value = null;

// Keep this "strategy" non-parameterized for now.
if (this.valuesOnly) {
termsOnlyOperation();
} else {
standardOperation();
}
}

/**
* Return a multimap containing mappings of datatypes to term entries that should be aggregated.
*/
private Multimap<String,TermEntry> getTermsByDatatype() throws IOException {
Multimap<String,TermEntry> terms = ArrayListMultimap.create();
private Multimap<String,TermInterface> getTermsByDatatype() throws IOException {
Multimap<String,TermInterface> terms = ArrayListMultimap.create();
Key start = new Key(iterator.getTopKey());
Key key;
// If we should sum up counts, we want to collect the term entries for each date seen for the current field and term of start. Otherwise, we only want
// to collect the term entries for the current field, term, and date of start.
// If we should sum up counts, we want to collect the term entries for each date seen for the current field and term of start.
// Otherwise, we only want to collect the term entries for the current field, term, and date of start.
BiFunction<Key,Key,Boolean> dateMatchingFunction = sumCounts ? (first, second) -> true : this::datesMatch;

// Find all matching entries and parse term entries from them.
while (iterator.hasTop() && start.equals((key = iterator.getTopKey()), PartialKey.ROW_COLFAM) && dateMatchingFunction.apply(start, key)) {
//@formatter:off
while (iterator.hasTop() &&
start.equals((key = iterator.getTopKey()), PartialKey.ROW_COLFAM) &&
Comment thread
hoper-38709 marked this conversation as resolved.
Outdated
dateMatchingFunction.apply(start, key)) {
TermEntry termEntry = new TermEntry(key, iterator.getTopValue());
if (termEntry.isValid())
terms.put(termEntry.getDatatype(), termEntry);
Expand All @@ -99,6 +132,37 @@ private Multimap<String,TermEntry> getTermsByDatatype() throws IOException {
}
iterator.next();
}
//@formatter:on
return terms;
}

private Set<TermInterface> getTermsOnly(boolean firstTermOnly) throws IOException {
Set<TermInterface> terms = new HashSet<>();
Key start = new Key(iterator.getTopKey());
Key key;

// Find all matching entries and parse term entries from them.
//@formatter:off
while (iterator.hasTop() &&
start.equals((key = iterator.getTopKey()), PartialKey.ROW)) {
TermOnlyEntry termEntry = new TermOnlyEntry(key, iterator.getTopValue());

if (termEntry.isValid()) {
terms.add(termEntry);
return terms;
// // Return the first term only.
// if (firstTermOnly){
// iterator.next();
// break;
// }
} else {
if (log.isTraceEnabled()) {
log.trace("Received invalid term entry from key: " + key);
}
}
iterator.next();
}
//@formatter:on
return terms;
}

Expand All @@ -119,20 +183,23 @@ private boolean datesMatch(Key left, Key right) {
/**
* Return the given term entries aggregated into a single {@link DiscoveredThing} if possible, or return null if any issues occurred.
*/
private DiscoveredThing aggregate(Collection<TermEntry> termEntries) {
private DiscoveredThing aggregate(Collection<TermInterface> termEntries) {
if (termEntries.isEmpty()) {
return null;
} else {
TermEntry first = termEntries.iterator().next();
TermInterface first = termEntries.iterator().next();
String term = reverseIndex ? new StringBuilder(first.getTerm()).reverse().toString() : first.getTerm();
String date = sumCounts ? "" : first.date;
String date = sumCounts ? "" : first.getDate();
// if (valuesOnly) {
// date = "";
// }

Set<ColumnVisibility> visibilities = new HashSet<>();
Map<String,Long> visibilityToCounts = new HashMap<>();
long count = 0L;

// Aggregate the counts and visibilities from each entry.
for (TermEntry termEntry : termEntries) {
for (TermInterface termEntry : termEntries) {
// Fetch the count to aggregate based of whether we should show the term count or the reference count.
long currentCount = this.showReferenceCount ? termEntry.getUidListSize() : termEntry.getUidCount();
try {
Expand Down Expand Up @@ -179,26 +246,43 @@ private DiscoveredThing aggregate(Collection<TermEntry> termEntries) {
/**
* Set the top {@link Key} and {@link Value} of this iterator, created from the given list of {@link DiscoveredThing} instances.
*/
private void setTop(List<DiscoveredThing> things) {
// We want the key to be the last possible key for this date. Return the key as it is in the index (reversed if necessary) to ensure the keys are
// consistent with the initial seek range.
private void setTop(List<DiscoveredThing> things) throws IOException {

// We want the key to be the last possible key for this date. Return the key as it is in the index (reversed if
// necessary) to ensure the keys are consistent with the initial seek range.
DiscoveredThing thing = things.get(0);
String row = (this.reverseIndex ? new StringBuilder().append(thing.getTerm()).reverse().toString() : thing.getTerm());
Key newKey = new Key(row, thing.getField(), thing.getDate() + "\uffff");

// Create a value from the list of things.
ArrayWritable thingArray = new ArrayWritable(DiscoveredThing.class, things.toArray(new DiscoveredThing[0]));
Value newValue = new Value(WritableUtils.toByteArray(thingArray));
if (valuesOnly){
Key skipKey = new Key(row, "\uffff");
if(!columnFamilies.isEmpty()){
skipKey = new Key(row, thing.getField(), "\uffff");
Comment thread
hoper-38709 marked this conversation as resolved.
}

if (lastRange.contains(skipKey)) {
this.key = skipKey;
Range nextRange = new Range(skipKey,false,lastRange.getEndKey(), lastRange.isEndKeyInclusive());
this.iterator.seek(nextRange, columnFamilies,false);
}
} else{
this.key = newKey;

}

this.key = newKey;
this.value = newValue;
}

@Override
public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
this.lastRange = range;
this.columnFamilies = columnFamilies;
this.iterator.seek(range, columnFamilies, inclusive);
if (log.isTraceEnabled()) {
log.trace("My source " + (this.iterator.hasTop() ? "does" : "does not") + " have a top.");
log.trace("My source " + ((Optional.ofNullable(iterator).isPresent() && this.iterator.hasTop()) ? "does" : "does not") + " have a top.");
Comment thread
hoper-38709 marked this conversation as resolved.
Outdated
}
next();
}
Expand All @@ -210,13 +294,18 @@ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> op
this.showReferenceCount = Boolean.parseBoolean(options.get(DiscoveryLogic.SHOW_REFERENCE_COUNT));
this.reverseIndex = Boolean.parseBoolean(options.get(DiscoveryLogic.REVERSE_INDEX));
this.sumCounts = Boolean.parseBoolean(options.get(DiscoveryLogic.SUM_COUNTS));
this.valuesOnly = Boolean.parseBoolean(options.get(DiscoveryLogic.VALUES_ONLY));
Comment thread
hoper-38709 marked this conversation as resolved.
if(valuesOnly){
sumCounts=false;
}

if (log.isTraceEnabled()) {
log.trace("Source: " + source.getClass().getName());
log.trace("Separate counts by column visibility: " + this.separateCountsByColVis);
log.trace("Show reference counts only: " + this.showReferenceCount);
log.trace("Reverse index: " + this.reverseIndex);
log.trace("Sum counts: " + this.sumCounts);
log.trace("Values only: " + this.valuesOnly);
}
}

Expand All @@ -238,7 +327,7 @@ public Value getTopValue() {
/**
* Represents term information parsed from a {@link Key}, {@link Value} pair.
*/
private static class TermEntry {
private static class TermEntry implements TermInterface {

private final String term;
private final String field;
Expand Down Expand Up @@ -325,5 +414,35 @@ public long getUidListSize() {
public boolean isValid() {
return valid;
}

@Override
public boolean equals(Object o) {
if (o instanceof TermEntry) {
TermEntry other = (TermEntry) o;
// @formatter:off
return new EqualsBuilder().append(getTerm(), other.getTerm())
.append(getField(), other.getField())
.append(getVisibility(), other.getVisibility())
.append(getDate(), other.getDate())
.append(getDatatype(), other.getDatatype())
.append(getUidCount(), other.getUidCount())
.append(getUidListSize(), other.getUidListSize()).isEquals();
// @formatter:on
}
return false;
}

@Override
public int hashCode() {
// @formatter:off
return new HashCodeBuilder().append(getTerm())
.append(getField())
.append(getVisibility())
.append(getDate())
.append(getDatatype())
.append(getUidCount())
.append(getUidListSize()).toHashCode();
// @formatter:on
}
Comment thread
hoper-38709 marked this conversation as resolved.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ public class DiscoveryLogic extends ShardIndexQueryTable {
*/
public static final String SUM_COUNTS = "sum.counts";

/**
* Used to specify a unique list of values not associated with a field.
*/
public static final String VALUES_ONLY = "values.only";

/**
* Used to specify whether to search against the reversed index.
*/
Expand Down Expand Up @@ -151,6 +156,9 @@ public GenericQueryConfiguration initialize(AccumuloClient client, Query setting
// Check if counts should be summed.
setSumCounts(getOrDefaultBoolean(settings, SUM_COUNTS, getSumCounts()));

// Specify values only. Treat associated field, data type, and the like as "don't care."
setValuesOnly(getOrDefaultBoolean(settings, VALUES_ONLY, false));

// Check if any datatype filters were specified.
getConfig().setDatatypeFilter(getOrDefaultSet(settings, QueryParameters.DATATYPE_FILTER_SET, getConfig().getDatatypeFilter()));

Expand Down Expand Up @@ -580,6 +588,7 @@ private IteratorSetting configureDiscoveryIterator(DiscoveryQueryConfiguration c
setting.addOption(SEPARATE_COUNTS_BY_COLVIS, Boolean.toString(config.getSeparateCountsByColVis()));
setting.addOption(SHOW_REFERENCE_COUNT, Boolean.toString(config.getShowReferenceCount()));
setting.addOption(SUM_COUNTS, Boolean.toString(config.getSumCounts()));
setting.addOption(VALUES_ONLY, Boolean.toString(config.getValuesOnly()));
return setting;
}

Expand Down Expand Up @@ -689,4 +698,12 @@ public boolean getSumCounts() {
public void setSumCounts(boolean sumCounts) {
getConfig().setSumCounts(sumCounts);
}

public void setValuesOnly(boolean valuesOnly) {
getConfig().setValuesOnly(valuesOnly);
}

public boolean getValuesOnly() {
return getConfig().getValuesOnly();
}
}
Loading
Loading