From 442bbcdc5f715ee51c75b0840cea9e1396e02619 Mon Sep 17 00:00:00 2001 From: psasikaladevi Date: Mon, 18 Jul 2022 19:59:30 +0530 Subject: [PATCH 1/6] Added support for Rainbow API --- .../core/QueryServiceAbstractStatement.java | 25 ++- .../core/QueryServiceConnection.java | 8 + .../core/RainbowQueryResultSet.java | 171 ++++++++++++++++++ .../cdp/queryservice/util/ArrowUtil.java | 86 ++++++++- .../cdp/queryservice/util/Constants.java | 3 + .../util/QueryRainbowExecutor.java | 46 +++++ .../queryservice/util/RainbowDataStream.java | 47 +++++ 7 files changed, 382 insertions(+), 4 deletions(-) create mode 100644 src/main/java/com/salesforce/cdp/queryservice/core/RainbowQueryResultSet.java create mode 100644 src/main/java/com/salesforce/cdp/queryservice/util/QueryRainbowExecutor.java create mode 100644 src/main/java/com/salesforce/cdp/queryservice/util/RainbowDataStream.java diff --git a/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceAbstractStatement.java b/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceAbstractStatement.java index b26acf5..b1a6877 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceAbstractStatement.java +++ b/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceAbstractStatement.java @@ -18,6 +18,7 @@ import com.google.protobuf.Struct; import com.google.protobuf.Value; +import com.salesforce.a360.queryservice.grpc.v1.AnsiSqlExtractQueryResponse; import com.salesforce.a360.queryservice.grpc.v1.AnsiSqlQueryStreamResponse; import com.salesforce.cdp.queryservice.model.QueryServiceResponse; import com.salesforce.cdp.queryservice.model.Type; @@ -27,6 +28,7 @@ import com.salesforce.cdp.queryservice.util.QueryExecutor; import com.salesforce.cdp.queryservice.util.HttpHelper; import com.salesforce.cdp.queryservice.util.QueryGrpcExecutor; +import com.salesforce.cdp.queryservice.util.QueryRainbowExecutor; import lombok.extern.slf4j.Slf4j; import okhttp3.Response; @@ -55,6 +57,8 @@ public abstract class QueryServiceAbstractStatement { private QueryGrpcExecutor queryGrpcExecutor; + private QueryRainbowExecutor queryRainbowExecutor; + private static final String KEY_TYPE = "type"; private static final String KEY_TYPE_CODE = "typeCode"; private static final String KEY_PLACE_IN_ORDER = "placeInOrder"; @@ -67,6 +71,7 @@ public QueryServiceAbstractStatement(QueryServiceConnection queryServiceConnecti this.resultSetConcurrency = resultSetConcurrency; this.queryExecutor = createQueryExecutor(); this.queryGrpcExecutor = createQueryGrpcExecutor(); + this.queryRainbowExecutor = createQueryRainbowExecutor(); } public ResultSet executeQuery(String sql) throws SQLException { @@ -79,8 +84,11 @@ public ResultSet executeQuery(String sql) throws SQLException { boolean requireManagedPagination = isTableauQuery() && !isCursorBasedPaginationReq; Optional limit = requireManagedPagination ? Optional.of(Constants.MAX_LIMIT) : Optional.empty(); Optional orderby = requireManagedPagination ? Optional.of("1 ASC") : Optional.empty(); - - if(isEnableStreamFlow) { + if(connection.isRainbowConnection()){ + Iterator response = queryRainbowExecutor.executeQuery(sql); + return createResultSetFromRainbowResponse(response); + } + else if(isEnableStreamFlow) { Iterator response = queryGrpcExecutor.executeQueryWithRetry(sql); return createResultSetFromResponse(response); } else { @@ -118,7 +126,14 @@ private boolean isTableauQuery() throws SQLException { String userAgent = connection.getClientInfo(Constants.USER_AGENT); return Constants.TABLEAU_USER_AGENT_VALUE.equals(userAgent); } - + private ResultSet createResultSetFromRainbowResponse(Iterator response) throws SQLException { + try{ + return new RainbowQueryResultSet(response,this); + } + catch (Exception e){ + throw new SQLException(QUERY_EXCEPTION); + } + } private ResultSet createResultSetFromResponse(QueryServiceResponse queryServiceResponse, boolean isCursorBasedPaginationReq) throws SQLException { ArrowUtil arrowUtil = new ArrowUtil(); paginationRequired = !queryServiceResponse.isDone(); @@ -227,4 +242,8 @@ protected QueryExecutor createQueryExecutor() { protected QueryGrpcExecutor createQueryGrpcExecutor() { return new QueryGrpcExecutor(connection); } + + protected QueryRainbowExecutor createQueryRainbowExecutor(){ + return new QueryRainbowExecutor(connection); + } } diff --git a/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceConnection.java b/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceConnection.java index 9472aad..0e452a8 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceConnection.java +++ b/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceConnection.java @@ -41,6 +41,7 @@ public class QueryServiceConnection implements Connection { private boolean isCursorBasedPaginationReq = true; private final boolean isSocksProxyDisabled; private boolean enableStreamFlow = false; + private boolean rainbowConnection = false; private String tenantUrl; public QueryServiceConnection(String url, Properties properties) throws SQLException { @@ -52,6 +53,9 @@ public QueryServiceConnection(String url, Properties properties) throws SQLExcep // default `enableArrowStream` is false enableArrowStream = Boolean.parseBoolean(this.properties.getProperty(Constants.ENABLE_ARROW_STREAM)); + //Rainbow connection by default false + rainbowConnection = Boolean.parseBoolean(properties.getProperty(Constants.RAINBOW_CLIENT,Constants.FALSE_STR)); + // default `isCursorBasedPaginationReq` is true isCursorBasedPaginationReq = Boolean.parseBoolean(this.properties.getProperty(Constants.CURSOR_BASED_PAGINATION, Constants.TRUE_STR)); @@ -456,4 +460,8 @@ public String getTenantUrl() { public void setTenantUrl(String tenantUrl) { this.tenantUrl = tenantUrl; } + + public boolean isRainbowConnection() { + return rainbowConnection; + } } \ No newline at end of file diff --git a/src/main/java/com/salesforce/cdp/queryservice/core/RainbowQueryResultSet.java b/src/main/java/com/salesforce/cdp/queryservice/core/RainbowQueryResultSet.java new file mode 100644 index 0000000..cab0234 --- /dev/null +++ b/src/main/java/com/salesforce/cdp/queryservice/core/RainbowQueryResultSet.java @@ -0,0 +1,171 @@ +package com.salesforce.cdp.queryservice.core; + +import com.google.protobuf.ListValue; +import com.google.protobuf.Value; +import com.salesforce.a360.queryservice.grpc.v1.AnsiSqlExtractQueryResponse; +import com.salesforce.a360.queryservice.grpc.v1.AnsiSqlQueryStreamResponse; +import com.salesforce.cdp.queryservice.util.ArrowUtil; +import com.salesforce.cdp.queryservice.util.RainbowDataStream; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; + +@Slf4j +public class RainbowQueryResultSet extends QueryServiceResultSet{ + List data ; + private RainbowDataStream dataStream; + private Iterator streamIterator; + private ArrowUtil arrowUtil; + + public RainbowQueryResultSet(Iterator response, QueryServiceAbstractStatement statement) { + streamIterator = response; + arrowUtil = new ArrowUtil(); + dataStream = new RainbowDataStream(response); + initialiseArrowReader(); + this.statement = statement; + } + + private void initialiseArrowReader() { + arrowUtil.createArrowReaderForStream(dataStream); + } + @Override + public boolean next() throws SQLException { + try { + errorOutIfClosed(); + + if (currentRow == -1 && isNextChunkPresent()) { + getNextChunk(); + } else { + currentRow++; + } + + if (currentRow < data.size()) { + return true; + } + + if (isNextChunkPresent()) { + getNextChunk(); + if (data != null && data.size() > 0) { + return true; + } + } + arrowUtil.closeReader(); + closeDataStream(); + // Closing as this is move forward only cursor. + log.info("Resultset {} does not have any more rows. Total {} pages retrieved", this, currentPageNum); + return false; + } + catch (SQLException e){ + + closeDataStream(); + throw e; + } + } + + private void closeDataStream() { + if(dataStream != null) { + try { + dataStream.close(); + } + catch (IOException ex) { + ex.printStackTrace(); + } + dataStream = null; + } + } + + @Override + public Object getObject(int columnIndex) throws SQLException { + try{ + errorOutIfClosed(); + Object value = getValue(data.get(currentRow), columnIndex); + wasNull.set(value == null); + return value;} + catch(SQLException e) { + closeDataStream(); + throw e; + } + } + + @Override + public Object getObject(String columnLabel) throws SQLException { + throw new SQLException("Not Implemented"); + } + + @Override + protected Object getValue(Object row, String columnLabel) throws SQLException { + throw new SQLException("Not Implemented"); + } + + private Object getValue(Object row, int columnIndex) throws SQLException { + return valueToObject(((ListValue) row).getValues(columnIndex)); + } + + private static Object valueToObject(Value value) { + switch (value.getKindCase()) { + case NULL_VALUE: + return null; + case NUMBER_VALUE: + return value.getNumberValue(); + case STRING_VALUE: + return value.getStringValue(); + case BOOL_VALUE: + return value.getBoolValue(); + default: + throw new IllegalArgumentException(String.format("Unsupported protobuf value %s", value)); + } + } + + private int getColumnIndexByName(String columnName) throws SQLException { + return ((QueryServiceResultSetMetaData)resultSetMetaData).getColumnNameToPosition().get(columnName); + } + + private void getNextChunk() throws SQLException { + log.trace("Fetching page with number {} for resultset {}", ++currentPageNum, this); + + try { + List rows = arrowUtil.getRowsFromRainbowResponse(); + if(rows != null && rows.size()>0){ + this.data = rows; + currentRow=0; + } + } catch (Exception e) { + log.error("Error while getting the data chunk {}", this, e); + closeDataStream(); + throw new SQLException(e.getMessage()); + } + } + + @Override + protected ResultSet getNextPageData() throws SQLException { + throw new SQLException("This method is not implemented"); + } + + @Override + protected void updateState(ResultSet resultSet) throws SQLException { + throw new SQLException("This method is not implemented"); + } + + @Override + public boolean isAfterLast() throws SQLException { + return !this.isNextChunkPresent() && this.currentRow >= this.data.size(); + } + + @Override + public boolean isLast() throws SQLException { + return !this.isNextChunkPresent() && this.currentRow == this.data.size() - 1; + } + + private boolean isNextChunkPresent() throws SQLException { + try { + return streamIterator.hasNext(); + } catch (Exception e) { + throw new SQLException(e.getMessage()); + } + } +} diff --git a/src/main/java/com/salesforce/cdp/queryservice/util/ArrowUtil.java b/src/main/java/com/salesforce/cdp/queryservice/util/ArrowUtil.java index 558189e..9697ce7 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/util/ArrowUtil.java +++ b/src/main/java/com/salesforce/cdp/queryservice/util/ArrowUtil.java @@ -4,26 +4,36 @@ import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.BigIntVector; import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.DateDayVector; import org.apache.arrow.vector.DecimalVector; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.Float4Vector; import org.apache.arrow.vector.Float8Vector; import org.apache.arrow.vector.IntVector; import org.apache.arrow.vector.SmallIntVector; +import org.apache.arrow.vector.TimeNanoVector; +import org.apache.arrow.vector.TimeStampNanoTZVector; +import org.apache.arrow.vector.TimeStampNanoVector; import org.apache.arrow.vector.TinyIntVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowReader; import org.apache.arrow.vector.ipc.ArrowStreamReader; import org.apache.arrow.vector.types.Types; import org.apache.arrow.vector.util.Text; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; +import java.nio.channels.Channel; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; import java.sql.SQLException; import java.text.MessageFormat; import java.util.ArrayList; import java.util.Base64; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; @@ -31,6 +41,10 @@ * This class contains the utilities for processing the arrow stream. */ public class ArrowUtil { + + private ArrowStreamReader arrowStreamReader; + private RootAllocator streamRootAllocator; + /** * Converts the arrow stream in to List> so that it can then be converted into result set format. * @param queryServiceResponse Response received from query service @@ -97,7 +111,47 @@ public List getResultSetDataFromArrowStream(QueryServiceResponse querySe } return data; } - + public void createArrowReaderForStream(InputStream inputStream){ + if(arrowStreamReader == null){ + streamRootAllocator = new RootAllocator(Long.MAX_VALUE); + arrowStreamReader =new ArrowStreamReader(Channels.newChannel(inputStream), streamRootAllocator ); + } + } + public List getRowsFromRainbowResponse() throws SQLException { + if(arrowStreamReader == null){ + throw new SQLException("Arrow Reader not created for RainbowStream "); + } + List fieldVectors = null; + List data = new ArrayList<>(); + try{ + VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot(); + fieldVectors = root.getFieldVectors(); + while (arrowStreamReader.loadNextBatch()) { + int rowCount = fieldVectors.get(0).getValueCount(); + for(int i=0;i row = new ArrayList<>(); + for(FieldVector fieldVector : fieldVectors) { + Object fieldValue = this.getFieldValue(fieldVector,i); + row.add(fieldValue); + } + data.add(row); + } + }}catch(Exception e){ + if(fieldVectors != null){ + for(FieldVector fieldVector : fieldVectors) { + if(fieldVector != null) { + fieldVector.close(); + } + } + fieldVectors = null; + } + if(streamRootAllocator != null) { + streamRootAllocator.close(); + streamRootAllocator = null; + } + } + return data; + } private Object getFieldValue(FieldVector fieldVector, int index) throws SQLException { Types.MinorType type = Types.getMinorTypeForArrowType(fieldVector.getField().getType()); @@ -125,7 +179,37 @@ private Object getFieldValue(FieldVector fieldVector, int index) throws SQLExcep return ((BigIntVector) fieldVector).get(index); } else if (type == Types.MinorType.BIT) { return (int) ((BitVector) fieldVector).get(index) == 1; + } else if(type ==Types.MinorType.DATEDAY){ + return ((DateDayVector)fieldVector).getObject(index); + }else if(type ==Types.MinorType.TIMENANO){ + return ((TimeNanoVector)fieldVector).getObject(index); + }else if(type == Types.MinorType.TIMESTAMPNANOTZ){ + return ((TimeStampNanoTZVector)fieldVector).getObject(index); + }else if(type == Types.MinorType.TIMESTAMPNANO){ + return ((TimeStampNanoVector)fieldVector).getObject(index); } throw new SQLException(MessageFormat.format("Unknown arrow type {0}", type.name())); } + + public void closeReader() { + try{ + VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot(); + List vectors = root.getFieldVectors(); + if(vectors != null){ + for(FieldVector fieldVector : vectors) { + if(fieldVector != null) { + fieldVector.close(); + } + } + vectors = null; + } + if(streamRootAllocator != null) { + streamRootAllocator.close(); + streamRootAllocator = null; + } + + } catch (IOException e) { + e.printStackTrace(); + } + } } diff --git a/src/main/java/com/salesforce/cdp/queryservice/util/Constants.java b/src/main/java/com/salesforce/cdp/queryservice/util/Constants.java index 9efa39b..3331cf2 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/util/Constants.java +++ b/src/main/java/com/salesforce/cdp/queryservice/util/Constants.java @@ -130,4 +130,7 @@ public class Constants { public static final String BEGIN_PRIVATE_KEY = "-----BEGIN PRIVATE KEY-----"; public static final String END_PRIVATE_KEY = "-----END PRIVATE KEY-----"; + public static final String RAINBOW_CLIENT = "Rainbow-Client" ; + public static final int END_OF_STREAM = -1; + public static final int START_OF_STREAM = 0; } diff --git a/src/main/java/com/salesforce/cdp/queryservice/util/QueryRainbowExecutor.java b/src/main/java/com/salesforce/cdp/queryservice/util/QueryRainbowExecutor.java new file mode 100644 index 0000000..0e12407 --- /dev/null +++ b/src/main/java/com/salesforce/cdp/queryservice/util/QueryRainbowExecutor.java @@ -0,0 +1,46 @@ +package com.salesforce.cdp.queryservice.util; + +import com.salesforce.a360.queryservice.grpc.v1.AnsiSqlExtractQueryRequest; +import com.salesforce.a360.queryservice.grpc.v1.AnsiSqlExtractQueryResponse; +import com.salesforce.a360.queryservice.grpc.v1.AnsiSqlQueryStreamRequest; +import com.salesforce.a360.queryservice.grpc.v1.AnsiSqlQueryStreamResponse; +import com.salesforce.a360.queryservice.grpc.v1.QueryServiceGrpc; +import com.salesforce.cdp.queryservice.core.QueryServiceConnection; +import com.salesforce.cdp.queryservice.interceptors.GrpcInterceptor; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import lombok.extern.slf4j.Slf4j; +import okhttp3.OkHttpClient; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class QueryRainbowExecutor extends QueryTokenExecutor{ + + private static final int port = 443; + private static final int timeoutInMin = 5; + + private final ManagedChannel channel; + + public QueryRainbowExecutor(QueryServiceConnection connection) { + super(connection); + channel = ManagedChannelBuilder.forAddress(connection.getTenantUrl(), port).build(); + } + + public QueryRainbowExecutor(QueryServiceConnection connection, OkHttpClient client) { + super(connection, client); + channel = ManagedChannelBuilder.forAddress(connection.getTenantUrl(), port).build(); + } + public Iterator executeQuery(String sql) throws IOException, SQLException { + log.info("Preparing to execute query rainbow query {}", sql); + Map tokenUrl = getTokenWithTenantUrl(); + QueryServiceGrpc.QueryServiceBlockingStub stub = QueryServiceGrpc.newBlockingStub(channel); + Properties properties = connection.getClientInfo(); + return stub.withDeadlineAfter(timeoutInMin, TimeUnit.MINUTES).withInterceptors(new GrpcInterceptor(tokenUrl, properties)).aniSqlExtractQuery(AnsiSqlExtractQueryRequest.newBuilder().setQuery(sql).build()); + } +} diff --git a/src/main/java/com/salesforce/cdp/queryservice/util/RainbowDataStream.java b/src/main/java/com/salesforce/cdp/queryservice/util/RainbowDataStream.java new file mode 100644 index 0000000..eee2f87 --- /dev/null +++ b/src/main/java/com/salesforce/cdp/queryservice/util/RainbowDataStream.java @@ -0,0 +1,47 @@ +package com.salesforce.cdp.queryservice.util; + +import com.salesforce.a360.queryservice.grpc.v1.AnsiSqlExtractQueryResponse; +import com.salesforce.a360.queryservice.grpc.v1.AnsiSqlQueryStreamResponse; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; + +public class RainbowDataStream extends InputStream { + Iterator streamIterator; + byte[] arrowMessage; + + int curIndex; + + public RainbowDataStream(Iterator responseIterator){ + this.streamIterator = responseIterator; + curIndex =0; + } + @Override + public int read() throws IOException { + if(arrowMessage == null || arrowMessage.length <= curIndex ) { + if(streamIterator.hasNext()){ + curIndex = getNextChunk(); + } + if(curIndex == Constants.END_OF_STREAM){ + return Constants.END_OF_STREAM; + } + } + return arrowMessage[curIndex++]; + } + + private int getNextChunk() { + AnsiSqlExtractQueryResponse response = streamIterator.next(); + if(response !=null && response.getData() !=null ){ + arrowMessage = response.getData().toByteArray(); + } + if(arrowMessage.length != 0){ + return Constants.START_OF_STREAM; + } + else { + arrowMessage = null; + return Constants.END_OF_STREAM; + } + } + +} From 139a182c5a6adf3c28ab4ddb57edd50cb226424b Mon Sep 17 00:00:00 2001 From: psasikaladevi Date: Tue, 26 Jul 2022 14:02:13 +0530 Subject: [PATCH 2/6] Fix for null pointer when the response is empty --- .../cdp/queryservice/core/RainbowQueryResultSet.java | 4 ++-- .../salesforce/cdp/queryservice/util/QueryTokenExecutor.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/salesforce/cdp/queryservice/core/RainbowQueryResultSet.java b/src/main/java/com/salesforce/cdp/queryservice/core/RainbowQueryResultSet.java index cab0234..c7664d3 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/core/RainbowQueryResultSet.java +++ b/src/main/java/com/salesforce/cdp/queryservice/core/RainbowQueryResultSet.java @@ -40,11 +40,11 @@ public boolean next() throws SQLException { if (currentRow == -1 && isNextChunkPresent()) { getNextChunk(); - } else { + } else if(data !=null) { currentRow++; } - if (currentRow < data.size()) { + if (data!=null && currentRow < data.size()) { return true; } diff --git a/src/main/java/com/salesforce/cdp/queryservice/util/QueryTokenExecutor.java b/src/main/java/com/salesforce/cdp/queryservice/util/QueryTokenExecutor.java index bce9f70..20f9175 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/util/QueryTokenExecutor.java +++ b/src/main/java/com/salesforce/cdp/queryservice/util/QueryTokenExecutor.java @@ -71,7 +71,7 @@ public QueryTokenExecutor(QueryServiceConnection connection, OkHttpClient client this.client = updateClientWithSocketFactory(client, connection.isSocksProxyDisabled()); // set TenantUrl in connection. This is mandatory in gRPC flow. - if(connection.isEnableStreamFlow()) { + if(connection.isEnableStreamFlow() || connection.isRainbowConnection()) { try { Map token = getTokenWithTenantUrl(); connection.setTenantUrl(token.get(Constants.TENANT_URL)); From d8be07177b72faf597995689e916b6880fc80b5a Mon Sep 17 00:00:00 2001 From: psasikaladevi Date: Wed, 27 Jul 2022 17:54:22 +0530 Subject: [PATCH 3/6] Update RainbowDataStream.java --- .../cdp/queryservice/util/RainbowDataStream.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/salesforce/cdp/queryservice/util/RainbowDataStream.java b/src/main/java/com/salesforce/cdp/queryservice/util/RainbowDataStream.java index eee2f87..ecb7322 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/util/RainbowDataStream.java +++ b/src/main/java/com/salesforce/cdp/queryservice/util/RainbowDataStream.java @@ -1,7 +1,6 @@ package com.salesforce.cdp.queryservice.util; import com.salesforce.a360.queryservice.grpc.v1.AnsiSqlExtractQueryResponse; -import com.salesforce.a360.queryservice.grpc.v1.AnsiSqlQueryStreamResponse; import java.io.IOException; import java.io.InputStream; @@ -17,6 +16,7 @@ public RainbowDataStream(Iterator responseIterator) this.streamIterator = responseIterator; curIndex =0; } + @Override public int read() throws IOException { if(arrowMessage == null || arrowMessage.length <= curIndex ) { @@ -27,13 +27,14 @@ public int read() throws IOException { return Constants.END_OF_STREAM; } } - return arrowMessage[curIndex++]; + return ((int) arrowMessage[curIndex++]) & 0xFF; } private int getNextChunk() { AnsiSqlExtractQueryResponse response = streamIterator.next(); if(response !=null && response.getData() !=null ){ - arrowMessage = response.getData().toByteArray(); + arrowMessage=new byte[response.getData().size()]; + response.getData().copyTo(arrowMessage,0); } if(arrowMessage.length != 0){ return Constants.START_OF_STREAM; From 771defafc4f446e9b537b7427df943cb3728a8fd Mon Sep 17 00:00:00 2001 From: psasikaladevi Date: Wed, 27 Jul 2022 19:29:16 +0530 Subject: [PATCH 4/6] Added metadata support --- .../core/RainbowQueryResultSet.java | 10 +++ .../queryservice/util/ArrowTypeHelper.java | 68 +++++++++++++++++++ .../cdp/queryservice/util/ArrowUtil.java | 30 +++++++- 3 files changed, 107 insertions(+), 1 deletion(-) create mode 100644 src/main/java/com/salesforce/cdp/queryservice/util/ArrowTypeHelper.java diff --git a/src/main/java/com/salesforce/cdp/queryservice/core/RainbowQueryResultSet.java b/src/main/java/com/salesforce/cdp/queryservice/core/RainbowQueryResultSet.java index c7664d3..15f345d 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/core/RainbowQueryResultSet.java +++ b/src/main/java/com/salesforce/cdp/queryservice/core/RainbowQueryResultSet.java @@ -10,6 +10,7 @@ import java.io.IOException; import java.sql.ResultSet; +import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.Iterator; import java.util.List; @@ -141,6 +142,15 @@ private void getNextChunk() throws SQLException { } } + @Override + public ResultSetMetaData getMetaData() throws SQLException { + errorOutIfClosed(); + ResultSetMetaData metaData = arrowUtil.getMetadata(); + if(this.resultSetMetaData ==null) + this.resultSetMetaData = arrowUtil.getMetadata(); + return this.resultSetMetaData; + } + @Override protected ResultSet getNextPageData() throws SQLException { throw new SQLException("This method is not implemented"); diff --git a/src/main/java/com/salesforce/cdp/queryservice/util/ArrowTypeHelper.java b/src/main/java/com/salesforce/cdp/queryservice/util/ArrowTypeHelper.java new file mode 100644 index 0000000..ce146b8 --- /dev/null +++ b/src/main/java/com/salesforce/cdp/queryservice/util/ArrowTypeHelper.java @@ -0,0 +1,68 @@ +package com.salesforce.cdp.queryservice.util; + +import lombok.val; +import org.apache.arrow.vector.types.Types; +import org.apache.arrow.vector.types.pojo.ArrowType; + +import java.sql.SQLException; + +public class ArrowTypeHelper { + + enum JdbcType { + INTEGER("INTEGER"), + BIGINT("BIGINT"), + BIT("BOOLEAN"), + VARCHAR("VARCHAR"), + DATE_DAY("DATE"), + DECIMAL("DECIMAL"), + FLOAT_8("DOUBLE"), + INT("INTEGER"), + FLOAT_4("REAL"), + SMALL_INT("SMALLINT"), + TIME_NANO("TIME"), + TIMESTAMP_NANO_TZ("TIMESTAMP WITH TIME ZONE"), + TIMESTAMP_NANO("TIMESTAMP"), + TINY_INT("TINYINT"); + + private final String type; + + JdbcType(String value){ + this.type = value; + } + public String toString(){ + return type; + } + } + public static String getJdbcType(Types.MinorType minorType) throws SQLException { + switch (minorType){ + case BIGINT: + return JdbcType.BIGINT.toString(); + case BIT: + return JdbcType.BIT.toString(); + case VARCHAR: + return JdbcType.VARCHAR.toString(); + case DATEDAY: + return JdbcType.DATE_DAY.toString(); + case DECIMAL: + return JdbcType.DECIMAL.toString(); + case FLOAT8: + return JdbcType.FLOAT_8.toString(); + case INT: + return JdbcType.INTEGER.toString(); + case FLOAT4: + return JdbcType.FLOAT_4.toString(); + case SMALLINT: + return JdbcType.SMALL_INT.toString(); + case TIMENANO: + return JdbcType.TIME_NANO.toString(); + case TIMESTAMPNANOTZ: + return JdbcType.TIMESTAMP_NANO_TZ.toString(); + case TIMESTAMPNANO: + return JdbcType.TIMESTAMP_NANO.toString(); + case TINYINT: + return JdbcType.TINY_INT.toString(); + default: + throw new SQLException("Invalid type received "+ minorType); + } + } +} diff --git a/src/main/java/com/salesforce/cdp/queryservice/util/ArrowUtil.java b/src/main/java/com/salesforce/cdp/queryservice/util/ArrowUtil.java index 9697ce7..b9a993d 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/util/ArrowUtil.java +++ b/src/main/java/com/salesforce/cdp/queryservice/util/ArrowUtil.java @@ -1,5 +1,6 @@ package com.salesforce.cdp.queryservice.util; +import com.salesforce.cdp.queryservice.core.QueryServiceResultSetMetaData; import com.salesforce.cdp.queryservice.model.QueryServiceResponse; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.BigIntVector; @@ -20,6 +21,9 @@ import org.apache.arrow.vector.ipc.ArrowReader; import org.apache.arrow.vector.ipc.ArrowStreamReader; import org.apache.arrow.vector.types.Types; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; import org.apache.arrow.vector.util.Text; import java.io.ByteArrayInputStream; @@ -28,6 +32,7 @@ import java.nio.channels.Channel; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; +import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.text.MessageFormat; import java.util.ArrayList; @@ -42,6 +47,7 @@ */ public class ArrowUtil { + private ArrowStreamReader arrowStreamReader; private RootAllocator streamRootAllocator; @@ -114,9 +120,31 @@ public List getResultSetDataFromArrowStream(QueryServiceResponse querySe public void createArrowReaderForStream(InputStream inputStream){ if(arrowStreamReader == null){ streamRootAllocator = new RootAllocator(Long.MAX_VALUE); - arrowStreamReader =new ArrowStreamReader(Channels.newChannel(inputStream), streamRootAllocator ); + arrowStreamReader =new ArrowStreamReader(inputStream, streamRootAllocator ); } } + + public ResultSetMetaData getMetadata() throws SQLException { + if(this.arrowStreamReader == null) + return null; + try { + VectorSchemaRoot schemaRoot = arrowStreamReader.getVectorSchemaRoot(); + List columnNames =new ArrayList<>(); + List columnTypes=new ArrayList<>(); + Map columnNameToPosition= new HashMap<>(); + int i=1; + for(FieldVector field:schemaRoot.getFieldVectors()){ + columnNames.add(field.getName()); + columnTypes.add(ArrowTypeHelper.getJdbcType(field.getMinorType())); + columnNameToPosition.put(field.getName(), i++); + } + ResultSetMetaData metaData = new QueryServiceResultSetMetaData(columnNames,columnTypes,null,columnNameToPosition); + return metaData; + } catch (IOException | SQLException e) { + throw new SQLException("Error while getting metadata"); + } + } + public List getRowsFromRainbowResponse() throws SQLException { if(arrowStreamReader == null){ throw new SQLException("Arrow Reader not created for RainbowStream "); From c717af37aac4146393b2401e8acf9523f26b2f3d Mon Sep 17 00:00:00 2001 From: psasikaladevi Date: Fri, 5 Aug 2022 18:27:02 +0530 Subject: [PATCH 5/6] Added Timestamp support and formatting --- .../core/QueryServiceConnection.java | 2 +- .../core/RainbowQueryResultSet.java | 37 ++-- .../cdp/queryservice/util/ArrowUtil.java | 93 +------- .../queryservice/util/ExtractArrowUtil.java | 204 ++++++++++++++++++ 4 files changed, 222 insertions(+), 114 deletions(-) create mode 100644 src/main/java/com/salesforce/cdp/queryservice/util/ExtractArrowUtil.java diff --git a/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceConnection.java b/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceConnection.java index 0e452a8..d3333a2 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceConnection.java +++ b/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceConnection.java @@ -31,7 +31,7 @@ @Slf4j public class QueryServiceConnection implements Connection { - private static final String TEST_CONNECT_QUERY = "select 1"; + private static final String TEST_CONNECT_QUERY = "select 1 as col1"; private AtomicBoolean closed = new AtomicBoolean(false); private Properties properties; diff --git a/src/main/java/com/salesforce/cdp/queryservice/core/RainbowQueryResultSet.java b/src/main/java/com/salesforce/cdp/queryservice/core/RainbowQueryResultSet.java index 15f345d..e639f48 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/core/RainbowQueryResultSet.java +++ b/src/main/java/com/salesforce/cdp/queryservice/core/RainbowQueryResultSet.java @@ -5,6 +5,7 @@ import com.salesforce.a360.queryservice.grpc.v1.AnsiSqlExtractQueryResponse; import com.salesforce.a360.queryservice.grpc.v1.AnsiSqlQueryStreamResponse; import com.salesforce.cdp.queryservice.util.ArrowUtil; +import com.salesforce.cdp.queryservice.util.ExtractArrowUtil; import com.salesforce.cdp.queryservice.util.RainbowDataStream; import lombok.extern.slf4j.Slf4j; @@ -12,6 +13,7 @@ import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Objects; @@ -21,19 +23,15 @@ public class RainbowQueryResultSet extends QueryServiceResultSet{ List data ; private RainbowDataStream dataStream; private Iterator streamIterator; - private ArrowUtil arrowUtil; + private ExtractArrowUtil arrowUtil; - public RainbowQueryResultSet(Iterator response, QueryServiceAbstractStatement statement) { + public RainbowQueryResultSet(Iterator response, QueryServiceAbstractStatement statement) throws SQLException { streamIterator = response; - arrowUtil = new ArrowUtil(); dataStream = new RainbowDataStream(response); - initialiseArrowReader(); + arrowUtil = new ExtractArrowUtil(dataStream); this.statement = statement; } - private void initialiseArrowReader() { - arrowUtil.createArrowReaderForStream(dataStream); - } @Override public boolean next() throws SQLException { try { @@ -95,31 +93,20 @@ public Object getObject(int columnIndex) throws SQLException { @Override public Object getObject(String columnLabel) throws SQLException { - throw new SQLException("Not Implemented"); + errorOutIfClosed(); + int columnIndex = getColumnIndexByName(columnLabel); + return getObject(columnIndex); } @Override protected Object getValue(Object row, String columnLabel) throws SQLException { - throw new SQLException("Not Implemented"); + errorOutIfClosed(); + int columnIndex = getColumnIndexByName(columnLabel); + return getValue(row,columnIndex); } private Object getValue(Object row, int columnIndex) throws SQLException { - return valueToObject(((ListValue) row).getValues(columnIndex)); - } - - private static Object valueToObject(Value value) { - switch (value.getKindCase()) { - case NULL_VALUE: - return null; - case NUMBER_VALUE: - return value.getNumberValue(); - case STRING_VALUE: - return value.getStringValue(); - case BOOL_VALUE: - return value.getBoolValue(); - default: - throw new IllegalArgumentException(String.format("Unsupported protobuf value %s", value)); - } + return ((ArrayList)row).get(columnIndex-1); } private int getColumnIndexByName(String columnName) throws SQLException { diff --git a/src/main/java/com/salesforce/cdp/queryservice/util/ArrowUtil.java b/src/main/java/com/salesforce/cdp/queryservice/util/ArrowUtil.java index b9a993d..f58b746 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/util/ArrowUtil.java +++ b/src/main/java/com/salesforce/cdp/queryservice/util/ArrowUtil.java @@ -48,9 +48,6 @@ public class ArrowUtil { - private ArrowStreamReader arrowStreamReader; - private RootAllocator streamRootAllocator; - /** * Converts the arrow stream in to List> so that it can then be converted into result set format. * @param queryServiceResponse Response received from query service @@ -62,6 +59,7 @@ public List getResultSetDataFromArrowStream(QueryServiceResponse querySe byte[] bytes = Base64.getDecoder().decode(queryServiceResponse.getArrowStream()); ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes); List fieldVectors = null; + RootAllocator allocator = new RootAllocator(Long.MAX_VALUE); List data = new ArrayList<>(); try (ArrowStreamReader arrowStreamReader = new ArrowStreamReader(inputStream, allocator)) { @@ -117,70 +115,8 @@ public List getResultSetDataFromArrowStream(QueryServiceResponse querySe } return data; } - public void createArrowReaderForStream(InputStream inputStream){ - if(arrowStreamReader == null){ - streamRootAllocator = new RootAllocator(Long.MAX_VALUE); - arrowStreamReader =new ArrowStreamReader(inputStream, streamRootAllocator ); - } - } - - public ResultSetMetaData getMetadata() throws SQLException { - if(this.arrowStreamReader == null) - return null; - try { - VectorSchemaRoot schemaRoot = arrowStreamReader.getVectorSchemaRoot(); - List columnNames =new ArrayList<>(); - List columnTypes=new ArrayList<>(); - Map columnNameToPosition= new HashMap<>(); - int i=1; - for(FieldVector field:schemaRoot.getFieldVectors()){ - columnNames.add(field.getName()); - columnTypes.add(ArrowTypeHelper.getJdbcType(field.getMinorType())); - columnNameToPosition.put(field.getName(), i++); - } - ResultSetMetaData metaData = new QueryServiceResultSetMetaData(columnNames,columnTypes,null,columnNameToPosition); - return metaData; - } catch (IOException | SQLException e) { - throw new SQLException("Error while getting metadata"); - } - } - public List getRowsFromRainbowResponse() throws SQLException { - if(arrowStreamReader == null){ - throw new SQLException("Arrow Reader not created for RainbowStream "); - } - List fieldVectors = null; - List data = new ArrayList<>(); - try{ - VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot(); - fieldVectors = root.getFieldVectors(); - while (arrowStreamReader.loadNextBatch()) { - int rowCount = fieldVectors.get(0).getValueCount(); - for(int i=0;i row = new ArrayList<>(); - for(FieldVector fieldVector : fieldVectors) { - Object fieldValue = this.getFieldValue(fieldVector,i); - row.add(fieldValue); - } - data.add(row); - } - }}catch(Exception e){ - if(fieldVectors != null){ - for(FieldVector fieldVector : fieldVectors) { - if(fieldVector != null) { - fieldVector.close(); - } - } - fieldVectors = null; - } - if(streamRootAllocator != null) { - streamRootAllocator.close(); - streamRootAllocator = null; - } - } - return data; - } - private Object getFieldValue(FieldVector fieldVector, int index) throws SQLException { + Object getFieldValue(FieldVector fieldVector, int index) throws SQLException { Types.MinorType type = Types.getMinorTypeForArrowType(fieldVector.getField().getType()); if(fieldVector.isNull(index)) { @@ -212,32 +148,13 @@ private Object getFieldValue(FieldVector fieldVector, int index) throws SQLExcep }else if(type ==Types.MinorType.TIMENANO){ return ((TimeNanoVector)fieldVector).getObject(index); }else if(type == Types.MinorType.TIMESTAMPNANOTZ){ - return ((TimeStampNanoTZVector)fieldVector).getObject(index); + long epochNano =((TimeStampNanoTZVector)fieldVector).getObject(index); + String date = new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss").format(new java.util.Date (epochNano/1000000)); + return date; }else if(type == Types.MinorType.TIMESTAMPNANO){ return ((TimeStampNanoVector)fieldVector).getObject(index); } throw new SQLException(MessageFormat.format("Unknown arrow type {0}", type.name())); } - public void closeReader() { - try{ - VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot(); - List vectors = root.getFieldVectors(); - if(vectors != null){ - for(FieldVector fieldVector : vectors) { - if(fieldVector != null) { - fieldVector.close(); - } - } - vectors = null; - } - if(streamRootAllocator != null) { - streamRootAllocator.close(); - streamRootAllocator = null; - } - - } catch (IOException e) { - e.printStackTrace(); - } - } } diff --git a/src/main/java/com/salesforce/cdp/queryservice/util/ExtractArrowUtil.java b/src/main/java/com/salesforce/cdp/queryservice/util/ExtractArrowUtil.java new file mode 100644 index 0000000..ce3b418 --- /dev/null +++ b/src/main/java/com/salesforce/cdp/queryservice/util/ExtractArrowUtil.java @@ -0,0 +1,204 @@ +package com.salesforce.cdp.queryservice.util; + +import com.salesforce.cdp.queryservice.core.QueryServiceResultSetMetaData; +import com.salesforce.cdp.queryservice.model.QueryServiceResponse; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.DateDayVector; +import org.apache.arrow.vector.DecimalVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.SmallIntVector; +import org.apache.arrow.vector.TimeNanoVector; +import org.apache.arrow.vector.TimeStampNanoTZVector; +import org.apache.arrow.vector.TimeStampNanoVector; +import org.apache.arrow.vector.TinyIntVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowStreamReader; +import org.apache.arrow.vector.types.Types; +import org.apache.arrow.vector.util.Text; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Base64; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * This class contains the utilities for processing the arrow stream. + */ +public class ExtractArrowUtil extends ArrowUtil { + + + private ArrowStreamReader arrowStreamReader; + private RootAllocator streamRootAllocator; + private VectorSchemaRoot vectorSchemaRoot; + + public ExtractArrowUtil(InputStream inputStream) throws SQLException { + super(); + initialiseArrowReaderForStream(inputStream); + try { + vectorSchemaRoot = arrowStreamReader.getVectorSchemaRoot(); + } catch (IOException e) { + throw new SQLException("Error while getting VectorSchemaRoot"); + } + } + + /** + * Converts the arrow stream in to List> so that it can then be converted into result set format. + * @param queryServiceResponse Response received from query service + * @return List of data map. + * @throws SQLException + */ + public List getResultSetDataFromArrowStream(QueryServiceResponse queryServiceResponse, boolean isCursorBasedPaginationReq) throws SQLException { + + byte[] bytes = Base64.getDecoder().decode(queryServiceResponse.getArrowStream()); + ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes); + List fieldVectors = null; + + RootAllocator allocator = new RootAllocator(Long.MAX_VALUE); + List data = new ArrayList<>(); + try (ArrowStreamReader arrowStreamReader = new ArrowStreamReader(inputStream, allocator)) { + + VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot(); + fieldVectors = root.getFieldVectors(); + while (arrowStreamReader.loadNextBatch()) { + int rowCount = fieldVectors.get(0).getValueCount(); + for(int i=0;i row = new ArrayList<>(); + for(FieldVector fieldVector : fieldVectors) { + Object fieldValue = this.getFieldValue(fieldVector,i); + row.add(fieldValue); + } + data.add(row); + } else { + Map row = new HashMap<>(); + for(FieldVector fieldVector : fieldVectors) { + String fieldName = fieldVector.getField().getName(); + Object fieldValue = this.getFieldValue(fieldVector,i); + row.put(fieldName,fieldValue); + } + data.add(row); + } + } + } + } catch (IOException e) { + e.printStackTrace(); + throw new SQLException("Failed to parse the arrow stream", e); + } + finally { + if(inputStream != null) { + try { + inputStream.close(); + } + catch (IOException ex) { + ex.printStackTrace(); + } + inputStream = null; + } + if(fieldVectors != null){ + for(FieldVector fieldVector : fieldVectors) { + if(fieldVector != null) { + fieldVector.close(); + } + } + fieldVectors = null; + } + allocator.close(); + allocator = null; + } + return data; + } + + + public void initialiseArrowReaderForStream(InputStream inputStream){ + if(arrowStreamReader == null){ + streamRootAllocator = new RootAllocator(Long.MAX_VALUE); + arrowStreamReader =new ArrowStreamReader(inputStream, streamRootAllocator ); + } + } + + public ResultSetMetaData getMetadata() throws SQLException { + if(this.arrowStreamReader == null) + return null; + try { + VectorSchemaRoot schemaRoot = arrowStreamReader.getVectorSchemaRoot(); + List columnNames =new ArrayList<>(); + List columnTypes=new ArrayList<>(); + Map columnNameToPosition= new HashMap<>(); + int i=1; + for(FieldVector field:schemaRoot.getFieldVectors()){ + columnNames.add(field.getName()); + columnTypes.add(ArrowTypeHelper.getJdbcType(field.getMinorType())); + columnNameToPosition.put(field.getName(), i++); + } + ResultSetMetaData metaData = new QueryServiceResultSetMetaData(columnNames,columnTypes,null,columnNameToPosition); + return metaData; + } catch (IOException | SQLException e) { + throw new SQLException("Error while getting metadata"); + } + } + + public List getRowsFromRainbowResponse() throws SQLException { + if(arrowStreamReader == null){ + throw new SQLException("Arrow Reader not created for RainbowStream "); + } + List fieldVectors = null; + List data = new ArrayList<>(); + try{ + fieldVectors = vectorSchemaRoot.getFieldVectors(); + if (arrowStreamReader.loadNextBatch()) { + int rowCount = fieldVectors.get(0).getValueCount(); + for(int i=0;i row = new ArrayList<>(); + for(FieldVector fieldVector : fieldVectors) { + Object fieldValue = this.getFieldValue(fieldVector,i); + row.add(fieldValue); + } + data.add(row); + } + }}catch(Exception e){ + if(fieldVectors != null){ + for(FieldVector fieldVector : fieldVectors) { + if(fieldVector != null) { + fieldVector.close(); + } + } + fieldVectors = null; + } + if(streamRootAllocator != null) { + streamRootAllocator.close(); + streamRootAllocator = null; + } + } + return data; + } + + public void closeReader() { + List vectors = vectorSchemaRoot.getFieldVectors(); + if(vectors != null){ + for(FieldVector fieldVector : vectors) { + if(fieldVector != null) { + fieldVector.close(); + } + } + vectors = null; + } + if(streamRootAllocator != null) { + streamRootAllocator.close(); + streamRootAllocator = null; + } + } +} From bc5e450ac405b150f78cf35e167e27fa7c0e07b2 Mon Sep 17 00:00:00 2001 From: psasikaladevi Date: Wed, 10 Aug 2022 10:58:53 +0530 Subject: [PATCH 6/6] Update ExtractArrowUtil.java --- .../queryservice/util/ExtractArrowUtil.java | 71 +------------------ 1 file changed, 1 insertion(+), 70 deletions(-) diff --git a/src/main/java/com/salesforce/cdp/queryservice/util/ExtractArrowUtil.java b/src/main/java/com/salesforce/cdp/queryservice/util/ExtractArrowUtil.java index ce3b418..ba56f02 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/util/ExtractArrowUtil.java +++ b/src/main/java/com/salesforce/cdp/queryservice/util/ExtractArrowUtil.java @@ -54,76 +54,7 @@ public ExtractArrowUtil(InputStream inputStream) throws SQLException { } } - /** - * Converts the arrow stream in to List> so that it can then be converted into result set format. - * @param queryServiceResponse Response received from query service - * @return List of data map. - * @throws SQLException - */ - public List getResultSetDataFromArrowStream(QueryServiceResponse queryServiceResponse, boolean isCursorBasedPaginationReq) throws SQLException { - - byte[] bytes = Base64.getDecoder().decode(queryServiceResponse.getArrowStream()); - ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes); - List fieldVectors = null; - - RootAllocator allocator = new RootAllocator(Long.MAX_VALUE); - List data = new ArrayList<>(); - try (ArrowStreamReader arrowStreamReader = new ArrowStreamReader(inputStream, allocator)) { - - VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot(); - fieldVectors = root.getFieldVectors(); - while (arrowStreamReader.loadNextBatch()) { - int rowCount = fieldVectors.get(0).getValueCount(); - for(int i=0;i row = new ArrayList<>(); - for(FieldVector fieldVector : fieldVectors) { - Object fieldValue = this.getFieldValue(fieldVector,i); - row.add(fieldValue); - } - data.add(row); - } else { - Map row = new HashMap<>(); - for(FieldVector fieldVector : fieldVectors) { - String fieldName = fieldVector.getField().getName(); - Object fieldValue = this.getFieldValue(fieldVector,i); - row.put(fieldName,fieldValue); - } - data.add(row); - } - } - } - } catch (IOException e) { - e.printStackTrace(); - throw new SQLException("Failed to parse the arrow stream", e); - } - finally { - if(inputStream != null) { - try { - inputStream.close(); - } - catch (IOException ex) { - ex.printStackTrace(); - } - inputStream = null; - } - if(fieldVectors != null){ - for(FieldVector fieldVector : fieldVectors) { - if(fieldVector != null) { - fieldVector.close(); - } - } - fieldVectors = null; - } - allocator.close(); - allocator = null; - } - return data; - } - - - public void initialiseArrowReaderForStream(InputStream inputStream){ + public void initialiseArrowReaderForStream(InputStream inputStream){ if(arrowStreamReader == null){ streamRootAllocator = new RootAllocator(Long.MAX_VALUE); arrowStreamReader =new ArrowStreamReader(inputStream, streamRootAllocator );