Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import java.io.Closeable;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import javax.sql.DataSource;

import com.google.common.collect.Iterables;
Expand Down Expand Up @@ -51,16 +53,36 @@ default DataSource create(Configuration hdpConfig) throws SQLException {
* @return The pooling type string associated with the data source.
*/
String getPoolingType();

/**
* @param hdpConfig
* @param hdpConfig Hadoop configuration object
* @param prefix the prefix for connection pool specific properties
* @param timeDurationConfigs map of property names (without prefix) to their default values in milliseconds.
* @return subset of properties prefixed by a connection pool specific substring
*/
static Properties getPrefixedProperties(Configuration hdpConfig, String factoryPrefix) {
static Properties getPrefixedProperties(Configuration hdpConfig, String prefix,
Map<String, Long> timeDurationConfigs) {
Properties dataSourceProps = new Properties();
Iterables.filter(
hdpConfig, (entry -> entry.getKey() != null && entry.getKey().startsWith(factoryPrefix)))
.forEach(entry -> dataSourceProps.put(entry.getKey(), entry.getValue()));
hdpConfig, (entry -> entry.getKey() != null && entry.getKey().startsWith(prefix)))
.forEach(entry -> {
String fullKey = entry.getKey();
String keyName = fullKey.substring(prefix.length() + 1);
Long defaultVal = timeDurationConfigs.get(keyName);
if (defaultVal != null) {
long timeMs = hdpConfig.getTimeDuration(fullKey, defaultVal, TimeUnit.MILLISECONDS);
dataSourceProps.setProperty(fullKey, String.valueOf(timeMs));
} else {
dataSourceProps.setProperty(fullKey, entry.getValue());
}
});

// Setting defaults for time duration configs if not set already
timeDurationConfigs.forEach((keyName, defaultVal) -> {
String fullKey = prefix + "." + keyName;
dataSourceProps.putIfAbsent(fullKey, String.valueOf(defaultVal));
});

return dataSourceProps;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ public class HikariCPDataSourceProvider implements DataSourceProvider {
private static final Logger LOG = LoggerFactory.getLogger(HikariCPDataSourceProvider.class);

static final String HIKARI = "hikaricp";
static final long DEFAULT_CONNECTION_TIMEOUT_MS = 60000;

private static final Map<String, Long> TIME_DURATION_CONFIGS = Map.of(
"connectionTimeout", DEFAULT_CONNECTION_TIMEOUT_MS
);

@Override
public DataSource create(Configuration hdpConfig, int maxPoolSize) throws SQLException {
Expand All @@ -50,8 +55,9 @@ public DataSource create(Configuration hdpConfig, int maxPoolSize) throws SQLExc
String user = DataSourceProvider.getMetastoreJdbcUser(hdpConfig);
String passwd = DataSourceProvider.getMetastoreJdbcPasswd(hdpConfig);

Properties properties = replacePrefix(DataSourceProvider.getPrefixedProperties(hdpConfig, HIKARI));

Properties properties = replacePrefix(
DataSourceProvider.getPrefixedProperties(hdpConfig, HIKARI, TIME_DURATION_CONFIGS));

HikariConfig config;
try {
config = new HikariConfig(properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void testDefaultHikariCpProperties() throws SQLException {
DataSource ds = dsp.create(conf);
Assert.assertTrue(ds instanceof HikariDataSource);
HikariDataSource hds = (HikariDataSource) ds;
Assert.assertEquals(30000L, hds.getConnectionTimeout());
Assert.assertEquals(60000L, hds.getConnectionTimeout());
Assert.assertEquals(1800000L, hds.getMaxLifetime());
Assert.assertEquals(0L, hds.getLeakDetectionThreshold());
Assert.assertEquals(1L, hds.getInitializationFailTimeout());
Expand All @@ -99,6 +99,32 @@ public void testSetHikariCpProperties() throws SQLException {
Assert.assertEquals(-1L, hds.getInitializationFailTimeout());
}

@Test
public void testHikariCpConnectionTimeout() throws SQLException {
Object[][] timeoutConfigs = {
{"90000", 90000L},
{"60s", 60000L},
{"90000ms", 90000L}
};

for (Object[] config : timeoutConfigs) {
String timeoutValue = (String) config[0];
long expectedValue = (long) config[1];

MetastoreConf.setVar(conf, ConfVars.CONNECTION_POOLING_TYPE, HikariCPDataSourceProvider.HIKARI);
conf.set(HikariCPDataSourceProvider.HIKARI + ".connectionTimeout", timeoutValue);
conf.set(HikariCPDataSourceProvider.HIKARI + ".initializationFailTimeout", "-1");

DataSourceProvider dsp = DataSourceProviderFactory.tryGetDataSourceProviderOrNull(conf);
Assert.assertNotNull(dsp);

DataSource ds = dsp.create(conf);
Assert.assertTrue(ds instanceof HikariDataSource);
HikariDataSource hds = (HikariDataSource) ds;
Assert.assertEquals(expectedValue, hds.getConnectionTimeout());
}
}

@Test
public void testHikariCpMaxPoolSize() {
MetastoreConf.setVar(conf, ConfVars.CONNECTION_POOLING_TYPE, HikariCPDataSourceProvider.HIKARI);
Expand Down
Loading