Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.protobuf.Struct;
import com.google.protobuf.Value;
import com.salesforce.a360.queryservice.grpc.v1.AnsiSqlExtractQueryResponse;
import com.salesforce.a360.queryservice.grpc.v1.AnsiSqlQueryStreamResponse;
import com.salesforce.cdp.queryservice.model.QueryServiceResponse;
import com.salesforce.cdp.queryservice.model.Type;
Expand All @@ -27,6 +28,7 @@
import com.salesforce.cdp.queryservice.util.QueryExecutor;
import com.salesforce.cdp.queryservice.util.HttpHelper;
import com.salesforce.cdp.queryservice.util.QueryGrpcExecutor;
import com.salesforce.cdp.queryservice.util.QueryRainbowExecutor;
import lombok.extern.slf4j.Slf4j;
import okhttp3.Response;

Expand Down Expand Up @@ -55,6 +57,8 @@ public abstract class QueryServiceAbstractStatement {

private QueryGrpcExecutor queryGrpcExecutor;

private QueryRainbowExecutor queryRainbowExecutor;

private static final String KEY_TYPE = "type";
private static final String KEY_TYPE_CODE = "typeCode";
private static final String KEY_PLACE_IN_ORDER = "placeInOrder";
Expand All @@ -67,6 +71,7 @@ public QueryServiceAbstractStatement(QueryServiceConnection queryServiceConnecti
this.resultSetConcurrency = resultSetConcurrency;
this.queryExecutor = createQueryExecutor();
this.queryGrpcExecutor = createQueryGrpcExecutor();
this.queryRainbowExecutor = createQueryRainbowExecutor();
}

public ResultSet executeQuery(String sql) throws SQLException {
Expand All @@ -79,8 +84,11 @@ public ResultSet executeQuery(String sql) throws SQLException {
boolean requireManagedPagination = isTableauQuery() && !isCursorBasedPaginationReq;
Optional<Integer> limit = requireManagedPagination ? Optional.of(Constants.MAX_LIMIT) : Optional.empty();
Optional<String> orderby = requireManagedPagination ? Optional.of("1 ASC") : Optional.empty();

if(isEnableStreamFlow) {
if(connection.isRainbowConnection()){
Iterator<AnsiSqlExtractQueryResponse> response = queryRainbowExecutor.executeQuery(sql);
return createResultSetFromRainbowResponse(response);
}
else if(isEnableStreamFlow) {
Iterator<AnsiSqlQueryStreamResponse> response = queryGrpcExecutor.executeQueryWithRetry(sql);
return createResultSetFromResponse(response);
} else {
Expand Down Expand Up @@ -118,7 +126,14 @@ private boolean isTableauQuery() throws SQLException {
String userAgent = connection.getClientInfo(Constants.USER_AGENT);
return Constants.TABLEAU_USER_AGENT_VALUE.equals(userAgent);
}

private ResultSet createResultSetFromRainbowResponse(Iterator<AnsiSqlExtractQueryResponse> response) throws SQLException {
try{
return new RainbowQueryResultSet(response,this);
}
catch (Exception e){
throw new SQLException(QUERY_EXCEPTION);
}
}
private ResultSet createResultSetFromResponse(QueryServiceResponse queryServiceResponse, boolean isCursorBasedPaginationReq) throws SQLException {
ArrowUtil arrowUtil = new ArrowUtil();
paginationRequired = !queryServiceResponse.isDone();
Expand Down Expand Up @@ -227,4 +242,8 @@ protected QueryExecutor createQueryExecutor() {
protected QueryGrpcExecutor createQueryGrpcExecutor() {
return new QueryGrpcExecutor(connection);
}

protected QueryRainbowExecutor createQueryRainbowExecutor(){
return new QueryRainbowExecutor(connection);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class QueryServiceConnection implements Connection {
private boolean isCursorBasedPaginationReq = true;
private final boolean isSocksProxyDisabled;
private boolean enableStreamFlow = false;
private boolean rainbowConnection = false;
private String tenantUrl;

public QueryServiceConnection(String url, Properties properties) throws SQLException {
Expand All @@ -52,6 +53,9 @@ public QueryServiceConnection(String url, Properties properties) throws SQLExcep
// default `enableArrowStream` is false
enableArrowStream = Boolean.parseBoolean(this.properties.getProperty(Constants.ENABLE_ARROW_STREAM));

//Rainbow connection by default false
rainbowConnection = Boolean.parseBoolean(properties.getProperty(Constants.RAINBOW_CLIENT,Constants.FALSE_STR));

// default `isCursorBasedPaginationReq` is true
isCursorBasedPaginationReq = Boolean.parseBoolean(this.properties.getProperty(Constants.CURSOR_BASED_PAGINATION, Constants.TRUE_STR));

Expand Down Expand Up @@ -456,4 +460,8 @@ public String getTenantUrl() {
public void setTenantUrl(String tenantUrl) {
this.tenantUrl = tenantUrl;
}

public boolean isRainbowConnection() {
return rainbowConnection;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package com.salesforce.cdp.queryservice.core;

import com.google.protobuf.ListValue;
import com.google.protobuf.Value;
import com.salesforce.a360.queryservice.grpc.v1.AnsiSqlExtractQueryResponse;
import com.salesforce.a360.queryservice.grpc.v1.AnsiSqlQueryStreamResponse;
import com.salesforce.cdp.queryservice.util.ArrowUtil;
import com.salesforce.cdp.queryservice.util.RainbowDataStream;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;

@Slf4j
public class RainbowQueryResultSet extends QueryServiceResultSet{
List<Object> data ;
private RainbowDataStream dataStream;
private Iterator<AnsiSqlExtractQueryResponse> streamIterator;
private ArrowUtil arrowUtil;

public RainbowQueryResultSet(Iterator<AnsiSqlExtractQueryResponse> response, QueryServiceAbstractStatement statement) {
streamIterator = response;
arrowUtil = new ArrowUtil();
dataStream = new RainbowDataStream(response);
initialiseArrowReader();
this.statement = statement;
}

private void initialiseArrowReader() {
arrowUtil.createArrowReaderForStream(dataStream);
}
@Override
public boolean next() throws SQLException {
try {
errorOutIfClosed();

if (currentRow == -1 && isNextChunkPresent()) {
getNextChunk();
} else {
currentRow++;
}

if (currentRow < data.size()) {
return true;
}

if (isNextChunkPresent()) {
getNextChunk();
if (data != null && data.size() > 0) {
return true;
}
}
arrowUtil.closeReader();
closeDataStream();
// Closing as this is move forward only cursor.
log.info("Resultset {} does not have any more rows. Total {} pages retrieved", this, currentPageNum);
return false;
}
catch (SQLException e){

closeDataStream();
throw e;
}
}

private void closeDataStream() {
if(dataStream != null) {
try {
dataStream.close();
}
catch (IOException ex) {
ex.printStackTrace();
}
dataStream = null;
}
}

@Override
public Object getObject(int columnIndex) throws SQLException {
try{
errorOutIfClosed();
Object value = getValue(data.get(currentRow), columnIndex);
wasNull.set(value == null);
return value;}
catch(SQLException e) {
closeDataStream();
throw e;
}
}

@Override
public Object getObject(String columnLabel) throws SQLException {
throw new SQLException("Not Implemented");
}

@Override
protected Object getValue(Object row, String columnLabel) throws SQLException {
throw new SQLException("Not Implemented");
}

private Object getValue(Object row, int columnIndex) throws SQLException {
return valueToObject(((ListValue) row).getValues(columnIndex));
}

private static Object valueToObject(Value value) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is duplicate method, we should move it to util

switch (value.getKindCase()) {
case NULL_VALUE:
return null;
case NUMBER_VALUE:
return value.getNumberValue();
case STRING_VALUE:
return value.getStringValue();
case BOOL_VALUE:
return value.getBoolValue();
default:
throw new IllegalArgumentException(String.format("Unsupported protobuf value %s", value));
}
}

private int getColumnIndexByName(String columnName) throws SQLException {
return ((QueryServiceResultSetMetaData)resultSetMetaData).getColumnNameToPosition().get(columnName);
}

private void getNextChunk() throws SQLException {
log.trace("Fetching page with number {} for resultset {}", ++currentPageNum, this);

try {
List<Object> rows = arrowUtil.getRowsFromRainbowResponse();
if(rows != null && rows.size()>0){
this.data = rows;
currentRow=0;
}
} catch (Exception e) {
log.error("Error while getting the data chunk {}", this, e);
closeDataStream();
throw new SQLException(e.getMessage());
}
}

@Override
protected ResultSet getNextPageData() throws SQLException {
throw new SQLException("This method is not implemented");
}

@Override
protected void updateState(ResultSet resultSet) throws SQLException {
throw new SQLException("This method is not implemented");
}

@Override
public boolean isAfterLast() throws SQLException {
return !this.isNextChunkPresent() && this.currentRow >= this.data.size();
}

@Override
public boolean isLast() throws SQLException {
return !this.isNextChunkPresent() && this.currentRow == this.data.size() - 1;
}

private boolean isNextChunkPresent() throws SQLException {
try {
return streamIterator.hasNext();
} catch (Exception e) {
throw new SQLException(e.getMessage());
}
}
}
Loading