From 2cafaf5ed08ee6366ef88b459c7ddb19094f54a2 Mon Sep 17 00:00:00 2001 From: Rahul Kumar Date: Thu, 28 May 2026 13:16:47 +0530 Subject: [PATCH 1/2] Adding SparkSQL to DatabaseType and adding SaprkSQLPaginationQueryProvider Signed-off-by: Rahul Kumar --- .../support/SparkSqlPagingQueryProvider.java | 49 +++++++++ .../SqlPagingQueryProviderFactoryBean.java | 2 + .../infrastructure/support/DatabaseType.java | 2 +- .../SparkSqlPagingQueryProviderTests.java | 100 ++++++++++++++++++ 4 files changed, 152 insertions(+), 1 deletion(-) create mode 100644 spring-batch-infrastructure/src/main/java/org/springframework/batch/infrastructure/item/database/support/SparkSqlPagingQueryProvider.java create mode 100644 spring-batch-infrastructure/src/test/java/org/springframework/batch/infrastructure/item/database/support/SparkSqlPagingQueryProviderTests.java diff --git a/spring-batch-infrastructure/src/main/java/org/springframework/batch/infrastructure/item/database/support/SparkSqlPagingQueryProvider.java b/spring-batch-infrastructure/src/main/java/org/springframework/batch/infrastructure/item/database/support/SparkSqlPagingQueryProvider.java new file mode 100644 index 0000000000..8eacdbfc21 --- /dev/null +++ b/spring-batch-infrastructure/src/main/java/org/springframework/batch/infrastructure/item/database/support/SparkSqlPagingQueryProvider.java @@ -0,0 +1,49 @@ +/* + * Copyright 2006-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.batch.infrastructure.item.database.support; + +import org.springframework.batch.infrastructure.item.database.PagingQueryProvider; +import org.springframework.util.StringUtils; + +/** + * SparkSQL implementation of a {@link PagingQueryProvider} using database specific features. + * + * @author Rahul Kumar + * @since 2.0 + */ +public class SparkSqlPagingQueryProvider extends AbstractSqlPagingQueryProvider { + + @Override + public String generateFirstPageQuery(int pageSize) { + return SqlPagingQueryUtils.generateLimitSqlQuery(this, false, buildLimitClause(pageSize)); + } + + @Override + public String generateRemainingPagesQuery(int pageSize) { + if (StringUtils.hasText(getGroupClause())) { + return SqlPagingQueryUtils.generateLimitGroupedSqlQuery(this, buildLimitClause(pageSize)); + } + else { + return SqlPagingQueryUtils.generateLimitSqlQuery(this, true, buildLimitClause(pageSize)); + } + } + + private String buildLimitClause(int pageSize) { + return "LIMIT " + pageSize; + } + +} diff --git a/spring-batch-infrastructure/src/main/java/org/springframework/batch/infrastructure/item/database/support/SqlPagingQueryProviderFactoryBean.java b/spring-batch-infrastructure/src/main/java/org/springframework/batch/infrastructure/item/database/support/SqlPagingQueryProviderFactoryBean.java index eb74523eb3..c3b7764cfd 100644 --- a/spring-batch-infrastructure/src/main/java/org/springframework/batch/infrastructure/item/database/support/SqlPagingQueryProviderFactoryBean.java +++ b/spring-batch-infrastructure/src/main/java/org/springframework/batch/infrastructure/item/database/support/SqlPagingQueryProviderFactoryBean.java @@ -30,6 +30,7 @@ import static org.springframework.batch.infrastructure.support.DatabaseType.SQLITE; import static org.springframework.batch.infrastructure.support.DatabaseType.SQLSERVER; import static org.springframework.batch.infrastructure.support.DatabaseType.SYBASE; +import static org.springframework.batch.infrastructure.support.DatabaseType.SPARKSQL; import java.util.HashMap; import java.util.LinkedHashMap; @@ -89,6 +90,7 @@ public class SqlPagingQueryProviderFactoryBean implements FactoryBean DATABASE_TYPES = Arrays.stream(DatabaseType.values()) .collect(Collectors.toMap(DatabaseType::getProductName, Function.identity())); diff --git a/spring-batch-infrastructure/src/test/java/org/springframework/batch/infrastructure/item/database/support/SparkSqlPagingQueryProviderTests.java b/spring-batch-infrastructure/src/test/java/org/springframework/batch/infrastructure/item/database/support/SparkSqlPagingQueryProviderTests.java new file mode 100644 index 0000000000..6fb0fbc367 --- /dev/null +++ b/spring-batch-infrastructure/src/test/java/org/springframework/batch/infrastructure/item/database/support/SparkSqlPagingQueryProviderTests.java @@ -0,0 +1,100 @@ +/* + * Copyright 2006-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.infrastructure.item.database.support; + +import org.junit.jupiter.api.Test; +import org.springframework.batch.infrastructure.item.database.Order; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * @author Rahul Kumar + */ +class SparkSqlPagingQueryProviderTests extends AbstractSqlPagingQueryProviderTests { + + SparkSqlPagingQueryProviderTests() { + pagingQueryProvider = new SparkSqlPagingQueryProvider(); + } + + @Test + @Override + void testGenerateFirstPageQuery() { + String sql = "SELECT id, name, age FROM foo WHERE bar = 1 ORDER BY id ASC LIMIT 100"; + String s = pagingQueryProvider.generateFirstPageQuery(pageSize); + assertEquals(sql, s); + } + + @Test + @Override + void testGenerateRemainingPagesQuery() { + String sql = "SELECT id, name, age FROM foo WHERE (bar = 1) AND ((id > ?)) ORDER BY id ASC LIMIT 100"; + String s = pagingQueryProvider.generateRemainingPagesQuery(pageSize); + assertEquals(sql, s); + } + + @Override + @Test + void testGenerateFirstPageQueryWithGroupBy() { + pagingQueryProvider.setGroupClause("dep"); + String sql = "SELECT id, name, age FROM foo WHERE bar = 1 GROUP BY dep ORDER BY id ASC LIMIT 100"; + String s = pagingQueryProvider.generateFirstPageQuery(pageSize); + assertEquals(sql, s); + } + + @Override + @Test + void testGenerateRemainingPagesQueryWithGroupBy() { + pagingQueryProvider.setGroupClause("dep"); + String sql = "SELECT * FROM (SELECT id, name, age FROM foo WHERE bar = 1 GROUP BY dep) AS MAIN_QRY WHERE ((id > ?)) ORDER BY id ASC LIMIT 100"; + String s = pagingQueryProvider.generateRemainingPagesQuery(pageSize); + assertEquals(sql, s); + } + + @Test + void testFirstPageSqlWithAliases() { + Map sorts = new HashMap<>(); + sorts.put("owner.id", Order.ASCENDING); + + this.pagingQueryProvider = new SparkSqlPagingQueryProvider(); + this.pagingQueryProvider.setSelectClause("SELECT owner.id as ownerid, first_name, last_name, dog_name "); + this.pagingQueryProvider.setFromClause("FROM dog_owner owner INNER JOIN dog ON owner.id = dog.id "); + this.pagingQueryProvider.setSortKeys(sorts); + + String firstPage = this.pagingQueryProvider.generateFirstPageQuery(5); + String remainingPagesQuery = this.pagingQueryProvider.generateRemainingPagesQuery(5); + + assertEquals( + "SELECT owner.id as ownerid, first_name, last_name, dog_name FROM dog_owner owner INNER JOIN dog ON owner.id = dog.id ORDER BY owner.id ASC LIMIT 5", + firstPage); + assertEquals( + "SELECT owner.id as ownerid, first_name, last_name, dog_name FROM dog_owner owner INNER JOIN dog ON owner.id = dog.id WHERE ((owner.id > ?)) ORDER BY owner.id ASC LIMIT 5", + remainingPagesQuery); + } + + @Override + String getFirstPageSqlWithMultipleSortKeys() { + return "SELECT id, name, age FROM foo WHERE bar = 1 ORDER BY name ASC, id DESC LIMIT 100"; + } + + @Override + String getRemainingSqlWithMultipleSortKeys() { + return "SELECT id, name, age FROM foo WHERE (bar = 1) AND ((name > ?) OR (name = ? AND id < ?)) ORDER BY name ASC, id DESC LIMIT 100"; + } + +} From e8828bf0ae5777d11befa110c260b2c3145cc72d Mon Sep 17 00:00:00 2001 From: Rahul Kumar Date: Thu, 28 May 2026 13:37:31 +0530 Subject: [PATCH 2/2] Formating fix added. Signed off by gloul408@gmail.com Signed-off-by: Rahul Kumar --- .../support/SparkSqlPagingQueryProvider.java | 35 ++++++++++--------- .../infrastructure/support/DatabaseType.java | 3 +- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/spring-batch-infrastructure/src/main/java/org/springframework/batch/infrastructure/item/database/support/SparkSqlPagingQueryProvider.java b/spring-batch-infrastructure/src/main/java/org/springframework/batch/infrastructure/item/database/support/SparkSqlPagingQueryProvider.java index 8eacdbfc21..75009297a5 100644 --- a/spring-batch-infrastructure/src/main/java/org/springframework/batch/infrastructure/item/database/support/SparkSqlPagingQueryProvider.java +++ b/spring-batch-infrastructure/src/main/java/org/springframework/batch/infrastructure/item/database/support/SparkSqlPagingQueryProvider.java @@ -20,30 +20,31 @@ import org.springframework.util.StringUtils; /** - * SparkSQL implementation of a {@link PagingQueryProvider} using database specific features. + * SparkSQL implementation of a {@link PagingQueryProvider} using database specific + * features. * * @author Rahul Kumar * @since 2.0 */ public class SparkSqlPagingQueryProvider extends AbstractSqlPagingQueryProvider { - @Override - public String generateFirstPageQuery(int pageSize) { - return SqlPagingQueryUtils.generateLimitSqlQuery(this, false, buildLimitClause(pageSize)); - } + @Override + public String generateFirstPageQuery(int pageSize) { + return SqlPagingQueryUtils.generateLimitSqlQuery(this, false, buildLimitClause(pageSize)); + } - @Override - public String generateRemainingPagesQuery(int pageSize) { - if (StringUtils.hasText(getGroupClause())) { - return SqlPagingQueryUtils.generateLimitGroupedSqlQuery(this, buildLimitClause(pageSize)); - } - else { - return SqlPagingQueryUtils.generateLimitSqlQuery(this, true, buildLimitClause(pageSize)); - } - } + @Override + public String generateRemainingPagesQuery(int pageSize) { + if (StringUtils.hasText(getGroupClause())) { + return SqlPagingQueryUtils.generateLimitGroupedSqlQuery(this, buildLimitClause(pageSize)); + } + else { + return SqlPagingQueryUtils.generateLimitSqlQuery(this, true, buildLimitClause(pageSize)); + } + } - private String buildLimitClause(int pageSize) { - return "LIMIT " + pageSize; - } + private String buildLimitClause(int pageSize) { + return "LIMIT " + pageSize; + } } diff --git a/spring-batch-infrastructure/src/main/java/org/springframework/batch/infrastructure/support/DatabaseType.java b/spring-batch-infrastructure/src/main/java/org/springframework/batch/infrastructure/support/DatabaseType.java index 03b4558c00..dd79e75aff 100644 --- a/spring-batch-infrastructure/src/main/java/org/springframework/batch/infrastructure/support/DatabaseType.java +++ b/spring-batch-infrastructure/src/main/java/org/springframework/batch/infrastructure/support/DatabaseType.java @@ -43,7 +43,8 @@ public enum DatabaseType { DERBY("Apache Derby"), DB2("DB2"), DB2VSE("DB2VSE"), DB2ZOS("DB2ZOS"), DB2AS400("DB2AS400"), HSQL("HSQL Database Engine"), SQLSERVER("Microsoft SQL Server"), MYSQL("MySQL"), ORACLE("Oracle"), - POSTGRES("PostgreSQL"), SYBASE("Sybase"), H2("H2"), SQLITE("SQLite"), HANA("HDB"), MARIADB("MariaDB"), SPARKSQL("SparkSQL"); + POSTGRES("PostgreSQL"), SYBASE("Sybase"), H2("H2"), SQLITE("SQLite"), HANA("HDB"), MARIADB("MariaDB"), + SPARKSQL("SparkSQL"); private static final Map DATABASE_TYPES = Arrays.stream(DatabaseType.values()) .collect(Collectors.toMap(DatabaseType::getProductName, Function.identity()));