From 7329eca4d059d7c5ad348874f34b59ac831651e5 Mon Sep 17 00:00:00 2001 From: Seth Date: Wed, 8 Oct 2025 15:36:35 -0400 Subject: [PATCH 1/9] Remove as many clientImpl non-public Accumulo as a could. --- core/in-memory-accumulo/pom.xml | 24 +++++++++ .../inmemory/InMemoryAccumuloClient.java | 47 +++++++++-------- .../accumulo/inmemory/InMemoryClientInfo.java | 23 --------- .../accumulo/inmemory/InMemoryConnector.java | 51 ++++--------------- .../accumulo/inmemory/InMemoryInstance.java | 40 +-------------- .../datawave/mr/bulk/BulkInputFormat.java | 8 ++- 6 files changed, 69 insertions(+), 124 deletions(-) delete mode 100644 core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryClientInfo.java diff --git a/core/in-memory-accumulo/pom.xml b/core/in-memory-accumulo/pom.xml index 39731e50bcb..3704ff093ca 100644 --- a/core/in-memory-accumulo/pom.xml +++ b/core/in-memory-accumulo/pom.xml @@ -105,6 +105,30 @@ org.mockito mockito-core + + org.apache.accumulo + accumulo-core + 2.1.4-5792fed3-bulkv2 + compile + + + org.apache.accumulo + accumulo-core + 2.1.4-5792fed3-bulkv2 + compile + + + org.apache.accumulo + accumulo-core + 2.1.4-5792fed3-bulkv2 + compile + + + org.apache.accumulo + accumulo-core + 2.1.4-5792fed3-bulkv2 + compile + diff --git a/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryAccumuloClient.java b/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryAccumuloClient.java index bcb097e396e..4c64ee48087 100644 --- a/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryAccumuloClient.java +++ b/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryAccumuloClient.java @@ -17,6 +17,7 @@ package datawave.accumulo.inmemory; import java.util.Properties; +import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.AccumuloException; @@ -30,35 +31,31 @@ import org.apache.accumulo.core.client.MultiTableBatchWriter; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; + import org.apache.accumulo.core.client.admin.InstanceOperations; import org.apache.accumulo.core.client.admin.NamespaceOperations; import org.apache.accumulo.core.client.admin.ReplicationOperations; import org.apache.accumulo.core.client.admin.SecurityOperations; import org.apache.accumulo.core.client.admin.TableOperations; import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.clientImpl.ClientContext; -import org.apache.accumulo.core.clientImpl.Credentials; -import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode; -import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.SystemPermission; -import org.apache.accumulo.core.singletons.SingletonReservation; -public class InMemoryAccumuloClient extends ClientContext implements AccumuloClient { +// Remove `extends ClientContext` since it's deprecated. +// Need a workaround for the features that are no longer supported. Not sure if we can just throw them away atm. + +public class InMemoryAccumuloClient implements AccumuloClient { String username; private final InMemoryAccumulo acu; + private ConditionalWriterConfig conditionalWriterConfig; + private volatile boolean closed = false; + public InMemoryAccumuloClient(String username, InMemoryInstance instance) throws AccumuloSecurityException { - this(new Credentials(username, new PasswordToken(new byte[0])), instance.acu); - } - public InMemoryAccumuloClient(Credentials credentials, InMemoryAccumulo acu) throws AccumuloSecurityException { - super(SingletonReservation.noop(), new InMemoryClientInfo(credentials), DefaultConfiguration.getInstance(), null); - if (credentials.getToken().isDestroyed()) - throw new AccumuloSecurityException(credentials.getPrincipal(), SecurityErrorCode.TOKEN_EXPIRED); - this.username = credentials.getPrincipal(); - this.acu = acu; + this.username = username; + this.acu = instance.acu; if (!acu.users.containsKey(username)) { InMemoryUser user = new InMemoryUser(username, new PasswordToken(new byte[0]), Authorizations.EMPTY); user.permissions.add(SystemPermission.SYSTEM); @@ -66,6 +63,22 @@ public InMemoryAccumuloClient(Credentials credentials, InMemoryAccumulo acu) thr } } + @Override + public ConditionalWriter createConditionalWriter(String tableName, ConditionalWriterConfig config) throws TableNotFoundException { + // TODO add implementation + throw new UnsupportedOperationException(); + } + + public ConditionalWriter createConditionalWriter(String tableName) throws TableNotFoundException { + return this.createConditionalWriter(tableName, (ConditionalWriterConfig)null); + } + + private void ensureOpen() { + if (this.closed) { + throw new IllegalStateException("This client was closed."); + } + } + @Override public BatchScanner createBatchScanner(String tableName, Authorizations authorizations, int numQueryThreads) throws TableNotFoundException { if (acu.tables.get(tableName) == null) @@ -156,12 +169,6 @@ public NamespaceOperations namespaceOperations() { return new InMemoryNamespaceOperations(acu, username); } - @Override - public ConditionalWriter createConditionalWriter(String tableName, ConditionalWriterConfig config) { - // TODO add implementation - throw new UnsupportedOperationException(); - } - @Override public ReplicationOperations replicationOperations() { // TODO add implementation diff --git a/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryClientInfo.java b/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryClientInfo.java deleted file mode 100644 index 4f172a55401..00000000000 --- a/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryClientInfo.java +++ /dev/null @@ -1,23 +0,0 @@ -package datawave.accumulo.inmemory; - -import java.util.Properties; - -import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.clientImpl.ClientInfoImpl; -import org.apache.accumulo.core.clientImpl.Credentials; -import org.apache.accumulo.core.conf.ClientProperty; - -public class InMemoryClientInfo extends ClientInfoImpl { - public InMemoryClientInfo(Credentials credentials) { - super(toProperties(credentials), credentials.getToken()); - } - - private static Properties toProperties(Credentials credentials) { - Properties props = new Properties(); - props.put(ClientProperty.INSTANCE_NAME.getKey(), new InMemoryInstance().instanceName); - props.put(ClientProperty.AUTH_PRINCIPAL.getKey(), credentials.getPrincipal()); - props.put(ClientProperty.AUTH_TOKEN.getKey(), new String(((PasswordToken) (credentials.getToken())).getPassword())); - props.put(ClientProperty.AUTH_TYPE.getKey(), "password"); - return props; - } -} diff --git a/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryConnector.java b/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryConnector.java index 5708d0cfa84..37164b070a6 100644 --- a/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryConnector.java +++ b/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryConnector.java @@ -25,47 +25,39 @@ import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.ConditionalWriter; import org.apache.accumulo.core.client.ConditionalWriterConfig; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.MultiTableBatchWriter; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.InstanceOperations; import org.apache.accumulo.core.client.admin.NamespaceOperations; -import org.apache.accumulo.core.client.admin.ReplicationOperations; import org.apache.accumulo.core.client.admin.SecurityOperations; import org.apache.accumulo.core.client.admin.TableOperations; import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.clientImpl.Credentials; -import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.SystemPermission; -public class InMemoryConnector extends Connector { +public class InMemoryConnector { String username; private final InMemoryAccumulo acu; - private final Instance instance; - InMemoryConnector(String username, InMemoryInstance instance) throws AccumuloSecurityException { - this(new Credentials(username, new PasswordToken(new byte[0])), new InMemoryAccumulo(InMemoryInstance.getDefaultFileSystem()), instance); - } + InMemoryConnector(String username, InMemoryInstance instance) throws AccumuloSecurityException { - InMemoryConnector(Credentials credentials, InMemoryAccumulo acu, InMemoryInstance instance) throws AccumuloSecurityException { - if (credentials.getToken().isDestroyed()) - throw new AccumuloSecurityException(credentials.getPrincipal(), SecurityErrorCode.TOKEN_EXPIRED); - this.username = credentials.getPrincipal(); - this.acu = acu; - this.instance = instance; + this.username = username; + this.acu = instance.acu; + if (!acu.users.containsKey(username)) { + InMemoryUser user = new InMemoryUser(username, new PasswordToken(new byte[0]), Authorizations.EMPTY); + user.permissions.add(SystemPermission.SYSTEM); + acu.users.put(user.name, user); + } } - @Override public BatchScanner createBatchScanner(String tableName, Authorizations authorizations, int numQueryThreads) throws TableNotFoundException { if (acu.tables.get(tableName) == null) throw new TableNotFoundException(tableName, tableName, "no such table"); return acu.createBatchScanner(tableName, authorizations); } - @Override public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations, int numQueryThreads, long maxMemory, long maxLatency, int maxWriteThreads) throws TableNotFoundException { if (acu.tables.get(tableName) == null) @@ -73,36 +65,30 @@ public BatchDeleter createBatchDeleter(String tableName, Authorizations authoriz return new InMemoryBatchDeleter(acu, tableName, authorizations); } - @Override public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations, int numQueryThreads, BatchWriterConfig config) throws TableNotFoundException { return createBatchDeleter(tableName, authorizations, numQueryThreads, config.getMaxMemory(), config.getMaxLatency(TimeUnit.MILLISECONDS), config.getMaxWriteThreads()); } - @Override public BatchWriter createBatchWriter(String tableName, long maxMemory, long maxLatency, int maxWriteThreads) throws TableNotFoundException { if (acu.tables.get(tableName) == null) throw new TableNotFoundException(tableName, tableName, "no such table"); return new InMemoryBatchWriter(acu, tableName); } - @Override public BatchWriter createBatchWriter(String tableName, BatchWriterConfig config) throws TableNotFoundException { return createBatchWriter(tableName, config.getMaxMemory(), config.getMaxLatency(TimeUnit.MILLISECONDS), config.getMaxWriteThreads()); } - @Override public MultiTableBatchWriter createMultiTableBatchWriter(long maxMemory, long maxLatency, int maxWriteThreads) { return new InMemoryMultiTableBatchWriter(acu); } - @Override public MultiTableBatchWriter createMultiTableBatchWriter(BatchWriterConfig config) { return createMultiTableBatchWriter(config.getMaxMemory(), config.getMaxLatency(TimeUnit.MILLISECONDS), config.getMaxWriteThreads()); } - @Override public Scanner createScanner(String tableName, Authorizations authorizations) throws TableNotFoundException { InMemoryTable table = acu.tables.get(tableName); if (table == null) @@ -110,46 +96,29 @@ public Scanner createScanner(String tableName, Authorizations authorizations) th return new InMemoryScanner(table, authorizations); } - @Override - public Instance getInstance() { - return instance; - } - - @Override public String whoami() { return username; } - @Override public TableOperations tableOperations() { return new InMemoryTableOperations(acu, username); } - @Override public SecurityOperations securityOperations() { return new InMemorySecurityOperations(acu); } - @Override public InstanceOperations instanceOperations() { return new InMemoryInstanceOperations(acu); } - @Override public NamespaceOperations namespaceOperations() { return new InMemoryNamespaceOperations(acu, username); } - @Override public ConditionalWriter createConditionalWriter(String tableName, ConditionalWriterConfig config) throws TableNotFoundException { // TODO add implementation throw new UnsupportedOperationException(); } - @Override - public ReplicationOperations replicationOperations() { - // TODO add implementation - throw new UnsupportedOperationException(); - } - } diff --git a/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryInstance.java b/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryInstance.java index 62589f01c54..e6599165ccd 100644 --- a/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryInstance.java +++ b/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryInstance.java @@ -20,22 +20,17 @@ import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashMap; + import java.util.List; import java.util.Map; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.clientImpl.Credentials; -import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode; import org.apache.accumulo.core.util.ByteBufferUtil; import org.apache.accumulo.core.util.TextUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.io.Text; /** * InMemory Accumulo provides an in memory implementation of the Accumulo client API. It is possible that the behavior of this implementation may differ subtly @@ -48,7 +43,7 @@ * Accumulo. * */ -public class InMemoryInstance implements Instance { +public class InMemoryInstance { static final String genericAddress = "localhost:1234"; static final Map instances = new HashMap<>(); @@ -85,61 +80,30 @@ public InMemoryInstance(String instanceName, FileSystem fs) { this.instanceName = instanceName; } - @Override public String getRootTabletLocation() { return genericAddress; } - @Override public List getMasterLocations() { return Collections.singletonList(genericAddress); } - @Override public String getInstanceID() { return "mock-instance-id"; } - @Override public String getInstanceName() { return instanceName; } - @Override public String getZooKeepers() { return "localhost"; } - @Override public int getZooKeepersSessionTimeOut() { return 30 * 1000; } - @Override - public Connector getConnector(String user, byte[] pass) throws AccumuloException, AccumuloSecurityException { - return getConnector(user, new PasswordToken(pass)); - } - - @Override - public Connector getConnector(String user, ByteBuffer pass) throws AccumuloException, AccumuloSecurityException { - return getConnector(user, ByteBufferUtil.toBytes(pass)); - } - - @Override - public Connector getConnector(String user, CharSequence pass) throws AccumuloException, AccumuloSecurityException { - return getConnector(user, TextUtil.getBytes(new Text(pass.toString()))); - } - - @Override - public Connector getConnector(String principal, AuthenticationToken token) throws AccumuloException, AccumuloSecurityException { - Connector conn = new InMemoryConnector(new Credentials(principal, token), acu, this); - if (!acu.users.containsKey(principal)) - conn.securityOperations().createLocalUser(principal, (PasswordToken) token); - else if (!acu.users.get(principal).token.equals(token)) - throw new AccumuloSecurityException(principal, SecurityErrorCode.BAD_CREDENTIALS); - return conn; - } - public static class CachedConfiguration { private static Configuration configuration = null; diff --git a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java index 12acc1d138e..cdd5c675277 100644 --- a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java +++ b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java @@ -1064,9 +1064,11 @@ protected static TabletLocator getTabletLocator(Configuration conf) throws Table String tableName = getTablename(conf); Properties props = Accumulo.newClientProperties().to(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS)) .as(getUsername(conf), new PasswordToken(getPassword(conf))).build(); + + // SETH NOTE : I can't find a way to remove the usages of ClientInfo, ClientConfConverter, and ClientContext here since TableLocator.getLocator is the only access to return a TableLocator and happens to require them. + // If we reallllllyyy want to gut it, we could remove this too but it's not on the dep list so I'm going to leave it as is. ClientInfo info = ClientInfo.from(props); - ClientContext context = new ClientContext(SingletonManager.getClientReservation(), info, ClientConfConverter.toAccumuloConf(info.getProperties()), - Threads.UEH); + ClientContext context = new ClientContext(SingletonManager.getClientReservation(), info, ClientConfConverter.toAccumuloConf(info.getProperties()), Threads.UEH); return TabletLocator.getLocator(context, context.getTableId(tableName)); } @@ -1105,6 +1107,8 @@ public List getSplits(JobContext job) throws IOException { tl = getTabletLocator(job.getConfiguration()); // its possible that the cache could contain complete, but old information about a tables tablets... so clear it tl.invalidateCache(); + + // SETH NOTE: Same thing here, TableLocator is the stopper from removing ClientInfo, ClientConfConverter and ClientContext ClientInfo info = ClientInfo.from(cbHelper.newClientProperties()); ClientContext context = new ClientContext(SingletonManager.getClientReservation(), info, ClientConfConverter.toAccumuloConf(info.getProperties()), Threads.UEH); From c68d779b60e5ad8fb3301733eb7028eaf986d458 Mon Sep 17 00:00:00 2001 From: Seth Date: Tue, 14 Oct 2025 11:42:01 -0400 Subject: [PATCH 2/9] Move bin ranges to InMemoryTableOperations --- .../accumulo/inmemory/InMemoryTableOperations.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java b/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java index 7c879b2305d..a23d9635c75 100644 --- a/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java +++ b/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java @@ -49,12 +49,14 @@ import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.TimeType; import org.apache.accumulo.core.client.sample.SamplerConfiguration; +import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.clientImpl.TableOperationsHelper; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.crypto.CryptoFactoryLoader; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.TabletId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; @@ -560,11 +562,15 @@ public SamplerConfiguration getSamplerConfiguration(String tableName) throws Tab @Override public Locations locate(String tableName, Collection ranges) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { Map>> binnedRanges = new HashMap<>(); - InMemoryTabletLocator locator = new InMemoryTabletLocator(); - List ignore = locator.binRanges(null, new ArrayList<>(ranges), binnedRanges); + List ignore = binRanges(new ArrayList<>(ranges), binnedRanges); return new LocationsImpl(binnedRanges); } + private List binRanges(ArrayList ranges, Map>> binnedRanges) { + binnedRanges.put("", Collections.singletonMap(new KeyExtent(TableId.of(""), null, null), ranges)); + return Collections.emptyList(); + } + private static class LocationsImpl implements Locations { private Map> groupedByRanges; From f69071c6ff090f68637b8d2b173257c660f0e4a2 Mon Sep 17 00:00:00 2001 From: Seth Date: Mon, 20 Oct 2025 12:30:48 -0400 Subject: [PATCH 3/9] Remove unused imports --- .../datawave/accumulo/inmemory/InMemoryTableOperations.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java b/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java index a23d9635c75..26fab5f2dcc 100644 --- a/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java +++ b/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java @@ -49,7 +49,6 @@ import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.TimeType; import org.apache.accumulo.core.client.sample.SamplerConfiguration; -import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.clientImpl.TableOperationsHelper; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.crypto.CryptoFactoryLoader; @@ -79,8 +78,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import datawave.accumulo.inmemory.impl.InMemoryTabletLocator; - public class InMemoryTableOperations extends TableOperationsHelper { private static final Logger log = LoggerFactory.getLogger(InMemoryTableOperations.class); private static final byte[] ZERO = {0}; From 82423b319497a774ba84d177173ff0befff12a0c Mon Sep 17 00:00:00 2001 From: Seth Date: Tue, 21 Oct 2025 11:31:49 -0400 Subject: [PATCH 4/9] wip? --- .../datawave/mr/bulk/BulkInputFormat.java | 60 +++++++++++-------- 1 file changed, 34 insertions(+), 26 deletions(-) diff --git a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java index 12acc1d138e..26e49264b34 100644 --- a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java +++ b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java @@ -9,6 +9,7 @@ import java.text.DateFormat; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -1047,29 +1048,36 @@ protected static LocationStrategy getLocationStrategy(Configuration conf) { return new DefaultLocationStrategy(); } - /** - * Initializes an Accumulo {@link TabletLocator} based on the configuration. - * - * @param conf - * the Hadoop configuration object - * @return an accumulo tablet locator - * @throws TableNotFoundException - * if the table name set on the configuration doesn't exist - * @throws IOException - * if the input format is unable to read the password file from the FileSystem - */ - protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException, IOException { - if (conf.getBoolean(MOCK, false)) - return new InMemoryTabletLocator(); - String tableName = getTablename(conf); - Properties props = Accumulo.newClientProperties().to(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS)) - .as(getUsername(conf), new PasswordToken(getPassword(conf))).build(); - ClientInfo info = ClientInfo.from(props); - ClientContext context = new ClientContext(SingletonManager.getClientReservation(), info, ClientConfConverter.toAccumuloConf(info.getProperties()), - Threads.UEH); - return TabletLocator.getLocator(context, context.getTableId(tableName)); +// /** +// * Initializes an Accumulo {@link TabletLocator} based on the configuration. +// * +// * @param conf +// * the Hadoop configuration object +// * @return an accumulo tablet locator +// * @throws TableNotFoundException +// * if the table name set on the configuration doesn't exist +// * @throws IOException +// * if the input format is unable to read the password file from the FileSystem +// */ +// protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException, IOException { +// if (conf.getBoolean(MOCK, false)) +// return new InMemoryTabletLocator(); +// String tableName = getTablename(conf); +// Properties props = Accumulo.newClientProperties().to(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS)) +// .as(getUsername(conf), new PasswordToken(getPassword(conf))).build(); +// ClientInfo info = ClientInfo.from(props); +// ClientContext context = new ClientContext(SingletonManager.getClientReservation(), info, ClientConfConverter.toAccumuloConf(info.getProperties()), +// Threads.UEH); +// return TabletLocator.getLocator(context, context.getTableId(tableName)); +// } + + public List binRanges(ClientContext context, List ranges, Map>> binnedRanges) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + binnedRanges.put("", Collections.singletonMap(new KeyExtent(TableId.of(""), null, null), ranges)); + return Collections.emptyList(); } + /** * Read the metadata table to get tablets and match up ranges to them. */ @@ -1090,7 +1098,7 @@ public List getSplits(JobContext job) throws IOException { // get the metadata information for these ranges Map>> binnedRanges = new HashMap<>(); - TabletLocator tl; +// TabletLocator tl; try { if (isOfflineScan(job.getConfiguration())) { binnedRanges = binOfflineTable(job, tableName, ranges); @@ -1102,13 +1110,13 @@ public List getSplits(JobContext job) throws IOException { } else { try (AccumuloClient client = getClient(job.getConfiguration())) { TableId tableId = null; - tl = getTabletLocator(job.getConfiguration()); +// tl = getTabletLocator(job.getConfiguration()); // its possible that the cache could contain complete, but old information about a tables tablets... so clear it - tl.invalidateCache(); +// tl.invalidateCache(); ClientInfo info = ClientInfo.from(cbHelper.newClientProperties()); ClientContext context = new ClientContext(SingletonManager.getClientReservation(), info, ClientConfConverter.toAccumuloConf(info.getProperties()), Threads.UEH); - while (!tl.binRanges(context, ranges, binnedRanges).isEmpty()) { + while (!binRanges(context, ranges, binnedRanges).isEmpty()) { if (!(client instanceof InMemoryAccumuloClient)) { if (tableId == null) tableId = context.getTableId(tableName); @@ -1120,7 +1128,7 @@ public List getSplits(JobContext job) throws IOException { binnedRanges.clear(); log.warn("Unable to locate bins for specified ranges. Retrying."); TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(100, 200)); - tl.invalidateCache(); +// tl.invalidateCache(); } clipRanges(binnedRanges); From 63b1c5fdd13a33140eb8ef488135e388eba095ad Mon Sep 17 00:00:00 2001 From: Seth Date: Thu, 23 Oct 2025 16:06:17 -0400 Subject: [PATCH 5/9] update --- .../datawave/mr/bulk/BulkInputFormat.java | 58 +++++++++---------- 1 file changed, 27 insertions(+), 31 deletions(-) diff --git a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java index 26e49264b34..a4fec671af5 100644 --- a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java +++ b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java @@ -17,7 +17,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Properties; import java.util.Set; import java.util.StringTokenizer; import java.util.UUID; @@ -41,7 +40,6 @@ import org.apache.accumulo.core.clientImpl.ClientConfConverter; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.clientImpl.ClientInfo; -import org.apache.accumulo.core.clientImpl.TabletLocator; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.PartialKey; import org.apache.accumulo.core.data.Range; @@ -84,7 +82,6 @@ import datawave.accumulo.inmemory.InMemoryAccumuloClient; import datawave.accumulo.inmemory.InMemoryInstance; -import datawave.accumulo.inmemory.impl.InMemoryTabletLocator; import datawave.common.util.ArgumentChecker; import datawave.ingest.data.config.ingest.AccumuloHelper; import datawave.mr.bulk.split.DefaultLocationStrategy; @@ -1048,36 +1045,35 @@ protected static LocationStrategy getLocationStrategy(Configuration conf) { return new DefaultLocationStrategy(); } -// /** -// * Initializes an Accumulo {@link TabletLocator} based on the configuration. -// * -// * @param conf -// * the Hadoop configuration object -// * @return an accumulo tablet locator -// * @throws TableNotFoundException -// * if the table name set on the configuration doesn't exist -// * @throws IOException -// * if the input format is unable to read the password file from the FileSystem -// */ -// protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException, IOException { -// if (conf.getBoolean(MOCK, false)) -// return new InMemoryTabletLocator(); -// String tableName = getTablename(conf); -// Properties props = Accumulo.newClientProperties().to(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS)) -// .as(getUsername(conf), new PasswordToken(getPassword(conf))).build(); -// ClientInfo info = ClientInfo.from(props); -// ClientContext context = new ClientContext(SingletonManager.getClientReservation(), info, ClientConfConverter.toAccumuloConf(info.getProperties()), -// Threads.UEH); -// return TabletLocator.getLocator(context, context.getTableId(tableName)); -// } + // /** + // * Initializes an Accumulo {@link TabletLocator} based on the configuration. + // * + // * @param conf + // * the Hadoop configuration object + // * @return an accumulo tablet locator + // * @throws TableNotFoundException + // * if the table name set on the configuration doesn't exist + // * @throws IOException + // * if the input format is unable to read the password file from the FileSystem + // */ + // protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException, IOException { + // if (conf.getBoolean(MOCK, false)) + // return new InMemoryTabletLocator(); + // String tableName = getTablename(conf); + // Properties props = Accumulo.newClientProperties().to(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS)) + // .as(getUsername(conf), new PasswordToken(getPassword(conf))).build(); + // ClientInfo info = ClientInfo.from(props); + // ClientContext context = new ClientContext(SingletonManager.getClientReservation(), info, ClientConfConverter.toAccumuloConf(info.getProperties()), + // Threads.UEH); + // return TabletLocator.getLocator(context, context.getTableId(tableName)); + // } public List binRanges(ClientContext context, List ranges, Map>> binnedRanges) - throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + throws AccumuloException, AccumuloSecurityException, TableNotFoundException { binnedRanges.put("", Collections.singletonMap(new KeyExtent(TableId.of(""), null, null), ranges)); return Collections.emptyList(); } - /** * Read the metadata table to get tablets and match up ranges to them. */ @@ -1098,7 +1094,7 @@ public List getSplits(JobContext job) throws IOException { // get the metadata information for these ranges Map>> binnedRanges = new HashMap<>(); -// TabletLocator tl; + // TabletLocator tl; try { if (isOfflineScan(job.getConfiguration())) { binnedRanges = binOfflineTable(job, tableName, ranges); @@ -1110,9 +1106,9 @@ public List getSplits(JobContext job) throws IOException { } else { try (AccumuloClient client = getClient(job.getConfiguration())) { TableId tableId = null; -// tl = getTabletLocator(job.getConfiguration()); + // tl = getTabletLocator(job.getConfiguration()); // its possible that the cache could contain complete, but old information about a tables tablets... so clear it -// tl.invalidateCache(); + // tl.invalidateCache(); ClientInfo info = ClientInfo.from(cbHelper.newClientProperties()); ClientContext context = new ClientContext(SingletonManager.getClientReservation(), info, ClientConfConverter.toAccumuloConf(info.getProperties()), Threads.UEH); @@ -1128,7 +1124,7 @@ public List getSplits(JobContext job) throws IOException { binnedRanges.clear(); log.warn("Unable to locate bins for specified ranges. Retrying."); TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(100, 200)); -// tl.invalidateCache(); + // tl.invalidateCache(); } clipRanges(binnedRanges); From 3f84d0c480290d238a1141505339b9fcccfbda5d Mon Sep 17 00:00:00 2001 From: Seth Date: Fri, 24 Oct 2025 14:02:06 -0400 Subject: [PATCH 6/9] wip --- .../datawave/mr/bulk/BulkInputFormat.java | 34 ++----------------- 1 file changed, 2 insertions(+), 32 deletions(-) diff --git a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java index a4fec671af5..4219c66c409 100644 --- a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java +++ b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java @@ -1045,28 +1045,6 @@ protected static LocationStrategy getLocationStrategy(Configuration conf) { return new DefaultLocationStrategy(); } - // /** - // * Initializes an Accumulo {@link TabletLocator} based on the configuration. - // * - // * @param conf - // * the Hadoop configuration object - // * @return an accumulo tablet locator - // * @throws TableNotFoundException - // * if the table name set on the configuration doesn't exist - // * @throws IOException - // * if the input format is unable to read the password file from the FileSystem - // */ - // protected static TabletLocator getTabletLocator(Configuration conf) throws TableNotFoundException, IOException { - // if (conf.getBoolean(MOCK, false)) - // return new InMemoryTabletLocator(); - // String tableName = getTablename(conf); - // Properties props = Accumulo.newClientProperties().to(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS)) - // .as(getUsername(conf), new PasswordToken(getPassword(conf))).build(); - // ClientInfo info = ClientInfo.from(props); - // ClientContext context = new ClientContext(SingletonManager.getClientReservation(), info, ClientConfConverter.toAccumuloConf(info.getProperties()), - // Threads.UEH); - // return TabletLocator.getLocator(context, context.getTableId(tableName)); - // } public List binRanges(ClientContext context, List ranges, Map>> binnedRanges) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { @@ -1094,7 +1072,7 @@ public List getSplits(JobContext job) throws IOException { // get the metadata information for these ranges Map>> binnedRanges = new HashMap<>(); - // TabletLocator tl; + try { if (isOfflineScan(job.getConfiguration())) { binnedRanges = binOfflineTable(job, tableName, ranges); @@ -1106,9 +1084,6 @@ public List getSplits(JobContext job) throws IOException { } else { try (AccumuloClient client = getClient(job.getConfiguration())) { TableId tableId = null; - // tl = getTabletLocator(job.getConfiguration()); - // its possible that the cache could contain complete, but old information about a tables tablets... so clear it - // tl.invalidateCache(); ClientInfo info = ClientInfo.from(cbHelper.newClientProperties()); ClientContext context = new ClientContext(SingletonManager.getClientReservation(), info, ClientConfConverter.toAccumuloConf(info.getProperties()), Threads.UEH); @@ -1124,7 +1099,6 @@ public List getSplits(JobContext job) throws IOException { binnedRanges.clear(); log.warn("Unable to locate bins for specified ranges. Retrying."); TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(100, 200)); - // tl.invalidateCache(); } clipRanges(binnedRanges); @@ -1170,13 +1144,9 @@ public List getSplits(JobContext job) throws IOException { log.info("There are approximately {} values ", map.size()); for (RangeSplit split : map.keySet()) { - // Iterable> rangeIter = splitter.partition(map.get(split)); - // for (List rangeList : rangeIter) { - // RangeSplit newSplit = (RangeSplit) split.clone(); - // newSplit.addRanges(rangeList); + split.addRanges(map.get(split)); splits.add(split); - // } } From fd8e4d17c0c49119d44e2258d56f6393201395ce Mon Sep 17 00:00:00 2001 From: Seth Date: Mon, 27 Oct 2025 11:47:12 -0400 Subject: [PATCH 7/9] formatting?: --- .../core/src/main/java/datawave/mr/bulk/BulkInputFormat.java | 1 - 1 file changed, 1 deletion(-) diff --git a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java index 4219c66c409..7b70c238a7a 100644 --- a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java +++ b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java @@ -1045,7 +1045,6 @@ protected static LocationStrategy getLocationStrategy(Configuration conf) { return new DefaultLocationStrategy(); } - public List binRanges(ClientContext context, List ranges, Map>> binnedRanges) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { binnedRanges.put("", Collections.singletonMap(new KeyExtent(TableId.of(""), null, null), ranges)); From 810154861035f8ebb96ba2afa5c3a7ba41e9cdba Mon Sep 17 00:00:00 2001 From: Seth Date: Mon, 27 Oct 2025 12:02:46 -0400 Subject: [PATCH 8/9] format --- .../datawave/accumulo/inmemory/InMemoryTableOperations.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java b/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java index 26fab5f2dcc..628df0b460c 100644 --- a/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java +++ b/core/in-memory-accumulo/src/main/java/datawave/accumulo/inmemory/InMemoryTableOperations.java @@ -563,7 +563,7 @@ public Locations locate(String tableName, Collection ranges) throws Accum return new LocationsImpl(binnedRanges); } - private List binRanges(ArrayList ranges, Map>> binnedRanges) { + private List binRanges(ArrayList ranges, Map>> binnedRanges) { binnedRanges.put("", Collections.singletonMap(new KeyExtent(TableId.of(""), null, null), ranges)); return Collections.emptyList(); } From f85deba5b611afd3c08b27bbc8988d7138152718 Mon Sep 17 00:00:00 2001 From: Seth Date: Tue, 2 Dec 2025 11:42:33 -0500 Subject: [PATCH 9/9] replace with tableOperations --- .../java/datawave/mr/bulk/BulkInputFormat.java | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java index 7b70c238a7a..1263c5df154 100644 --- a/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java +++ b/warehouse/core/src/main/java/datawave/mr/bulk/BulkInputFormat.java @@ -37,8 +37,8 @@ import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.TableOfflineException; import org.apache.accumulo.core.client.security.tokens.PasswordToken; + import org.apache.accumulo.core.clientImpl.ClientConfConverter; -import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.clientImpl.ClientInfo; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.PartialKey; @@ -1045,7 +1045,7 @@ protected static LocationStrategy getLocationStrategy(Configuration conf) { return new DefaultLocationStrategy(); } - public List binRanges(ClientContext context, List ranges, Map>> binnedRanges) + public List binRanges(List ranges, Map>> binnedRanges) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { binnedRanges.put("", Collections.singletonMap(new KeyExtent(TableId.of(""), null, null), ranges)); return Collections.emptyList(); @@ -1083,16 +1083,14 @@ public List getSplits(JobContext job) throws IOException { } else { try (AccumuloClient client = getClient(job.getConfiguration())) { TableId tableId = null; - ClientInfo info = ClientInfo.from(cbHelper.newClientProperties()); - ClientContext context = new ClientContext(SingletonManager.getClientReservation(), info, - ClientConfConverter.toAccumuloConf(info.getProperties()), Threads.UEH); - while (!binRanges(context, ranges, binnedRanges).isEmpty()) { + + while (!binRanges(ranges, binnedRanges).isEmpty()) { if (!(client instanceof InMemoryAccumuloClient)) { if (tableId == null) - tableId = context.getTableId(tableName); - if (!context.tableNodeExists(tableId)) + tableId = TableId.of(client.tableOperations().tableIdMap().get(tableName));//TABLE ID todo:ensure this is correct + if (!client.tableOperations().tableIdMap().containsKey(tableName))// CHECK IF TABLE ID EXISTS //todo same throw new TableDeletedException(tableId.canonical()); - if (context.getTableState(tableId) == TableState.OFFLINE) + if (!client.tableOperations().isOnline(tableName)) //CHECK IF TABLE ID OFFLInE //todo same throw new TableOfflineException("Table (" + tableId.canonical() + ") is offline"); } binnedRanges.clear();