From 81a67533dd45cd6785d0f5ebc4b998ed3960e329 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Sat, 16 Nov 2024 12:09:20 -0800 Subject: [PATCH] WIP --- .../dml/TestPartialUpdateForMergeInto.scala | 38 +++++++++++++++++-- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala index 2da26ebb0528..2fa48b037aed 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala @@ -17,26 +17,28 @@ package org.apache.spark.sql.hudi.dml +import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkUtils} import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig, HoodieReaderConfig, HoodieStorageConfig} import org.apache.hudi.common.engine.HoodieLocalEngineContext import org.apache.hudi.common.function.SerializableFunctionUnchecked import org.apache.hudi.common.model.{FileSlice, HoodieLogFile} +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.table.log.HoodieLogFileReader import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType +import org.apache.hudi.common.table.timeline.HoodieTimeline import org.apache.hudi.common.table.view.{FileSystemViewManager, FileSystemViewStorageConfig, SyncableFileSystemView} -import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.testutils.HoodieTestUtils -import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig} +import org.apache.hudi.common.util.CompactionUtils +import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, HoodieWriteConfig} import org.apache.hudi.metadata.HoodieTableMetadata -import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkUtils} import org.apache.avro.Schema import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} -import java.util.function.Predicate import java.util.{Collections, List, Optional} +import java.util.function.Predicate import scala.collection.JavaConverters._ @@ -238,6 +240,25 @@ class TestPartialUpdateForMergeInto extends HoodieSparkSqlTestBase { if (tableType.equals("mor")) { validateLogBlock(basePath, 2, Seq(Seq("price", "_ts"), Seq("_ts", "description")), true) + + spark.sql(s"set ${HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key} = 3") + // Partial updates that trigger compaction + spark.sql( + s""" + |merge into $tableName t0 + |using ( select 2 as id, '_a2' as name, 18.0 as price, 1025 as ts + |union select 3 as id, '_a3' as name, 28.0 as price, 1280 as ts) s0 + |on t0.id = s0.id + |when matched then update set price = s0.price, _ts = s0.ts + |""".stripMargin) + validateCompactionExecuted(basePath) + checkAnswer(s"select id, name, price, _ts, description from $tableName")( + Seq(1, "a1", 18.0, 1025, "a1: updated desc1"), + Seq(2, "a2", 20.0, 1270, "a2: updated desc2"), + Seq(3, "a3", 28.0, 1280, "a3: desc3") + ) + spark.sql(s"set ${HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key}" + + s" = ${HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.defaultValue()}") } if (tableType.equals("cow")) { @@ -426,4 +447,13 @@ class TestPartialUpdateForMergeInto extends HoodieSparkSqlTestBase { assertEquals(expectedSchema, actualSchema) } } + + def validateCompactionExecuted(basePath: String): Unit = { + val storageConf = HoodieTestUtils.getDefaultStorageConf + val metaClient: HoodieTableMetaClient = + HoodieTableMetaClient.builder.setConf(storageConf).setBasePath(basePath).build + val lastCommit = metaClient.getActiveTimeline.getCommitsTimeline.lastInstant().get() + assertEquals(HoodieTimeline.COMMIT_ACTION, lastCommit.getAction) + CompactionUtils.getCompactionPlan(metaClient, lastCommit.requestedTime()) + } }