Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2006-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.batch.infrastructure.item.database.support;

import org.springframework.batch.infrastructure.item.database.PagingQueryProvider;
import org.springframework.util.StringUtils;

/**
* SparkSQL implementation of a {@link PagingQueryProvider} using database specific
* features.
*
* @author Rahul Kumar
* @since 2.0
*/
public class SparkSqlPagingQueryProvider extends AbstractSqlPagingQueryProvider {

@Override
public String generateFirstPageQuery(int pageSize) {
return SqlPagingQueryUtils.generateLimitSqlQuery(this, false, buildLimitClause(pageSize));
}

@Override
public String generateRemainingPagesQuery(int pageSize) {
if (StringUtils.hasText(getGroupClause())) {
return SqlPagingQueryUtils.generateLimitGroupedSqlQuery(this, buildLimitClause(pageSize));
}
else {
return SqlPagingQueryUtils.generateLimitSqlQuery(this, true, buildLimitClause(pageSize));
}
}

private String buildLimitClause(int pageSize) {
return "LIMIT " + pageSize;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static org.springframework.batch.infrastructure.support.DatabaseType.SQLITE;
import static org.springframework.batch.infrastructure.support.DatabaseType.SQLSERVER;
import static org.springframework.batch.infrastructure.support.DatabaseType.SYBASE;
import static org.springframework.batch.infrastructure.support.DatabaseType.SPARKSQL;

import java.util.HashMap;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -89,6 +90,7 @@ public class SqlPagingQueryProviderFactoryBean implements FactoryBean<PagingQuer
providers.put(SQLITE, new SqlitePagingQueryProvider());
providers.put(SQLSERVER, new SqlServerPagingQueryProvider());
providers.put(SYBASE, new SybasePagingQueryProvider());
providers.put(SPARKSQL, new SparkSqlPagingQueryProvider());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public enum DatabaseType {

DERBY("Apache Derby"), DB2("DB2"), DB2VSE("DB2VSE"), DB2ZOS("DB2ZOS"), DB2AS400("DB2AS400"),
HSQL("HSQL Database Engine"), SQLSERVER("Microsoft SQL Server"), MYSQL("MySQL"), ORACLE("Oracle"),
POSTGRES("PostgreSQL"), SYBASE("Sybase"), H2("H2"), SQLITE("SQLite"), HANA("HDB"), MARIADB("MariaDB");
POSTGRES("PostgreSQL"), SYBASE("Sybase"), H2("H2"), SQLITE("SQLite"), HANA("HDB"), MARIADB("MariaDB"),
SPARKSQL("SparkSQL");

private static final Map<String, DatabaseType> DATABASE_TYPES = Arrays.stream(DatabaseType.values())
.collect(Collectors.toMap(DatabaseType::getProductName, Function.identity()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright 2006-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.infrastructure.item.database.support;

import org.junit.jupiter.api.Test;
import org.springframework.batch.infrastructure.item.database.Order;

import java.util.HashMap;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;

/**
* @author Rahul Kumar
*/
class SparkSqlPagingQueryProviderTests extends AbstractSqlPagingQueryProviderTests {

SparkSqlPagingQueryProviderTests() {
pagingQueryProvider = new SparkSqlPagingQueryProvider();
}

@Test
@Override
void testGenerateFirstPageQuery() {
String sql = "SELECT id, name, age FROM foo WHERE bar = 1 ORDER BY id ASC LIMIT 100";
String s = pagingQueryProvider.generateFirstPageQuery(pageSize);
assertEquals(sql, s);
}

@Test
@Override
void testGenerateRemainingPagesQuery() {
String sql = "SELECT id, name, age FROM foo WHERE (bar = 1) AND ((id > ?)) ORDER BY id ASC LIMIT 100";
String s = pagingQueryProvider.generateRemainingPagesQuery(pageSize);
assertEquals(sql, s);
}

@Override
@Test
void testGenerateFirstPageQueryWithGroupBy() {
pagingQueryProvider.setGroupClause("dep");
String sql = "SELECT id, name, age FROM foo WHERE bar = 1 GROUP BY dep ORDER BY id ASC LIMIT 100";
String s = pagingQueryProvider.generateFirstPageQuery(pageSize);
assertEquals(sql, s);
}

@Override
@Test
void testGenerateRemainingPagesQueryWithGroupBy() {
pagingQueryProvider.setGroupClause("dep");
String sql = "SELECT * FROM (SELECT id, name, age FROM foo WHERE bar = 1 GROUP BY dep) AS MAIN_QRY WHERE ((id > ?)) ORDER BY id ASC LIMIT 100";
String s = pagingQueryProvider.generateRemainingPagesQuery(pageSize);
assertEquals(sql, s);
}

@Test
void testFirstPageSqlWithAliases() {
Map<String, Order> sorts = new HashMap<>();
sorts.put("owner.id", Order.ASCENDING);

this.pagingQueryProvider = new SparkSqlPagingQueryProvider();
this.pagingQueryProvider.setSelectClause("SELECT owner.id as ownerid, first_name, last_name, dog_name ");
this.pagingQueryProvider.setFromClause("FROM dog_owner owner INNER JOIN dog ON owner.id = dog.id ");
this.pagingQueryProvider.setSortKeys(sorts);

String firstPage = this.pagingQueryProvider.generateFirstPageQuery(5);
String remainingPagesQuery = this.pagingQueryProvider.generateRemainingPagesQuery(5);

assertEquals(
"SELECT owner.id as ownerid, first_name, last_name, dog_name FROM dog_owner owner INNER JOIN dog ON owner.id = dog.id ORDER BY owner.id ASC LIMIT 5",
firstPage);
assertEquals(
"SELECT owner.id as ownerid, first_name, last_name, dog_name FROM dog_owner owner INNER JOIN dog ON owner.id = dog.id WHERE ((owner.id > ?)) ORDER BY owner.id ASC LIMIT 5",
remainingPagesQuery);
}

@Override
String getFirstPageSqlWithMultipleSortKeys() {
return "SELECT id, name, age FROM foo WHERE bar = 1 ORDER BY name ASC, id DESC LIMIT 100";
}

@Override
String getRemainingSqlWithMultipleSortKeys() {
return "SELECT id, name, age FROM foo WHERE (bar = 1) AND ((name > ?) OR (name = ? AND id < ?)) ORDER BY name ASC, id DESC LIMIT 100";
}

}