Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions src/main/scala/models/batches/StagedBatch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,6 @@ trait StagedBatch:
*/
def reduceExpr: String

/** Query that should be used to dispose of this batch data.
* @return
* SQL query text
*/
def disposeExpr: String = s"DROP TABLE $name"

/** Check if current batch is an empty batch - batch without schema or name is empty and should be discarded
* @return
*/
Expand Down
24 changes: 24 additions & 0 deletions src/main/scala/services/base/DisposeServiceClient.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.sneaksanddata.arcane.framework
package services.base

import models.batches.StagedBatch

import zio.Task

/** Result of a batch disposal
*/
case class BatchDisposeResult(isSuccess: Boolean)

/** A service client that disposes of data batches.
*/
trait DisposeServiceClient:
type Batch = StagedBatch

/** Disposes of a batch.
*
* @param batch
* The batch to dispose.
* @return
* The result of disposing of the batch.
*/
def disposeBatch(batch: Batch): Task[BatchDisposeResult]
19 changes: 0 additions & 19 deletions src/main/scala/services/base/MergeServiceClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ import zio.Task
*/
type BatchApplicationResult = Boolean

/** The result of disposing of a batch.
*/
class BatchDisposeResult

/** A service client that merges data batches.
*/
trait MergeServiceClient:
Expand All @@ -27,18 +23,3 @@ trait MergeServiceClient:
* The result of applying the batch.
*/
def applyBatch(batch: Batch): Task[BatchApplicationResult]

/** A service client that disposes of data batches.
*/
trait DisposeServiceClient:

type Batch = StagedBatch

/** Disposes of a batch.
*
* @param batch
* The batch to dispose.
* @return
* The result of disposing of the batch.
*/
def disposeBatch(batch: Batch): Task[BatchDisposeResult]
25 changes: 2 additions & 23 deletions src/main/scala/services/merging/JdbcMergeServiceClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,16 @@ package services.merging

import logging.ZIOLogAnnotations.*
import models.app.PluginStreamContext
import models.settings.staging.{
AlwaysImpl,
BackfillOnlyImpl,
JdbcCredentialType,
JdbcMergeServiceClientSettings,
NeverImpl
}
import models.settings.staging.{AlwaysImpl, BackfillOnlyImpl, JdbcMergeServiceClientSettings, NeverImpl}
import services.base.*
import services.merging.maintenance.{*, given}
import services.metrics.DeclaredMetrics
import services.metrics.DeclaredMetrics.*

import org.apache.iceberg.types.Type
import org.apache.iceberg.types.Type.TypeID
import org.apache.iceberg.types.Types.{DecimalType, ListType, StructType, TimestampType}
import zio.{Schedule, Task, ZIO, ZLayer}

import java.io.IOException
import java.sql.*
import scala.jdk.CollectionConverters.*

trait JdbcTableManager extends TableManager:
/** @inheritdoc
Expand Down Expand Up @@ -52,8 +42,7 @@ class JdbcMergeServiceClient(
isBackfilling: Boolean
) extends MergeServiceClient
with JdbcTableManager
with AutoCloseable
with DisposeServiceClient:
with AutoCloseable:

require(options.isValid, "Invalid JDBC url provided for the consumer")

Expand Down Expand Up @@ -89,16 +78,6 @@ class JdbcMergeServiceClient(
executeBatchQuery(batch.batchQuery.query, batch.name, "Applying", _ => true)
.gaugeDuration(declaredMetrics.batchMergeDuration)

/** @inheritdoc
*/
override def disposeBatch(batch: Batch): Task[BatchDisposeResult] =
ZIO
.unless(batch.isEmpty)(
executeBatchQuery(batch.disposeExpr, batch.name, "Disposing", _ => new BatchDisposeResult)
.gaugeDuration(declaredMetrics.batchDisposeDuration)
)
.map(_.getOrElse(new BatchDisposeResult))

/** @inheritdoc
*/
override def optimizeTable(maybeRequest: Option[TableOptimizationRequest]): Task[BatchOptimizationResult] =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.sneaksanddata.arcane.framework
package services.merging.cleanup

import services.base.{BatchDisposeResult, DisposeServiceClient}
import services.iceberg.base.StagingEntityManager

import zio.Task

/** Batch dispose client implementation that uses Iceberg REST Catalog API
*/
class CatalogDisposeServiceClient(
stagingEntityManager: StagingEntityManager
) extends DisposeServiceClient:
/** Disposes of a batch.
*
* @param batch
* The batch to dispose.
* @return
* The result of disposing of the batch.
*/
override def disposeBatch(batch: Batch): Task[BatchDisposeResult] =
stagingEntityManager.delete(batch.name).map(BatchDisposeResult(_))
Original file line number Diff line number Diff line change
Expand Up @@ -104,27 +104,6 @@ object JdbcMergeServiceClientTests extends ZIOSpecDefault:
_ <- ZIO.attemptBlocking(rs.next())
yield assertTrue(rs.getInt(1) == 10)
},
test("disposes of a batch") {
for
tableName <- ZIO.succeed("table_disposed")
batch <- ZIO.succeed(
SynapseLinkMergeBatch(
s"test.staged_$tableName",
schema,
s"test.$tableName",
TestTablePropertiesSettings,
None
)
)
_ <- setupTable(tableName)

connection <- getConnection
mergeServiceClient = getJdbcMergeServiceClient
_ <- mergeServiceClient.disposeBatch(batch)
rs <- ZIO.attemptBlocking(connection.getMetaData.getTables(null, null, s"staged_$tableName", null))
stagingTableExists <- ZIO.attemptBlocking(rs.next())
yield assertTrue(!stagingTableExists)
},
test("optimizes a table") {
for
tableName <- ZIO.succeed("table_optimized")
Expand Down
Loading