-
Notifications
You must be signed in to change notification settings - Fork 282
Replace ClientContext/TabletLocator/MetadataServicer with AccumuloTableInfoFetcher facade #3449
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
SethSmucker
wants to merge
23
commits into
integration
Choose a base branch
from
task/clientcontext-facade-migration
base: integration
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 16 commits
Commits
Show all changes
23 commits
Select commit
Hold shift + click to select a range
1c86b79
Replace ClientContext.getTableId with public API
SethSmucker 5f29ebf
Add AccumuloTableInfoFetcher facade to replace non-public Accumulo AP…
SethSmucker 0172f51
Address PR #3449 review feedback from jschmidt10
SethSmucker 2c33a78
Merge
SethSmucker a488b60
Merge branch 'integration' into task/clientcontext-facade-migration
SethSmucker 0c04d33
Use Thrift RPC for queued compaction counts, inline public API calls …
SethSmucker 44e5aa0
Merge branch 'integration' into task/clientcontext-facade-migration
SethSmucker be1a2e1
Merge remote-tracking branch 'origin/integration' into task/clientcon…
SethSmucker 31bb688
Merge branch 'task/clientcontext-facade-migration' of github.com:Nati…
SethSmucker 4ffad05
Merge branch 'integration' into task/clientcontext-facade-migration
SethSmucker 619d710
Refactor AccumuloTableInfoFetcher to static utility class
SethSmucker 0e166e9
Merge branch 'integration' into task/clientcontext-facade-migration
SethSmucker 516f3c6
Fix javadoc generation for AccumuloTableInfoFetcher.locateTablets
SethSmucker d2c0891
Merge branch 'integration' into task/clientcontext-facade-migration
SethSmucker 40f003f
Merge remote-tracking branch 'origin/integration' into task/clientcon…
SethSmucker 19772a9
Merge branch 'integration' into task/clientcontext-facade-migration
lbschanno 60f25db
Rename AccumuloTableInfoFetcher to AccumuloTableUtils per lbschanno
SethSmucker 87c1cfe
Remove AccumuloSecurityException from throws clauses per lbschanno re…
SethSmucker 866f286
Add javadoc to clipRanges() per lbschanno review
SethSmucker 3089e5e
Merge branch 'integration' into task/clientcontext-facade-migration
SethSmucker 1c463ca
Merge branch 'integration' into task/clientcontext-facade-migration
SethSmucker 7428623
Merge branch 'integration' into task/clientcontext-facade-migration
SethSmucker b243082
Merge branch 'integration' into task/clientcontext-facade-migration
SethSmucker File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
168 changes: 168 additions & 0 deletions
168
...nnection-pool/src/main/java/datawave/core/common/connection/AccumuloTableInfoFetcher.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,168 @@ | ||
| package datawave.core.common.connection; | ||
|
|
||
| import java.util.Collections; | ||
| import java.util.HashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.TreeMap; | ||
|
|
||
| import org.apache.accumulo.core.client.AccumuloClient; | ||
| import org.apache.accumulo.core.client.AccumuloException; | ||
| import org.apache.accumulo.core.client.AccumuloSecurityException; | ||
| import org.apache.accumulo.core.client.TableNotFoundException; | ||
| import org.apache.accumulo.core.client.admin.Locations; | ||
| import org.apache.accumulo.core.clientImpl.ClientContext; | ||
| import org.apache.accumulo.core.data.Range; | ||
| import org.apache.accumulo.core.data.TableId; | ||
| import org.apache.accumulo.core.data.TabletId; | ||
| import org.apache.accumulo.core.dataImpl.KeyExtent; | ||
| import org.apache.accumulo.core.manager.thrift.ManagerClientService; | ||
| import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo; | ||
| import org.apache.accumulo.core.master.thrift.TableInfo; | ||
| import org.apache.accumulo.core.rpc.ThriftUtil; | ||
| import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; | ||
| import org.apache.hadoop.io.Text; | ||
|
|
||
| /** | ||
| * Utility class that centralizes Accumulo table metadata operations. | ||
| * <p> | ||
| * This class replaces direct usage of non-public Accumulo internals ({@code ClientContext}, {@code MetadataServicer}, {@code TabletLocator}, etc.) with public | ||
| * API equivalents where possible. Methods that still require non-public APIs are marked with {@code NON-PUBLIC API} comments. | ||
| * | ||
| * @see <a href="https://github.com/NationalSecurityAgency/datawave/issues/2443">Issue #2443</a> | ||
| */ | ||
| public final class AccumuloTableInfoFetcher { | ||
|
|
||
| private AccumuloTableInfoFetcher() { | ||
| // utility class | ||
|
SethSmucker marked this conversation as resolved.
Outdated
|
||
| } | ||
|
|
||
| /** | ||
| * Get the {@link TableId} for a table by name. | ||
| * | ||
| * @param client | ||
| * the Accumulo client | ||
| * @param tableName | ||
| * the table name to look up | ||
| * @return the TableId, or {@code null} if the table does not exist | ||
| */ | ||
| public static TableId getTableId(AccumuloClient client, String tableName) { | ||
| String id = client.tableOperations().tableIdMap().get(tableName); | ||
| return id == null ? null : TableId.of(id); | ||
| } | ||
|
|
||
| /** | ||
| * Locate tablets for the given ranges and return them grouped by tablet server location, keyed by {@link KeyExtent}. | ||
| * <p> | ||
| * This replaces the {@code TabletLocator.binRanges()} pattern. The returned structure maps tablet server location strings to a map of {@link KeyExtent} to | ||
| * the ranges assigned to that extent, matching the structure expected by downstream consumers like {@code clipRanges()}. | ||
| * | ||
| * @param client | ||
| * the Accumulo client | ||
| * @param tableName | ||
| * the table to locate tablets for | ||
| * @param ranges | ||
| * the ranges to bin into tablet locations | ||
| * @return a map of {@code location -> (extent -> ranges)} | ||
| * @throws AccumuloException | ||
| * if a general Accumulo error occurs | ||
| * @throws AccumuloSecurityException | ||
| * if a security error occurs | ||
| * @throws TableNotFoundException | ||
| * if the table does not exist | ||
| */ | ||
| // NON-PUBLIC API: Return type uses KeyExtent (org.apache.accumulo.core.dataImpl) which is non-public. | ||
| // This is required for compatibility with downstream consumers (clipRanges, binOfflineTable). | ||
| // When callers are migrated to use TabletId (public API), this method can be updated. | ||
| // | ||
| // REVIEW: The original TabletLocator.binRanges() used a retry loop for partial binning failures. | ||
| // The public locate() API may handle this internally. Verify retry semantics are equivalent. | ||
| public static Map<String,Map<KeyExtent,List<Range>>> locateTablets(AccumuloClient client, String tableName, List<Range> ranges) | ||
| throws AccumuloException, AccumuloSecurityException, TableNotFoundException { | ||
| Locations locations = client.tableOperations().locate(tableName, ranges); | ||
| Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>(); | ||
| for (Map.Entry<TabletId,List<Range>> entry : locations.groupByTablet().entrySet()) { | ||
| TabletId tabletId = entry.getKey(); | ||
| String location = locations.getTabletLocation(tabletId); | ||
| if (location == null) { | ||
| location = ""; | ||
| } | ||
| KeyExtent extent = KeyExtent.fromTabletId(tabletId); | ||
| binnedRanges.computeIfAbsent(location, k -> new HashMap<>()).put(extent, entry.getValue()); | ||
| } | ||
| return binnedRanges; | ||
| } | ||
|
|
||
| /** | ||
| * Get the split points and their tablet server locations for a table. | ||
| * <p> | ||
| * Returns a sorted map of end-row to tablet server location for each tablet. Tablets with no end-row (the last tablet) are excluded. This replaces the | ||
| * {@code MetadataServicer.getTabletLocations()} pattern. | ||
| * | ||
| * @param client | ||
| * the Accumulo client | ||
| * @param tableName | ||
| * the table name | ||
| * @return a sorted map of split point (end-row) to tablet server location | ||
| * @throws AccumuloException | ||
| * if a general Accumulo error occurs | ||
| * @throws AccumuloSecurityException | ||
| * if a security error occurs | ||
| * @throws TableNotFoundException | ||
| * if the table does not exist | ||
| */ | ||
| public static Map<Text,String> getSplitsWithLocations(AccumuloClient client, String tableName) | ||
| throws AccumuloException, AccumuloSecurityException, TableNotFoundException { | ||
| Locations locations = client.tableOperations().locate(tableName, Collections.singletonList(new Range())); | ||
| Map<Text,String> result = new TreeMap<>(); | ||
| for (Map.Entry<TabletId,List<Range>> entry : locations.groupByTablet().entrySet()) { | ||
| TabletId tabletId = entry.getKey(); | ||
| Text endRow = tabletId.getEndRow(); | ||
| if (endRow != null) { | ||
| String location = locations.getTabletLocation(tabletId); | ||
| result.put(endRow, location == null ? "" : location); | ||
| } | ||
| } | ||
| return result; | ||
| } | ||
|
|
||
| // NON-PUBLIC API: Uses Thrift RPC to get queued + running compaction counts. | ||
| // No public Accumulo API exists for queued compactions as of 2.1.x. | ||
| // Tracking: https://github.com/apache/accumulo/issues/5965 | ||
| // | ||
| // To swap out: when a public API for compaction counts is available, | ||
| // replace this method body and remove the Thrift imports above. | ||
| /** | ||
| * Get the count of queued and running major compactions across all tablet servers. | ||
| * | ||
| * @param client | ||
| * the Accumulo client | ||
| * @return the number of queued and running major compactions | ||
| * @throws AccumuloException | ||
| * if a general Accumulo error occurs | ||
| */ | ||
| public static int getMajorCompactionCount(AccumuloClient client) throws AccumuloException { | ||
| int majC = 0; | ||
|
|
||
| ClientContext context = (ClientContext) client; | ||
| ManagerClientService.Client managerClient = null; | ||
| try { | ||
| managerClient = ThriftClientTypes.MANAGER.getConnection(context); | ||
| ManagerMonitorInfo mmi = managerClient.getManagerStats(null, context.rpcCreds()); | ||
| Map<String,TableInfo> tableStats = mmi.getTableMap(); | ||
|
|
||
| for (Map.Entry<String,TableInfo> e : tableStats.entrySet()) { | ||
| majC += e.getValue().getMajors().getQueued(); | ||
| majC += e.getValue().getMajors().getRunning(); | ||
| } | ||
| } catch (Exception e) { | ||
| throw new AccumuloException("Unable to retrieve major compaction stats", e); | ||
| } finally { | ||
| if (managerClient != null) { | ||
| ThriftUtil.close(managerClient, context); | ||
| } | ||
| } | ||
|
|
||
| return majC; | ||
| } | ||
| } | ||
75 changes: 75 additions & 0 deletions
75
...tion-pool/src/test/java/datawave/core/common/connection/AccumuloTableInfoFetcherTest.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,75 @@ | ||
| package datawave.core.common.connection; | ||
|
|
||
| import static org.junit.Assert.assertEquals; | ||
| import static org.junit.Assert.assertNotNull; | ||
| import static org.junit.Assert.assertNull; | ||
| import static org.junit.Assert.assertTrue; | ||
|
|
||
| import java.util.Collections; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
|
|
||
| import org.apache.accumulo.core.client.AccumuloClient; | ||
| import org.apache.accumulo.core.client.AccumuloException; | ||
| import org.apache.accumulo.core.data.Range; | ||
| import org.apache.accumulo.core.data.TableId; | ||
| import org.apache.accumulo.core.dataImpl.KeyExtent; | ||
| import org.apache.hadoop.io.Text; | ||
| import org.junit.Before; | ||
| import org.junit.Test; | ||
|
|
||
| import datawave.accumulo.inmemory.InMemoryAccumuloClient; | ||
| import datawave.accumulo.inmemory.InMemoryInstance; | ||
|
|
||
| public class AccumuloTableInfoFetcherTest { | ||
|
|
||
| private AccumuloClient client; | ||
|
|
||
| @Before | ||
| public void setup() throws Exception { | ||
| InMemoryInstance instance = new InMemoryInstance(); | ||
| client = new InMemoryAccumuloClient("root", instance); | ||
| } | ||
|
|
||
| @Test(expected = AccumuloException.class) | ||
| public void testGetMajorCompactionCountThrowsWithoutCluster() throws Exception { | ||
| // The Thrift-based implementation requires a live Accumulo cluster. | ||
| // With InMemoryAccumuloClient, the underlying ClientContext cannot connect | ||
| // to ZooKeeper, so this should throw AccumuloException. | ||
| AccumuloTableInfoFetcher.getMajorCompactionCount(client); | ||
| } | ||
|
|
||
| @Test | ||
| public void testGetTableIdForExistingTable() { | ||
| // accumulo.metadata always exists | ||
| TableId id = AccumuloTableInfoFetcher.getTableId(client, "accumulo.metadata"); | ||
| assertNotNull("Table ID should not be null for existing table", id); | ||
| } | ||
|
|
||
| @Test | ||
| public void testGetTableIdForNonexistentTable() { | ||
| TableId id = AccumuloTableInfoFetcher.getTableId(client, "nonexistent_table"); | ||
| assertNull("Table ID should be null for nonexistent table", id); | ||
| } | ||
|
|
||
| @Test | ||
| public void testLocateTabletsForExistingTable() throws Exception { | ||
| // Create a test table | ||
| client.tableOperations().create("testTable"); | ||
| List<Range> ranges = Collections.singletonList(new Range()); | ||
| Map<String,Map<KeyExtent,List<Range>>> result = AccumuloTableInfoFetcher.locateTablets(client, "testTable", ranges); | ||
| assertNotNull("Result should not be null", result); | ||
| // With a new table there should be exactly one tablet (the default tablet) | ||
| int totalExtents = result.values().stream().mapToInt(Map::size).sum(); | ||
| assertEquals("New table should have exactly one tablet", 1, totalExtents); | ||
| } | ||
|
|
||
| @Test | ||
| public void testGetSplitsWithLocationsForExistingTable() throws Exception { | ||
| client.tableOperations().create("testSplitsTable"); | ||
| Map<Text,String> result = AccumuloTableInfoFetcher.getSplitsWithLocations(client, "testSplitsTable"); | ||
| assertNotNull("Result should not be null", result); | ||
| // A new table with no splits has one tablet with no end-row, so result should be empty | ||
| assertTrue("New table with no splits should return empty map", result.isEmpty()); | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.