diff --git a/pom.xml b/pom.xml
index aa849568694..3e2486f6112 100644
--- a/pom.xml
+++ b/pom.xml
@@ -56,6 +56,7 @@
1.0.0.Final3.20.21.11.4
+ 4.3.01.14.113.1.01.9.4
@@ -117,6 +118,7 @@
0.11.2202310131.19.0
+ 2.3.04.13.25.12.01.12.0
@@ -149,6 +151,8 @@
2.3.5.Final17.0.1.Final
+
+ 2.9.95.4.03.1.42.12.2
@@ -1137,6 +1141,11 @@
+
+ org.awaitility
+ awaitility
+ ${version.awaitutility}
+ org.eclipse.emforg.eclipse.emf.common
@@ -1469,6 +1478,12 @@
${version.weld-test}test
+
+ org.junit-pioneer
+ junit-pioneer
+ ${version.junit-pioneer}
+ test
+ org.junit.jupiterjunit-jupiter
diff --git a/web-services/cached-results/src/main/java/datawave/webservice/results/cached/CachedResultsBean.java b/web-services/cached-results/src/main/java/datawave/webservice/results/cached/CachedResultsBean.java
index f416641273e..39a2bcaa377 100644
--- a/web-services/cached-results/src/main/java/datawave/webservice/results/cached/CachedResultsBean.java
+++ b/web-services/cached-results/src/main/java/datawave/webservice/results/cached/CachedResultsBean.java
@@ -240,6 +240,7 @@ public class CachedResultsBean {
private AccumuloConnectionRequestBean accumuloConnectionRequestBean;
@Inject
+ @SpringBean(name = "queryLimiter")
private QueryLimiter queryLimiter;
protected static final String COMMA = ",";
diff --git a/web-services/deploy/application/pom.xml b/web-services/deploy/application/pom.xml
index 1ca5fd78d00..86ff5c9ebe9 100644
--- a/web-services/deploy/application/pom.xml
+++ b/web-services/deploy/application/pom.xml
@@ -49,6 +49,12 @@
gov.nsa.datawavedatawave-metrics-core${project.version}
+
+
+ org.yaml
+ snakeyaml
+
+ gov.nsa.datawave
@@ -63,6 +69,10 @@
gov.nsa.datawave.coredatawave-core-connection-pool
+
+ org.yaml
+ snakeyaml
+
diff --git a/web-services/deploy/configuration/src/main/resources/datawave/query/QueryLimiterFactory.xml b/web-services/deploy/configuration/src/main/resources/datawave/query/QueryLimiterFactory.xml
index 4ecb5b1cf38..278a95345b9 100644
--- a/web-services/deploy/configuration/src/main/resources/datawave/query/QueryLimiterFactory.xml
+++ b/web-services/deploy/configuration/src/main/resources/datawave/query/QueryLimiterFactory.xml
@@ -146,10 +146,42 @@
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/web-services/query/pom.xml b/web-services/query/pom.xml
index 9abcbc0d2b2..1d47b4d981f 100644
--- a/web-services/query/pom.xml
+++ b/web-services/query/pom.xml
@@ -9,7 +9,20 @@
datawave-ws-queryejb${project.artifactId}
+
+
+ com.fasterxml.jackson.dataformat
+ jackson-dataformat-xml
+
+ ${version.wildfly.jackson}
+
+
+ com.fasterxml.jackson.dataformat
+ jackson-dataformat-yaml
+
+ ${version.wildfly.jackson}
+ com.google.code.gsongson
@@ -99,6 +112,10 @@
3.0jar
+
+ org.awaitility
+ awaitility
+ org.easymockeasymock
@@ -249,6 +266,11 @@
javassisttest
+
+ org.junit-pioneer
+ junit-pioneer
+ test
+ org.mockitomockito-inline
diff --git a/web-services/query/src/main/java/datawave/webservice/query/cache/QueryExpirationBean.java b/web-services/query/src/main/java/datawave/webservice/query/cache/QueryExpirationBean.java
index 8db89c2ff18..330dfa69d52 100644
--- a/web-services/query/src/main/java/datawave/webservice/query/cache/QueryExpirationBean.java
+++ b/web-services/query/src/main/java/datawave/webservice/query/cache/QueryExpirationBean.java
@@ -43,45 +43,27 @@ public class QueryExpirationBean {
private static final Logger log = Logger.getLogger(QueryExpirationBean.class);
- private QueryCache queryCache;
- private QueryExpirationProperties config;
- private AccumuloConnectionFactory connectionFactory;
- private CreatedQueryLogicCacheBean queryLogicCacheBean;
- private QueryMetricsBean metricsBean;
- private QueryLimiter queryLimiter;
-
- private boolean clearAll = false;
-
@Inject
- public void setQueryCache(QueryCache cache) {
- this.queryCache = cache;
- }
+ private QueryCache queryCache;
@Inject
@SpringBean(refreshable = true)
- public void setQueryExpirationProperties(QueryExpirationProperties properties) {
- this.config = properties;
- }
+ private QueryExpirationProperties config;
@Inject
- public void setConnectionFactory(AccumuloConnectionFactory connectionFactory) {
- this.connectionFactory = connectionFactory;
- }
+ private AccumuloConnectionFactory connectionFactory;
@Inject
- public void setCreatedQueryLogicCacheBean(CreatedQueryLogicCacheBean cacheBean) {
- this.queryLogicCacheBean = cacheBean;
- }
+ private CreatedQueryLogicCacheBean queryLogicCacheBean;
@Inject
- public void setQueryMetricsBean(QueryMetricsBean metrics) {
- this.metricsBean = metrics;
- }
+ private QueryMetricsBean metricsBean;
@Inject
- public void setQueryLimiter(QueryLimiter queryLimiter) {
- this.queryLimiter = queryLimiter;
- }
+ @SpringBean(name = "queryLimiter")
+ private QueryLimiter queryLimiter;
+
+ private boolean clearAll = false;
@PostConstruct
public void init() {
diff --git a/web-services/query/src/main/java/datawave/webservice/query/limit/ActiveQueryTracker.java b/web-services/query/src/main/java/datawave/webservice/query/limit/ActiveQueryTracker.java
index 669b0daf260..7902841469e 100644
--- a/web-services/query/src/main/java/datawave/webservice/query/limit/ActiveQueryTracker.java
+++ b/web-services/query/src/main/java/datawave/webservice/query/limit/ActiveQueryTracker.java
@@ -1,15 +1,11 @@
package datawave.webservice.query.limit;
-import java.io.File;
-import java.net.URI;
+import static datawave.webservice.zookeeper.ZkUtils.EMPTY_DATA;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.Function;
@@ -17,16 +13,18 @@
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.nodes.PersistentNode;
import org.apache.curator.retry.RetryNTimes;
-import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.server.quorum.QuorumPeer;
-import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
+
+import datawave.webservice.zookeeper.LockedZkClientDispatcher;
+import datawave.webservice.zookeeper.ZkUtils;
/**
- * This class provides methods for leveraging Zookeeper to track queries and their active status.
+ * This class provides methods for leveraging Zookeeper to track queries and their active status. It is expected that only one instance of an
+ * {@link ActiveQueryTracker} will exist at a time within a singleton {@link QueryLimiter}, and the Zookeeper logic herein adheres to that assumption.
*/
public class ActiveQueryTracker implements AutoCloseable {
@@ -34,58 +32,34 @@ public class ActiveQueryTracker implements AutoCloseable {
private static final Logger log = Logger.getLogger(ActiveQueryTracker.class);
- private static final byte[] EMPTY_DATA = new byte[0];
-
private static final String DISTINCT_QUERY_LOGICS_CONTAINER_PATH = "/distinctQueryLogics";
private static final String SYSTEMS_CONTAINER_PATH = "/systems";
private static final String USERS_CONTAINER_PATH = "/users";
- private final String zookeeperConfig;
- private final long cleanUpClientInterval;
- private final Lock clientLock = new ReentrantLock();
-
- private CuratorFramework client;
- private long lastClientAccess;
- private Timer clientCleanupTimer;
+ private final CuratorFrameworkFactory.Builder clientFactory;
+ private LockedZkClientDispatcher clientDispatcher;
/**
- * Create and return a new {@link ActiveQueryTracker} instance
+ * Create and return a new {@link ActiveQueryTracker} instance.
*
* @param zookeeperConfig
* the zookeeper config
* @param clientCleanupInterval
* the interval in milliseconds after which the zookeeper client should be cleaned up since its last access
- * @throws QuorumPeerConfig.ConfigException
+ * @throws ConfigException
* if an error occurs when verifying the zookeeper configuration
*/
- public ActiveQueryTracker(String zookeeperConfig, long clientCleanupInterval) throws QuorumPeerConfig.ConfigException {
- this.zookeeperConfig = getQuorumPeerConfig(zookeeperConfig);
- this.cleanUpClientInterval = clientCleanupInterval;
- }
-
- private static String getQuorumPeerConfig(String zookeeperConfig) throws QuorumPeerConfig.ConfigException {
- URI zookeeperConfigFile;
- try {
- zookeeperConfigFile = new Path(zookeeperConfig).toUri();
- if (new File(zookeeperConfigFile).exists()) {
- QuorumPeerConfig zooConfig = new QuorumPeerConfig();
- zooConfig.parse(zookeeperConfigFile.getPath());
- StringBuilder sb = new StringBuilder();
- for (QuorumPeer.QuorumServer server : zooConfig.getServers().values()) {
- if (sb.length() > 0) {
- sb.append(',');
- }
- sb.append(server.addr.getReachableOrOne().getHostName()).append(':').append(zooConfig.getClientPortAddress().getPort());
- }
- if (sb.length() == 0) {
- sb.append(zooConfig.getClientPortAddress().getHostName()).append(':').append(zooConfig.getClientPortAddress().getPort());
- }
- return sb.toString();
- }
- } catch (IllegalArgumentException e) {
- // Try the zookeeper config as is.
- }
- return zookeeperConfig;
+ public ActiveQueryTracker(String zookeeperConfig, long clientCleanupInterval) throws ConfigException {
+ zookeeperConfig = ZkUtils.getQuorumPeerConfig(zookeeperConfig);
+ // @formatter:off
+ clientFactory = CuratorFrameworkFactory.builder()
+ .namespace(ZOOKEEPER_NAMESPACE)
+ .connectString(zookeeperConfig)
+ .sessionTimeoutMs(60000)
+ .connectionTimeoutMs(60000)
+ .retryPolicy(new RetryNTimes(10, 1000));
+ // @formatter:on
+ clientDispatcher = new LockedZkClientDispatcher(clientFactory, clientCleanupInterval, clientCleanupInterval, TimeUnit.MILLISECONDS);
}
/**
@@ -135,11 +109,10 @@ public QueryHeartbeat trackQuery(String queryId, String userDn, String system, S
log.trace("Tracking query: queryId=" + queryId + ", user='" + userDn + "', system='" + system + "', queryLogic='" + queryLogic + "'");
}
- clientLock.lock();
- try {
- // Initialize the client if needed.
- initClient();
+ try (LockedZkClientDispatcher.LockedClient lockedClient = clientDispatcher.getLockedClient()) {
try {
+ CuratorFramework client = lockedClient.getClient();
+
// Verify we are not already tracking the query.
String systemQueryIdPath = getSystemQueryIdPath(system, queryLogic, queryId);
Stat stat = client.checkExists().forPath(systemQueryIdPath);
@@ -167,12 +140,13 @@ public QueryHeartbeat trackQuery(String queryId, String userDn, String system, S
}
// Create ephemeral nodes for the query ID. These nodes will not persist beyond the lifetime of the client created here.
- CuratorFramework client = createClient();
+ CuratorFramework heartbeatClient = clientFactory.build();
+ heartbeatClient.start();
List nodes = new ArrayList<>();
- nodes.add(new PersistentNode(client, CreateMode.EPHEMERAL, false, systemQueryIdPath, EMPTY_DATA, false));
+ nodes.add(new PersistentNode(heartbeatClient, CreateMode.EPHEMERAL, false, systemQueryIdPath, EMPTY_DATA, false));
if (systemCountsAgainstUserLimit) {
String userQueryIdPath = getUserQueryIdPath(userDn, queryLogic, queryId);
- nodes.add(new PersistentNode(client, CreateMode.EPHEMERAL, false, userQueryIdPath, EMPTY_DATA, false));
+ nodes.add(new PersistentNode(heartbeatClient, CreateMode.EPHEMERAL, false, userQueryIdPath, EMPTY_DATA, false));
}
// Persist each node to Zookeeper.
@@ -188,11 +162,7 @@ public QueryHeartbeat trackQuery(String queryId, String userDn, String system, S
log.error("Failed to track query " + queryId, e);
throw e;
}
-
- } finally {
- clientLock.unlock();
}
-
return heartbeat;
}
@@ -216,7 +186,10 @@ public int getTotalUserQueriesForQueryLogic(String userDn, String queryLogic) th
log.trace("Fetching total queries for user='" + userDn + "', queryLogic='" + queryLogic + "'");
}
- return getTotalChildrenWithLock(getUserQueryLogicPath(userDn, queryLogic));
+ try (LockedZkClientDispatcher.LockedClient lockedClient = clientDispatcher.getLockedClient()) {
+ CuratorFramework client = lockedClient.getClient();
+ return getTotalChildren(client, getUserQueryLogicPath(userDn, queryLogic));
+ }
}
/**
@@ -238,30 +211,9 @@ public int getTotalSystemQueriesForQueryLogic(String system, String queryLogic)
log.trace("Fetching total queries for system='" + system + "', queryLogic='" + queryLogic + "'");
}
- return getTotalChildrenWithLock(getSystemQueryLogicPath(system, queryLogic));
- }
-
- /**
- * Obtain a lock for the client and return the total children for the given path. If the path does not exist, 0 will be returned.
- *
- * @param path
- * the node path
- * @return the total children
- * @throws Exception
- * if an error occurs while scanning nodes
- */
- private int getTotalChildrenWithLock(String path) throws Exception {
- clientLock.lock();
- try {
- // Initialize the client if needed.
- initClient();
-
- return getTotalChildren(path);
- } catch (Exception e) {
- log.error("Failed to get total children for path " + path, e);
- throw e;
- } finally {
- clientLock.unlock();
+ try (LockedZkClientDispatcher.LockedClient lockedClient = clientDispatcher.getLockedClient()) {
+ CuratorFramework client = lockedClient.getClient();
+ return getTotalChildren(client, getSystemQueryLogicPath(system, queryLogic));
}
}
@@ -289,7 +241,10 @@ public boolean totalUserQueriesMeetsLimit(String userDn, int queryLimit, Set quer
/**
* Return the total number of children for the path. If the path does not exist, 0 will be returned.
*
+ * @param client
+ * the client
* @param path
* the path
* @return the total number of children
* @throws Exception
* if an error occurs while scanning nodes
*/
- private int getTotalChildren(String path) throws Exception {
+ private int getTotalChildren(CuratorFramework client, String path) throws Exception {
try {
Stat stat = client.checkExists().forPath(path);
if (stat == null) {
@@ -407,10 +360,9 @@ public List getDistinctQueryLogics() {
if (log.isTraceEnabled()) {
log.trace("Fetching distinct query logics");
}
- clientLock.lock();
- try {
- // Initialize the client if needed.
- initClient();
+
+ try (LockedZkClientDispatcher.LockedClient lockedClient = clientDispatcher.getLockedClient()) {
+ CuratorFramework client = lockedClient.getClient();
// If any query logics were tracked, return them.
Stat stat = client.checkExists().forPath(DISTINCT_QUERY_LOGICS_CONTAINER_PATH);
if (stat != null) {
@@ -422,8 +374,6 @@ public List getDistinctQueryLogics() {
} catch (Exception e) {
log.error("Failed to fetch distinct query logics", e);
throw new ActiveQueryException(e);
- } finally {
- clientLock.unlock();
}
}
@@ -516,101 +466,15 @@ private String getSystemQueryIdPath(String system, String queryLogic, String que
return getSystemQueryLogicPath(system, queryLogic) + "/" + queryId;
}
- /**
- * Initialize the zookeeper client and cleanup timer if not already initialize. Calling this method will update the last time the client was accessed to the
- * current time.
- */
- private void initClient() {
- if (client == null) {
- clientLock.lock();
- try {
- // @formatter:off
- client = createClient();
- if (cleanUpClientInterval > 0) {
- createCleanupTimer();
- }
- } finally {
- clientLock.unlock();
- }
- }
- // Update the last time the client was accessed.
- lastClientAccess = System.currentTimeMillis();
- }
-
- /**
- * Return a new zookeeper client targeting the namespace {@value #ZOOKEEPER_NAMESPACE}.
- * @return the client
- */
- private CuratorFramework createClient() {
- CuratorFramework client = CuratorFrameworkFactory.builder()
- .namespace(ZOOKEEPER_NAMESPACE)
- .connectString(zookeeperConfig)
- .sessionTimeoutMs(60000)
- .connectionTimeoutMs(60000)
- .retryPolicy(new RetryNTimes(10, 1000))
- .build();
-
- // @formatter:on
- client.start();
- return client;
- }
-
- /**
- * Create the cleanup timer.
- */
- private void createCleanupTimer() {
- if (clientCleanupTimer == null) {
- clientCleanupTimer = new Timer("Zookeeper Client Cleanup");
- }
-
- clientCleanupTimer.schedule(new TimerTask() {
- @Override
- public void run() {
- if (lastClientAccess + cleanUpClientInterval <= System.currentTimeMillis()) {
- cancel();
- } else if (client == null) {
- cancel();
- }
- }
- }, cleanUpClientInterval, cleanUpClientInterval);
- }
-
- /**
- * Clean up the underlying resources used by this {@link ActiveQueryTracker}.
- */
- public void cleanup() {
- closeClientAndTimer();
- }
-
- /**
- * Close the client and clean up timer, and nullify them.
- */
- private void closeClientAndTimer() {
- if (client != null) {
- clientLock.lock();
+ @Override
+ public void close() throws Exception {
+ if (clientDispatcher != null) {
try {
- if (clientCleanupTimer != null) {
- clientCleanupTimer.cancel();
- clientCleanupTimer = null;
- }
- if (client != null) {
- try {
- client.close();
- } finally {
- client = null;
- }
- }
- } finally {
- clientLock.unlock();
+ clientDispatcher.close();
+ } catch (Exception e) {
+ log.error("Failed to close client dispatcher", e);
}
+ clientDispatcher = null;
}
}
-
- /**
- * Close this {@link ActiveQueryTracker} and call {@link #cleanup()}.
- */
- @Override
- public void close() {
- cleanup();
- }
}
diff --git a/web-services/query/src/main/java/datawave/webservice/query/limit/GroupLimitCache.java b/web-services/query/src/main/java/datawave/webservice/query/limit/GroupLimitCache.java
index 34d088a826a..3408c7492b7 100644
--- a/web-services/query/src/main/java/datawave/webservice/query/limit/GroupLimitCache.java
+++ b/web-services/query/src/main/java/datawave/webservice/query/limit/GroupLimitCache.java
@@ -18,10 +18,10 @@
public class GroupLimitCache {
// The set of group limit overrides, sorted in best-match order.
- private final SortedSet groupLimits;
+ private SortedSet groupLimits;
// Internal cache for improved lookup efficiency.
- private final Cache> cache;
+ private Cache> cache;
public static GroupLimitCache of(SortedSet groupLimits, long maxCacheSize) {
if (groupLimits != null && !groupLimits.isEmpty()) {
@@ -91,6 +91,20 @@ public boolean isEmpty() {
return this.groupLimits == null;
}
+ /**
+ * Cleans up this {@link GroupLimitCache} and releases its underlying resources.
+ */
+ public void cleanUp() {
+ if (groupLimits != null) {
+ groupLimits.clear();
+ groupLimits = null;
+ }
+ if (cache != null) {
+ cache.cleanUp();
+ cache = null;
+ }
+ }
+
@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
@@ -109,4 +123,5 @@ public int hashCode() {
public String toString() {
return new StringJoiner(", ", GroupLimitCache.class.getSimpleName() + "[", "]").add("groupLimits=" + groupLimits).add("cache=" + cache).toString();
}
+
}
diff --git a/web-services/query/src/main/java/datawave/webservice/query/limit/ImmutableQueryLimitConfiguration.java b/web-services/query/src/main/java/datawave/webservice/query/limit/ImmutableQueryLimitConfiguration.java
new file mode 100644
index 00000000000..91b472c44a5
--- /dev/null
+++ b/web-services/query/src/main/java/datawave/webservice/query/limit/ImmutableQueryLimitConfiguration.java
@@ -0,0 +1,98 @@
+package datawave.webservice.query.limit;
+
+import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * An immutable implementation of {@link QueryLimitConfiguration} that prevents modifications and uses immutable internal members.
+ */
+public final class ImmutableQueryLimitConfiguration extends QueryLimitConfiguration {
+
+ /**
+ * Return an immutable copy of the given {@link QueryLimitConfiguration}.
+ *
+ * @param config
+ * the config
+ */
+ public ImmutableQueryLimitConfiguration(QueryLimitConfiguration config) {
+ super.setDefaultUserQueryLimit(config.getDefaultUserQueryLimit());
+ super.setDefaultSystemQueryLimit(config.getDefaultSystemQueryLimit());
+ super.setInternalCacheMaxSize(config.getInternalCacheMaxSize());
+ super.setUserConfigs(copyList(config.getUserConfigs(), ImmutableUserLimitConfiguration::new));
+ super.setSystemConfigs(copyList(config.getSystemConfigs(), ImmutableSystemLimitConfiguration::new));
+ super.setQueryLogicGroupConfigs(copyList(config.getQueryLogicGroupConfigs(), ImmutableQueryLogicGroupLimitConfiguration::new));
+ }
+
+ /**
+ * Return an immutable version of the given list and its elements.
+ *
+ * @param list
+ * the list to copy
+ * @param immutableConstructor
+ * the constructor that will provide an immutable copy of each element
+ * @return the immutable list
+ * @param
+ * the element type
+ */
+ private List copyList(List list, Function immutableConstructor) {
+ if (list == null) {
+ return List.of();
+ } else {
+ return list.stream().map(immutableConstructor).collect(Collectors.toUnmodifiableList());
+ }
+ }
+
+ /**
+ * Throws {@link UnsupportedOperationException}.
+ */
+ @Override
+ public void setDefaultUserQueryLimit(int defaultUserQueryLimit) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Throws {@link UnsupportedOperationException}.
+ */
+ @Override
+ public void setDefaultSystemQueryLimit(int defaultSystemQueryLimit) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Throws {@link UnsupportedOperationException}.
+ */
+ @Override
+ public void setInternalCacheMaxSize(long internalCacheMaxSize) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Throws {@link UnsupportedOperationException}.
+ */
+ @Override
+ public void setUserConfigs(List userConfigs) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Throws {@link UnsupportedOperationException}.
+ */
+ @Override
+ public void setSystemConfigs(List systemConfigs) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Throws {@link UnsupportedOperationException}.
+ */
+ @Override
+ public void setQueryLogicGroupConfigs(List queryLogicGroupConfigs) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String toString() {
+ return toString(ImmutableQueryLimitConfiguration.class);
+ }
+}
diff --git a/web-services/query/src/main/java/datawave/webservice/query/limit/ImmutableQueryLogicGroupLimitConfiguration.java b/web-services/query/src/main/java/datawave/webservice/query/limit/ImmutableQueryLogicGroupLimitConfiguration.java
new file mode 100644
index 00000000000..265523c1f73
--- /dev/null
+++ b/web-services/query/src/main/java/datawave/webservice/query/limit/ImmutableQueryLogicGroupLimitConfiguration.java
@@ -0,0 +1,47 @@
+package datawave.webservice.query.limit;
+
+/**
+ * An immutable implementation of {@link QueryLogicGroupLimitConfiguration} that prevents modifications and uses immutable internal members.
+ */
+public final class ImmutableQueryLogicGroupLimitConfiguration extends QueryLogicGroupLimitConfiguration {
+
+ /**
+ * Return an immutable copy of the given {@link QueryLogicGroupLimitConfiguration}.
+ *
+ * @param config
+ * the config
+ */
+ public ImmutableQueryLogicGroupLimitConfiguration(QueryLogicGroupLimitConfiguration config) {
+ super.setGroupName(config.getGroupName());
+ super.setQueryLogicPattern(config.getQueryLogicPattern());
+ super.setQueryLimit(config.getQueryLimit());
+ }
+
+ /**
+ * Throws {@link UnsupportedOperationException}.
+ */
+ @Override
+ public void setGroupName(String groupName) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Throws {@link UnsupportedOperationException}.
+ */
+ @Override
+ public void setQueryLogicPattern(String queryLogicPattern) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Throws {@link UnsupportedOperationException}.
+ */
+ @Override
+ public void setQueryLimit(int queryLimit) {
+ throw new UnsupportedOperationException();
+ }
+
+ public String toString() {
+ return toString(ImmutableQueryLogicGroupLimitConfiguration.class);
+ }
+}
diff --git a/web-services/query/src/main/java/datawave/webservice/query/limit/ImmutableSystemLimitConfiguration.java b/web-services/query/src/main/java/datawave/webservice/query/limit/ImmutableSystemLimitConfiguration.java
new file mode 100644
index 00000000000..6709344a7ed
--- /dev/null
+++ b/web-services/query/src/main/java/datawave/webservice/query/limit/ImmutableSystemLimitConfiguration.java
@@ -0,0 +1,59 @@
+package datawave.webservice.query.limit;
+
+import java.util.Map;
+
+/**
+ * An immutable implementation of {@link SystemLimitConfiguration} that prevents modifications and uses immutable internal members.
+ */
+public final class ImmutableSystemLimitConfiguration extends SystemLimitConfiguration {
+
+ /**
+ * Return an immutable copy of the given {@link SystemLimitConfiguration}.
+ *
+ * @param config
+ * the config
+ */
+ public ImmutableSystemLimitConfiguration(SystemLimitConfiguration config) {
+ super.setSystemPattern(config.getSystemPattern());
+ super.setCountsAgainstUserLimit(config.getCountsAgainstUserLimit());
+ super.setQueryLimit(config.getQueryLimit());
+ super.setQueryLogicGroupLimits(config.getQueryLogicGroupLimits() == null ? null : Map.copyOf(config.getQueryLogicGroupLimits()));
+ }
+
+ /**
+ * Throws {@link UnsupportedOperationException}.
+ */
+ @Override
+ public void setSystemPattern(String systemPattern) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Throws {@link UnsupportedOperationException}.
+ */
+ @Override
+ public void setCountsAgainstUserLimit(Boolean countsAgainstUserLimit) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Throws {@link UnsupportedOperationException}.
+ */
+ @Override
+ public void setQueryLimit(Integer queryLimit) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Throws {@link UnsupportedOperationException}.
+ */
+ @Override
+ public void setQueryLogicGroupLimits(Map queryLogicGroupLimits) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String toString() {
+ return toString(ImmutableSystemLimitConfiguration.class);
+ }
+}
diff --git a/web-services/query/src/main/java/datawave/webservice/query/limit/ImmutableUserLimitConfiguration.java b/web-services/query/src/main/java/datawave/webservice/query/limit/ImmutableUserLimitConfiguration.java
new file mode 100644
index 00000000000..01081d8ea97
--- /dev/null
+++ b/web-services/query/src/main/java/datawave/webservice/query/limit/ImmutableUserLimitConfiguration.java
@@ -0,0 +1,51 @@
+package datawave.webservice.query.limit;
+
+import java.util.Map;
+
+/**
+ * An immutable implementation of {@link UserLimitConfiguration} that prevents modifications and uses immutable internal members.
+ */
+public final class ImmutableUserLimitConfiguration extends UserLimitConfiguration {
+
+ /**
+ * Return an immutable copy of the given {@link UserLimitConfiguration}.
+ *
+ * @param config
+ * the config
+ */
+ public ImmutableUserLimitConfiguration(UserLimitConfiguration config) {
+ super.setUserDn(config.getUserDn());
+ super.setQueryLimit(config.getQueryLimit());
+ super.setQueryLogicGroupLimits(config.getQueryLogicGroupLimits() == null ? Map.of() : Map.copyOf(config.getQueryLogicGroupLimits()));
+ }
+
+ /**
+ * Throws {@link UnsupportedOperationException}.
+ */
+ @Override
+ public void setUserDn(String userDn) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Throws {@link UnsupportedOperationException}.
+ */
+ @Override
+ public void setQueryLimit(Integer queryLimit) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Throws {@link UnsupportedOperationException}.
+ */
+ @Override
+ public void setQueryLogicGroupLimits(Map queryLogicGroupLimits) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String toString() {
+ return super.toString(ImmutableUserLimitConfiguration.class);
+ }
+
+}
diff --git a/web-services/query/src/main/java/datawave/webservice/query/limit/QueryLimitConfiguration.java b/web-services/query/src/main/java/datawave/webservice/query/limit/QueryLimitConfiguration.java
index 4024fb3ac6c..134b876e9dc 100644
--- a/web-services/query/src/main/java/datawave/webservice/query/limit/QueryLimitConfiguration.java
+++ b/web-services/query/src/main/java/datawave/webservice/query/limit/QueryLimitConfiguration.java
@@ -4,6 +4,8 @@
import java.util.Objects;
import java.util.StringJoiner;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
/**
* Configuration for query limits.
*/
@@ -12,32 +14,38 @@ public class QueryLimitConfiguration {
/**
* The default maximum number of active concurrent queries a user may have across all systems.
*/
+ @JsonProperty
private int defaultUserQueryLimit;
/**
* The default maximum number of active concurrent queries that may be running on a system.
*/
+ @JsonProperty
private int defaultSystemQueryLimit;
/**
* The maximum size to use for internal caches in {@link GroupLimitCache} and {@link PatternMatcher}. This value should be large enough to hold the number
* of distinct query logics.
*/
+ @JsonProperty
private long internalCacheMaxSize = 200;
/**
* The custom user limit configurations.
*/
+ @JsonProperty
private List userConfigs;
/**
* The custom system limit configurations.
*/
+ @JsonProperty
private List systemConfigs;
/**
* The custom query logic group configurations.
*/
+ @JsonProperty
private List queryLogicGroupConfigs;
public int getDefaultUserQueryLimit() {
@@ -88,11 +96,24 @@ public void setQueryLogicGroupConfigs(List qu
this.queryLogicGroupConfigs = queryLogicGroupConfigs;
}
+ /**
+ * Return whether this {@link QueryLimitConfiguration} is considered equal to the given object. This {@code equals(Object)} implementation allows this
+ * instance to be equal to an object that is a subclass of {@link QueryLimitConfiguration}, such as {@link ImmutableQueryLimitConfiguration}.
+ *
+ * @param o
+ * the object to compare
+ * @return true if the object is equal to this {@link QueryLimitConfiguration}, or false otherwise
+ */
@Override
public boolean equals(Object o) {
- if (o == null || getClass() != o.getClass()) {
+ if (o == this) {
+ return true;
+ }
+ // Allow this instance to be considered equal to subclasses.
+ if (!(o instanceof QueryLimitConfiguration)) {
return false;
}
+
QueryLimitConfiguration that = (QueryLimitConfiguration) o;
return defaultUserQueryLimit == that.defaultUserQueryLimit && defaultSystemQueryLimit == that.defaultSystemQueryLimit
&& internalCacheMaxSize == that.internalCacheMaxSize && Objects.equals(userConfigs, that.userConfigs)
@@ -106,7 +127,19 @@ public int hashCode() {
@Override
public String toString() {
- return new StringJoiner(", ", QueryLimitConfiguration.class.getSimpleName() + "[", "]").add("defaultUserQueryLimit=" + defaultUserQueryLimit)
+ return toString(QueryLimitConfiguration.class);
+ }
+
+ /**
+ * Return a String representation of this {@link QueryLimitConfiguration} referencing the given class as the instance of this
+ * {@link QueryLimitConfiguration}.
+ *
+ * @param clazz
+ * the class
+ * @return the string representation
+ */
+ protected String toString(Class extends QueryLimitConfiguration> clazz) {
+ return new StringJoiner(", ", clazz.getSimpleName() + "[", "]").add("defaultUserQueryLimit=" + defaultUserQueryLimit)
.add("defaultSystemQueryLimit=" + defaultSystemQueryLimit).add("internalCacheMaxSize=" + internalCacheMaxSize)
.add("userConfigs=" + userConfigs).add("systemConfigs=" + systemConfigs).add("queryLogicGroupConfigs=" + queryLogicGroupConfigs)
.toString();
diff --git a/web-services/query/src/main/java/datawave/webservice/query/limit/QueryLimitConfigurationValidationUtils.java b/web-services/query/src/main/java/datawave/webservice/query/limit/QueryLimitConfigurationValidationUtils.java
new file mode 100644
index 00000000000..d373fe4c7de
--- /dev/null
+++ b/web-services/query/src/main/java/datawave/webservice/query/limit/QueryLimitConfigurationValidationUtils.java
@@ -0,0 +1,230 @@
+package datawave.webservice.query.limit;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+import org.apache.commons.lang3.StringUtils;
+
+public final class QueryLimitConfigurationValidationUtils {
+
+ /**
+ * Validate the given configuration
+ *
+ * @param config
+ * the configuration to validate
+ */
+ public static void validate(QueryLimitConfiguration config) {
+ if (config != null) {
+ if (config.getDefaultUserQueryLimit() < 1) {
+ throw new IllegalArgumentException("Default user query limit must be greater than 0");
+ }
+ if (config.getInternalCacheMaxSize() < 1) {
+ throw new IllegalArgumentException("Internal cache max size must be greater than 0");
+ }
+
+ List queryLogicGroupConfigs = config.getQueryLogicGroupConfigs();
+ if (queryLogicGroupConfigs != null && !queryLogicGroupConfigs.isEmpty()) {
+ validateQueryLogicGroupConfigs(config.getQueryLogicGroupConfigs());
+ }
+
+ List userLimitConfigs = config.getUserConfigs();
+ if (userLimitConfigs != null && !userLimitConfigs.isEmpty()) {
+ validateUserLimitConfigs(userLimitConfigs);
+ }
+
+ List systemLimitConfigs = config.getSystemConfigs();
+ if (systemLimitConfigs != null && !systemLimitConfigs.isEmpty()) {
+ validateSystemLimitConfigs(systemLimitConfigs, config.getInternalCacheMaxSize());
+ }
+ }
+ }
+
+ /**
+ * Validate the given query logic group limit configurations.
+ *
+ * @param configs
+ * the configurations to validate
+ */
+ public static void validateQueryLogicGroupConfigs(Collection configs) {
+ Set groupNames = new HashSet<>();
+ for (QueryLogicGroupLimitConfiguration config : configs) {
+
+ // Verify that a group name was given.
+ String groupName = config.getGroupName();
+ if (StringUtils.isBlank(groupName)) {
+ throw new IllegalArgumentException("Query logic group limit configuration given with blank group name");
+ }
+
+ // Verify that we have not seen a configuration with the group name before.
+ if (groupNames.contains(groupName)) {
+ throw new IllegalArgumentException("Multiple query logic group configurations given with group name '" + groupName + "'");
+ } else {
+ groupNames.add(groupName);
+ }
+
+ // Verify that the query limit is not negative.
+ if (config.getQueryLimit() < 0) {
+ throw new IllegalArgumentException("Negative limit given for query logic group '" + groupName + "'");
+ }
+
+ // Verify that a query logic pattern was given.
+ String queryLogicPattern = config.getQueryLogicPattern();
+ if (StringUtils.isBlank(queryLogicPattern)) {
+ throw new IllegalArgumentException("Blank query logic pattern given for query logic group '" + groupName + "'");
+ }
+
+ // Verify that the pattern compiles if it is not simply a * as is occasionally used as a wildcard in configurations.
+ try {
+ if (!queryLogicPattern.equals(QueryLimitConstants.ASTERISK)) {
+ Pattern.compile(queryLogicPattern);
+ }
+ } catch (PatternSyntaxException e) {
+ throw new IllegalArgumentException("Invalid regex in query logic pattern '" + queryLogicPattern + "' for query logic group '" + groupName + "'",
+ e);
+ }
+ }
+ }
+
+ /**
+ * Validate the given user limit configurations.
+ *
+ * @param configs
+ * the configurations to validate
+ */
+ public static void validateUserLimitConfigs(Collection configs) {
+ Set userDns = new HashSet<>();
+ for (UserLimitConfiguration config : configs) {
+ // Verify that a user dn was given.
+ String userDn = config.getUserDn();
+ if (StringUtils.isBlank(userDn)) {
+ throw new IllegalArgumentException("User query limit configuration given with blank user DN");
+ }
+
+ // Verify we have not seen a configuration with the user dn before.
+ if (userDns.contains(userDn)) {
+ throw new IllegalArgumentException("Multiple query limit configurations specified for user '" + userDn + "'");
+ } else {
+ userDns.add(userDn);
+ }
+
+ // Verify that if the user query limit was overridden, it is not negative.
+ if (config.getQueryLimit() != null && config.getQueryLimit() < 0) {
+ throw new IllegalArgumentException("Negative user query limit given for user '" + userDn + "'");
+ }
+
+ // Verify that no invalid group name patterns were provided.
+ Map groupLimits = config.getQueryLogicGroupLimits();
+ if (groupLimits != null) {
+ for (Map.Entry entry : groupLimits.entrySet()) {
+ String groupPattern = entry.getKey();
+ if (StringUtils.isBlank(groupPattern)) {
+ throw new IllegalArgumentException("User group query limit configuration given with blank group pattern for user '" + userDn + "'");
+ }
+ if (!groupPattern.equals(QueryLimitConstants.ASTERISK)) {
+ try {
+ Pattern.compile(groupPattern);
+ } catch (PatternSyntaxException e) {
+ throw new IllegalArgumentException("Invalid query logic group name pattern: " + groupPattern + " given for user " + userDn, e);
+ }
+ }
+ Integer limit = entry.getValue();
+ if (limit < 0) {
+ throw new IllegalArgumentException("Negative query logic group limit given for user '" + userDn + "': " + limit);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Validate the given system limit configurations.
+ *
+ * @param configs
+ * the configurations to validate
+ */
+ public static void validateSystemLimitConfigs(Collection configs, long maxCacheSize) {
+ Set systemPatterns = new HashSet<>();
+ Map matcherPatterns = new HashMap<>();
+ for (SystemLimitConfiguration config : configs) {
+ // Verify that a system pattern was given.
+ String systemPattern = config.getSystemPattern();
+ if (StringUtils.isBlank(systemPattern)) {
+ throw new IllegalArgumentException("System query limit configuration specified with blank system pattern");
+ }
+
+ // Verify that the pattern compiles if it is not simply a * as is occasionally used as a wildcard in configurations.
+ try {
+ if (!systemPattern.equals(QueryLimitConstants.ASTERISK)) {
+ Pattern.compile(systemPattern);
+ }
+ } catch (PatternSyntaxException e) {
+ throw new IllegalArgumentException("Invalid regex in system pattern '" + systemPattern + "'", e);
+ }
+
+ // Verify that we have not seen a configuration with the system pattern before.
+ if (systemPatterns.contains(systemPattern)) {
+ throw new IllegalArgumentException("Multiple query limit configurations specified with system pattern '" + systemPattern + "'");
+ } else {
+ systemPatterns.add(systemPattern);
+ }
+
+ // Fetch the matcher that would be used for the system pattern.
+ Matcher matcher = Matcher.getMatcher(systemPattern, maxCacheSize);
+
+ // Verify that we do not have an exact-matching pattern that is equivalent to a previously seen exact-matching pattern, such as 'SYSTEM-01' vs.
+ // 'SYSTEM\\-01'.
+ if (matcher instanceof StringMatcher) {
+ String matcherPattern = ((StringMatcher) matcher).getValue();
+ String equivalentSystemPattern = matcherPatterns.get(matcherPattern);
+ if (equivalentSystemPattern != null) {
+ throw new IllegalArgumentException(
+ "System pattern '" + systemPattern + "' will resolve to an exact match that is equivalent to system pattern '"
+ + equivalentSystemPattern + "' from another system configuration.");
+ } else {
+ matcherPatterns.put(matcherPattern, systemPattern);
+ }
+ }
+
+ // Safeguard against allowing a configuration to potentially set whether queries on a system counts against user limits to false for all
+ // systems. Only allow this to be done for exact system names, or non-wildcard-only patterns.
+ if (QueryLimitConstants.wildcardOnlyPattern.matcher(systemPattern).matches() && !config.getCountsAgainstUserLimit()) {
+ throw new IllegalArgumentException("System pattern '" + systemPattern
+ + "' is wildcard-only and may not be used to override whether queries count against user limits to false");
+ }
+
+ // Verify that no invalid group name patterns were provided.
+ Map groupLimits = config.getQueryLogicGroupLimits();
+ if (groupLimits != null) {
+ for (Map.Entry entry : groupLimits.entrySet()) {
+ String groupPattern = entry.getKey();
+ if (StringUtils.isBlank(groupPattern)) {
+ throw new IllegalArgumentException(
+ "User group query limit configuration given with blank group pattern for system pattern '" + systemPattern + "'");
+ }
+ if (!groupPattern.equals(QueryLimitConstants.ASTERISK)) {
+ try {
+ Pattern.compile(groupPattern);
+ } catch (PatternSyntaxException e) {
+ throw new IllegalArgumentException(
+ "Invalid query logic group name pattern: " + groupPattern + " given for system pattern " + systemPattern, e);
+ }
+ }
+ Integer limit = entry.getValue();
+ if (limit < 0) {
+ throw new IllegalArgumentException("Negative query logic group limit given for system pattern '" + systemPattern + "': " + limit);
+ }
+ }
+ }
+ }
+ }
+
+ private QueryLimitConfigurationValidationUtils() {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/web-services/query/src/main/java/datawave/webservice/query/limit/QueryLimitConfigurationValidator.java b/web-services/query/src/main/java/datawave/webservice/query/limit/QueryLimitConfigurationValidator.java
new file mode 100644
index 00000000000..f714103f56d
--- /dev/null
+++ b/web-services/query/src/main/java/datawave/webservice/query/limit/QueryLimitConfigurationValidator.java
@@ -0,0 +1,14 @@
+package datawave.webservice.query.limit;
+
+import com.google.common.base.Preconditions;
+
+import datawave.webservice.zookeeper.ObjectValidator;
+
+public class QueryLimitConfigurationValidator implements ObjectValidator {
+
+ @Override
+ public void validate(Object object) {
+ Preconditions.checkArgument((object instanceof QueryLimitConfiguration), "Object must be an instance of " + QueryLimitConfiguration.class.getName());
+ QueryLimitConfigurationValidationUtils.validate((QueryLimitConfiguration) object);
+ }
+}
diff --git a/web-services/query/src/main/java/datawave/webservice/query/limit/QueryLimiter.java b/web-services/query/src/main/java/datawave/webservice/query/limit/QueryLimiter.java
index 32c320029bd..a64e132ddcb 100644
--- a/web-services/query/src/main/java/datawave/webservice/query/limit/QueryLimiter.java
+++ b/web-services/query/src/main/java/datawave/webservice/query/limit/QueryLimiter.java
@@ -5,23 +5,37 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import com.google.common.base.Preconditions;
+
+import datawave.webservice.zookeeper.ZkObjectPublishResult;
+import datawave.webservice.zookeeper.ZkObjectPublishStatus;
+import datawave.webservice.zookeeper.ZkObjectPublisher;
+
/**
* This class is responsible for determining if any concurrent query limits are going to be exceeded for a user, system, or query logic when a new query is
- * submitted.
+ * submitted. It is expected that only a singleton instance of {@link QueryLimiter} will be created via CDI.
*/
public class QueryLimiter {
private static final Logger log = Logger.getLogger(QueryLimiter.class);
+ // The default string to use as a system name when no system is provided with a query.
+ public static final String EMPTY_SYSTEM_FROM = "EMPTY_SYSTEM_FROM";
+
+ // A lock that will guard access to the query limit configuration and the limit providers.
+ private final Lock configLock = new ReentrantLock();
+
// The string to use to connect to zookeeper.
private String zookeeperConfig;
// The configuration to initialize the limit providers with.
- private QueryLimitConfiguration configuration;
+ private ImmutableQueryLimitConfiguration configuration;
// A cache to store heartbeats of active queries within.
private QueryHeartbeatCache heartbeatCache;
@@ -38,7 +52,11 @@ public class QueryLimiter {
// The tracker responsible for interfacing with Zookeeper.
private ActiveQueryTracker activeQueryTracker;
- public static final String EMPTY_SYSTEM_FROM = "EMPTY_SYSTEM_FROM";
+ // The publisher responsible for notifying the query limiter when there are updates to the configuration.
+ private ZkObjectPublisher configPublisher;
+
+ // Whether the limiter is currently in a state where it can provide limits
+ private boolean canProvideLimits = false;
/**
* Return the zookeeper connection string.
@@ -60,24 +78,141 @@ public void setZookeeperConfig(String zookeeperConfig) {
}
/**
- * Set the configuration to use to set up this {@link QueryLimiter}
+ * Set the config publisher that will notify this {@link QueryLimiter} of configuration updates.
+ *
+ * @param configPublisher
+ * the configuration publisher
+ */
+ public void setConfigPublisher(ZkObjectPublisher configPublisher) {
+ this.configPublisher = configPublisher;
+ }
+
+ /**
+ * Update the configuration for this {@link QueryLimiter}. The configuration will be validated if indicated, and the internal configuration and limit
+ * providers will be recreated to reflect the new configuration.
+ *
+ * @param configuration
+ * the configuration to set
+ * @param validationRequired
+ * whether the configuration should be validated before updating the internal providers
+ */
+ private void updateConfiguration(QueryLimitConfiguration configuration, boolean validationRequired) {
+ Preconditions.checkNotNull(configuration, "configuration must not be null");
+
+ configLock.lock();
+ try {
+ // If validation is required, do so.
+ if (validationRequired) {
+ QueryLimitConfigurationValidationUtils.validate(configuration);
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Updating configuration to " + configuration);
+ }
+
+ try {
+ // Update the configuration.
+ this.configuration = new ImmutableQueryLimitConfiguration(configuration);
+
+ // Recreate the query logic group provider.
+ if (this.queryLogicGroupLimitProvider != null) {
+ try {
+ this.queryLogicGroupLimitProvider.cleanUp();
+ } catch (Exception e) {
+ log.warn("Failed to clean up query logic group limit provider", e);
+ }
+ // Make this null so that if recreating the provider fails for some reason, canProvideLimits() will return false.
+ this.queryLogicGroupLimitProvider = null;
+ }
+ this.queryLogicGroupLimitProvider = new QueryLogicGroupLimitProvider(configuration.getInternalCacheMaxSize(),
+ configuration.getQueryLogicGroupConfigs());
+
+ // Recreate the user limit provider.
+ if (this.userLimitProvider != null) {
+ try {
+ this.userLimitProvider.cleanUp();
+ } catch (Exception e) {
+ log.warn("Failed to clean up user limit provider", e);
+ }
+ // Make this null so that if recreating the provider fails for some reason, canProvideLimits() will return false.
+ this.userLimitProvider = null;
+ }
+ this.userLimitProvider = new UserLimitProvider(configuration.getDefaultUserQueryLimit(), configuration.getInternalCacheMaxSize(),
+ configuration.getUserConfigs(), queryLogicGroupLimitProvider);
+
+ // Recreate the system limit provider.
+ if (this.systemLimitProvider != null) {
+ try {
+ this.systemLimitProvider.cleanUp();
+ } catch (Exception e) {
+ log.warn("Failed to clean up system limit provider", e);
+ }
+ // Make this null so that if recreating the provider fails for some reason, canProvideLimits() will return false.
+ this.systemLimitProvider = null;
+ }
+ this.systemLimitProvider = new SystemLimitProvider(configuration.getDefaultSystemQueryLimit(), configuration.getInternalCacheMaxSize(),
+ configuration.getSystemConfigs(), queryLogicGroupLimitProvider);
+
+ log.debug("Configuration updated and internal limit providers recreated");
+ } catch (Exception e) {
+ log.error("Failed to update configuration", e);
+ }
+
+ // Update whether this limiter can provide limits.
+ this.canProvideLimits = this.configuration != null && this.queryLogicGroupLimitProvider != null && this.userLimitProvider != null
+ && this.systemLimitProvider != null;
+ } finally {
+ configLock.unlock();
+ }
+ }
+
+ /**
+ * Set the configuration for the {@link QueryLimiter} if and only if the configuration for the limiter is currently null. Throws an
+ * {@link IllegalStateException} otherwise. This method exists primarily to support initial CDI injection during startup, and it is expected that
+ * {@link #setup()} will be called to create the internal limit providers.
*
* @param queryLimitConfiguration
* the config
+ *
+ * @throws NullPointerException
+ * if the new configuration is null
+ * @throws IllegalStateException
+ * if the internal {@link QueryLimitConfiguration} is not null.
*/
public void setConfiguration(QueryLimitConfiguration queryLimitConfiguration) {
- this.configuration = queryLimitConfiguration;
+ Preconditions.checkNotNull(queryLimitConfiguration, "configuration must not be null");
+ configLock.lock();
+ try {
+ if (this.configuration == null) {
+ this.configuration = new ImmutableQueryLimitConfiguration(queryLimitConfiguration);
+ } else {
+ throw new IllegalStateException("QueryLimitConfiguration is already set, use updateConfiguration(QueryLimitConfiguration) instead");
+ }
+ } finally {
+ configLock.unlock();
+ }
}
/**
- * Return the configuration used to set up this {@link QueryLimiter}
+ * Return the configuration currently configured for this {@link QueryLimiter}. This will be an instance of {@link ImmutableQueryLimitConfiguration}.
*
* @return the config
*/
public QueryLimitConfiguration getConfiguration() {
- return configuration;
+ configLock.lock();
+ try {
+ return configuration;
+ } finally {
+ configLock.unlock();
+ }
}
+ /**
+ * Set the {@link QueryHeartbeatCache}.
+ *
+ * @param heartbeatCache
+ * the heartbeat cache
+ */
public void setHeartbeatCache(QueryHeartbeatCache heartbeatCache) {
this.heartbeatCache = heartbeatCache;
}
@@ -88,28 +223,45 @@ public void setHeartbeatCache(QueryHeartbeatCache heartbeatCache) {
*/
public void setup() {
if (log.isDebugEnabled()) {
- log.debug("Initializing with zookeeperConfig: '" + zookeeperConfig + "' and query limit config: " + configuration);
+ log.debug("Initializing with zookeeperConfig: '" + zookeeperConfig + "', and query limit config: " + configuration);
}
- if (this.configuration != null) {
- if (this.configuration.getDefaultUserQueryLimit() < 1) {
- throw new IllegalArgumentException("Default user query limit must be greater than 0");
+ configLock.lock();
+ try {
+ // Require the heartbeat cache to be set.
+ if (heartbeatCache == null) {
+ throw new IllegalStateException("No heartbeat cache set");
}
- if (this.configuration.getInternalCacheMaxSize() < 1) {
- throw new IllegalArgumentException("Internal cache max size must be greater than 0");
+ // If no configuration was supplied from a configured bean, attempt to load a configuration from Zookeeper.
+ if (this.configuration == null) {
+ if (this.configPublisher != null) {
+ ZkObjectPublishResult result = configPublisher.getObjectFromZk();
+ if (result.getStatus() == ZkObjectPublishStatus.SUCCESS) {
+ // Update the configuration and create the providers. The configuration returned by the reloader will already be validated.
+ updateConfiguration((QueryLimitConfiguration) result.getUpdatedObject(), false);
+ } else {
+ log.error("Failed to load configuration from zookeeper: " + result);
+ }
+ }
+ if (this.configuration == null) {
+ throw new IllegalStateException("No configuration supplied for Query Limiter either via injection or Zookeeper.");
+ }
+ } else {
+ // Update the configuration and create the providers.
+ updateConfiguration(this.configuration, true);
}
- this.queryLogicGroupLimitProvider = new QueryLogicGroupLimitProvider(configuration.getInternalCacheMaxSize(),
- configuration.getQueryLogicGroupConfigs());
- this.userLimitProvider = new UserLimitProvider(configuration.getDefaultUserQueryLimit(), configuration.getInternalCacheMaxSize(),
- configuration.getUserConfigs(), queryLogicGroupLimitProvider);
- this.systemLimitProvider = new SystemLimitProvider(configuration.getDefaultSystemQueryLimit(), configuration.getInternalCacheMaxSize(),
- configuration.getSystemConfigs(), queryLogicGroupLimitProvider);
- } else {
- this.queryLogicGroupLimitProvider = null;
- this.userLimitProvider = null;
- this.systemLimitProvider = null;
+ // If the configuration reloader is not null, add a listener so that this limiter will be provided with new configurations. Any configs provided by
+ // the reloader will already be validated.
+ if (configPublisher != null) {
+ configPublisher.subscribeToUpdates(((config) -> updateConfiguration((QueryLimitConfiguration) config, false)));
+ log.debug("QueryLimiter now listening for configuration updates from config publisher");
+ } else {
+ log.warn("No config publisher set for QueryLimiter, limiter will not be notified of configuration updates");
+ }
+ } finally {
+ configLock.unlock();
}
}
@@ -117,18 +269,33 @@ public void setup() {
* Releases internal resources and cleans up connections and scheduled tasks.
*/
public void shutdown() {
+ log.debug("Shutting down");
+
if (this.heartbeatCache != null) {
try {
this.heartbeatCache.shutdown();
} catch (Exception e) {
- log.error("Error closing heartbeat cache", e);
+ log.warn("Error closing heartbeat cache", e);
+ } finally {
+ this.heartbeatCache = null;
}
}
if (this.activeQueryTracker != null) {
try {
this.activeQueryTracker.close();
} catch (Exception e) {
- log.error("Error closing active query tracker", e);
+ log.warn("Error closing active query tracker", e);
+ } finally {
+ this.activeQueryTracker = null;
+ }
+ }
+ if (this.configPublisher != null) {
+ try {
+ this.configPublisher.close();
+ } catch (Exception e) {
+ log.warn("Error closing config publisher", e);
+ } finally {
+ this.configPublisher = null;
}
}
}
@@ -147,29 +314,37 @@ public void shutdown() {
* if an exception occurs
*/
public QueryLimiterResponse checkForLimits(String userDn, String system, String queryLogic) throws Exception {
- // Cast the user DN to lowercase to ensure a consistent format.
- userDn = userDn.trim().toLowerCase();
+ configLock.lock();
+ try {
+ Preconditions.checkState(canProvideLimits, "Cannot check for limits, configuration or providers are not initialized");
- // Do not cast the system or query logic to lowercase, they will be getting matched against regex patterns.
- queryLogic = queryLogic.trim();
+ // Cast the user DN to lowercase to ensure a consistent format.
+ userDn = userDn.trim().toLowerCase();
- // Ensure the system is non-null if empty
- if (system == null || system.isBlank()) {
- system = EMPTY_SYSTEM_FROM;
- }
+ // Do not cast the system or query logic to lowercase, they will be getting matched against regex patterns.
+ queryLogic = queryLogic.trim();
- if (log.isDebugEnabled()) {
- log.debug("Checking limits - userDn: " + userDn + ", system: " + system + ", queryLogic: " + queryLogic);
- }
+ // Ensure the system is non-null if empty
+ if (system == null || system.isBlank()) {
+ system = EMPTY_SYSTEM_FROM;
+ }
- // Check if the snapshot reveals that any limits have been met.
- LimitChecker checker = new LimitChecker(userDn, system, queryLogic);
- checker.checkLimits();
- if (checker.metLimit) {
- return QueryLimiterResponse.metLimit(checker.message);
- } else {
- return QueryLimiterResponse.hasNotMetLimit();
+ if (log.isDebugEnabled()) {
+ log.debug("Checking limits - userDn: " + userDn + ", system: " + system + ", queryLogic: " + queryLogic);
+ }
+
+ // Check if the snapshot reveals that any limits have been met.
+ LimitChecker checker = new LimitChecker(userDn, system, queryLogic);
+ checker.checkLimits();
+ if (checker.metLimit) {
+ return QueryLimiterResponse.metLimit(checker.message);
+ } else {
+ return QueryLimiterResponse.hasNotMetLimit();
+ }
+ } finally {
+ configLock.unlock();
}
+
}
/**
@@ -187,22 +362,30 @@ public QueryLimiterResponse checkForLimits(String userDn, String system, String
* if an error occurs
*/
public void countQueryTowardsLimits(String queryId, String userDn, String system, String queryLogic) throws Exception {
- if (log.isDebugEnabled()) {
- log.debug("Start counting query " + queryId + " towards limits");
- }
+ configLock.lock();
+ try {
+ Preconditions.checkState(canProvideLimits, "Cannot check for limits, configuration or providers are not initialized");
- userDn = userDn.trim().toLowerCase();
- queryLogic = queryLogic.trim();
- // Ensure the system is non-null if empty
- if (system == null || system.isBlank()) {
- system = EMPTY_SYSTEM_FROM;
- }
+ if (log.isDebugEnabled()) {
+ log.debug("Start counting query " + queryId + " towards limits");
+ }
+
+ userDn = userDn.trim().toLowerCase();
+ queryLogic = queryLogic.trim();
+ // Ensure the system is non-null if empty
+ if (system == null || system.isBlank()) {
+ system = EMPTY_SYSTEM_FROM;
+ }
- boolean systemCountsTowardsUserLimits = systemLimitProvider.countsAgainstUserLimit(system);
+ boolean systemCountsTowardsUserLimits = systemLimitProvider.countsAgainstUserLimit(system);
- QueryHeartbeat heartbeat = getActiveQueryTracker().trackQuery(queryId, userDn, system, queryLogic, systemCountsTowardsUserLimits);
- // Store the heartbeat into the cache. This acts as a means to keep the connection to Zookeeper alive for the ephemeral nodes stored in the heartbeat.
- heartbeatCache.put(heartbeat);
+ QueryHeartbeat heartbeat = getActiveQueryTracker().trackQuery(queryId, userDn, system, queryLogic, systemCountsTowardsUserLimits);
+ // Store the heartbeat into the cache. This acts as a means to keep the connection to Zookeeper alive for the ephemeral nodes stored in the
+ // heartbeat.
+ heartbeatCache.put(heartbeat);
+ } finally {
+ configLock.unlock();
+ }
}
/**
diff --git a/web-services/query/src/main/java/datawave/webservice/query/limit/QueryLogicGroupLimitConfiguration.java b/web-services/query/src/main/java/datawave/webservice/query/limit/QueryLogicGroupLimitConfiguration.java
index 5615181fe39..0be06b19ea5 100644
--- a/web-services/query/src/main/java/datawave/webservice/query/limit/QueryLogicGroupLimitConfiguration.java
+++ b/web-services/query/src/main/java/datawave/webservice/query/limit/QueryLogicGroupLimitConfiguration.java
@@ -3,19 +3,24 @@
import java.util.Objects;
import java.util.StringJoiner;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
/**
* Represents a custom query limit configuration that can be configured for query logics.
*/
public class QueryLogicGroupLimitConfiguration {
// The name of the query logic group.
+ @JsonProperty
private String groupName;
// The query logic regex pattern.
+ @JsonProperty
private String queryLogicPattern;
// The default concurrency limit for users. This applies to the total concurrent queries a user may run that originate from a query logic in the group
// across all systems.
+ @JsonProperty
private int queryLimit;
public QueryLogicGroupLimitConfiguration() {}
@@ -50,9 +55,22 @@ public void setQueryLimit(int queryLimit) {
this.queryLimit = queryLimit;
}
+ /**
+ * Return whether this {@link QueryLogicGroupLimitConfiguration} is considered equal to the given object. This {@code equals(Object)} implementation allows
+ * this instance to be equal to an object that is a subclass of {@link QueryLogicGroupLimitConfiguration}, such as
+ * {@link ImmutableQueryLogicGroupLimitConfiguration}.
+ *
+ * @param o
+ * the object to compare
+ * @return true if the object is equal to this {@link QueryLogicGroupLimitConfiguration}, or false otherwise
+ */
@Override
public boolean equals(Object o) {
- if (o == null || getClass() != o.getClass()) {
+ if (o == this) {
+ return true;
+ }
+ // Allow this instance to be considered equal to subclasses.
+ if (!(o instanceof QueryLogicGroupLimitConfiguration)) {
return false;
}
QueryLogicGroupLimitConfiguration that = (QueryLogicGroupLimitConfiguration) o;
@@ -66,7 +84,19 @@ public int hashCode() {
@Override
public String toString() {
- return new StringJoiner(", ", QueryLogicGroupLimitConfiguration.class.getSimpleName() + "[", "]").add("groupName='" + groupName + "'")
+ return toString(QueryLogicGroupLimitConfiguration.class);
+ }
+
+ /**
+ * Return a String representation of this {@link QueryLogicGroupLimitConfiguration} referencing the given class as the instance of this
+ * {@link QueryLogicGroupLimitConfiguration}.
+ *
+ * @param clazz
+ * the class
+ * @return the string representation
+ */
+ protected String toString(Class extends QueryLogicGroupLimitConfiguration> clazz) {
+ return new StringJoiner(", ", clazz.getSimpleName() + "[", "]").add("groupName='" + groupName + "'")
.add("queryLogicPattern='" + queryLogicPattern + "'").add("queryLimit=" + queryLimit).toString();
}
}
diff --git a/web-services/query/src/main/java/datawave/webservice/query/limit/QueryLogicGroupLimitProvider.java b/web-services/query/src/main/java/datawave/webservice/query/limit/QueryLogicGroupLimitProvider.java
index 5c892be89d7..1c8829d55f7 100644
--- a/web-services/query/src/main/java/datawave/webservice/query/limit/QueryLogicGroupLimitProvider.java
+++ b/web-services/query/src/main/java/datawave/webservice/query/limit/QueryLogicGroupLimitProvider.java
@@ -3,23 +3,22 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
-import java.util.regex.Pattern;
-import java.util.regex.PatternSyntaxException;
import java.util.stream.Collectors;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Logger;
/**
* This class is responsible for identifying and providing limits that should be enforced for query logic groups.
*/
public class QueryLogicGroupLimitProvider {
+ private static final Logger log = Logger.getLogger(QueryLogicGroupLimitProvider.class);
+
private final long maxCacheSize;
private Map groupsToLimits = Map.of();
@@ -29,7 +28,6 @@ public class QueryLogicGroupLimitProvider {
public QueryLogicGroupLimitProvider(long maxCacheSize, Collection configs) {
this.maxCacheSize = maxCacheSize;
if (configs != null && !configs.isEmpty()) {
- validateConfigs(configs);
populateLimits(configs);
} else {
this.groupLimitCache = GroupLimitCache.emptyInstance();
@@ -37,52 +35,6 @@ public QueryLogicGroupLimitProvider(long maxCacheSize, Collection configs) {
- Set groupNames = new HashSet<>();
- for (QueryLogicGroupLimitConfiguration config : configs) {
-
- // Verify that a group name was given.
- String groupName = config.getGroupName();
- if (StringUtils.isBlank(groupName)) {
- throw new IllegalArgumentException("Query logic group limit configuration given with blank group name");
- }
-
- // Verify that we have not seen a configuration with the group name before.
- if (groupNames.contains(groupName)) {
- throw new IllegalArgumentException("Multiple query logic group configurations given with group name '" + groupName + "'");
- } else {
- groupNames.add(groupName);
- }
-
- // Verify that the query limit is not negative.
- if (config.getQueryLimit() < 0) {
- throw new IllegalArgumentException("Negative limit given for query logic group '" + groupName + "'");
- }
-
- // Verify that a query logic pattern was given.
- String queryLogicPattern = config.getQueryLogicPattern();
- if (StringUtils.isBlank(queryLogicPattern)) {
- throw new IllegalArgumentException("Blank query logic pattern given for query logic group '" + groupName + "'");
- }
-
- // Verify that the pattern compiles if it is not simply a * as is occasionally used as a wildcard in configurations.
- try {
- if (!queryLogicPattern.equals(QueryLimitConstants.ASTERISK)) {
- Pattern.compile(queryLogicPattern);
- }
- } catch (PatternSyntaxException e) {
- throw new IllegalArgumentException("Invalid regex in query logic pattern '" + queryLogicPattern + "' for query logic group '" + groupName + "'",
- e);
- }
- }
- }
-
/**
* Populate the limits to enforce for query logic groups.
*
@@ -161,6 +113,22 @@ public Map getGroupMatchers(Set groups) {
return map;
}
+ /**
+ * Clean up this {@link QueryLogicGroupLimitProvider} and release its underlying resources.
+ */
+ public void cleanUp() {
+ groupsToLimits = null;
+ if (groupLimitCache != null) {
+ try {
+ groupLimitCache.cleanUp();
+ } catch (Exception e) {
+ log.warn("Failed to clear groupLimitCache", e);
+ } finally {
+ groupLimitCache = null;
+ }
+ }
+ }
+
/**
* This class represents a sortable matchable group limit override.
*/
diff --git a/web-services/query/src/main/java/datawave/webservice/query/limit/README.md b/web-services/query/src/main/java/datawave/webservice/query/limit/README.md
index 5d7aa353d65..ce0b350214f 100644
--- a/web-services/query/src/main/java/datawave/webservice/query/limit/README.md
+++ b/web-services/query/src/main/java/datawave/webservice/query/limit/README.md
@@ -47,6 +47,10 @@ When using regex patterns in the configurations above, there is the possibility
2. Partial regex (non-wildcard-only): If we cannot find an exact match, then we attempt to find all partial matches, and see if any of their limits are met, checking against the lowest limits first.
3. Wildcard-only regex: In the case of no exact or partial matches, we use the wildcard match with the lowest limit.
+## Dynamic Configuration Updates
+
+The configuration for the `QueryLimiter` may be updated dynamically through Zookeeper. When the `QueryLimiter` is configured with a [ZkObjectPublisher](../../zookeeper/ZkObjectPublisher.java), it will subscribe to updates publisher. When the publisher receives a triggering event, it will attempt to load a new `QueryLimitConfiguration` from the configured file. See the [ZkObjectPublisher README](../../zookeeper/README.md) for more details.
+
## Implementation
Checking limits and marking as active/inactive is done through the [QueryLimiter](QueryLimiter.java) class. The three main methods for interacting with the query limit feature are:
@@ -70,7 +74,7 @@ When a query is marked as active via `QueryLimiter.countQueryTowardsLimits()`, i
`ActiveQueryTracker.trackQuery()` will return a [QueryHeartbeat](QueryHeartbeat.java) instance that contain a list of `PersistentNode` (provided by the Apache Curator library) wrappers around the ephemeral nodes listed above. The `QueryHeartbeat` will maintain the connection to Zookeeper and attempt to keep the ephemeral nodes present in Zookeeper until `QueryHeartbeat.stop()` is called. If `QueryHeartbeat.stop()` is called, or the webserver crashes, the ephemeral nodes will automatically be deleted by Zookeeper.
-The following HTTP status codes have been added for responses from the webserver:
+The following HTTP status codes are available for responses from the webserver:
```
412-20 - Concurrent query limit exceeded
500-164 - Error checking concurrent query limits
diff --git a/web-services/query/src/main/java/datawave/webservice/query/limit/SystemLimitConfiguration.java b/web-services/query/src/main/java/datawave/webservice/query/limit/SystemLimitConfiguration.java
index b0fe82180e7..6054886f22e 100644
--- a/web-services/query/src/main/java/datawave/webservice/query/limit/SystemLimitConfiguration.java
+++ b/web-services/query/src/main/java/datawave/webservice/query/limit/SystemLimitConfiguration.java
@@ -4,22 +4,28 @@
import java.util.Objects;
import java.util.StringJoiner;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
/**
* Represents a custom query limit configuration that can be configured for matching systems.
*/
public class SystemLimitConfiguration {
// The system name regex pattern.
+ @JsonProperty
private String systemPattern;
// Whether queries submitted on matching systems should count against a user's query limit.
+ @JsonProperty
private Boolean countsAgainstUserLimit;
// The maximum number of queries that can run concurrently on matching systems.
+ @JsonProperty
private Integer queryLimit;
// Map of query logic group names to the maximum number of queries that can run concurrently on the system when originating from query logics that fall
// within the query logic group. The names may be regex patterns.
+ @JsonProperty
private Map queryLogicGroupLimits;
public SystemLimitConfiguration() {
@@ -65,11 +71,24 @@ public void setQueryLogicGroupLimits(Map queryLogicGroupLimits)
this.queryLogicGroupLimits = queryLogicGroupLimits == null ? Map.of() : queryLogicGroupLimits;
}
+ /**
+ * Return whether this {@link SystemLimitConfiguration} is considered equal to the given object. This {@code equals(Object)} implementation allows this
+ * instance to be equal to an object that is a subclass of {@link SystemLimitConfiguration}, such as {@link ImmutableSystemLimitConfiguration}.
+ *
+ * @param o
+ * the object to compare
+ * @return true if the object is equal to this {@link SystemLimitConfiguration}, or false otherwise
+ */
@Override
public boolean equals(Object o) {
- if (o == null || getClass() != o.getClass()) {
+ if (o == this) {
+ return true;
+ }
+ // Allow this instance to be considered equal to subclasses.
+ if (!(o instanceof SystemLimitConfiguration)) {
return false;
}
+
SystemLimitConfiguration that = (SystemLimitConfiguration) o;
return Objects.equals(systemPattern, that.systemPattern) && Objects.equals(countsAgainstUserLimit, that.countsAgainstUserLimit)
&& Objects.equals(queryLimit, that.queryLimit) && Objects.equals(queryLogicGroupLimits, that.queryLogicGroupLimits);
@@ -82,7 +101,19 @@ public int hashCode() {
@Override
public String toString() {
- return new StringJoiner(", ", SystemLimitConfiguration.class.getSimpleName() + "[", "]").add("systemPattern='" + systemPattern + "'")
+ return toString(SystemLimitConfiguration.class);
+ }
+
+ /**
+ * Return a String representation of this {@link SystemLimitConfiguration} referencing the given class as the instance of this
+ * {@link SystemLimitConfiguration}.
+ *
+ * @param clazz
+ * the class
+ * @return the string representation
+ */
+ protected String toString(Class extends SystemLimitConfiguration> clazz) {
+ return new StringJoiner(", ", clazz.getSimpleName() + "[", "]").add("systemPattern='" + systemPattern + "'")
.add("countsAgainstsUserLimit=" + countsAgainstUserLimit).add("queryLimit=" + queryLimit)
.add("queryLogicGroupLimits=" + queryLogicGroupLimits).toString();
}
diff --git a/web-services/query/src/main/java/datawave/webservice/query/limit/SystemLimitProvider.java b/web-services/query/src/main/java/datawave/webservice/query/limit/SystemLimitProvider.java
index 30ef8c70574..6697d0529ee 100644
--- a/web-services/query/src/main/java/datawave/webservice/query/limit/SystemLimitProvider.java
+++ b/web-services/query/src/main/java/datawave/webservice/query/limit/SystemLimitProvider.java
@@ -2,17 +2,11 @@
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
-import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
-import java.util.regex.Pattern;
-import java.util.regex.PatternSyntaxException;
-import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
import com.github.benmanes.caffeine.cache.Cache;
@@ -25,12 +19,12 @@ public class SystemLimitProvider {
private static final Logger log = Logger.getLogger(SystemLimitProvider.class);
- private final Cache> systemLimitCache;
-
private final int defaultSystemQueryLimit;
private final long maxCacheSize;
+ private Cache> systemLimitCache;
+
private SortedSet sortedSystemLimits;
SystemLimitProvider(int defaultSystemQueryLimit, long maxCacheSize, Collection configs,
@@ -44,7 +38,6 @@ public class SystemLimitProvider {
this.maxCacheSize = maxCacheSize;
if (configs != null && !configs.isEmpty()) {
- validateConfigs(configs);
populateLimits(configs, groupLimitProvider);
this.systemLimitCache = Caffeine.newBuilder().maximumSize(100).build();
} else {
@@ -53,88 +46,6 @@ public class SystemLimitProvider {
}
}
- /**
- * Validate the given configurations.
- *
- * @param configs
- * the configurations to validate
- */
- private void validateConfigs(Collection configs) {
- Set systemPatterns = new HashSet<>();
- Map matcherPatterns = new HashMap<>();
- for (SystemLimitConfiguration config : configs) {
- // Verify that a system pattern was given.
- String systemPattern = config.getSystemPattern();
- if (StringUtils.isBlank(systemPattern)) {
- throw new IllegalArgumentException("System query limit configuration specified with blank system pattern");
- }
-
- // Verify that the pattern compiles if it is not simply a * as is occasionally used as a wildcard in configurations.
- try {
- if (!systemPattern.equals(QueryLimitConstants.ASTERISK)) {
- Pattern.compile(systemPattern);
- }
- } catch (PatternSyntaxException e) {
- throw new IllegalArgumentException("Invalid regex in system pattern '" + systemPattern + "'", e);
- }
-
- // Verify that we have not seen a configuration with the system pattern before.
- if (systemPatterns.contains(systemPattern)) {
- throw new IllegalArgumentException("Multiple query limit configurations specified with system pattern '" + systemPattern + "'");
- } else {
- systemPatterns.add(systemPattern);
- }
-
- // Fetch the matcher that would be used for the system pattern.
- Matcher matcher = Matcher.getMatcher(systemPattern, maxCacheSize);
-
- // Verify that we do not have an exact-matching pattern that is equivalent to a previously seen exact-matching pattern, such as 'SYSTEM-01' vs.
- // 'SYSTEM\\-01'.
- if (matcher instanceof StringMatcher) {
- String matcherPattern = ((StringMatcher) matcher).getValue();
- String equivalentSystemPattern = matcherPatterns.get(matcherPattern);
- if (equivalentSystemPattern != null) {
- throw new IllegalArgumentException(
- "System pattern '" + systemPattern + "' will resolve to an exact match that is equivalent to system pattern '"
- + equivalentSystemPattern + "' from another system configuration.");
- } else {
- matcherPatterns.put(matcherPattern, systemPattern);
- }
- }
-
- // Safeguard against allowing a configuration to potentially set whether queries on a system counts against user limits to false for all
- // systems. Only allow this to be done for exact system names, or non-wildcard-only patterns.
- if (QueryLimitConstants.wildcardOnlyPattern.matcher(systemPattern).matches() && !config.getCountsAgainstUserLimit()) {
- throw new IllegalArgumentException("System pattern '" + systemPattern
- + "' is wildcard-only and may not be used to override whether queries count against user limits to false");
- }
-
- // Verify that no invalid group name patterns were provided.
- Map groupLimits = config.getQueryLogicGroupLimits();
- if (groupLimits != null) {
- for (Map.Entry entry : groupLimits.entrySet()) {
- String groupPattern = entry.getKey();
- if (StringUtils.isBlank(groupPattern)) {
- throw new IllegalArgumentException(
- "User group query limit configuration given with blank group pattern for system pattern '" + systemPattern + "'");
- }
- if (!groupPattern.equals(QueryLimitConstants.ASTERISK)) {
- try {
- Pattern.compile(groupPattern);
- } catch (PatternSyntaxException e) {
- throw new IllegalArgumentException(
- "Invalid query logic group name pattern: " + groupPattern + " given for system pattern " + systemPattern, e);
- }
- }
- Integer limit = entry.getValue();
- if (limit < 0) {
- throw new IllegalArgumentException("Negative query logic group limit given for system pattern '" + systemPattern + "': " + limit);
- }
- }
- }
- }
- }
-
/**
* Populate the limits to enforce for systems.
*
@@ -238,6 +149,22 @@ public boolean countsAgainstUserLimit(String system) {
return customLimit.map(SystemLimits::countsAgainstUserLimit).orElse(true);
}
+ /**
+ * Clean up this {@link SystemLimitProvider} and release its underlying resources.
+ */
+ public void cleanUp() {
+ if (systemLimitCache != null) {
+ try {
+ systemLimitCache.invalidateAll();
+ } catch (Exception e) {
+ log.error("Failed to clear systemLimitCache", e);
+ } finally {
+ systemLimitCache = null;
+ }
+ }
+ sortedSystemLimits = null;
+ }
+
/**
* This class represents a sortable system pattern and its limit configuration.
*/
diff --git a/web-services/query/src/main/java/datawave/webservice/query/limit/UserLimitConfiguration.java b/web-services/query/src/main/java/datawave/webservice/query/limit/UserLimitConfiguration.java
index 5f624ed3c07..edd22adf5fc 100644
--- a/web-services/query/src/main/java/datawave/webservice/query/limit/UserLimitConfiguration.java
+++ b/web-services/query/src/main/java/datawave/webservice/query/limit/UserLimitConfiguration.java
@@ -4,18 +4,23 @@
import java.util.Objects;
import java.util.StringJoiner;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
/**
* Represents a custom query limit configuration that can be configured for a single user.
*/
public class UserLimitConfiguration {
// The user DN.
+ @JsonProperty
private String userDn;
// The user's concurrent query limit. This applies to the total number of queries the user may run across all systems.
+ @JsonProperty
private Integer queryLimit;
// Map of query logic group names to the user's concurrent query limit for the group. The names may be regex patterns.
+ @JsonProperty
private Map queryLogicGroupLimits;
public UserLimitConfiguration() {
@@ -52,9 +57,21 @@ public void setQueryLogicGroupLimits(Map queryLogicGroupLimits)
this.queryLogicGroupLimits = queryLogicGroupLimits == null ? Map.of() : queryLogicGroupLimits;
}
+ /**
+ * Return whether this {@link UserLimitConfiguration} is considered equal to the given object. This {@code equals(Object)} implementation allows this
+ * instance to be equal to an object that is a subclass of {@link UserLimitConfiguration}, such as {@link ImmutableUserLimitConfiguration}.
+ *
+ * @param o
+ * the object to compare
+ * @return true if the object is equal to this {@link UserLimitConfiguration}, or false otherwise
+ */
@Override
public boolean equals(Object o) {
- if (o == null || getClass() != o.getClass()) {
+ if (o == this) {
+ return true;
+ }
+ // Allow this instance to be considered equal to subclasses.
+ if (!(o instanceof UserLimitConfiguration)) {
return false;
}
UserLimitConfiguration that = (UserLimitConfiguration) o;
@@ -69,7 +86,18 @@ public int hashCode() {
@Override
public String toString() {
- return new StringJoiner(", ", UserLimitConfiguration.class.getSimpleName() + "[", "]").add("userDn='" + userDn + "'").add("queryLimit=" + queryLimit)
+ return toString(UserLimitConfiguration.class);
+ }
+
+ /**
+ * Return a String representation of this {@link UserLimitConfiguration} referencing the given class as the instance of this {@link UserLimitConfiguration}.
+ *
+ * @param clazz
+ * the class
+ * @return the string representation
+ */
+ protected String toString(Class extends UserLimitConfiguration> clazz) {
+ return new StringJoiner(", ", clazz.getSimpleName() + "[", "]").add("userDn='" + userDn + "'").add("queryLimit=" + queryLimit)
.add("queryLogicGroupLimits=" + queryLogicGroupLimits).toString();
}
}
diff --git a/web-services/query/src/main/java/datawave/webservice/query/limit/UserLimitProvider.java b/web-services/query/src/main/java/datawave/webservice/query/limit/UserLimitProvider.java
index da6d8e40219..b87df4e332e 100644
--- a/web-services/query/src/main/java/datawave/webservice/query/limit/UserLimitProvider.java
+++ b/web-services/query/src/main/java/datawave/webservice/query/limit/UserLimitProvider.java
@@ -2,14 +2,9 @@
import java.util.Collection;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
import java.util.SortedSet;
-import java.util.regex.Pattern;
-import java.util.regex.PatternSyntaxException;
-import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
/**
@@ -30,64 +25,12 @@ public class UserLimitProvider {
this.defaultUserQueryLimit = defaultUserQueryLimit;
this.maxCacheSize = maxCacheSize;
if (configs != null && !configs.isEmpty()) {
- validateConfigs(configs);
populateLimits(configs, groupLimitProvider);
} else {
this.customLimits = Map.of();
}
}
- /**
- * Validate the given configurations.
- *
- * @param configs
- * the configurations to validate
- */
- private void validateConfigs(Collection configs) {
- Set userDns = new HashSet<>();
- for (UserLimitConfiguration config : configs) {
- // Verify that a user dn was given.
- String userDn = config.getUserDn();
- if (StringUtils.isBlank(userDn)) {
- throw new IllegalArgumentException("User query limit configuration given with blank user DN");
- }
-
- // Verify we have not seen a configuration with the user dn before.
- if (userDns.contains(userDn)) {
- throw new IllegalArgumentException("Multiple query limit configurations specified for user '" + userDn + "'");
- } else {
- userDns.add(userDn);
- }
-
- // Verify that if the user query limit was overridden, it is not negative.
- if (config.getQueryLimit() != null && config.getQueryLimit() < 0) {
- throw new IllegalArgumentException("Negative user query limit given for user '" + userDn + "'");
- }
-
- // Verify that no invalid group name patterns were provided.
- Map groupLimits = config.getQueryLogicGroupLimits();
- if (groupLimits != null) {
- for (Map.Entry entry : groupLimits.entrySet()) {
- String groupPattern = entry.getKey();
- if (StringUtils.isBlank(groupPattern)) {
- throw new IllegalArgumentException("User group query limit configuration given with blank group pattern for user '" + userDn + "'");
- }
- if (!groupPattern.equals(QueryLimitConstants.ASTERISK)) {
- try {
- Pattern.compile(groupPattern);
- } catch (PatternSyntaxException e) {
- throw new IllegalArgumentException("Invalid query logic group name pattern: " + groupPattern + " given for user " + userDn, e);
- }
- }
- Integer limit = entry.getValue();
- if (limit < 0) {
- throw new IllegalArgumentException("Negative query logic group limit given for user '" + userDn + "': " + limit);
- }
- }
- }
- }
- }
-
/**
* Populate the limits to enforce for users.
*
@@ -139,4 +82,11 @@ public boolean hasCustomLimits(String userDn) {
public UserLimits getCustomLimits(String userDn) {
return customLimits.get(userDn);
}
+
+ /**
+ * Clean up this {@link UserLimitProvider} and release its underlying resources.
+ */
+ public void cleanUp() {
+ customLimits = null;
+ }
}
diff --git a/web-services/query/src/main/java/datawave/webservice/zookeeper/LockedZkClientDispatcher.java b/web-services/query/src/main/java/datawave/webservice/zookeeper/LockedZkClientDispatcher.java
new file mode 100644
index 00000000000..16686b5a0d4
--- /dev/null
+++ b/web-services/query/src/main/java/datawave/webservice/zookeeper/LockedZkClientDispatcher.java
@@ -0,0 +1,235 @@
+package datawave.webservice.zookeeper;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A class that provides the ability to access a maintained {@link CuratorFramework} client that is guarded by a lock from access by multiple threads at the
+ * same time. Access to the underlying client is provided via {@link LockedZkClientDispatcher#getLockedClient()}, and is intended to be used within the context
+ * of a try-with-resources statement. For example:
+ *
+ *
+ * LockedZkClientDispatcher dispatcher = new LockedZkClientDispatcher(clientFactory, cleanupTaskInterval, maxElapsedAccessTime, timeUnit);
+ * // Obtain guarded access to the client.
+ * try (LockedZkClientDispatcher.LockedClient lockedClient = dispatcher.getLockedClient()) {
+ * CuratorFramework client = lockedClient.getClient();
+ * // Do things with the client.
+ * }
+ * // The lock will automatically be released after the try statement. Note: you should never call {@link CuratorFramework#close()} on the provided client.
+ * Clean up of the client will be handled by the dispatcher.
+ *
+ *
+ * If {@link LockedZkClientDispatcher#getLockedClient()} is not used with a try-with-resources statement, care must be taken to ensure
+ * {@link LockedClient#close()} is called when you are finished with the client to release the lock. The underlying client will be cleaned up and resources
+ * released if the {@link LockedZkClientDispatcher} is created with a non-zero, positive maxElapsedAccessTime, and the time since
+ * {@link LockedZkClientDispatcher#getLockedClient()} meets or exceeds the max elapsed time.
+ */
+public class LockedZkClientDispatcher implements AutoCloseable {
+
+ private static final Logger log = Logger.getLogger(LockedZkClientDispatcher.class);
+
+ /**
+ * The {@link CuratorFramework} factory.
+ */
+ protected final CuratorFrameworkFactory.Builder clientFactory;
+
+ /**
+ * The interval in milliseconds between checks for client access timeouts.
+ */
+ protected final long cleanupTaskInterval;
+
+ /**
+ * The max time in milliseconds that can elapse since the last client access before cleanup will trigger on the next cleanup task.
+ */
+ protected final long maxElapsedAccessTime;
+
+ /**
+ * The lock that guards access to the client.
+ */
+ protected final ReentrantLock clientLock = new ReentrantLock();
+
+ /**
+ * The underlying client.
+ */
+ protected CuratorFramework client;
+
+ /**
+ * The thread pool that will check if the client should be cleaned up.
+ */
+ protected ScheduledThreadPoolExecutor executor;
+
+ /**
+ * The system time in milliseconds that {@link #initClient()} was last called.
+ */
+ protected long lastClientAccess = 0L;
+
+ public LockedZkClientDispatcher(CuratorFrameworkFactory.Builder clientFactory, long cleanupTaskInterval, long maxElapsedAccessTime, TimeUnit timeUnit) {
+ Preconditions.checkNotNull(clientFactory, "clientFactory must not be null");
+ Preconditions.checkNotNull(timeUnit, "timeUnit must not be null");
+
+ this.clientFactory = clientFactory;
+ this.maxElapsedAccessTime = timeUnit.toMillis(maxElapsedAccessTime);
+ this.cleanupTaskInterval = timeUnit.toMillis(cleanupTaskInterval);
+ }
+
+ /**
+ * Returns a {@link LockedClient} that has locked access to the underlying {@link CuratorFramework} of this {@link LockedZkClientDispatcher}. The underlying
+ * client will be non-null and started. This method is intended to be used with a try-with-resources statement. If not used in that manner, you MUST call
+ * {@link LockedClient#close()} to release the client lock once you are done with it.
+ *
+ * @return the new locked client
+ */
+ public LockedClient getLockedClient() {
+ // Lock access to the client.
+ clientLock.lock();
+ // Ensure the client is initialized.
+ initClient();
+ // Return a locked client.
+ return new LockedClient(client, clientLock);
+ }
+
+ /**
+ * Initialize the underlying client if necessary, and set {@link #lastClientAccess} to the current system time.
+ */
+ private void initClient() {
+ if (client == null) {
+ clientLock.lock();
+ try {
+ // Create the client and start it.
+ client = clientFactory.build();
+ client.start();
+
+ // If we have a finite timeout, create the cleanup task.
+ if (maxElapsedAccessTime > 0) {
+ createCleanupTask();
+ }
+ } finally {
+ clientLock.unlock();
+ }
+ }
+ // Update the last-accessed time.
+ lastClientAccess = System.currentTimeMillis();
+ }
+
+ /**
+ * Initialize the executor service if necessary, and add tasks that will check if we've reached the client timeout at the specified cleanup task intervals.
+ */
+ private void createCleanupTask() {
+ if (executor == null) {
+ executor = new ScheduledThreadPoolExecutor(1);
+ }
+
+ Runnable task = new Runnable() {
+ @Override
+ public void run() {
+ // If the max elapsed timeout has been reached, clean up the client.
+ if (System.currentTimeMillis() - lastClientAccess >= maxElapsedAccessTime) {
+ cleanupClient();
+ } else {
+ // Otherwise, schedule another task to check again after the designated interval.
+ executor.schedule(this, cleanupTaskInterval, TimeUnit.MILLISECONDS);
+ }
+ }
+ };
+
+ // Schedule the task.
+ executor.schedule(task, cleanupTaskInterval, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Clean up the client.
+ */
+ private void cleanupClient() {
+ if (client != null) {
+ clientLock.lock();
+ try {
+ if (client != null) {
+ try {
+ client.close();
+ } catch (Exception e) {
+ log.warn("Failed to close client", e);
+ } finally {
+ client = null;
+ }
+ }
+ } finally {
+ clientLock.unlock();
+ }
+ }
+ }
+
+ /**
+ * Clean up the client and the executor service.
+ */
+ private void cleanupClientAndExecutor() {
+ clientLock.lock();
+ try {
+ if (executor != null) {
+ try {
+ executor.shutdown();
+ } catch (Exception e) {
+ log.warn("Failed to shutdown executor", e);
+ }
+ executor = null;
+ }
+ cleanupClient();
+ } finally {
+ clientLock.unlock();
+ }
+ }
+
+ /**
+ * Release the underlying resources held by this {@link LockedZkClientDispatcher}. The underlying {@link CuratorFramework} and executor service will be
+ * stopped and nullified.
+ *
+ * @throws Exception
+ * if an error occurs during cleanup.
+ */
+ @Override
+ public void close() throws Exception {
+ cleanupClientAndExecutor();
+ }
+
+ /**
+ * An {@link AutoCloseable} that holds a reference to the client provided by an instance of {@link LockedZkClientDispatcher}, and the client's associated
+ * lock. The lock will be unlocked when {@link #close()} is called.
+ */
+ public static class LockedClient implements AutoCloseable {
+
+ private final CuratorFramework client;
+ private final Lock lock;
+
+ private LockedClient(CuratorFramework client, Lock lock) {
+ this.client = client;
+ this.lock = lock;
+ }
+
+ /**
+ * Return the guarded {@link CuratorFramework} client.
+ *
+ * @return the client
+ */
+ public CuratorFramework getClient() {
+ return client;
+ }
+
+ /**
+ * Release the client lock.
+ *
+ * @throws Exception
+ * if an error occurs
+ */
+ @Override
+ public void close() throws Exception {
+ lock.unlock();
+ }
+ }
+}
diff --git a/web-services/query/src/main/java/datawave/webservice/zookeeper/ObjectValidator.java b/web-services/query/src/main/java/datawave/webservice/zookeeper/ObjectValidator.java
new file mode 100644
index 00000000000..5d6a0058a5b
--- /dev/null
+++ b/web-services/query/src/main/java/datawave/webservice/zookeeper/ObjectValidator.java
@@ -0,0 +1,16 @@
+package datawave.webservice.zookeeper;
+
+/**
+ * An interface for defining a validator that can be provided to a {@link ZkObjectPublisher} for pre-validating any updated objects before publishing them to
+ * subscribers.
+ */
+public interface ObjectValidator {
+
+ /**
+ * Validate the given object. Any implementations should throw an exception if the provided object is not considered valid.
+ *
+ * @param object
+ * the object to validate
+ */
+ void validate(Object object) throws Exception;
+}
diff --git a/web-services/query/src/main/java/datawave/webservice/zookeeper/README.md b/web-services/query/src/main/java/datawave/webservice/zookeeper/README.md
new file mode 100644
index 00000000000..129ced13078
--- /dev/null
+++ b/web-services/query/src/main/java/datawave/webservice/zookeeper/README.md
@@ -0,0 +1,36 @@
+# ZkObjectPublisher
+
+The class [ZkObjectPublisher](ZkObjectPublisher.java) provides the ability to trigger and publish updates of a configured class instance to any subscribers using Zookeeper to listen for updates and triggering events. A publisher instance can be configured with the following:
+* `namespace`: The unique namespace for the ZkObjectPublisher. **It is critical that this namespace is unique to any configured ZkObjectPublisher instances** on the same server in order to prevent multiple publishers from writing to the same `//attempts/` node in Zookeeper.
+* `zookeeperConfig`: The zookeeper connect string, or a filepath of a zookeeper configuration file.
+* `hdfsConfigUrls`: A comma-delimited list of hadoop configuration files.
+* `objectClass`: The class of the object type the publisher will deserialize and publish.
+* `objectValidators`: All validators that a successfully deserialized instance of `objectClass` will be supplied to before being supplied to all subscribers.
+
+A ZkObjectPublisher will attempt to reload and publish a new instance of its configured class when one of the following happens:
+
+* The node `//path` is created or modified with non-empty data.
+* The node `//trigger` is created, modified, or deleted.
+
+Upon receiving a trigger event, the publisher will attempt to read and deserialize an instance of the configured class from the filepath stored in the data of the node `//path`. The filepath is expected to point to an XML, JSON, or YAML file, and must conform to one of the following URI schemes:
+* A URL: `http://path/to/file` or `https://path/to/file`
+* An HDFS file: `hdfs://path/to/file`
+* A local file: `file://path/to/file` or `/path/to/file`
+
+If an instance of the class is successfully deserialized from the file, it will be validated against any configured object validators. Afterward it will be provided to all subscribers that have subscribed to the publisher via `ZkObjectPublisher.subscribeToUpdates(Consumer)`. The status of any triggered attempt will be recorded under the node `//attempts/`. Upon a success, the children of that node will follow the structure:
+
+```text
+/status # The data will be SUCCESS
+/cause # The data will be one of the values of the enum ZkObjectPublishCause
+/time # The data will be an ISO-8601 string representing the time of the publish attempt
+```
+If an error occurs, either when loading an instance of the class from the file, or when providing the new instance to subscribers, the children will follow the structure:
+```text
+/status # The data will be RELOAD_ERROR or SUBSCRIBER_ERROR
+/cause # The data will be one of the values of the enum ZkObjectPublishCause
+/time # The data will be an ISO-8601 string representing the time of the publish attempt
+/errors # A node containing error_N nodes where N is a number ranging from 0 to one less than the total errors
+/errors/error_N/message # A short description of the error
+/errors/error_N/stacktrace # The stack trace of the error's exception, if any. If no exception was caught, this node will not exist.
+```
+The nodes under `//attempts/` will always reflect the latest reload attempt.
diff --git a/web-services/query/src/main/java/datawave/webservice/zookeeper/ZkObjectPublishCause.java b/web-services/query/src/main/java/datawave/webservice/zookeeper/ZkObjectPublishCause.java
new file mode 100644
index 00000000000..5ccd792ff92
--- /dev/null
+++ b/web-services/query/src/main/java/datawave/webservice/zookeeper/ZkObjectPublishCause.java
@@ -0,0 +1,24 @@
+package datawave.webservice.zookeeper;
+
+public enum ZkObjectPublishCause {
+ /**
+ * Indicates the triggering event was the creation of the node {@value ZkObjectPublisher#NODE_PATH} with non-empty data.
+ */
+ PATH_NODE_CREATED,
+ /**
+ * Indicates the triggering event was the modification of the node {@value ZkObjectPublisher#NODE_PATH} with non-empty data.
+ */
+ PATH_NODE_MODIFIED,
+ /**
+ * Indicates the triggering event was the creation of the node {@value ZkObjectPublisher#NODE_TRIGGER}.
+ */
+ TRIGGER_NODE_CREATED,
+ /**
+ * Indicates the triggering event was the modification of the node {@value ZkObjectPublisher#NODE_TRIGGER}.
+ */
+ TRIGGER_NODE_MODIFIED,
+ /**
+ * Indicates the triggering event was the deletion of the node {@value ZkObjectPublisher#NODE_TRIGGER}.
+ */
+ TRIGGER_NODE_DELETED
+}
diff --git a/web-services/query/src/main/java/datawave/webservice/zookeeper/ZkObjectPublishError.java b/web-services/query/src/main/java/datawave/webservice/zookeeper/ZkObjectPublishError.java
new file mode 100644
index 00000000000..f16077a35f9
--- /dev/null
+++ b/web-services/query/src/main/java/datawave/webservice/zookeeper/ZkObjectPublishError.java
@@ -0,0 +1,34 @@
+package datawave.webservice.zookeeper;
+
+/**
+ * Represents an error that occurred when attempting to load a new updated object via a {@link ZkObjectPublisher}.
+ */
+public class ZkObjectPublishError {
+
+ /**
+ * A short description of the error.
+ */
+ private final String message;
+
+ /**
+ * The associated exception for the error, if any.
+ */
+ private final Exception exception;
+
+ public ZkObjectPublishError(String message, Exception exception) {
+ this.message = message;
+ this.exception = exception;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public Exception getException() {
+ return exception;
+ }
+
+ public boolean hasException() {
+ return exception != null;
+ }
+}
diff --git a/web-services/query/src/main/java/datawave/webservice/zookeeper/ZkObjectPublishResult.java b/web-services/query/src/main/java/datawave/webservice/zookeeper/ZkObjectPublishResult.java
new file mode 100644
index 00000000000..2a675cf9ff3
--- /dev/null
+++ b/web-services/query/src/main/java/datawave/webservice/zookeeper/ZkObjectPublishResult.java
@@ -0,0 +1,79 @@
+package datawave.webservice.zookeeper;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.StringJoiner;
+import java.util.stream.Collectors;
+
+/**
+ * Represents a result from {@link ZkObjectPublisher#getObjectFromZk()}.
+ */
+public class ZkObjectPublishResult {
+
+ /**
+ * The updated object. This will be null if no object update could be successfully loaded.
+ */
+ private final Object updatedObject;
+
+ /**
+ * The status of loading the object.
+ */
+ private final ZkObjectPublishStatus status;
+
+ /**
+ * A list of any errors that occurred while trying to load the results.
+ */
+ private final List errors;
+
+ /**
+ * The time that loading the object was attempted.
+ */
+ private final Instant time;
+
+ public static ZkObjectPublishResult success(Instant time, Object pojo) {
+ return new ZkObjectPublishResult(pojo, ZkObjectPublishStatus.SUCCESS, null, time);
+ }
+
+ public static ZkObjectPublishResult error(Instant time, String message) {
+ return new ZkObjectPublishResult(null, ZkObjectPublishStatus.RELOAD_ERROR, List.of(new ZkObjectPublishError(message, null)), time);
+ }
+
+ public static ZkObjectPublishResult error(Instant time, String message, Exception exception) {
+ return new ZkObjectPublishResult(null, ZkObjectPublishStatus.RELOAD_ERROR, List.of(new ZkObjectPublishError(message, exception)), time);
+ }
+
+ public static ZkObjectPublishResult subscriberErrors(Instant time, Object pojo, List exceptions) {
+ List errors = exceptions.stream().map((e) -> new ZkObjectPublishError("Exception thrown by listener: " + e.getMessage(), e))
+ .collect(Collectors.toList());
+ return new ZkObjectPublishResult(pojo, ZkObjectPublishStatus.SUBSCRIBER_ERROR, errors, time);
+ }
+
+ public ZkObjectPublishResult(Object updatedObject, ZkObjectPublishStatus status, List errors, Instant time) {
+ this.updatedObject = updatedObject;
+ this.status = status;
+ this.errors = errors != null ? List.copyOf(errors) : List.of();
+ this.time = time;
+ }
+
+ public Object getUpdatedObject() {
+ return updatedObject;
+ }
+
+ public ZkObjectPublishStatus getStatus() {
+ return status;
+ }
+
+ public List getErrors() {
+ return errors;
+ }
+
+ public Instant getTime() {
+ return time;
+ }
+
+ @Override
+ public String toString() {
+ return new StringJoiner(", ", ZkObjectPublishResult.class.getSimpleName() + "[", "]").add("updatedObject=" + updatedObject).add("status=" + status)
+ .add("errors=" + errors).add("time=" + time).toString();
+ }
+}
diff --git a/web-services/query/src/main/java/datawave/webservice/zookeeper/ZkObjectPublishStatus.java b/web-services/query/src/main/java/datawave/webservice/zookeeper/ZkObjectPublishStatus.java
new file mode 100644
index 00000000000..c3f727117f5
--- /dev/null
+++ b/web-services/query/src/main/java/datawave/webservice/zookeeper/ZkObjectPublishStatus.java
@@ -0,0 +1,18 @@
+package datawave.webservice.zookeeper;
+
+public enum ZkObjectPublishStatus {
+ /**
+ * Indicates an object update was successfully loaded from Zookeeper and, if triggered by a trigger event, successfully published to all subscribers.
+ */
+ SUCCESS,
+
+ /**
+ * Indicates an error occurred when trying to load an object update from Zookeeper.
+ */
+ RELOAD_ERROR,
+
+ /**
+ * Indicates an object update was successfully loaded from Zookeeper, but one or more subscribers threw an error when provided the updated object.
+ */
+ SUBSCRIBER_ERROR
+}
diff --git a/web-services/query/src/main/java/datawave/webservice/zookeeper/ZkObjectPublisher.java b/web-services/query/src/main/java/datawave/webservice/zookeeper/ZkObjectPublisher.java
new file mode 100644
index 00000000000..65f65ebdee4
--- /dev/null
+++ b/web-services/query/src/main/java/datawave/webservice/zookeeper/ZkObjectPublisher.java
@@ -0,0 +1,791 @@
+package datawave.webservice.zookeeper;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.data.Stat;
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionTimeoutException;
+
+import com.ctc.wstx.stax.WstxInputFactory;
+import com.ctc.wstx.stax.WstxOutputFactory;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.format.DataFormatDetector;
+import com.fasterxml.jackson.core.format.DataFormatMatcher;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.fasterxml.jackson.dataformat.xml.XmlMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
+import com.google.common.base.Preconditions;
+
+import datawave.util.StringUtils;
+
+/**
+ * A publisher that can be triggered to deserialize and publish updates of a configured class to subscribers. The publisher leverages Zookeeper and is triggered
+ * by changes to Zookeeper nodes. The publisher will be triggered to reload an instance of the configured object when:
+ *
+ *
The node {@code //path} is created or modified with non-empty data.
+ *
The node {@code //trigger} is created, modified, or deleted.
+ *
+ * Upon receiving a trigger event, the publisher will attempt to read and deserialize an instance of the configured class from the filepath stored in the data
+ * of the node {@code //path}. The filepath is expected to be XML, JSON, or YAML, and must conform to one of the following URI schemes:
+ *
+ *
A URL: {@code http://path/to/file} or {@code https://path/to/file}.
+ *
An HDFS file: {@code hdfs://path/to/file}.
+ *
A local file: {@code file://path/to/file} or {@code /path/to/file}.
+ *
+ * If an instance of the class is successfully deserialized from the file, it will be validated against any configured object validators. Afterward it will be
+ * provided to all subscribers that have subscribed to the publisher via {@link ZkObjectPublisher#subscribeToUpdates(Consumer)}. The status of any triggered
+ * attempt will be recorded under the node {@code //attempts/}. Upon a success, the children nodes will follow the structure:
+ *
+ *
+ * /status # The data will be {@link ZkObjectPublishStatus#SUCCESS}
+ * /cause # The data will be one of {@link ZkObjectPublishCause}
+ * /time # The data will be an ISO-8601 string representing the time of the publish attempt
+ *
+ *
+ * If an error occurs, either when loading an instance of the class from the file, or when providing the new instance to subscribers, the children will follow
+ * the structure:
+ *
+ *
+ * /status # The data will be {@link ZkObjectPublishStatus#RELOAD_ERROR} or {@link ZkObjectPublishStatus#SUBSCRIBER_ERROR}
+ * /cause # The data will be one of {@link ZkObjectPublishCause}
+ * /time # The data will be an ISO-8601 string representing the time of the publish attempt
+ * /errors # A node containing error_N nodes where N is a number ranging from 0 to one less than the total errors
+ * /errors/error_N/message # A short description of the error
+ * /errors/error_N/stacktrace # The stack trace of the error's exception, if any. If no exception was caught, this node will not exist.
+ *
+ *
+ * The nodes under {@code //attempts/} will always reflect the latest reload attempt.
+ *
+ * NOTE: It is crucial that separate {@link ZkObjectPublisher} instances on the same server are created with unique namespaces in order to
+ * prevent the same {@code //attempts/} node and its children from being modified by multiple publishers.
+ */
+public class ZkObjectPublisher {
+
+ private static final Logger log = Logger.getLogger(ZkObjectPublisher.class);
+
+ public static final String NODE_PATH = "/path";
+ public static final String NODE_TRIGGER = "/trigger";
+ public static final String NODE_ATTEMPTS = "/attempts";
+ public static final String NODE_CAUSE = "/cause";
+ public static final String NODE_STATUS = "/status";
+ public static final String NODE_ERRORS = "/errors";
+ public static final String NODE_ERROR_BASE = "/error_";
+ public static final String NODE_MESSAGE = "/message";
+ public static final String NODE_STACKTRACE = "/stacktrace";
+ public static final String NODE_TIME = "/time";
+
+ /**
+ * Mapper for JSON files.
+ */
+ private static final JsonMapper jsonMapper = new JsonMapper();
+
+ /**
+ * Mapper for XML files.
+ */
+ // Ensure the mapper is created with factories that support the StAX2 API.
+ private static final XmlMapper xmlMapper = new XmlMapper(new WstxInputFactory(), new WstxOutputFactory());
+
+ /**
+ * Mapper for YAML files.
+ */
+ private static final YAMLMapper yamlMapper = new YAMLMapper();
+
+ /**
+ * Map of format names to the mappers.
+ */
+ // @formatter:off
+ private static final Map formatToMapper = Map.of(
+ jsonMapper.getFactory().getFormatName(), jsonMapper,
+ xmlMapper.getFactory().getFormatName(), xmlMapper,
+ yamlMapper.getFactory().getFormatName(), yamlMapper
+ );
+ // @formatter:on
+
+ /**
+ * Helper class to detect the format of a file.
+ */
+ private static final DataFormatDetector formatDetector = new DataFormatDetector(jsonMapper.getFactory(), xmlMapper.getFactory(), yamlMapper.getFactory());
+
+ /**
+ * The finalized path for the node {@code /attempts/}.
+ */
+ private final String baseAttemptNode;
+
+ /**
+ * The finalized path for the node {@code /attempts//cause}.
+ */
+ private final String attemptCauseNode;
+
+ /**
+ * The finalized path for the node {@code /attempts//status}.
+ */
+ private final String attemptStatusNode;
+
+ /**
+ * The finalized path for the node {@code /attempts//errors}.
+ */
+ private final String attemptErrorsNode;
+
+ /**
+ * The finalized path for the node {@code /attempts//time}.
+ */
+ private final String attemptTimeNode;
+
+ private final String namespace;
+ private final Configuration hadoopConfig;
+ private final Class> objectClass;
+ private final List objectValidators;
+
+ /**
+ * The list of subscribers that should be supplied with new objects after successful reloads.
+ */
+ private List> subscribers = new CopyOnWriteArrayList<>();
+
+ /**
+ * A {@link CuratorCache} that will listen for creates and modifications of the node {@code /path}.
+ */
+ private CuratorCache pathCache;
+
+ /**
+ * A {@link CuratorCache} that will listen for creates, modifications, and deletions of the node {@code /trigger}
+ */
+ private CuratorCache triggerCache;
+
+ /**
+ * A boolean that will be set to true when {@link #pathCache} is initialized.
+ */
+ private final AtomicBoolean pathCacheInitialized = new AtomicBoolean(false);
+
+ /**
+ * A boolean that will be set to true when {@link #triggerCache} is initialized.
+ */
+ private final AtomicBoolean triggerCacheInitialized = new AtomicBoolean(false);
+
+ /**
+ * The lock that must be obtained by any task calling {@link #triggerReload(ZkObjectPublishCause)} in order to perform a reload.
+ */
+ private final Lock reloadLock = new ReentrantLock();
+
+ /**
+ * An executor that runs 1 task, and keeps at most 1 in the queue. If a 3rd task arrives, the one in the queue is discarded for the new one. If a bunch of
+ * reloads occur, we are only interested in supplying listeners with the latest reload attempt.
+ */
+ // @formatter:off
+ private ThreadPoolExecutor executor = new ThreadPoolExecutor(
+ 1, // Use a core pool size of 1.
+ 1, // The maximum pool size is 1.
+ 0L, TimeUnit.MILLISECONDS, // Keep alive time of 1 ms for idle threads.
+ new ArrayBlockingQueue<>(1), // Only allow 1 task to be queued at a time.
+ new ThreadPoolExecutor.DiscardOldestPolicy()); // If a new task is submitted, discard any task present in the queue.
+ // @formatter:on
+
+ /**
+ * The client dispatcher.
+ */
+ private LockedZkClientDispatcher clientDispatcher;
+
+ public ZkObjectPublisher(String namespace, String zookeeperConfig, String hdfsConfigUrls, Class> objectClass, List objectValidators) {
+ Preconditions.checkArgument((namespace != null && !namespace.isBlank()), "namespace must not be null or blank");
+ Preconditions.checkArgument((zookeeperConfig != null && !zookeeperConfig.isBlank()), "zookeeperConfig must not be null or blank");
+ Preconditions.checkNotNull(objectClass, "objectClass must not be null");
+
+ if (log.isDebugEnabled()) {
+ log.debug(addLogPrefix("Initializing with namespace=" + namespace + ", zookeeperConfig=" + zookeeperConfig + ", hdfsConfigUrls=" + hdfsConfigUrls
+ + ", " + "objectClass=" + objectClass.getName() + ", pojoValidators=" + objectValidators));
+ }
+
+ this.namespace = namespace.trim();
+ this.objectClass = objectClass;
+ this.objectValidators = objectValidators == null ? List.of() : List.copyOf(objectValidators);
+
+ String zkConnectString;
+ try {
+ // If the zookeeper config points to a file, extract the hosts from it.
+ zkConnectString = ZkUtils.getQuorumPeerConfig(zookeeperConfig);
+ } catch (Exception e) {
+ throw new RuntimeException(addLogPrefix("Failed to extract zookeeper connect string from config " + zookeeperConfig), e);
+ }
+
+ try {
+ // Load any provided hadoop configurations.
+ this.hadoopConfig = new Configuration();
+ if (hdfsConfigUrls != null && !hdfsConfigUrls.isBlank()) {
+ for (String url : StringUtils.split(hdfsConfigUrls, ",")) {
+ hadoopConfig.addResource(new URL(url));
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(addLogPrefix("Failed to load hadoop configuration from URLs " + hdfsConfigUrls), e);
+ }
+
+ // Construct the finalized attempt node paths to be relative to the server IP address.
+ String serverIpAddress;
+ try {
+ serverIpAddress = InetAddress.getLocalHost().getHostAddress();
+ } catch (Exception e) {
+ throw new RuntimeException(addLogPrefix("Failed to get local host address"), e);
+ }
+ baseAttemptNode = NODE_ATTEMPTS + "/" + serverIpAddress;
+ attemptCauseNode = baseAttemptNode + NODE_CAUSE;
+ attemptStatusNode = baseAttemptNode + NODE_STATUS;
+ attemptErrorsNode = baseAttemptNode + NODE_ERRORS;
+ attemptTimeNode = baseAttemptNode + NODE_TIME;
+
+ // @formatter:off
+ CuratorFrameworkFactory.Builder clientFactory = CuratorFrameworkFactory.builder()
+ .namespace(this.namespace)
+ .connectString(zkConnectString)
+ .sessionTimeoutMs(60000)
+ .connectionTimeoutMs(60000)
+ .retryPolicy(new RetryNTimes(10, 1000));
+ // @formatter:on
+
+ clientDispatcher = new LockedZkClientDispatcher(clientFactory, 120000, 120000, TimeUnit.MILLISECONDS);
+ this.pathCache = createCache(NODE_PATH, clientFactory, () -> createPathCacheListener(pathCacheInitialized));
+ this.triggerCache = createCache(NODE_TRIGGER, clientFactory, () -> createTriggerCacheListener(triggerCacheInitialized));
+ }
+
+ /**
+ * Create and return a new {@link CuratorCache} that will watch for events concerning the given node, and supply them to the listener returned by the
+ * listener supplier.
+ *
+ * @param node
+ * the node
+ * @param listenerSupplier
+ * the listener supplier
+ * @return the new cache
+ */
+ private CuratorCache createCache(String node, CuratorFrameworkFactory.Builder clientFactory, Supplier listenerSupplier) {
+ try {
+ CuratorFramework client = clientFactory.build();
+ client.start();
+ CuratorCache cache = CuratorCache.build(client, node, CuratorCache.Options.SINGLE_NODE_CACHE);
+ // Add the desired listeners to the cache.
+ CuratorCacheListener cacheListener = listenerSupplier.get();
+ cache.listenable().addListener(cacheListener);
+ // Start the cache.
+ cache.start();
+ return cache;
+ } catch (Exception e) {
+ log.error(addLogPrefix("Failed to create curator cache for path node " + node), e);
+ throw new RuntimeException(addLogPrefix("Failed to create curator cache for path " + node), e);
+ }
+ }
+
+ /**
+ * Create and return a {@link CuratorCacheListener} that will listen for creations and modifications of the node {@code /path}, and trigger a
+ * configuration reload if the updated {@code /path} node has non-empty data. The listener will also set the given boolean to true when its
+ * wrapping {@link CuratorCache} is initialized.
+ *
+ * @param initFlag
+ * a flag to set to true when an initialized event is received by the listener
+ * @return the cache listener
+ */
+ private CuratorCacheListener createPathCacheListener(AtomicBoolean initFlag) {
+ // @formatter:off
+ return CuratorCacheListener.builder()
+ .afterInitialized() // Ignore any events that occurred before the cache was initialized.
+ .forInitialized(() -> initFlag.set(true)) // Indicate when the cache is initialized.
+ .forCreates((node) -> {
+ byte[] data = node.getData();
+ // Only trigger a reload attempt if the data is not empty.
+ if (data != null && data.length > 0) {
+ executor.submit(()-> triggerReload(ZkObjectPublishCause.PATH_NODE_CREATED));
+ }
+ })
+ .forChanges((oldNode, newNode) -> {
+ byte[] newData = newNode.getData();
+ // Only trigger a reload attempt if the data is not empty.
+ if(newData != null && newData.length > 0) {
+ executor.submit(()-> triggerReload(ZkObjectPublishCause.PATH_NODE_MODIFIED));
+ }
+
+ }).build();
+ // @formatter:on
+ }
+
+ /**
+ * Create and return a {@link CuratorCacheListener} that will listen for creations, modifications, and deletions of the node {@code /trigger}, and trigger a
+ * configuration reload. The listener will also set the given boolean to true when its wrapping {@link CuratorCache} is initialized.
+ *
+ * @param initFlag
+ * a flag to set to true when an initialized event is received by the listener
+ */
+ private CuratorCacheListener createTriggerCacheListener(AtomicBoolean initFlag) {
+ // @formatter:off
+ return CuratorCacheListener.builder()
+ .afterInitialized() // Ignore any events that occurred before the cache was initialized.
+ .forInitialized(() -> initFlag.set(true)) // Indicate when the cache is initialized.
+ .forCreates((node) -> executor.submit(()-> triggerReload(ZkObjectPublishCause.TRIGGER_NODE_CREATED)))
+ .forChanges((oldNode, newNode) -> executor.submit(() -> triggerReload(ZkObjectPublishCause.TRIGGER_NODE_MODIFIED)))
+ .forDeletes((node) -> executor.submit(() -> triggerReload(ZkObjectPublishCause.TRIGGER_NODE_DELETED)))
+ .build();
+ // @formatter:on
+ }
+
+ /**
+ * Wait until the underlying node caches are initialized for at most the given timeout.
+ *
+ * @param timeout
+ * the timeout
+ * @param unit
+ * the time unit
+ * @throws ConditionTimeoutException
+ * if the caches are not initialized before the timeout
+ */
+ public void awaitCacheInitialization(long timeout, TimeUnit unit) throws ConditionTimeoutException {
+ Awaitility.await().atMost(timeout, unit).until(this::areCachesInitialized);
+ }
+
+ /**
+ * Return whether the underlying node caches are initialized and ready to listen for events.
+ *
+ * @return true if all underlying caches are initialized, or false otherwise
+ */
+ public boolean areCachesInitialized() {
+ return pathCacheInitialized.get() && triggerCacheInitialized.get();
+ }
+
+ /**
+ * Trigger a POJO reload. If a POJO is reloaded, it will be provided to any listeners configured for this {@link ZkObjectPublisher}.
+ *
+ * @param cause
+ * the triggering cause
+ */
+ private void triggerReload(ZkObjectPublishCause cause) {
+ if (log.isDebugEnabled()) {
+ log.debug(addLogPrefix("Reload triggered by " + cause));
+ }
+
+ // Obtain the reload lock.
+ reloadLock.lock();
+ try {
+ Instant attemptTime = Instant.now();
+ // Attempt to load a new POJO instance.
+ ZkObjectPublishResult result = getObjectFromZk();
+
+ // If we successfully loaded a valid subscriber, pass it to any subscribers registered with this updater.
+ if (result.getStatus() == ZkObjectPublishStatus.SUCCESS) {
+ if (!subscribers.isEmpty()) {
+ List subscriberExceptions = new ArrayList<>();
+ for (Consumer