diff --git a/dependency-reduced-pom.xml b/dependency-reduced-pom.xml new file mode 100644 index 0000000..0e8c260 --- /dev/null +++ b/dependency-reduced-pom.xml @@ -0,0 +1,262 @@ + + + 4.0.0 + com.queryService + Salesforce-CDP-jdbc + 1.16.0 + + + + maven-shade-plugin + 3.4.0 + + + package + + shade + + + + + + + org.apache + ${shadeBase}.apache + + + io.netty + ${shadeBase}.io.netty + + + + + *:* + + META-INF/LICENSE* + META-INF/NOTICE* + META-INF/DEPENDENCIES + META-INF/maven/** + META-INF/services/com.fasterxml.* + META-INF/*.xml + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + .netbeans_automatic_build + git.properties + google-http-client.properties + storage.v1.json + pipes-fork-server-default-log4j2.xml + dependencies.properties + pipes-fork-server-default-log4j2.xml + + + + org.apache.arrow:arrow-vector + + codegen/** + + + + + + + + + + + maven-compiler-plugin + 3.7.0 + + ${java.version} + ${java.version} + + + + maven-assembly-plugin + 3.1.1 + + + make-assembly + package + + single + + + + + + jar-with-dependencies + + + + + maven-surefire-plugin + 2.19.1 + + + org.junit.platform + junit-platform-surefire-provider + ${junit.platform.version} + + + + + + + + org.powermock + powermock-module-junit4 + 1.7.1 + test + + + powermock-module-junit4-common + org.powermock + + + hamcrest-core + org.hamcrest + + + + + org.projectlombok + lombok + 1.18.22 + provided + + + org.junit.platform + junit-platform-launcher + 1.0.0-M4 + test + + + junit-platform-engine + org.junit.platform + + + + + org.junit.jupiter + junit-jupiter-engine + 5.0.0-M4 + test + + + junit-jupiter-api + org.junit.jupiter + + + junit-platform-engine + org.junit.platform + + + + + org.junit.vintage + junit-vintage-engine + 4.12.0-M4 + test + + + junit-platform-engine + org.junit.platform + + + + + junit + junit + 4.13.2 + test + + + hamcrest-core + org.hamcrest + + + + + org.apache.tomcat + tomcat-catalina + 9.0.17 + test + + + tomcat-servlet-api + org.apache.tomcat + + + tomcat-jsp-api + org.apache.tomcat + + + tomcat-juli + org.apache.tomcat + + + tomcat-annotations-api + org.apache.tomcat + + + tomcat-api + org.apache.tomcat + + + tomcat-jni + org.apache.tomcat + + + tomcat-coyote + org.apache.tomcat + + + tomcat-util + org.apache.tomcat + + + tomcat-util-scan + org.apache.tomcat + + + tomcat-jaspic-api + org.apache.tomcat + + + + + org.powermock + powermock-api-mockito + 1.7.1 + test + + + powermock-api-mockito-common + org.powermock + + + mockito-core + org.mockito + + + + + org.assertj + assertj-core + 3.20.2 + test + + + + 1.18.22 + 1.7.32 + 1.8 + UTF-8 + 1.7.1 + com.salesforce.cdp.queryservice.internal + 5.0.0-M4 + 4.12.0-M4 + 1.0.0-M4 + + diff --git a/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceConnection.java b/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceConnection.java index 8913f10..57284a0 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceConnection.java +++ b/src/main/java/com/salesforce/cdp/queryservice/core/QueryServiceConnection.java @@ -19,6 +19,7 @@ import com.google.common.annotations.VisibleForTesting; import com.salesforce.cdp.queryservice.model.Token; import com.salesforce.cdp.queryservice.util.Constants; +import com.salesforce.cdp.queryservice.util.TokenHelper; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -58,7 +59,14 @@ public QueryServiceConnection(String url, Properties properties) throws SQLExcep this.isSocksProxyDisabled = Boolean.parseBoolean(this.properties.getProperty(Constants.DISABLE_SOCKS_PROXY)); // default `enableStreamFlow` is false - enableStreamFlow = Boolean.parseBoolean(this.properties.getProperty(Constants.ENABLE_STREAM_FLOW, Constants.FALSE_STR)); + if(this.properties.containsKey(Constants.ENABLE_STREAM_FLOW)) { + enableStreamFlow = Boolean.parseBoolean(this.properties.getProperty(Constants.ENABLE_STREAM_FLOW, Constants.FALSE_STR)); + } + String streamFlow = System.getenv("enablestreamflow"); + if(Boolean.parseBoolean(streamFlow) == true) { + enableStreamFlow = true; + } + // use isValid to test connection this.isValid(20); @@ -351,8 +359,16 @@ public boolean isValid(int timeout) throws SQLException { } try { - PreparedStatement statement = this.prepareStatement(TEST_CONNECT_QUERY); - return statement.execute(); + if(properties.containsKey(Constants.CORETOKEN) && TokenHelper.tokenExistsInCache(this.properties.getProperty(Constants.CORETOKEN))) { + log.info("Reusing connection"); + return true; + } + else { + + PreparedStatement statement = this.prepareStatement(TEST_CONNECT_QUERY); + return statement.execute(); + } + } catch (Exception e) { log.error("Exception while connecting to server", e); if(isEnableStreamFlow()) { diff --git a/src/main/java/com/salesforce/cdp/queryservice/interceptors/GrpcInterceptor.java b/src/main/java/com/salesforce/cdp/queryservice/interceptors/GrpcInterceptor.java index e15e5de..beffec3 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/interceptors/GrpcInterceptor.java +++ b/src/main/java/com/salesforce/cdp/queryservice/interceptors/GrpcInterceptor.java @@ -35,7 +35,14 @@ public void start(final Listener responseListener, final Metadata headers Key.of("Authorization", Metadata.ASCII_STRING_MARSHALLER), authHeaders.get(Constants.ACCESS_TOKEN)); if (properties.containsKey(Constants.USER_AGENT)) { - headers.put(Key.of(Constants.USER_AGENT_GRPC, Metadata.ASCII_STRING_MARSHALLER), properties.get(Constants.USER_AGENT).toString()); + String enableV2InGRPCStream = System.getenv("enableV2InGRPCStream"); + if(Boolean.parseBoolean(enableV2InGRPCStream) == true) { + headers.put(Key.of(Constants.USER_AGENT_GRPC, Metadata.ASCII_STRING_MARSHALLER), properties.get(Constants.USER_AGENT).toString() + "-JDBCV2STREAM"); + } + else { + headers.put(Key.of(Constants.USER_AGENT_GRPC, Metadata.ASCII_STRING_MARSHALLER), properties.get(Constants.USER_AGENT).toString()); + } + } else { headers.put(Key.of(Constants.USER_AGENT_GRPC, Metadata.ASCII_STRING_MARSHALLER), Constants.USER_AGENT_VALUE); } diff --git a/src/main/java/com/salesforce/cdp/queryservice/interceptors/HttpEventListener.java b/src/main/java/com/salesforce/cdp/queryservice/interceptors/HttpEventListener.java new file mode 100644 index 0000000..e759c00 --- /dev/null +++ b/src/main/java/com/salesforce/cdp/queryservice/interceptors/HttpEventListener.java @@ -0,0 +1,64 @@ +package com.salesforce.cdp.queryservice.interceptors; + +import lombok.extern.slf4j.Slf4j; +import okhttp3.*; + +import java.net.InetSocketAddress; +import java.net.Proxy; +import java.time.LocalDateTime; + +@Slf4j +public class HttpEventListener extends EventListener { + + @Override + public void connectStart(Call call, InetSocketAddress inetSocketAddress, Proxy proxy) { + log.info("connectStart"); + } + + @Override + public void connectEnd(Call call, InetSocketAddress inetSocketAddress, Proxy proxy, Protocol protocol) { + log.info("connectEnd"); + } + + + + @Override + public void callStart(Call call) { + log.info("callStart"); + } + +// @Override +// public void requestHeadersEnd(Call call, Request request) { +// log.info("requestHeadersEnd"); +// } +// +// @Override +// public void responseHeadersEnd(Call call, Response response) { +// log.info("responseHeadersEnd"); +// } + + @Override + public void callEnd(Call call) { + log.info("callEnd"); + } + + @Override + public void requestBodyEnd(Call call, long byteCount){ + log.info("requestBodyEnd {}", byteCount); + } + + @Override + public void requestBodyStart(Call call) { + log.info("requestBodyStart"); + } + + @Override + public void responseBodyStart(Call call) { + log.info("responseBodyStart"); + } + + @Override + public void responseBodyEnd(Call call, long byteCount) { + log.info("responseBodyEnd"); + } +} diff --git a/src/main/java/com/salesforce/cdp/queryservice/util/QueryExecutor.java b/src/main/java/com/salesforce/cdp/queryservice/util/QueryExecutor.java index 1bf7be7..a847829 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/util/QueryExecutor.java +++ b/src/main/java/com/salesforce/cdp/queryservice/util/QueryExecutor.java @@ -18,6 +18,7 @@ import com.google.gson.Gson; import com.salesforce.cdp.queryservice.core.QueryServiceConnection; +import com.salesforce.cdp.queryservice.interceptors.HttpEventListener; import com.salesforce.cdp.queryservice.interceptors.RetryInterceptor; import com.salesforce.cdp.queryservice.model.AnsiQueryRequest; import lombok.extern.slf4j.Slf4j; @@ -29,16 +30,20 @@ import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.concurrent.TimeUnit; @Slf4j public class QueryExecutor extends QueryTokenExecutor { private static final OkHttpClient DEFAULT_QUERY_CLIENT; static { + // By default, add retry interceptors only for query service related calls // todo: delay adding retry interceptor so that user configured value can be used DEFAULT_QUERY_CLIENT = DEFAULT_CLIENT.newBuilder() .addInterceptor(new RetryInterceptor(DEFAULT_MAX_RETRY)) + .eventListener(new HttpEventListener()) + .connectionPool(new ConnectionPool(30, 5, TimeUnit.MINUTES)) .build(); } @@ -121,7 +126,8 @@ protected Response getResponse(Request request) throws IOException { // use queryClient to fetch metadata or to execute the query Response response = queryClient.newCall(request).execute(); long endTime = System.currentTimeMillis(); - log.info("Total time taken to get response for url {} is {} ms", request.url(), endTime - startTime); + String traceId = response.header(Constants.TRACE_ID); + log.info("Total time taken to get response for url {} is {} ms with traceid {}", request.url(), endTime - startTime, traceId); return response; } } diff --git a/src/main/java/com/salesforce/cdp/queryservice/util/QueryGrpcExecutor.java b/src/main/java/com/salesforce/cdp/queryservice/util/QueryGrpcExecutor.java index 92a7528..52a13b4 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/util/QueryGrpcExecutor.java +++ b/src/main/java/com/salesforce/cdp/queryservice/util/QueryGrpcExecutor.java @@ -90,19 +90,27 @@ public Iterator executeQueryWithRetry(String sql) th try { return Failsafe.with(retryPolicy) .get(() -> { - long startTime = System.currentTimeMillis(); - AtomicReference headers = new AtomicReference(); - AtomicReference trailers = new AtomicReference(); - Iterator response = executeQuery(sql, headers, trailers); - // This checks if there is failure in first chunk itself. - // NOTE: failure in later chunks is not handled intentionally here. - // as in that case, we expect a new request from client. - response.hasNext(); - - String traceId = getTraceIdFromGrpcResponseHeader(headers); - log.info("Time taken to get first chunk for traceId {} is {} ms", traceId, System.currentTimeMillis() - startTime); - - return response; + try{ + long startTime = System.currentTimeMillis(); + AtomicReference headers = new AtomicReference(); + AtomicReference trailers = new AtomicReference(); + Iterator response = executeQuery(sql, headers, trailers); + // This checks if there is failure in first chunk itself. + // NOTE: failure in later chunks is not handled intentionally here. + // as in that case, we expect a new request from client. + response.hasNext(); + + String traceId = getTraceIdFromGrpcResponseHeader(headers); + log.info("Time taken to get first chunk for traceId {} is {} ms", traceId, System.currentTimeMillis() - startTime); + + return response; + } + catch (Exception e) { +// log.info("Palani Failed " + e.getMessage()); +// log.info("Palani " + e.getStackTrace()); + throw e; + } + }); } catch (FailsafeException e) { if (e.getCause() != null) { diff --git a/src/main/java/com/salesforce/cdp/queryservice/util/TokenHelper.java b/src/main/java/com/salesforce/cdp/queryservice/util/TokenHelper.java index bc29c3f..727f57e 100644 --- a/src/main/java/com/salesforce/cdp/queryservice/util/TokenHelper.java +++ b/src/main/java/com/salesforce/cdp/queryservice/util/TokenHelper.java @@ -61,6 +61,10 @@ private TokenHelper() { //NOOP } + public static boolean tokenExistsInCache(String coreToken) { + return tokenCache.getIfPresent(coreToken) != null; + } + /** * Gets the token for specified credentials *