Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 24 additions & 15 deletions src/main/scala/models/batches/SqlServerChangeTracking.scala
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,7 @@ class SqlServerChangeTrackingMergeBatch(
batchSchema: ArcaneSchema,
targetName: String,
tablePropertiesSettings: TablePropertiesSettings,
mergeKey: String,
watermarkValue: Option[String]
mergeKey: String
) extends StagedVersionedBatch
with MergeableBatch:

Expand All @@ -128,29 +127,39 @@ class SqlServerChangeTrackingMergeBatch(
columns = schema.map(f => f.name)
)

override val completedWatermarkValue: Option[String] = watermarkValue
override val completedWatermarkValue: Option[String] = None

object SqlServerChangeTrackingMergeBatch:
def empty(watermarkValue: Option[String]): SqlServerChangeTrackingMergeBatch = new SqlServerChangeTrackingMergeBatch(
"",
ArcaneSchema.empty(),
"",
EmptyTablePropertiesSettings,
"",
watermarkValue
)
def apply(
batchName: String,
batchSchema: ArcaneSchema,
targetName: String,
tablePropertiesSettings: TablePropertiesSettings,
watermarkValue: Option[String]
tablePropertiesSettings: TablePropertiesSettings
): SqlServerChangeTrackingMergeBatch =
new SqlServerChangeTrackingMergeBatch(
batchName,
batchSchema,
targetName,
tablePropertiesSettings,
batchSchema.mergeKey.name,
watermarkValue
batchSchema.mergeKey.name
)

class SqlServerChangeTrackingWatermarkBatch(
targetName: String,
watermarkValue: String
) extends StagedVersionedBatch
with MergeableBatch:

override val name: String = "watermark"
override val schema: ArcaneSchema = ArcaneSchema.empty()
override val targetTableName: String = targetName

override def reduceExpr: String = ""

override val batchQuery: MergeQuery = SqlServerChangeTrackingMergeQuery.empty

override val completedWatermarkValue: Option[String] = Some(watermarkValue)

object SqlServerChangeTrackingWatermarkBatch:
def apply(targetName: String, watermarkValue: String): SqlServerChangeTrackingWatermarkBatch =
new SqlServerChangeTrackingWatermarkBatch(targetName, watermarkValue)
6 changes: 3 additions & 3 deletions src/main/scala/models/batches/StagedBatch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ trait StagedBatch:
*/
val name: String

/** Schema for the table that holds batch data
/** Schema of the staging table created fo this batch
*/
val schema: ArcaneSchema

Expand All @@ -39,10 +39,10 @@ trait StagedBatch:
*/
def reduceExpr: String

/** Check if current batch is an empty batch - batch without schema or name is empty and should be discarded
/** Check if current batch is an empty batch - batch without name is empty and should be discarded
* @return
*/
def isEmpty: Boolean = name.isBlank || schema.isEmpty
def isEmpty: Boolean = name.isBlank

/** Common trait for StagedBatch that performs a backfill operation on the table.
*/
Expand Down
40 changes: 31 additions & 9 deletions src/main/scala/models/batches/SynapseLink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ class SynapseLinkMergeBatch(
batchSchema: ArcaneSchema,
targetName: String,
tablePropertiesSettings: TablePropertiesSettings,
mergeKey: String,
watermarkValue: Option[String]
mergeKey: String
) extends StagedVersionedBatch
with MergeableBatch:
override val name: String = batchName
Expand All @@ -125,25 +124,48 @@ class SynapseLinkMergeBatch(
columns = schema.map(f => f.name)
)

override val completedWatermarkValue: Option[String] = watermarkValue
override val completedWatermarkValue: Option[String] = None

/** Watermark-only batch
*/
class SynapseLinkWatermarkBatch(
watermarkValue: String,
targetName: String
) extends StagedVersionedBatch
with MergeableBatch:
override val name: String = "watermark"
override val schema: ArcaneSchema = ArcaneSchema.empty()
override val targetTableName: String = targetName

override def reduceExpr: String = ""

override val batchQuery: MergeQuery = SynapseLinkMergeQuery.empty

override val completedWatermarkValue: Option[String] = Some(watermarkValue)

object SynapseLinkWatermarkBatch:
def apply(
watermarkValue: String,
targetName: String
): SynapseLinkWatermarkBatch =
new SynapseLinkWatermarkBatch(
watermarkValue,
targetName
)

object SynapseLinkMergeBatch:
def empty(watermarkValue: Option[String]): SynapseLinkMergeBatch =
new SynapseLinkMergeBatch("", ArcaneSchema.empty(), "", EmptyTablePropertiesSettings, "", watermarkValue)
def apply(
batchName: String,
batchSchema: ArcaneSchema,
targetName: String,
tablePropertiesSettings: TablePropertiesSettings,
watermarkValue: Option[String]
tablePropertiesSettings: TablePropertiesSettings
): SynapseLinkMergeBatch =
new SynapseLinkMergeBatch(
batchName,
batchSchema,
targetName,
tablePropertiesSettings,
batchSchema.mergeKey.name,
watermarkValue
batchSchema.mergeKey.name
)

class SynapseLinkBackfillMergeBatch(
Expand Down
35 changes: 25 additions & 10 deletions src/main/scala/models/batches/UpsertBlob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ class UpsertBlobMergeBatch(
batchSchema: ArcaneSchema,
targetName: String,
tablePropertiesSettings: TablePropertiesSettings,
mergeKey: String,
watermarkValue: Option[String]
mergeKey: String
) extends StagedVersionedBatch
with MergeableBatch:
override val name: String = batchName
Expand All @@ -106,7 +105,7 @@ class UpsertBlobMergeBatch(
| SELECT * FROM $name ORDER BY ROW_NUMBER() OVER (PARTITION BY ${schema.mergeKey.name} ORDER BY ${BlobBatchCommons.versionField.name} DESC) FETCH FIRST 1 ROWS WITH TIES
|)""".stripMargin

override val completedWatermarkValue: Option[String] = watermarkValue
override val completedWatermarkValue: Option[String] = None

override val batchQuery: MergeQuery =
if schema.isEmpty then UpsertBlobMergeQuery.empty
Expand All @@ -120,25 +119,41 @@ class UpsertBlobMergeBatch(
)

object UpsertBlobMergeBatch:
def empty(watermarkValue: Option[String]): UpsertBlobMergeBatch =
new UpsertBlobMergeBatch("", ArcaneSchema.empty(), "", EmptyTablePropertiesSettings, "", watermarkValue)

def apply(
batchName: String,
batchSchema: ArcaneSchema,
targetName: String,
tablePropertiesSettings: TablePropertiesSettings,
watermarkValue: Option[String]
tablePropertiesSettings: TablePropertiesSettings
): UpsertBlobMergeBatch =
new UpsertBlobMergeBatch(
batchName,
batchSchema,
targetName,
tablePropertiesSettings,
batchSchema.mergeKey.name,
watermarkValue
batchSchema.mergeKey.name
)

class UpsertBlobWatermarkBatch(
targetName: String,
watermarkValue: String
) extends StagedVersionedBatch
with MergeableBatch:
override val name: String = "watermark"
override val schema: ArcaneSchema = ArcaneSchema.empty()
override val targetTableName: String = targetName

override def reduceExpr: String = ""

override val completedWatermarkValue: Option[String] = Some(watermarkValue)

override val batchQuery: MergeQuery = UpsertBlobMergeQuery.empty

object UpsertBlobWatermarkBatch:
def apply(
targetName: String,
watermarkValue: String
): UpsertBlobWatermarkBatch = new UpsertBlobWatermarkBatch(targetName, watermarkValue)

class UpsertBlobBackfillMergeBatch(
batchName: String,
batchSchema: ArcaneSchema,
Expand Down
36 changes: 36 additions & 0 deletions src/main/scala/models/maintenance/JdbcAnalyzeRequest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.sneaksanddata.arcane.framework
package models.maintenance

case class JdbcAnalyzeRequest(
tableName: String,
includedColumns: Seq[String]
) extends MaintenanceRequest:
def toSqlExpression: String =
if includedColumns.nonEmpty then
val columnList = includedColumns.map(col => s"'$col'").mkString(",")
s"ANALYZE $tableName WITH (columns = ARRAY[$columnList])"
else s"ANALYZE $tableName"

object JdbcAnalyzeRequest:
/** A request to analyze a table.
*
* @param tableName
* The name of the table to optimize.
* @param optimizeThreshold
* Columns to run ANALYZE on.
* @param includedColumns
* The file size threshold.
* @param batchCount
* The current batch counter.
*/
def apply(
tableName: String,
optimizeThreshold: Long,
includedColumns: Seq[String],
batchCount: Long
): Option[JdbcAnalyzeRequest] =

require(optimizeThreshold > 0, "Optimize threshold must be greater than 0")

if (batchCount + 1) % optimizeThreshold == 0 then Some(JdbcAnalyzeRequest(tableName, includedColumns))
else None
30 changes: 30 additions & 0 deletions src/main/scala/models/maintenance/JdbcOptimizationRequest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.sneaksanddata.arcane.framework
package models.maintenance

case class JdbcOptimizationRequest(tableName: String, fileSizeThreshold: String) extends MaintenanceRequest:
def toSqlExpression: String =
s"ALTER TABLE $tableName execute optimize(file_size_threshold => '$fileSizeThreshold')"

object JdbcOptimizationRequest:
/** A request to optimize a table.
*
* @param tableName
* The name of the table to optimize.
* @param optimizeThreshold
* The threshold for optimization.
* @param fileSizeThreshold
* The file size threshold.
* @param batchCount
* The current batch counter.
*/
def apply(
tableName: String,
optimizeThreshold: Long,
fileSizeThreshold: String,
batchCount: Long
): Option[JdbcOptimizationRequest] =

require(optimizeThreshold > 0, "Optimize threshold must be greater than 0")

if (batchCount + 1) % optimizeThreshold == 0 then Some(JdbcOptimizationRequest(tableName, fileSizeThreshold))
else None
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.sneaksanddata.arcane.framework
package models.maintenance

case class JdbcOrphanFilesExpirationRequest(
tableName: String,
retentionThreshold: String
) extends MaintenanceRequest:
def toSqlExpression: String =
s"ALTER TABLE $tableName execute remove_orphan_files(retention_threshold => '$retentionThreshold')"

object JdbcOrphanFilesExpirationRequest:
/** A request to remove orphan files.
*
* @param tableName
* The name of the table to optimize.
* @param optimizeThreshold
* The threshold for optimization.
* @param retentionThreshold
* The file size threshold.
* @param batchCount
* The current batch counter.
*/
def apply(
tableName: String,
optimizeThreshold: Long,
retentionThreshold: String,
batchCount: Long
): Option[JdbcOrphanFilesExpirationRequest] =

require(optimizeThreshold > 0, "Optimize threshold must be greater than 0")

if (batchCount + 1) % optimizeThreshold == 0 then
Some(JdbcOrphanFilesExpirationRequest(tableName, retentionThreshold))
else None
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.sneaksanddata.arcane.framework
package models.maintenance

case class JdbcSnapshotExpirationRequest(
tableName: String,
retentionThreshold: String
) extends MaintenanceRequest:
def toSqlExpression: String =
s"ALTER TABLE $tableName execute expire_snapshots(retention_threshold => '$retentionThreshold')"

object JdbcSnapshotExpirationRequest:
/** A request to expire outdated snapshots.
*
* @param tableName
* The name of the table to optimize.
* @param optimizeThreshold
* The threshold for optimization.
* @param retentionThreshold
* The file size threshold.
* @param batchCount
* The current batch counter.
*/
def apply(
tableName: String,
optimizeThreshold: Long,
retentionThreshold: String,
batchCount: Long
): Option[JdbcSnapshotExpirationRequest] =

require(optimizeThreshold > 0, "Optimize threshold must be greater than 0")

if (batchCount + 1) % optimizeThreshold == 0 then Some(JdbcSnapshotExpirationRequest(tableName, retentionThreshold))
else None
5 changes: 5 additions & 0 deletions src/main/scala/models/maintenance/MaintenanceRequest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.sneaksanddata.arcane.framework
package models.maintenance

trait MaintenanceRequest:
def toSqlExpression: String
9 changes: 0 additions & 9 deletions src/main/scala/models/schemas/DataCell.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,6 @@ object DataCell:
value
)

/** Extension method to get the schema of a DataRow.
*/
extension (row: DataRow)
def schema: ArcaneSchema =
row.foldLeft(ArcaneSchema.empty()) {
case (schema, cell) if cell.name == MergeKeyField.name => schema ++ Seq(MergeKeyField)
case (schema, cell) => schema ++ Seq(Field(cell.name, cell.Type))
}

/** Checks if the cell holds a watermark
*/
extension (cell: DataCell) def isWatermark: Boolean = cell.name == watermarkCellName
Expand Down
Loading
Loading