From 1c86b79b4f70735d9652dcdbe4e4de3b60caa22c Mon Sep 17 00:00:00 2001 From: Seth Date: Mon, 5 Jan 2026 13:16:05 -0500 Subject: [PATCH 1/9] Replace ClientContext.getTableId with public API - Remove getClientContext utility from AccumuloConnectionFactory - Update PushdownScheduler to use tableOperations().tableIdMap() Fixes #3339 Part of #2443 --- .../connection/AccumuloConnectionFactory.java | 19 ------------------- .../query/scheduler/PushdownScheduler.java | 5 +---- 2 files changed, 1 insertion(+), 23 deletions(-) diff --git a/core/connection-pool/src/main/java/datawave/core/common/connection/AccumuloConnectionFactory.java b/core/connection-pool/src/main/java/datawave/core/common/connection/AccumuloConnectionFactory.java index eb6f9ef2f99..093950374cd 100644 --- a/core/connection-pool/src/main/java/datawave/core/common/connection/AccumuloConnectionFactory.java +++ b/core/connection-pool/src/main/java/datawave/core/common/connection/AccumuloConnectionFactory.java @@ -5,10 +5,8 @@ import java.util.Map; import org.apache.accumulo.core.client.AccumuloClient; -import org.apache.accumulo.core.clientImpl.ClientContext; import datawave.core.common.result.ConnectionPool; -import datawave.webservice.common.connection.WrappedAccumuloClient; public interface AccumuloConnectionFactory extends AutoCloseable { @@ -110,21 +108,4 @@ AccumuloClient getClient(String userDN, Collection proxyServers, String * @return A map representation */ Map getTrackingMap(StackTraceElement[] stackTrace); - - /** - * Utility method to unwrap the ClientContext instance within {@link WrappedAccumuloClient} as needed - * - * @param accumuloClient - * {@link AccumuloClient} instance - * @return {@link WrappedAccumuloClient#getReal()}, if applicable; accumuloClient itself, if it implements {@link ClientContext}; otherwise returns null - */ - static ClientContext getClientContext(AccumuloClient accumuloClient) { - ClientContext cc = null; - if (accumuloClient instanceof WrappedAccumuloClient) { - cc = (ClientContext) ((WrappedAccumuloClient) accumuloClient).getReal(); - } else if (accumuloClient instanceof ClientContext) { - cc = (ClientContext) accumuloClient; - } - return cc; - } } diff --git a/warehouse/query-core/src/main/java/datawave/query/scheduler/PushdownScheduler.java b/warehouse/query-core/src/main/java/datawave/query/scheduler/PushdownScheduler.java index c6a393cfeff..897f66f575a 100644 --- a/warehouse/query-core/src/main/java/datawave/query/scheduler/PushdownScheduler.java +++ b/warehouse/query-core/src/main/java/datawave/query/scheduler/PushdownScheduler.java @@ -14,7 +14,6 @@ import org.apache.accumulo.core.client.BatchScanner; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; @@ -28,7 +27,6 @@ import com.google.common.collect.Lists; import datawave.accumulo.inmemory.InMemoryAccumuloClient; -import datawave.core.common.connection.AccumuloConnectionFactory; import datawave.core.common.logging.ThreadConfigurableLogger; import datawave.core.query.configuration.QueryData; import datawave.core.query.configuration.Result; @@ -156,8 +154,7 @@ protected Iterator concatIterators() throws AccumuloException, AccumuloS if (client instanceof InMemoryAccumuloClient) { tableId = TableId.of(config.getTableName()); } else { - ClientContext ctx = AccumuloConnectionFactory.getClientContext(client); - tableId = ctx.getTableId(tableName); + tableId = TableId.of(client.tableOperations().tableIdMap().get(tableName)); } Iterator> chunkIter = Iterators.transform(getQueryDataIterator(), getPushdownFunction()); From 5f29ebfb705d68ba7bc89f437a6579080a817dba Mon Sep 17 00:00:00 2001 From: Seth Date: Mon, 2 Mar 2026 15:21:19 -0500 Subject: [PATCH 2/9] Add AccumuloTableInfoFetcher facade to replace non-public Accumulo APIs (#2443) Create AccumuloTableInfoFetcher in core/connection-pool that centralizes Accumulo table metadata operations behind public APIs, replacing direct usage of ClientContext, ThriftClientTypes, TabletLocator, and MetadataServicer. Migrated callers: - BulkIngestMapFileLoader: replace Thrift RPC with getActiveCompactions() - TableSplitsCache: replace MetadataServicer with locate() API - BulkInputFormat: replace TabletLocator/ClientContext online path with locate() API; offline path deferred (uses KeyExtent, separate task) Also removes TabletLocator from import-control-accumulo.xml allowlist. --- .../connection/AccumuloTableInfoFetcher.java | 117 ++++++++++++++++++ .../AccumuloTableInfoFetcherTest.java | 81 ++++++++++++ import-control-accumulo.xml | 3 +- warehouse/core/pom.xml | 5 + .../datawave/mr/bulk/BulkInputFormat.java | 82 +++++------- .../job/BulkIngestMapFileLoader.java | 30 +---- .../mapreduce/job/TableSplitsCache.java | 27 ++-- 7 files changed, 252 insertions(+), 93 deletions(-) create mode 100644 core/connection-pool/src/main/java/datawave/core/common/connection/AccumuloTableInfoFetcher.java create mode 100644 core/connection-pool/src/test/java/datawave/core/common/connection/AccumuloTableInfoFetcherTest.java diff --git a/core/connection-pool/src/main/java/datawave/core/common/connection/AccumuloTableInfoFetcher.java b/core/connection-pool/src/main/java/datawave/core/common/connection/AccumuloTableInfoFetcher.java new file mode 100644 index 00000000000..5a29c703851 --- /dev/null +++ b/core/connection-pool/src/main/java/datawave/core/common/connection/AccumuloTableInfoFetcher.java @@ -0,0 +1,117 @@ +package datawave.core.common.connection; + +import java.util.Collection; +import java.util.List; + +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.ActiveCompaction; +import org.apache.accumulo.core.client.admin.Locations; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; + +/** + * Facade that centralizes Accumulo table metadata operations behind public APIs. + *

+ * This class replaces direct usage of non-public Accumulo internals ({@code ClientContext}, {@code ThriftClientTypes}, {@code TabletLocator}, + * {@code MetadataServicer}, etc.) with their public API equivalents. All methods delegate to {@link org.apache.accumulo.core.client.admin.TableOperations} or + * {@link org.apache.accumulo.core.client.admin.InstanceOperations}. + * + * @see Issue #2443 + */ +public class AccumuloTableInfoFetcher { + + private final AccumuloClient client; + + public AccumuloTableInfoFetcher(AccumuloClient client) { + this.client = client; + } + + /** + * Get the TableId for a table name using the public {@code tableIdMap()} API. + * + * @param tableName + * the table name + * @return the TableId + * @throws TableNotFoundException + * if the table does not exist + */ + public TableId getTableId(String tableName) throws TableNotFoundException { + String id = client.tableOperations().tableIdMap().get(tableName); + if (id == null) { + throw new TableNotFoundException(null, tableName, "Table not found in tableIdMap"); + } + return TableId.of(id); + } + + /** + * Check if a table exists using the public {@code exists()} API. + * + * @param tableName + * the table name + * @return true if the table exists + */ + public boolean tableExists(String tableName) { + return client.tableOperations().exists(tableName); + } + + /** + * Check if a table is online using the public {@code isOnline()} API. + * + * @param tableName + * the table name + * @return true if the table is online + * @throws TableNotFoundException + * if the table does not exist + * @throws AccumuloException + * if a general Accumulo error occurs + */ + public boolean isTableOnline(String tableName) throws TableNotFoundException, AccumuloException { + return client.tableOperations().isOnline(tableName); + } + + /** + * Get tablet locations for the given ranges using the public {@code locate()} API. + * + * @param tableName + * the table name + * @param ranges + * the ranges to locate + * @return the Locations result + * @throws TableNotFoundException + * if the table does not exist + * @throws AccumuloException + * if a general Accumulo error occurs + * @throws AccumuloSecurityException + * if a security error occurs + */ + public Locations getTabletLocations(String tableName, Collection ranges) + throws TableNotFoundException, AccumuloException, AccumuloSecurityException { + return client.tableOperations().locate(tableName, ranges); + } + + /** + * Get the count of running major compactions across all tablet servers using the public {@code getActiveCompactions()} API. + *

+ * Note: This counts only running compactions (not queued), which differs slightly from the original Thrift-based implementation that also counted queued + * compactions. This is acceptable because the MAJC_THRESHOLD default is 3000 (a high safety margin) and this is polled on each bulk load cycle. + * + * @return the number of active major compactions + * @throws AccumuloException + * if a general Accumulo error occurs + * @throws AccumuloSecurityException + * if a security error occurs + */ + public int getMajorCompactionCount() throws AccumuloException, AccumuloSecurityException { + int count = 0; + List compactions = client.instanceOperations().getActiveCompactions(); + for (ActiveCompaction compaction : compactions) { + if (compaction.getType() == ActiveCompaction.CompactionType.MAJOR || compaction.getType() == ActiveCompaction.CompactionType.FULL) { + count++; + } + } + return count; + } +} diff --git a/core/connection-pool/src/test/java/datawave/core/common/connection/AccumuloTableInfoFetcherTest.java b/core/connection-pool/src/test/java/datawave/core/common/connection/AccumuloTableInfoFetcherTest.java new file mode 100644 index 00000000000..c8a9bc001ed --- /dev/null +++ b/core/connection-pool/src/test/java/datawave/core/common/connection/AccumuloTableInfoFetcherTest.java @@ -0,0 +1,81 @@ +package datawave.core.common.connection; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.Locations; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.data.TabletId; +import org.junit.Before; +import org.junit.Test; + +import datawave.accumulo.inmemory.InMemoryAccumuloClient; +import datawave.accumulo.inmemory.InMemoryInstance; + +public class AccumuloTableInfoFetcherTest { + + private static final String TEST_TABLE = "testTable"; + private AccumuloClient client; + private AccumuloTableInfoFetcher fetcher; + + @Before + public void setup() throws Exception { + InMemoryInstance instance = new InMemoryInstance(); + client = new InMemoryAccumuloClient("root", instance); + client.tableOperations().create(TEST_TABLE); + fetcher = new AccumuloTableInfoFetcher(client); + } + + @Test + public void testGetTableId() throws Exception { + TableId tableId = fetcher.getTableId(TEST_TABLE); + assertNotNull(tableId); + // Verify it matches the tableIdMap entry + String expectedId = client.tableOperations().tableIdMap().get(TEST_TABLE); + assertEquals(expectedId, tableId.canonical()); + } + + @Test(expected = TableNotFoundException.class) + public void testGetTableIdNonExistent() throws Exception { + fetcher.getTableId("nonExistentTable"); + } + + @Test + public void testTableExists() { + assertTrue(fetcher.tableExists(TEST_TABLE)); + assertFalse(fetcher.tableExists("nonExistentTable")); + } + + @Test + public void testIsTableOnline() throws Exception { + // InMemoryAccumuloClient.isOnline() returns false, but the API call should succeed + boolean online = fetcher.isTableOnline(TEST_TABLE); + // InMemory always returns false; just verify no exception is thrown + assertFalse(online); + } + + @Test + public void testGetTabletLocations() throws Exception { + Locations locations = fetcher.getTabletLocations(TEST_TABLE, Collections.singletonList(new Range())); + assertNotNull(locations); + Map> byTablet = locations.groupByTablet(); + assertNotNull(byTablet); + assertFalse(byTablet.isEmpty()); + } + + @Test + public void testGetMajorCompactionCount() throws Exception { + // InMemoryAccumuloClient returns empty list for getActiveCompactions + int count = fetcher.getMajorCompactionCount(); + assertEquals(0, count); + } +} diff --git a/import-control-accumulo.xml b/import-control-accumulo.xml index dd6e8eb2ca9..cb4d313dd60 100644 --- a/import-control-accumulo.xml +++ b/import-control-accumulo.xml @@ -19,9 +19,8 @@ + team separates parts of that into public api --> - diff --git a/warehouse/core/pom.xml b/warehouse/core/pom.xml index 04e1cca2ddb..bf53c59264f 100644 --- a/warehouse/core/pom.xml +++ b/warehouse/core/pom.xml @@ -60,6 +60,11 @@ gov.nsa.datawave.core common-utils + + gov.nsa.datawave.core + datawave-core-connection-pool + ${project.version} + gov.nsa.datawave.core metadata-utils diff --git a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java index 12acc1d138e..d10cc011017 100644 --- a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java +++ b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java @@ -16,7 +16,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Properties; import java.util.Set; import java.util.StringTokenizer; import java.util.UUID; @@ -36,29 +35,24 @@ import org.apache.accumulo.core.client.TableDeletedException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.TableOfflineException; +import org.apache.accumulo.core.client.admin.Locations; import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.clientImpl.ClientConfConverter; -import org.apache.accumulo.core.clientImpl.ClientContext; -import org.apache.accumulo.core.clientImpl.ClientInfo; -import org.apache.accumulo.core.clientImpl.TabletLocator; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.PartialKey; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.data.TabletId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.iterators.user.RegExFilter; import org.apache.accumulo.core.iterators.user.VersioningIterator; -import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.accumulo.core.security.TablePermission; -import org.apache.accumulo.core.singletons.SingletonManager; import org.apache.accumulo.core.util.format.DateFormatSupplier; -import org.apache.accumulo.core.util.threads.Threads; import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; @@ -83,8 +77,8 @@ import datawave.accumulo.inmemory.InMemoryAccumuloClient; import datawave.accumulo.inmemory.InMemoryInstance; -import datawave.accumulo.inmemory.impl.InMemoryTabletLocator; import datawave.common.util.ArgumentChecker; +import datawave.core.common.connection.AccumuloTableInfoFetcher; import datawave.ingest.data.config.ingest.AccumuloHelper; import datawave.mr.bulk.split.DefaultLocationStrategy; import datawave.mr.bulk.split.DefaultSplitStrategy; @@ -1047,29 +1041,6 @@ protected static LocationStrategy getLocationStrategy(Configuration conf) { return new DefaultLocationStrategy(); } - /** - * Initializes an Accumulo {@link TabletLocator} based on the configuration. - * - * @param conf - * the Hadoop configuration object - * @return an accumulo tablet locator - * @throws TableNotFoundException - * if the table name set on the configuration doesn't exist - * @throws IOException - * if the input format is unable to read the password file from the FileSystem - */ - protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException, IOException { - if (conf.getBoolean(MOCK, false)) - return new InMemoryTabletLocator(); - String tableName = getTablename(conf); - Properties props = Accumulo.newClientProperties().to(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS)) - .as(getUsername(conf), new PasswordToken(getPassword(conf))).build(); - ClientInfo info = ClientInfo.from(props); - ClientContext context = new ClientContext(SingletonManager.getClientReservation(), info, ClientConfConverter.toAccumuloConf(info.getProperties()), - Threads.UEH); - return TabletLocator.getLocator(context, context.getTableId(tableName)); - } - /** * Read the metadata table to get tablets and match up ranges to them. */ @@ -1090,7 +1061,6 @@ public List getSplits(JobContext job) throws IOException { // get the metadata information for these ranges Map>> binnedRanges = new HashMap<>(); - TabletLocator tl; try { if (isOfflineScan(job.getConfiguration())) { binnedRanges = binOfflineTable(job, tableName, ranges); @@ -1101,28 +1071,18 @@ public List getSplits(JobContext job) throws IOException { } } else { try (AccumuloClient client = getClient(job.getConfiguration())) { - TableId tableId = null; - tl = getTabletLocator(job.getConfiguration()); - // its possible that the cache could contain complete, but old information about a tables tablets... so clear it - tl.invalidateCache(); - ClientInfo info = ClientInfo.from(cbHelper.newClientProperties()); - ClientContext context = new ClientContext(SingletonManager.getClientReservation(), info, - ClientConfConverter.toAccumuloConf(info.getProperties()), Threads.UEH); - while (!tl.binRanges(context, ranges, binnedRanges).isEmpty()) { - if (!(client instanceof InMemoryAccumuloClient)) { - if (tableId == null) - tableId = context.getTableId(tableName); - if (!context.tableNodeExists(tableId)) - throw new TableDeletedException(tableId.canonical()); - if (context.getTableState(tableId) == TableState.OFFLINE) - throw new TableOfflineException("Table (" + tableId.canonical() + ") is offline"); - } - binnedRanges.clear(); - log.warn("Unable to locate bins for specified ranges. Retrying."); - TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(100, 200)); - tl.invalidateCache(); + AccumuloTableInfoFetcher fetcher = new AccumuloTableInfoFetcher(client); + // Validate table state + if (!(client instanceof InMemoryAccumuloClient)) { + if (!fetcher.tableExists(tableName)) + throw new TableDeletedException(tableName); + if (!fetcher.isTableOnline(tableName)) + throw new TableOfflineException("Table " + tableName + " is offline"); } - + // Locate tablets for ranges using public API + Locations locations = fetcher.getTabletLocations(tableName, ranges); + // Convert to binnedRanges structure + binnedRanges = buildBinnedRanges(locations); clipRanges(binnedRanges); } } @@ -1201,6 +1161,20 @@ private void clipRanges(Map>> binnedRanges) { } + private Map>> buildBinnedRanges(Locations locations) { + Map>> binnedRanges = new HashMap<>(); + for (Entry> entry : locations.groupByTablet().entrySet()) { + TabletId tabletId = entry.getKey(); + String location = locations.getTabletLocation(tabletId); + if (location == null) + location = ""; + // Convert TabletId to KeyExtent (still needed for binOfflineTable compatibility and clipRanges) + KeyExtent extent = new KeyExtent(tabletId.getTable(), tabletId.getEndRow(), tabletId.getPrevEndRow()); + binnedRanges.computeIfAbsent(location, k -> new HashMap<>()).put(extent, entry.getValue()); + } + return binnedRanges; + } + /** * The Class IteratorSetting. Encapsulates specifics for an Accumulo iterator's name & priority. */ diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java index a1ac89f1f08..6326b19f786 100755 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java @@ -33,13 +33,7 @@ import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.data.LoadPlan; -import org.apache.accumulo.core.manager.thrift.ManagerClientService; -import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo; -import org.apache.accumulo.core.master.thrift.TableInfo; -import org.apache.accumulo.core.rpc.ThriftUtil; -import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; @@ -67,6 +61,7 @@ import com.google.common.base.Objects; import com.google.common.collect.Lists; +import datawave.core.common.connection.AccumuloTableInfoFetcher; import datawave.ingest.data.TypeRegistry; import datawave.ingest.mapreduce.StandaloneStatusReporter; import datawave.util.cli.PasswordConverter; @@ -715,30 +710,13 @@ public boolean canBringMapFilesOnline(long lastOnlineTime, boolean logInfo) { } private int getMajorCompactionCount() { - int majC = 0; - - ManagerClientService.Client client = null; - ClientContext context = (ClientContext) accumuloClient; try { - client = ThriftClientTypes.MANAGER.getConnection(context); - ManagerMonitorInfo mmi = client.getManagerStats(null, context.rpcCreds()); - Map tableStats = mmi.getTableMap(); - - for (java.util.Map.Entry e : tableStats.entrySet()) { - majC += e.getValue().getMajors().getQueued(); - majC += e.getValue().getMajors().getRunning(); - } + AccumuloTableInfoFetcher fetcher = new AccumuloTableInfoFetcher(accumuloClient); + return fetcher.getMajorCompactionCount(); } catch (Exception e) { - // Accumulo API changed, catch exception for now until we redeploy - // accumulo on lightning. log.error("Unable to retrieve major compaction stats: " + e.getMessage()); - } finally { - if (client != null) { - ThriftUtil.close(client, context); - } + return 0; } - - return majC; } /** diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/TableSplitsCache.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/TableSplitsCache.java index 192702e1075..081c4816b89 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/TableSplitsCache.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/TableSplitsCache.java @@ -14,18 +14,16 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.SortedMap; import java.util.TreeMap; -import java.util.stream.Collectors; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.client.admin.Locations; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TabletId; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.metadata.MetadataServicer; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -38,6 +36,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; +import datawave.core.common.connection.AccumuloTableInfoFetcher; import datawave.ingest.config.BaseHdfsFileCacheUtil; import datawave.ingest.mapreduce.partition.BalancedShardPartitioner; import datawave.ingest.mapreduce.partition.DelegatePartitioner; @@ -82,13 +81,19 @@ public class TableSplitsCache extends BaseHdfsFileCacheUtil { private PartitionerCache partitionerCache; private Map getSplitsWithLocation(String table) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - SortedMap tabletLocations = new TreeMap<>(); - AccumuloClient client = accumuloHelper.newClient(); - MetadataServicer.forTableName((ClientContext) client, table).getTabletLocations(tabletLocations); - - return tabletLocations.entrySet().stream().filter(k -> k.getKey().endRow() != null).collect( - Collectors.toMap(e -> e.getKey().endRow(), e -> e.getValue() == null ? NO_LOCATION : e.getValue(), (o1, o2) -> o1, TreeMap::new)); + AccumuloTableInfoFetcher fetcher = new AccumuloTableInfoFetcher(client); + Locations locations = fetcher.getTabletLocations(table, Collections.singletonList(new Range())); + Map result = new TreeMap<>(); + for (Map.Entry> entry : locations.groupByTablet().entrySet()) { + TabletId tabletId = entry.getKey(); + Text endRow = tabletId.getEndRow(); + if (endRow != null) { + String location = locations.getTabletLocation(tabletId); + result.put(endRow, location == null ? NO_LOCATION : location); + } + } + return result; } /** From 0172f514af1c62520e10c2eed20bd6bb6310c4e0 Mon Sep 17 00:00:00 2001 From: Seth Date: Mon, 23 Mar 2026 12:12:29 -0400 Subject: [PATCH 3/9] Address PR #3449 review feedback from jschmidt10 - Remove implementation details from AccumuloTableInfoFetcher javadocs - Throw TableNotFoundException instead of TableDeletedException in BulkInputFormat - Add datawave-core-connection-pool to root pom dependencyManagement --- .../common/connection/AccumuloTableInfoFetcher.java | 10 +++++----- pom.xml | 5 +++++ warehouse/core/pom.xml | 1 - .../main/java/datawave/mr/bulk/BulkInputFormat.java | 3 +-- 4 files changed, 11 insertions(+), 8 deletions(-) diff --git a/core/connection-pool/src/main/java/datawave/core/common/connection/AccumuloTableInfoFetcher.java b/core/connection-pool/src/main/java/datawave/core/common/connection/AccumuloTableInfoFetcher.java index 5a29c703851..f4a07c66d95 100644 --- a/core/connection-pool/src/main/java/datawave/core/common/connection/AccumuloTableInfoFetcher.java +++ b/core/connection-pool/src/main/java/datawave/core/common/connection/AccumuloTableInfoFetcher.java @@ -30,7 +30,7 @@ public AccumuloTableInfoFetcher(AccumuloClient client) { } /** - * Get the TableId for a table name using the public {@code tableIdMap()} API. + * Get the TableId for a table name. * * @param tableName * the table name @@ -47,7 +47,7 @@ public TableId getTableId(String tableName) throws TableNotFoundException { } /** - * Check if a table exists using the public {@code exists()} API. + * Check if a table exists. * * @param tableName * the table name @@ -58,7 +58,7 @@ public boolean tableExists(String tableName) { } /** - * Check if a table is online using the public {@code isOnline()} API. + * Check if a table is online. * * @param tableName * the table name @@ -73,7 +73,7 @@ public boolean isTableOnline(String tableName) throws TableNotFoundException, Ac } /** - * Get tablet locations for the given ranges using the public {@code locate()} API. + * Get tablet locations for the given ranges. * * @param tableName * the table name @@ -93,7 +93,7 @@ public Locations getTabletLocations(String tableName, Collection ranges) } /** - * Get the count of running major compactions across all tablet servers using the public {@code getActiveCompactions()} API. + * Get the count of running major compactions across all tablet servers. *

* Note: This counts only running compactions (not queued), which differs slightly from the original Thrift-based implementation that also counted queued * compactions. This is acceptable because the MAJC_THRESHOLD default is 3000 (a high safety margin) and this is polled on each bulk load cycle. diff --git a/pom.xml b/pom.xml index e330359b720..76cecff1589 100644 --- a/pom.xml +++ b/pom.xml @@ -412,6 +412,11 @@ common-utils ${project.version} + + gov.nsa.datawave.core + datawave-core-connection-pool + ${project.version} + gov.nsa.datawave.core metadata-utils diff --git a/warehouse/core/pom.xml b/warehouse/core/pom.xml index bf53c59264f..aafb75a9c31 100644 --- a/warehouse/core/pom.xml +++ b/warehouse/core/pom.xml @@ -63,7 +63,6 @@ gov.nsa.datawave.core datawave-core-connection-pool - ${project.version} gov.nsa.datawave.core diff --git a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java index d10cc011017..807b573d9e8 100644 --- a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java +++ b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java @@ -32,7 +32,6 @@ import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.RowIterator; import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableDeletedException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.TableOfflineException; import org.apache.accumulo.core.client.admin.Locations; @@ -1075,7 +1074,7 @@ public List getSplits(JobContext job) throws IOException { // Validate table state if (!(client instanceof InMemoryAccumuloClient)) { if (!fetcher.tableExists(tableName)) - throw new TableDeletedException(tableName); + throw new TableNotFoundException(null, tableName, "Table does not exist"); if (!fetcher.isTableOnline(tableName)) throw new TableOfflineException("Table " + tableName + " is offline"); } From 0c04d339e2f27b82d6939516365c2bd67254cd54 Mon Sep 17 00:00:00 2001 From: Seth Date: Fri, 27 Mar 2026 11:42:51 -0400 Subject: [PATCH 4/9] Use Thrift RPC for queued compaction counts, inline public API calls into callers Restore the original Thrift-based getMajorCompactionCount() in AccumuloTableInfoFetcher since there is no public Accumulo API for queued compactions (apache/accumulo#5965). Remove public API wrapper methods (tableExists, isTableOnline, getTabletLocations, getTableId) from the facade and inline them directly into BulkInputFormat and TableSplitsCache. The facade now only contains methods that require non-public APIs. --- .../connection/AccumuloTableInfoFetcher.java | 155 ++++++++---------- .../AccumuloTableInfoFetcherTest.java | 68 +------- .../datawave/mr/bulk/BulkInputFormat.java | 10 +- .../mapreduce/job/TableSplitsCache.java | 4 +- 4 files changed, 83 insertions(+), 154 deletions(-) diff --git a/core/connection-pool/src/main/java/datawave/core/common/connection/AccumuloTableInfoFetcher.java b/core/connection-pool/src/main/java/datawave/core/common/connection/AccumuloTableInfoFetcher.java index f4a07c66d95..c1013ea2fd2 100644 --- a/core/connection-pool/src/main/java/datawave/core/common/connection/AccumuloTableInfoFetcher.java +++ b/core/connection-pool/src/main/java/datawave/core/common/connection/AccumuloTableInfoFetcher.java @@ -1,23 +1,54 @@ package datawave.core.common.connection; -import java.util.Collection; -import java.util.List; +import java.util.Map; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.admin.ActiveCompaction; -import org.apache.accumulo.core.client.admin.Locations; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.TableId; +// @formatter:off +// ============================================================================================ +// NON-PUBLIC ACCUMULO API FACADE (Issue #2443) +// ============================================================================================ +// The following imports use non-public Accumulo APIs. They are intentionally isolated in this +// facade class so that callers throughout Datawave never reference non-public APIs directly. +// +// WHY THIS EXISTS: +// There is no public Accumulo API for queued compaction counts (only running compactions +// via getActiveCompactions()). The Thrift RPC below is the only way to get queued + running +// counts. See: https://github.com/apache/accumulo/issues/5965 +// +// HOW TO SWAP OUT INCREMENTALLY: +// Each method in this class that uses non-public APIs is marked with a +// "NON-PUBLIC API" comment block. When a public API alternative becomes available +// (e.g., in Accumulo 4.0): +// 1. Replace the method body with the public API equivalent +// 2. Remove the non-public imports that are no longer used +// 3. Update the import-control-accumulo.xml if the non-public class was allowed there +// +// HOW TO KNOW WHEN THIS FACADE CAN BE REMOVED: +// When ALL methods in this class use only public APIs (no "NON-PUBLIC API" markers remain): +// 1. Remove these non-public imports +// 2. Remove this comment block +// 3. Inline the method(s) back into callers — the facade is no longer load-bearing +// 4. Verify by running: grep -r 'NON-PUBLIC API' AccumuloTableInfoFetcher.java +// — if no results, the migration is complete +// ============================================================================================ +// @formatter:on +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.manager.thrift.ManagerClientService; +import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo; +import org.apache.accumulo.core.master.thrift.TableInfo; +import org.apache.accumulo.core.rpc.ThriftUtil; +import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; /** - * Facade that centralizes Accumulo table metadata operations behind public APIs. + * Facade that isolates non-public Accumulo API usage behind a clean interface. *

- * This class replaces direct usage of non-public Accumulo internals ({@code ClientContext}, {@code ThriftClientTypes}, {@code TabletLocator}, - * {@code MetadataServicer}, etc.) with their public API equivalents. All methods delegate to {@link org.apache.accumulo.core.client.admin.TableOperations} or - * {@link org.apache.accumulo.core.client.admin.InstanceOperations}. + * This class exists solely to contain Accumulo internals that have no public API equivalent. Operations that can use public APIs (e.g., table existence, online + * status, tablet locations) should call {@link org.apache.accumulo.core.client.admin.TableOperations} or + * {@link org.apache.accumulo.core.client.admin.InstanceOperations} directly — they do not belong here. + *

+ * See the import block above for the incremental migration plan. * * @see Issue #2443 */ @@ -29,89 +60,43 @@ public AccumuloTableInfoFetcher(AccumuloClient client) { this.client = client; } + // NON-PUBLIC API: Uses Thrift RPC to get queued + running compaction counts. + // No public Accumulo API exists for queued compactions as of 2.1.x. + // Tracking: https://github.com/apache/accumulo/issues/5965 + // + // To swap out: when a public API for compaction counts is available, + // replace this method body and remove the Thrift imports above. /** - * Get the TableId for a table name. - * - * @param tableName - * the table name - * @return the TableId - * @throws TableNotFoundException - * if the table does not exist - */ - public TableId getTableId(String tableName) throws TableNotFoundException { - String id = client.tableOperations().tableIdMap().get(tableName); - if (id == null) { - throw new TableNotFoundException(null, tableName, "Table not found in tableIdMap"); - } - return TableId.of(id); - } - - /** - * Check if a table exists. - * - * @param tableName - * the table name - * @return true if the table exists - */ - public boolean tableExists(String tableName) { - return client.tableOperations().exists(tableName); - } - - /** - * Check if a table is online. - * - * @param tableName - * the table name - * @return true if the table is online - * @throws TableNotFoundException - * if the table does not exist - * @throws AccumuloException - * if a general Accumulo error occurs - */ - public boolean isTableOnline(String tableName) throws TableNotFoundException, AccumuloException { - return client.tableOperations().isOnline(tableName); - } - - /** - * Get tablet locations for the given ranges. - * - * @param tableName - * the table name - * @param ranges - * the ranges to locate - * @return the Locations result - * @throws TableNotFoundException - * if the table does not exist - * @throws AccumuloException - * if a general Accumulo error occurs - * @throws AccumuloSecurityException - * if a security error occurs - */ - public Locations getTabletLocations(String tableName, Collection ranges) - throws TableNotFoundException, AccumuloException, AccumuloSecurityException { - return client.tableOperations().locate(tableName, ranges); - } - - /** - * Get the count of running major compactions across all tablet servers. - *

- * Note: This counts only running compactions (not queued), which differs slightly from the original Thrift-based implementation that also counted queued - * compactions. This is acceptable because the MAJC_THRESHOLD default is 3000 (a high safety margin) and this is polled on each bulk load cycle. + * Get the count of queued and running major compactions across all tablet servers. * - * @return the number of active major compactions + * @return the number of queued and running major compactions * @throws AccumuloException * if a general Accumulo error occurs * @throws AccumuloSecurityException * if a security error occurs */ public int getMajorCompactionCount() throws AccumuloException, AccumuloSecurityException { - int count = 0; - List compactions = client.instanceOperations().getActiveCompactions(); - for (ActiveCompaction compaction : compactions) { - if (compaction.getType() == ActiveCompaction.CompactionType.MAJOR || compaction.getType() == ActiveCompaction.CompactionType.FULL) { - count++; + int majC = 0; + + ClientContext context = (ClientContext) client; + ManagerClientService.Client managerClient = null; + try { + managerClient = ThriftClientTypes.MANAGER.getConnection(context); + ManagerMonitorInfo mmi = managerClient.getManagerStats(null, context.rpcCreds()); + Map tableStats = mmi.getTableMap(); + + for (Map.Entry e : tableStats.entrySet()) { + majC += e.getValue().getMajors().getQueued(); + majC += e.getValue().getMajors().getRunning(); + } + } catch (Exception e) { + throw new AccumuloException("Unable to retrieve major compaction stats", e); + } finally { + if (managerClient != null) { + ThriftUtil.close(managerClient, context); } } - return count; + + return majC; } } diff --git a/core/connection-pool/src/test/java/datawave/core/common/connection/AccumuloTableInfoFetcherTest.java b/core/connection-pool/src/test/java/datawave/core/common/connection/AccumuloTableInfoFetcherTest.java index c8a9bc001ed..54582f920a9 100644 --- a/core/connection-pool/src/test/java/datawave/core/common/connection/AccumuloTableInfoFetcherTest.java +++ b/core/connection-pool/src/test/java/datawave/core/common/connection/AccumuloTableInfoFetcherTest.java @@ -1,20 +1,7 @@ package datawave.core.common.connection; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -import java.util.Collections; -import java.util.List; -import java.util.Map; - import org.apache.accumulo.core.client.AccumuloClient; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.admin.Locations; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.TableId; -import org.apache.accumulo.core.data.TabletId; +import org.apache.accumulo.core.client.AccumuloException; import org.junit.Before; import org.junit.Test; @@ -23,59 +10,20 @@ public class AccumuloTableInfoFetcherTest { - private static final String TEST_TABLE = "testTable"; - private AccumuloClient client; private AccumuloTableInfoFetcher fetcher; @Before public void setup() throws Exception { InMemoryInstance instance = new InMemoryInstance(); - client = new InMemoryAccumuloClient("root", instance); - client.tableOperations().create(TEST_TABLE); + AccumuloClient client = new InMemoryAccumuloClient("root", instance); fetcher = new AccumuloTableInfoFetcher(client); } - @Test - public void testGetTableId() throws Exception { - TableId tableId = fetcher.getTableId(TEST_TABLE); - assertNotNull(tableId); - // Verify it matches the tableIdMap entry - String expectedId = client.tableOperations().tableIdMap().get(TEST_TABLE); - assertEquals(expectedId, tableId.canonical()); - } - - @Test(expected = TableNotFoundException.class) - public void testGetTableIdNonExistent() throws Exception { - fetcher.getTableId("nonExistentTable"); - } - - @Test - public void testTableExists() { - assertTrue(fetcher.tableExists(TEST_TABLE)); - assertFalse(fetcher.tableExists("nonExistentTable")); - } - - @Test - public void testIsTableOnline() throws Exception { - // InMemoryAccumuloClient.isOnline() returns false, but the API call should succeed - boolean online = fetcher.isTableOnline(TEST_TABLE); - // InMemory always returns false; just verify no exception is thrown - assertFalse(online); - } - - @Test - public void testGetTabletLocations() throws Exception { - Locations locations = fetcher.getTabletLocations(TEST_TABLE, Collections.singletonList(new Range())); - assertNotNull(locations); - Map> byTablet = locations.groupByTablet(); - assertNotNull(byTablet); - assertFalse(byTablet.isEmpty()); - } - - @Test - public void testGetMajorCompactionCount() throws Exception { - // InMemoryAccumuloClient returns empty list for getActiveCompactions - int count = fetcher.getMajorCompactionCount(); - assertEquals(0, count); + @Test(expected = AccumuloException.class) + public void testGetMajorCompactionCountThrowsWithoutCluster() throws Exception { + // The Thrift-based implementation requires a live Accumulo cluster. + // With InMemoryAccumuloClient, the underlying ClientContext cannot connect + // to ZooKeeper, so this should throw AccumuloException. + fetcher.getMajorCompactionCount(); } } diff --git a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java index 12b796c0112..806e4fd7ca2 100644 --- a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java +++ b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java @@ -74,7 +74,6 @@ import datawave.accumulo.inmemory.InMemoryAccumuloClient; import datawave.accumulo.inmemory.InMemoryInstance; import datawave.common.util.ArgumentChecker; -import datawave.core.common.connection.AccumuloTableInfoFetcher; import datawave.ingest.data.config.ingest.AccumuloHelper; import datawave.mr.bulk.split.DefaultLocationStrategy; import datawave.mr.bulk.split.DefaultSplitStrategy; @@ -1064,16 +1063,15 @@ public List getSplits(JobContext job) throws IOException { } } else { try (AccumuloClient client = getClient(job.getConfiguration())) { - AccumuloTableInfoFetcher fetcher = new AccumuloTableInfoFetcher(client); - // Validate table state + // Validate table state using public API if (!(client instanceof InMemoryAccumuloClient)) { - if (!fetcher.tableExists(tableName)) + if (!client.tableOperations().exists(tableName)) throw new TableNotFoundException(null, tableName, "Table does not exist"); - if (!fetcher.isTableOnline(tableName)) + if (!client.tableOperations().isOnline(tableName)) throw new TableOfflineException("Table " + tableName + " is offline"); } // Locate tablets for ranges using public API - Locations locations = fetcher.getTabletLocations(tableName, ranges); + Locations locations = client.tableOperations().locate(tableName, ranges); // Convert to binnedRanges structure binnedRanges = buildBinnedRanges(locations); clipRanges(binnedRanges); diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/TableSplitsCache.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/TableSplitsCache.java index 081c4816b89..a3ec12eb285 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/TableSplitsCache.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/TableSplitsCache.java @@ -36,7 +36,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; -import datawave.core.common.connection.AccumuloTableInfoFetcher; import datawave.ingest.config.BaseHdfsFileCacheUtil; import datawave.ingest.mapreduce.partition.BalancedShardPartitioner; import datawave.ingest.mapreduce.partition.DelegatePartitioner; @@ -82,8 +81,7 @@ public class TableSplitsCache extends BaseHdfsFileCacheUtil { private Map getSplitsWithLocation(String table) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { AccumuloClient client = accumuloHelper.newClient(); - AccumuloTableInfoFetcher fetcher = new AccumuloTableInfoFetcher(client); - Locations locations = fetcher.getTabletLocations(table, Collections.singletonList(new Range())); + Locations locations = client.tableOperations().locate(table, Collections.singletonList(new Range())); Map result = new TreeMap<>(); for (Map.Entry> entry : locations.groupByTablet().entrySet()) { TabletId tabletId = entry.getKey(); From 619d710712210572a83391acaaabbef2349ce3f3 Mon Sep 17 00:00:00 2001 From: Seth Date: Mon, 6 Apr 2026 19:37:32 -0400 Subject: [PATCH 5/9] Refactor AccumuloTableInfoFetcher to static utility class Address PR #3449 review feedback from lbschanno and jschmidt10. --- .../connection/AccumuloTableInfoFetcher.java | 150 +++++++++++++----- .../AccumuloTableInfoFetcherTest.java | 54 ++++++- .../datawave/mr/bulk/BulkInputFormat.java | 29 +--- .../job/BulkIngestMapFileLoader.java | 3 +- .../mapreduce/job/TableSplitsCache.java | 19 +-- .../query/scheduler/PushdownScheduler.java | 6 +- 6 files changed, 176 insertions(+), 85 deletions(-) diff --git a/core/connection-pool/src/main/java/datawave/core/common/connection/AccumuloTableInfoFetcher.java b/core/connection-pool/src/main/java/datawave/core/common/connection/AccumuloTableInfoFetcher.java index c1013ea2fd2..962674c6fbe 100644 --- a/core/connection-pool/src/main/java/datawave/core/common/connection/AccumuloTableInfoFetcher.java +++ b/core/connection-pool/src/main/java/datawave/core/common/connection/AccumuloTableInfoFetcher.java @@ -1,63 +1,129 @@ package datawave.core.common.connection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.TreeMap; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; -// @formatter:off -// ============================================================================================ -// NON-PUBLIC ACCUMULO API FACADE (Issue #2443) -// ============================================================================================ -// The following imports use non-public Accumulo APIs. They are intentionally isolated in this -// facade class so that callers throughout Datawave never reference non-public APIs directly. -// -// WHY THIS EXISTS: -// There is no public Accumulo API for queued compaction counts (only running compactions -// via getActiveCompactions()). The Thrift RPC below is the only way to get queued + running -// counts. See: https://github.com/apache/accumulo/issues/5965 -// -// HOW TO SWAP OUT INCREMENTALLY: -// Each method in this class that uses non-public APIs is marked with a -// "NON-PUBLIC API" comment block. When a public API alternative becomes available -// (e.g., in Accumulo 4.0): -// 1. Replace the method body with the public API equivalent -// 2. Remove the non-public imports that are no longer used -// 3. Update the import-control-accumulo.xml if the non-public class was allowed there -// -// HOW TO KNOW WHEN THIS FACADE CAN BE REMOVED: -// When ALL methods in this class use only public APIs (no "NON-PUBLIC API" markers remain): -// 1. Remove these non-public imports -// 2. Remove this comment block -// 3. Inline the method(s) back into callers — the facade is no longer load-bearing -// 4. Verify by running: grep -r 'NON-PUBLIC API' AccumuloTableInfoFetcher.java -// — if no results, the migration is complete -// ============================================================================================ -// @formatter:on +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.Locations; import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.data.TabletId; +import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.manager.thrift.ManagerClientService; import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo; import org.apache.accumulo.core.master.thrift.TableInfo; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; +import org.apache.hadoop.io.Text; /** - * Facade that isolates non-public Accumulo API usage behind a clean interface. + * Utility class that centralizes Accumulo table metadata operations. *

- * This class exists solely to contain Accumulo internals that have no public API equivalent. Operations that can use public APIs (e.g., table existence, online - * status, tablet locations) should call {@link org.apache.accumulo.core.client.admin.TableOperations} or - * {@link org.apache.accumulo.core.client.admin.InstanceOperations} directly — they do not belong here. - *

- * See the import block above for the incremental migration plan. + * This class replaces direct usage of non-public Accumulo internals ({@code ClientContext}, {@code MetadataServicer}, {@code TabletLocator}, etc.) with public + * API equivalents where possible. Methods that still require non-public APIs are marked with {@code NON-PUBLIC API} comments. * * @see Issue #2443 */ -public class AccumuloTableInfoFetcher { +public final class AccumuloTableInfoFetcher { + + private AccumuloTableInfoFetcher() { + // utility class + } + + /** + * Get the {@link TableId} for a table by name. + * + * @param client + * the Accumulo client + * @param tableName + * the table name to look up + * @return the TableId, or {@code null} if the table does not exist + */ + public static TableId getTableId(AccumuloClient client, String tableName) { + String id = client.tableOperations().tableIdMap().get(tableName); + return id == null ? null : TableId.of(id); + } - private final AccumuloClient client; + /** + * Locate tablets for the given ranges and return them grouped by tablet server location, keyed by {@link KeyExtent}. + *

+ * This replaces the {@code TabletLocator.binRanges()} pattern. The returned structure maps tablet server location strings to a map of {@link KeyExtent} to + * the ranges assigned to that extent, matching the structure expected by downstream consumers like {@code clipRanges()}. + * + * @param client + * the Accumulo client + * @param tableName + * the table to locate tablets for + * @param ranges + * the ranges to bin into tablet locations + * @return a map of location -> (extent -> ranges) + * @throws AccumuloException + * if a general Accumulo error occurs + * @throws AccumuloSecurityException + * if a security error occurs + * @throws TableNotFoundException + * if the table does not exist + */ + // NON-PUBLIC API: Return type uses KeyExtent (org.apache.accumulo.core.dataImpl) which is non-public. + // This is required for compatibility with downstream consumers (clipRanges, binOfflineTable). + // When callers are migrated to use TabletId (public API), this method can be updated. + // + // REVIEW: The original TabletLocator.binRanges() used a retry loop for partial binning failures. + // The public locate() API may handle this internally. Verify retry semantics are equivalent. + public static Map>> locateTablets(AccumuloClient client, String tableName, List ranges) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + Locations locations = client.tableOperations().locate(tableName, ranges); + Map>> binnedRanges = new HashMap<>(); + for (Map.Entry> entry : locations.groupByTablet().entrySet()) { + TabletId tabletId = entry.getKey(); + String location = locations.getTabletLocation(tabletId); + if (location == null) { + location = ""; + } + KeyExtent extent = KeyExtent.fromTabletId(tabletId); + binnedRanges.computeIfAbsent(location, k -> new HashMap<>()).put(extent, entry.getValue()); + } + return binnedRanges; + } - public AccumuloTableInfoFetcher(AccumuloClient client) { - this.client = client; + /** + * Get the split points and their tablet server locations for a table. + *

+ * Returns a sorted map of end-row to tablet server location for each tablet. Tablets with no end-row (the last tablet) are excluded. This replaces the + * {@code MetadataServicer.getTabletLocations()} pattern. + * + * @param client + * the Accumulo client + * @param tableName + * the table name + * @return a sorted map of split point (end-row) to tablet server location + * @throws AccumuloException + * if a general Accumulo error occurs + * @throws AccumuloSecurityException + * if a security error occurs + * @throws TableNotFoundException + * if the table does not exist + */ + public static Map getSplitsWithLocations(AccumuloClient client, String tableName) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + Locations locations = client.tableOperations().locate(tableName, Collections.singletonList(new Range())); + Map result = new TreeMap<>(); + for (Map.Entry> entry : locations.groupByTablet().entrySet()) { + TabletId tabletId = entry.getKey(); + Text endRow = tabletId.getEndRow(); + if (endRow != null) { + String location = locations.getTabletLocation(tabletId); + result.put(endRow, location == null ? "" : location); + } + } + return result; } // NON-PUBLIC API: Uses Thrift RPC to get queued + running compaction counts. @@ -69,13 +135,13 @@ public AccumuloTableInfoFetcher(AccumuloClient client) { /** * Get the count of queued and running major compactions across all tablet servers. * + * @param client + * the Accumulo client * @return the number of queued and running major compactions * @throws AccumuloException * if a general Accumulo error occurs - * @throws AccumuloSecurityException - * if a security error occurs */ - public int getMajorCompactionCount() throws AccumuloException, AccumuloSecurityException { + public static int getMajorCompactionCount(AccumuloClient client) throws AccumuloException { int majC = 0; ClientContext context = (ClientContext) client; diff --git a/core/connection-pool/src/test/java/datawave/core/common/connection/AccumuloTableInfoFetcherTest.java b/core/connection-pool/src/test/java/datawave/core/common/connection/AccumuloTableInfoFetcherTest.java index 54582f920a9..3581e89e5ef 100644 --- a/core/connection-pool/src/test/java/datawave/core/common/connection/AccumuloTableInfoFetcherTest.java +++ b/core/connection-pool/src/test/java/datawave/core/common/connection/AccumuloTableInfoFetcherTest.java @@ -1,7 +1,20 @@ package datawave.core.common.connection; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.hadoop.io.Text; import org.junit.Before; import org.junit.Test; @@ -10,13 +23,12 @@ public class AccumuloTableInfoFetcherTest { - private AccumuloTableInfoFetcher fetcher; + private AccumuloClient client; @Before public void setup() throws Exception { InMemoryInstance instance = new InMemoryInstance(); - AccumuloClient client = new InMemoryAccumuloClient("root", instance); - fetcher = new AccumuloTableInfoFetcher(client); + client = new InMemoryAccumuloClient("root", instance); } @Test(expected = AccumuloException.class) @@ -24,6 +36,40 @@ public void testGetMajorCompactionCountThrowsWithoutCluster() throws Exception { // The Thrift-based implementation requires a live Accumulo cluster. // With InMemoryAccumuloClient, the underlying ClientContext cannot connect // to ZooKeeper, so this should throw AccumuloException. - fetcher.getMajorCompactionCount(); + AccumuloTableInfoFetcher.getMajorCompactionCount(client); + } + + @Test + public void testGetTableIdForExistingTable() { + // accumulo.metadata always exists + TableId id = AccumuloTableInfoFetcher.getTableId(client, "accumulo.metadata"); + assertNotNull("Table ID should not be null for existing table", id); + } + + @Test + public void testGetTableIdForNonexistentTable() { + TableId id = AccumuloTableInfoFetcher.getTableId(client, "nonexistent_table"); + assertNull("Table ID should be null for nonexistent table", id); + } + + @Test + public void testLocateTabletsForExistingTable() throws Exception { + // Create a test table + client.tableOperations().create("testTable"); + List ranges = Collections.singletonList(new Range()); + Map>> result = AccumuloTableInfoFetcher.locateTablets(client, "testTable", ranges); + assertNotNull("Result should not be null", result); + // With a new table there should be exactly one tablet (the default tablet) + int totalExtents = result.values().stream().mapToInt(Map::size).sum(); + assertEquals("New table should have exactly one tablet", 1, totalExtents); + } + + @Test + public void testGetSplitsWithLocationsForExistingTable() throws Exception { + client.tableOperations().create("testSplitsTable"); + Map result = AccumuloTableInfoFetcher.getSplitsWithLocations(client, "testSplitsTable"); + assertNotNull("Result should not be null", result); + // A new table with no splits has one tablet with no end-row, so result should be empty + assertTrue("New table with no splits should return empty map", result.isEmpty()); } } diff --git a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java index 806e4fd7ca2..bf1f9446b59 100644 --- a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java +++ b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java @@ -32,13 +32,11 @@ import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.TableOfflineException; -import org.apache.accumulo.core.client.admin.Locations; import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.PartialKey; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; -import org.apache.accumulo.core.data.TabletId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; @@ -74,6 +72,7 @@ import datawave.accumulo.inmemory.InMemoryAccumuloClient; import datawave.accumulo.inmemory.InMemoryInstance; import datawave.common.util.ArgumentChecker; +import datawave.core.common.connection.AccumuloTableInfoFetcher; import datawave.ingest.data.config.ingest.AccumuloHelper; import datawave.mr.bulk.split.DefaultLocationStrategy; import datawave.mr.bulk.split.DefaultSplitStrategy; @@ -1065,15 +1064,15 @@ public List getSplits(JobContext job) throws IOException { try (AccumuloClient client = getClient(job.getConfiguration())) { // Validate table state using public API if (!(client instanceof InMemoryAccumuloClient)) { - if (!client.tableOperations().exists(tableName)) + if (!client.tableOperations().exists(tableName)) { throw new TableNotFoundException(null, tableName, "Table does not exist"); - if (!client.tableOperations().isOnline(tableName)) + } + if (!client.tableOperations().isOnline(tableName)) { throw new TableOfflineException("Table " + tableName + " is offline"); + } } - // Locate tablets for ranges using public API - Locations locations = client.tableOperations().locate(tableName, ranges); - // Convert to binnedRanges structure - binnedRanges = buildBinnedRanges(locations); + // REVIEW: retry semantics differ from original TabletLocator.binRanges() loop + binnedRanges = AccumuloTableInfoFetcher.locateTablets(client, tableName, ranges); clipRanges(binnedRanges); } } @@ -1152,20 +1151,6 @@ private void clipRanges(Map>> binnedRanges) { } - private Map>> buildBinnedRanges(Locations locations) { - Map>> binnedRanges = new HashMap<>(); - for (Entry> entry : locations.groupByTablet().entrySet()) { - TabletId tabletId = entry.getKey(); - String location = locations.getTabletLocation(tabletId); - if (location == null) - location = ""; - // Convert TabletId to KeyExtent (still needed for binOfflineTable compatibility and clipRanges) - KeyExtent extent = new KeyExtent(tabletId.getTable(), tabletId.getEndRow(), tabletId.getPrevEndRow()); - binnedRanges.computeIfAbsent(location, k -> new HashMap<>()).put(extent, entry.getValue()); - } - return binnedRanges; - } - /** * The Class IteratorSetting. Encapsulates specifics for an Accumulo iterator's name & priority. */ diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java index 6326b19f786..def7fa90d90 100755 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java @@ -711,8 +711,7 @@ public boolean canBringMapFilesOnline(long lastOnlineTime, boolean logInfo) { private int getMajorCompactionCount() { try { - AccumuloTableInfoFetcher fetcher = new AccumuloTableInfoFetcher(accumuloClient); - return fetcher.getMajorCompactionCount(); + return AccumuloTableInfoFetcher.getMajorCompactionCount(accumuloClient); } catch (Exception e) { log.error("Unable to retrieve major compaction stats: " + e.getMessage()); return 0; diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/TableSplitsCache.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/TableSplitsCache.java index a3ec12eb285..2dedde3aae4 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/TableSplitsCache.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/TableSplitsCache.java @@ -20,9 +20,6 @@ import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.admin.Locations; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.TabletId; import org.apache.accumulo.core.data.Value; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.conf.Configuration; @@ -36,6 +33,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; +import datawave.core.common.connection.AccumuloTableInfoFetcher; import datawave.ingest.config.BaseHdfsFileCacheUtil; import datawave.ingest.mapreduce.partition.BalancedShardPartitioner; import datawave.ingest.mapreduce.partition.DelegatePartitioner; @@ -81,17 +79,10 @@ public class TableSplitsCache extends BaseHdfsFileCacheUtil { private Map getSplitsWithLocation(String table) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { AccumuloClient client = accumuloHelper.newClient(); - Locations locations = client.tableOperations().locate(table, Collections.singletonList(new Range())); - Map result = new TreeMap<>(); - for (Map.Entry> entry : locations.groupByTablet().entrySet()) { - TabletId tabletId = entry.getKey(); - Text endRow = tabletId.getEndRow(); - if (endRow != null) { - String location = locations.getTabletLocation(tabletId); - result.put(endRow, location == null ? NO_LOCATION : location); - } - } - return result; + Map locations = AccumuloTableInfoFetcher.getSplitsWithLocations(client, table); + // Replace empty-string locations with NO_LOCATION sentinel + locations.replaceAll((k, v) -> v.isEmpty() ? NO_LOCATION : v); + return locations; } /** diff --git a/warehouse/query-core/src/main/java/datawave/query/scheduler/PushdownScheduler.java b/warehouse/query-core/src/main/java/datawave/query/scheduler/PushdownScheduler.java index 897f66f575a..f231cc71561 100644 --- a/warehouse/query-core/src/main/java/datawave/query/scheduler/PushdownScheduler.java +++ b/warehouse/query-core/src/main/java/datawave/query/scheduler/PushdownScheduler.java @@ -27,6 +27,7 @@ import com.google.common.collect.Lists; import datawave.accumulo.inmemory.InMemoryAccumuloClient; +import datawave.core.common.connection.AccumuloTableInfoFetcher; import datawave.core.common.logging.ThreadConfigurableLogger; import datawave.core.query.configuration.QueryData; import datawave.core.query.configuration.Result; @@ -154,7 +155,10 @@ protected Iterator concatIterators() throws AccumuloException, AccumuloS if (client instanceof InMemoryAccumuloClient) { tableId = TableId.of(config.getTableName()); } else { - tableId = TableId.of(client.tableOperations().tableIdMap().get(tableName)); + tableId = AccumuloTableInfoFetcher.getTableId(client, tableName); + if (tableId == null) { + throw new TableNotFoundException(null, tableName, "Table does not exist"); + } } Iterator> chunkIter = Iterators.transform(getQueryDataIterator(), getPushdownFunction()); From 516f3c69a43727b3daf13e99a8583e62a01edffb Mon Sep 17 00:00:00 2001 From: Seth Date: Fri, 10 Apr 2026 13:30:38 -0400 Subject: [PATCH 6/9] Fix javadoc generation for AccumuloTableInfoFetcher.locateTablets --- .../core/common/connection/AccumuloTableInfoFetcher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/connection-pool/src/main/java/datawave/core/common/connection/AccumuloTableInfoFetcher.java b/core/connection-pool/src/main/java/datawave/core/common/connection/AccumuloTableInfoFetcher.java index 962674c6fbe..41fe4558a59 100644 --- a/core/connection-pool/src/main/java/datawave/core/common/connection/AccumuloTableInfoFetcher.java +++ b/core/connection-pool/src/main/java/datawave/core/common/connection/AccumuloTableInfoFetcher.java @@ -63,7 +63,7 @@ public static TableId getTableId(AccumuloClient client, String tableName) { * the table to locate tablets for * @param ranges * the ranges to bin into tablet locations - * @return a map of location -> (extent -> ranges) + * @return a map of {@code location -> (extent -> ranges)} * @throws AccumuloException * if a general Accumulo error occurs * @throws AccumuloSecurityException From 60f25dbc5f23842ac8db93b97906cf24650a8444 Mon Sep 17 00:00:00 2001 From: Seth Date: Tue, 28 Apr 2026 11:05:44 -0400 Subject: [PATCH 7/9] Rename AccumuloTableInfoFetcher to AccumuloTableUtils per lbschanno Also add UnsupportedOperationException in private constructor. Part of #2443 --- ...TableInfoFetcher.java => AccumuloTableUtils.java} | 6 +++--- ...oFetcherTest.java => AccumuloTableUtilsTest.java} | 12 ++++++------ .../main/java/datawave/mr/bulk/BulkInputFormat.java | 4 ++-- .../mapreduce/job/BulkIngestMapFileLoader.java | 4 ++-- .../ingest/mapreduce/job/TableSplitsCache.java | 4 ++-- .../datawave/query/scheduler/PushdownScheduler.java | 4 ++-- 6 files changed, 17 insertions(+), 17 deletions(-) rename core/connection-pool/src/main/java/datawave/core/common/connection/{AccumuloTableInfoFetcher.java => AccumuloTableUtils.java} (98%) rename core/connection-pool/src/test/java/datawave/core/common/connection/{AccumuloTableInfoFetcherTest.java => AccumuloTableUtilsTest.java} (85%) diff --git a/core/connection-pool/src/main/java/datawave/core/common/connection/AccumuloTableInfoFetcher.java b/core/connection-pool/src/main/java/datawave/core/common/connection/AccumuloTableUtils.java similarity index 98% rename from core/connection-pool/src/main/java/datawave/core/common/connection/AccumuloTableInfoFetcher.java rename to core/connection-pool/src/main/java/datawave/core/common/connection/AccumuloTableUtils.java index 41fe4558a59..4511802c1bd 100644 --- a/core/connection-pool/src/main/java/datawave/core/common/connection/AccumuloTableInfoFetcher.java +++ b/core/connection-pool/src/main/java/datawave/core/common/connection/AccumuloTableUtils.java @@ -31,10 +31,10 @@ * * @see Issue #2443 */ -public final class AccumuloTableInfoFetcher { +public final class AccumuloTableUtils { - private AccumuloTableInfoFetcher() { - // utility class + private AccumuloTableUtils() { + throw new UnsupportedOperationException("Utility class"); } /** diff --git a/core/connection-pool/src/test/java/datawave/core/common/connection/AccumuloTableInfoFetcherTest.java b/core/connection-pool/src/test/java/datawave/core/common/connection/AccumuloTableUtilsTest.java similarity index 85% rename from core/connection-pool/src/test/java/datawave/core/common/connection/AccumuloTableInfoFetcherTest.java rename to core/connection-pool/src/test/java/datawave/core/common/connection/AccumuloTableUtilsTest.java index 3581e89e5ef..40c559bf298 100644 --- a/core/connection-pool/src/test/java/datawave/core/common/connection/AccumuloTableInfoFetcherTest.java +++ b/core/connection-pool/src/test/java/datawave/core/common/connection/AccumuloTableUtilsTest.java @@ -21,7 +21,7 @@ import datawave.accumulo.inmemory.InMemoryAccumuloClient; import datawave.accumulo.inmemory.InMemoryInstance; -public class AccumuloTableInfoFetcherTest { +public class AccumuloTableUtilsTest { private AccumuloClient client; @@ -36,19 +36,19 @@ public void testGetMajorCompactionCountThrowsWithoutCluster() throws Exception { // The Thrift-based implementation requires a live Accumulo cluster. // With InMemoryAccumuloClient, the underlying ClientContext cannot connect // to ZooKeeper, so this should throw AccumuloException. - AccumuloTableInfoFetcher.getMajorCompactionCount(client); + AccumuloTableUtils.getMajorCompactionCount(client); } @Test public void testGetTableIdForExistingTable() { // accumulo.metadata always exists - TableId id = AccumuloTableInfoFetcher.getTableId(client, "accumulo.metadata"); + TableId id = AccumuloTableUtils.getTableId(client, "accumulo.metadata"); assertNotNull("Table ID should not be null for existing table", id); } @Test public void testGetTableIdForNonexistentTable() { - TableId id = AccumuloTableInfoFetcher.getTableId(client, "nonexistent_table"); + TableId id = AccumuloTableUtils.getTableId(client, "nonexistent_table"); assertNull("Table ID should be null for nonexistent table", id); } @@ -57,7 +57,7 @@ public void testLocateTabletsForExistingTable() throws Exception { // Create a test table client.tableOperations().create("testTable"); List ranges = Collections.singletonList(new Range()); - Map>> result = AccumuloTableInfoFetcher.locateTablets(client, "testTable", ranges); + Map>> result = AccumuloTableUtils.locateTablets(client, "testTable", ranges); assertNotNull("Result should not be null", result); // With a new table there should be exactly one tablet (the default tablet) int totalExtents = result.values().stream().mapToInt(Map::size).sum(); @@ -67,7 +67,7 @@ public void testLocateTabletsForExistingTable() throws Exception { @Test public void testGetSplitsWithLocationsForExistingTable() throws Exception { client.tableOperations().create("testSplitsTable"); - Map result = AccumuloTableInfoFetcher.getSplitsWithLocations(client, "testSplitsTable"); + Map result = AccumuloTableUtils.getSplitsWithLocations(client, "testSplitsTable"); assertNotNull("Result should not be null", result); // A new table with no splits has one tablet with no end-row, so result should be empty assertTrue("New table with no splits should return empty map", result.isEmpty()); diff --git a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java index bf1f9446b59..117f3f5e092 100644 --- a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java +++ b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java @@ -72,7 +72,7 @@ import datawave.accumulo.inmemory.InMemoryAccumuloClient; import datawave.accumulo.inmemory.InMemoryInstance; import datawave.common.util.ArgumentChecker; -import datawave.core.common.connection.AccumuloTableInfoFetcher; +import datawave.core.common.connection.AccumuloTableUtils; import datawave.ingest.data.config.ingest.AccumuloHelper; import datawave.mr.bulk.split.DefaultLocationStrategy; import datawave.mr.bulk.split.DefaultSplitStrategy; @@ -1072,7 +1072,7 @@ public List getSplits(JobContext job) throws IOException { } } // REVIEW: retry semantics differ from original TabletLocator.binRanges() loop - binnedRanges = AccumuloTableInfoFetcher.locateTablets(client, tableName, ranges); + binnedRanges = AccumuloTableUtils.locateTablets(client, tableName, ranges); clipRanges(binnedRanges); } } diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java index def7fa90d90..dd7c852c162 100755 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java @@ -61,7 +61,7 @@ import com.google.common.base.Objects; import com.google.common.collect.Lists; -import datawave.core.common.connection.AccumuloTableInfoFetcher; +import datawave.core.common.connection.AccumuloTableUtils; import datawave.ingest.data.TypeRegistry; import datawave.ingest.mapreduce.StandaloneStatusReporter; import datawave.util.cli.PasswordConverter; @@ -711,7 +711,7 @@ public boolean canBringMapFilesOnline(long lastOnlineTime, boolean logInfo) { private int getMajorCompactionCount() { try { - return AccumuloTableInfoFetcher.getMajorCompactionCount(accumuloClient); + return AccumuloTableUtils.getMajorCompactionCount(accumuloClient); } catch (Exception e) { log.error("Unable to retrieve major compaction stats: " + e.getMessage()); return 0; diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/TableSplitsCache.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/TableSplitsCache.java index 2dedde3aae4..ae821c9f031 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/TableSplitsCache.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/TableSplitsCache.java @@ -33,7 +33,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; -import datawave.core.common.connection.AccumuloTableInfoFetcher; +import datawave.core.common.connection.AccumuloTableUtils; import datawave.ingest.config.BaseHdfsFileCacheUtil; import datawave.ingest.mapreduce.partition.BalancedShardPartitioner; import datawave.ingest.mapreduce.partition.DelegatePartitioner; @@ -79,7 +79,7 @@ public class TableSplitsCache extends BaseHdfsFileCacheUtil { private Map getSplitsWithLocation(String table) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { AccumuloClient client = accumuloHelper.newClient(); - Map locations = AccumuloTableInfoFetcher.getSplitsWithLocations(client, table); + Map locations = AccumuloTableUtils.getSplitsWithLocations(client, table); // Replace empty-string locations with NO_LOCATION sentinel locations.replaceAll((k, v) -> v.isEmpty() ? NO_LOCATION : v); return locations; diff --git a/warehouse/query-core/src/main/java/datawave/query/scheduler/PushdownScheduler.java b/warehouse/query-core/src/main/java/datawave/query/scheduler/PushdownScheduler.java index f231cc71561..24eb116eadb 100644 --- a/warehouse/query-core/src/main/java/datawave/query/scheduler/PushdownScheduler.java +++ b/warehouse/query-core/src/main/java/datawave/query/scheduler/PushdownScheduler.java @@ -27,7 +27,7 @@ import com.google.common.collect.Lists; import datawave.accumulo.inmemory.InMemoryAccumuloClient; -import datawave.core.common.connection.AccumuloTableInfoFetcher; +import datawave.core.common.connection.AccumuloTableUtils; import datawave.core.common.logging.ThreadConfigurableLogger; import datawave.core.query.configuration.QueryData; import datawave.core.query.configuration.Result; @@ -155,7 +155,7 @@ protected Iterator concatIterators() throws AccumuloException, AccumuloS if (client instanceof InMemoryAccumuloClient) { tableId = TableId.of(config.getTableName()); } else { - tableId = AccumuloTableInfoFetcher.getTableId(client, tableName); + tableId = AccumuloTableUtils.getTableId(client, tableName); if (tableId == null) { throw new TableNotFoundException(null, tableName, "Table does not exist"); } From 87c1cfe18d8ccf7bbfd67598ebf7b1502343d029 Mon Sep 17 00:00:00 2001 From: Seth Date: Tue, 5 May 2026 15:19:35 -0400 Subject: [PATCH 8/9] Remove AccumuloSecurityException from throws clauses per lbschanno review Catch and wrap AccumuloSecurityException as AccumuloException in locateTablets() and getSplitsWithLocations() since it is never actually thrown. Propagate removal to TableSplitsCache caller. Add explicit braces to for-loop in clipRanges(). --- .../common/connection/AccumuloTableUtils.java | 23 +++++++++++-------- .../datawave/mr/bulk/BulkInputFormat.java | 3 ++- .../mapreduce/job/TableSplitsCache.java | 5 ++-- 3 files changed, 18 insertions(+), 13 deletions(-) diff --git a/core/connection-pool/src/main/java/datawave/core/common/connection/AccumuloTableUtils.java b/core/connection-pool/src/main/java/datawave/core/common/connection/AccumuloTableUtils.java index 4511802c1bd..bdd87d55042 100644 --- a/core/connection-pool/src/main/java/datawave/core/common/connection/AccumuloTableUtils.java +++ b/core/connection-pool/src/main/java/datawave/core/common/connection/AccumuloTableUtils.java @@ -66,8 +66,6 @@ public static TableId getTableId(AccumuloClient client, String tableName) { * @return a map of {@code location -> (extent -> ranges)} * @throws AccumuloException * if a general Accumulo error occurs - * @throws AccumuloSecurityException - * if a security error occurs * @throws TableNotFoundException * if the table does not exist */ @@ -78,8 +76,13 @@ public static TableId getTableId(AccumuloClient client, String tableName) { // REVIEW: The original TabletLocator.binRanges() used a retry loop for partial binning failures. // The public locate() API may handle this internally. Verify retry semantics are equivalent. public static Map>> locateTablets(AccumuloClient client, String tableName, List ranges) - throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - Locations locations = client.tableOperations().locate(tableName, ranges); + throws AccumuloException, TableNotFoundException { + Locations locations; + try { + locations = client.tableOperations().locate(tableName, ranges); + } catch (AccumuloSecurityException e) { + throw new AccumuloException(e); + } Map>> binnedRanges = new HashMap<>(); for (Map.Entry> entry : locations.groupByTablet().entrySet()) { TabletId tabletId = entry.getKey(); @@ -106,14 +109,16 @@ public static Map>> locateTablets(AccumuloClien * @return a sorted map of split point (end-row) to tablet server location * @throws AccumuloException * if a general Accumulo error occurs - * @throws AccumuloSecurityException - * if a security error occurs * @throws TableNotFoundException * if the table does not exist */ - public static Map getSplitsWithLocations(AccumuloClient client, String tableName) - throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - Locations locations = client.tableOperations().locate(tableName, Collections.singletonList(new Range())); + public static Map getSplitsWithLocations(AccumuloClient client, String tableName) throws AccumuloException, TableNotFoundException { + Locations locations; + try { + locations = client.tableOperations().locate(tableName, Collections.singletonList(new Range())); + } catch (AccumuloSecurityException e) { + throw new AccumuloException(e); + } Map result = new TreeMap<>(); for (Map.Entry> entry : locations.groupByTablet().entrySet()) { TabletId tabletId = entry.getKey(); diff --git a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java index 117f3f5e092..805dbd85090 100644 --- a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java +++ b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java @@ -1141,8 +1141,9 @@ private void clipRanges(Map>> binnedRanges) { Range tabletRange = tabletRanges.getKey().toDataRange(); List clippedRanges = new ArrayList<>(); tabletMap.put(tabletRanges.getKey(), clippedRanges); - for (Range range : tabletRanges.getValue()) + for (Range range : tabletRanges.getValue()) { clippedRanges.add(tabletRange.clip(range)); + } } } diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/TableSplitsCache.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/TableSplitsCache.java index ae821c9f031..8aa65b9a289 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/TableSplitsCache.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/TableSplitsCache.java @@ -18,7 +18,6 @@ import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.data.Value; import org.apache.commons.codec.binary.Base64; @@ -77,7 +76,7 @@ public class TableSplitsCache extends BaseHdfsFileCacheUtil { private PartitionerCache partitionerCache; - private Map getSplitsWithLocation(String table) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + private Map getSplitsWithLocation(String table) throws AccumuloException, TableNotFoundException { AccumuloClient client = accumuloHelper.newClient(); Map locations = AccumuloTableUtils.getSplitsWithLocations(client, table); // Replace empty-string locations with NO_LOCATION sentinel @@ -250,7 +249,7 @@ public void writeCacheFile(FileSystem fs, Path tmpSplitsFile) throws IOException // if the file exists and the new file would exceed the deviation threshold, don't replace it throw new IOException("Splits file will not be replaced"); } - } catch (IOException | AccumuloSecurityException | AccumuloException | TableNotFoundException ex) { + } catch (IOException | AccumuloException | TableNotFoundException ex) { log.error("Unable to write new splits file", ex); throw new IOException(ex); } From 866f286b6edc9c9970f1649c54d64f6073f99baa Mon Sep 17 00:00:00 2001 From: Seth Date: Tue, 5 May 2026 16:34:41 -0400 Subject: [PATCH 9/9] Add javadoc to clipRanges() per lbschanno review --- .../src/main/java/datawave/mr/bulk/BulkInputFormat.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java index 805dbd85090..3a74503ef2b 100644 --- a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java +++ b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java @@ -1130,9 +1130,14 @@ public List getSplits(JobContext job) throws IOException { return splits; } + /** + * Truncates/clips the ranges in binnedRanges to fit within their assigned tablet boundaries. This makes it easier to identify what work needs to be redone + * when failures occur and tablets have merged or split. The method modifies binnedRanges in place. + * + * @param binnedRanges + * the map of location -> (extent -> ranges) to clip + */ private void clipRanges(Map>> binnedRanges) { - // truncate the ranges to within the tablets... this makes it easier to know what work - // needs to be redone when failures occurs and tablets have merged or split Map>> binnedRanges2 = new HashMap<>(); for (Entry>> entry : binnedRanges.entrySet()) { Map> tabletMap = new HashMap<>();