diff --git a/spring-batch-infrastructure/src/main/java/org/springframework/batch/infrastructure/item/database/support/SparkSqlPagingQueryProvider.java b/spring-batch-infrastructure/src/main/java/org/springframework/batch/infrastructure/item/database/support/SparkSqlPagingQueryProvider.java new file mode 100644 index 0000000000..75009297a5 --- /dev/null +++ b/spring-batch-infrastructure/src/main/java/org/springframework/batch/infrastructure/item/database/support/SparkSqlPagingQueryProvider.java @@ -0,0 +1,50 @@ +/* + * Copyright 2006-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.batch.infrastructure.item.database.support; + +import org.springframework.batch.infrastructure.item.database.PagingQueryProvider; +import org.springframework.util.StringUtils; + +/** + * SparkSQL implementation of a {@link PagingQueryProvider} using database specific + * features. + * + * @author Rahul Kumar + * @since 2.0 + */ +public class SparkSqlPagingQueryProvider extends AbstractSqlPagingQueryProvider { + + @Override + public String generateFirstPageQuery(int pageSize) { + return SqlPagingQueryUtils.generateLimitSqlQuery(this, false, buildLimitClause(pageSize)); + } + + @Override + public String generateRemainingPagesQuery(int pageSize) { + if (StringUtils.hasText(getGroupClause())) { + return SqlPagingQueryUtils.generateLimitGroupedSqlQuery(this, buildLimitClause(pageSize)); + } + else { + return SqlPagingQueryUtils.generateLimitSqlQuery(this, true, buildLimitClause(pageSize)); + } + } + + private String buildLimitClause(int pageSize) { + return "LIMIT " + pageSize; + } + +} diff --git a/spring-batch-infrastructure/src/main/java/org/springframework/batch/infrastructure/item/database/support/SqlPagingQueryProviderFactoryBean.java b/spring-batch-infrastructure/src/main/java/org/springframework/batch/infrastructure/item/database/support/SqlPagingQueryProviderFactoryBean.java index eb74523eb3..c3b7764cfd 100644 --- a/spring-batch-infrastructure/src/main/java/org/springframework/batch/infrastructure/item/database/support/SqlPagingQueryProviderFactoryBean.java +++ b/spring-batch-infrastructure/src/main/java/org/springframework/batch/infrastructure/item/database/support/SqlPagingQueryProviderFactoryBean.java @@ -30,6 +30,7 @@ import static org.springframework.batch.infrastructure.support.DatabaseType.SQLITE; import static org.springframework.batch.infrastructure.support.DatabaseType.SQLSERVER; import static org.springframework.batch.infrastructure.support.DatabaseType.SYBASE; +import static org.springframework.batch.infrastructure.support.DatabaseType.SPARKSQL; import java.util.HashMap; import java.util.LinkedHashMap; @@ -89,6 +90,7 @@ public class SqlPagingQueryProviderFactoryBean implements FactoryBean DATABASE_TYPES = Arrays.stream(DatabaseType.values()) .collect(Collectors.toMap(DatabaseType::getProductName, Function.identity())); diff --git a/spring-batch-infrastructure/src/test/java/org/springframework/batch/infrastructure/item/database/support/SparkSqlPagingQueryProviderTests.java b/spring-batch-infrastructure/src/test/java/org/springframework/batch/infrastructure/item/database/support/SparkSqlPagingQueryProviderTests.java new file mode 100644 index 0000000000..6fb0fbc367 --- /dev/null +++ b/spring-batch-infrastructure/src/test/java/org/springframework/batch/infrastructure/item/database/support/SparkSqlPagingQueryProviderTests.java @@ -0,0 +1,100 @@ +/* + * Copyright 2006-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.infrastructure.item.database.support; + +import org.junit.jupiter.api.Test; +import org.springframework.batch.infrastructure.item.database.Order; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * @author Rahul Kumar + */ +class SparkSqlPagingQueryProviderTests extends AbstractSqlPagingQueryProviderTests { + + SparkSqlPagingQueryProviderTests() { + pagingQueryProvider = new SparkSqlPagingQueryProvider(); + } + + @Test + @Override + void testGenerateFirstPageQuery() { + String sql = "SELECT id, name, age FROM foo WHERE bar = 1 ORDER BY id ASC LIMIT 100"; + String s = pagingQueryProvider.generateFirstPageQuery(pageSize); + assertEquals(sql, s); + } + + @Test + @Override + void testGenerateRemainingPagesQuery() { + String sql = "SELECT id, name, age FROM foo WHERE (bar = 1) AND ((id > ?)) ORDER BY id ASC LIMIT 100"; + String s = pagingQueryProvider.generateRemainingPagesQuery(pageSize); + assertEquals(sql, s); + } + + @Override + @Test + void testGenerateFirstPageQueryWithGroupBy() { + pagingQueryProvider.setGroupClause("dep"); + String sql = "SELECT id, name, age FROM foo WHERE bar = 1 GROUP BY dep ORDER BY id ASC LIMIT 100"; + String s = pagingQueryProvider.generateFirstPageQuery(pageSize); + assertEquals(sql, s); + } + + @Override + @Test + void testGenerateRemainingPagesQueryWithGroupBy() { + pagingQueryProvider.setGroupClause("dep"); + String sql = "SELECT * FROM (SELECT id, name, age FROM foo WHERE bar = 1 GROUP BY dep) AS MAIN_QRY WHERE ((id > ?)) ORDER BY id ASC LIMIT 100"; + String s = pagingQueryProvider.generateRemainingPagesQuery(pageSize); + assertEquals(sql, s); + } + + @Test + void testFirstPageSqlWithAliases() { + Map sorts = new HashMap<>(); + sorts.put("owner.id", Order.ASCENDING); + + this.pagingQueryProvider = new SparkSqlPagingQueryProvider(); + this.pagingQueryProvider.setSelectClause("SELECT owner.id as ownerid, first_name, last_name, dog_name "); + this.pagingQueryProvider.setFromClause("FROM dog_owner owner INNER JOIN dog ON owner.id = dog.id "); + this.pagingQueryProvider.setSortKeys(sorts); + + String firstPage = this.pagingQueryProvider.generateFirstPageQuery(5); + String remainingPagesQuery = this.pagingQueryProvider.generateRemainingPagesQuery(5); + + assertEquals( + "SELECT owner.id as ownerid, first_name, last_name, dog_name FROM dog_owner owner INNER JOIN dog ON owner.id = dog.id ORDER BY owner.id ASC LIMIT 5", + firstPage); + assertEquals( + "SELECT owner.id as ownerid, first_name, last_name, dog_name FROM dog_owner owner INNER JOIN dog ON owner.id = dog.id WHERE ((owner.id > ?)) ORDER BY owner.id ASC LIMIT 5", + remainingPagesQuery); + } + + @Override + String getFirstPageSqlWithMultipleSortKeys() { + return "SELECT id, name, age FROM foo WHERE bar = 1 ORDER BY name ASC, id DESC LIMIT 100"; + } + + @Override + String getRemainingSqlWithMultipleSortKeys() { + return "SELECT id, name, age FROM foo WHERE (bar = 1) AND ((name > ?) OR (name = ? AND id < ?)) ORDER BY name ASC, id DESC LIMIT 100"; + } + +}