-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-8551] Validate MOR Without Precombine Set and Fix Related Bugs #12317
base: master
Are you sure you want to change the base?
Changes from all commits
76fa927
9155119
c95f9fb
77f2b83
7643abc
e4efb8f
03b2522
03dd57a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -112,7 +112,9 @@ public HoodieBaseFileGroupRecordBuffer(HoodieReaderContext<T> readerContext, | |
this.payloadClass = Option.empty(); | ||
} | ||
this.orderingFieldName = Option.ofNullable(ConfigUtils.getOrderingField(props)).orElseGet(() -> hoodieTableMetaClient.getTableConfig().getPreCombineField()); | ||
this.orderingFieldTypeOpt = recordMergeMode == RecordMergeMode.COMMIT_TIME_ORDERING ? Option.empty() : AvroSchemaUtils.findNestedFieldType(readerSchema, this.orderingFieldName); | ||
|
||
// Don't throw exception due to [HUDI-8574] | ||
this.orderingFieldTypeOpt = recordMergeMode == RecordMergeMode.COMMIT_TIME_ORDERING ? Option.empty() : AvroSchemaUtils.findNestedFieldType(readerSchema, this.orderingFieldName, false); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why can't we check for isEmptyOrNull on orderingFieldName and then avoid calling findNestedFieldType. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See above comment, the ordering field might not actually exist in the schema |
||
this.orderingFieldDefault = orderingFieldTypeOpt.map(type -> readerContext.castValue(0, type)).orElse(0); | ||
this.props = props; | ||
this.internalSchema = readerContext.getSchemaHandler().getInternalSchema(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -160,12 +160,16 @@ private Schema generateRequiredSchema() { | |
List<Schema.Field> addedFields = new ArrayList<>(); | ||
for (String field : getMandatoryFieldsForMerging(hoodieTableConfig, properties, dataSchema, recordMerger)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I checked the impl of getMandatoryFieldsForMerging. It does not return precombine if its not set. So, can you help me understand why do we need below changes ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above comments |
||
if (!findNestedField(requestedSchema, field).isPresent()) { | ||
Option<Schema.Field> foundFieldOpt = findNestedField(dataSchema, field); | ||
Option<Schema.Field> foundFieldOpt = findNestedField(dataSchema, field); | ||
if (!foundFieldOpt.isPresent()) { | ||
throw new IllegalArgumentException("Field: " + field + " does not exist in the table schema"); | ||
//see [HUDI-8574] | ||
if (!field.equals(hoodieTableConfig.getPreCombineField())) { | ||
throw new IllegalArgumentException("Field: " + field + " does not exist in the table schema"); | ||
} | ||
} else { | ||
Schema.Field foundField = foundFieldOpt.get(); | ||
addedFields.add(foundField); | ||
} | ||
Schema.Field foundField = foundFieldOpt.get(); | ||
addedFields.add(foundField); | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -82,9 +82,11 @@ private static InternalSchema addPositionalMergeCol(InternalSchema internalSchem | |
@Override | ||
public Pair<List<Schema.Field>,List<Schema.Field>> getBootstrapRequiredFields() { | ||
Pair<List<Schema.Field>,List<Schema.Field>> dataAndMetaCols = super.getBootstrapRequiredFields(); | ||
if (readerContext.supportsParquetRowIndex()) { | ||
if (!dataAndMetaCols.getLeft().isEmpty() && !dataAndMetaCols.getRight().isEmpty()) { | ||
if (readerContext.supportsParquetRowIndex() && (this.needsBootstrapMerge || this.needsMORMerge)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Previously we expected there to be a precombine if it was mor. So If you tried to just read meta cols, you would also still need to read the precombine so left and right would both have values. If you tried to only read data cols, you would also still read hoodie_record_key for mor merging in case we have to fall back to key based merging so left and right would also still both have values. Now that we don't require precombine field, If you only read the meta cols then you don't need to read any data cols. But we still want to do positional merging for mor, so we need to add the positional merge field There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why can't we make this simpler.
|
||
if (!dataAndMetaCols.getLeft().isEmpty()) { | ||
dataAndMetaCols.getLeft().add(getPositionalMergeField()); | ||
} | ||
if (!dataAndMetaCols.getRight().isEmpty()) { | ||
dataAndMetaCols.getRight().add(getPositionalMergeField()); | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -143,9 +143,12 @@ object HoodieCreateRecordUtils { | |
avroRecWithoutMeta | ||
} | ||
|
||
val hoodieRecord = if (shouldCombine && !precombineEmpty) { | ||
val orderingVal = HoodieAvroUtils.getNestedFieldVal(avroRec, precombine, | ||
false, consistentLogicalTimestampEnabled).asInstanceOf[Comparable[_]] | ||
//TODO [HUDI-8574] we can throw exception if field doesn't exist | ||
// lazy so that we don't evaluate if we short circuit the boolean expression in the if below | ||
lazy val orderingVal = HoodieAvroUtils.getNestedFieldVal(avroRec, precombine, | ||
true, consistentLogicalTimestampEnabled).asInstanceOf[Comparable[_]] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we really need lazy here. whats the issue w/ previous code here? may be I am missing something. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was trying to add validation that the precombine existed but decided against it for this pr due to backwards compatibility and scope creep of this pr. So this made sense when that other code was here. But now we can just pull it into the if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, I still think it should be like this. We need to be fault tolerant for the case where the precombine field does not exist. |
||
|
||
val hoodieRecord = if (shouldCombine && !precombineEmpty && orderingVal != null) { | ||
DataSourceUtils.createHoodieRecord(processedRecord, orderingVal, hoodieKey, | ||
config.getPayloadClass, recordLocation) | ||
} else { | ||
|
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -32,7 +32,7 @@ import org.apache.hudi.HoodieSparkSqlWriter.StreamingWriteParams | |||
import org.apache.hudi.HoodieSparkUtils.sparkAdapter | ||||
import org.apache.hudi.HoodieWriterUtils._ | ||||
import org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema | ||||
import org.apache.hudi.avro.HoodieAvroUtils | ||||
import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils} | ||||
import org.apache.hudi.client.common.HoodieSparkEngineContext | ||||
import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient} | ||||
import org.apache.hudi.commit.{DatasetBulkInsertCommitActionExecutor, DatasetBulkInsertOverwriteCommitActionExecutor, DatasetBulkInsertOverwriteTableCommitActionExecutor} | ||||
|
@@ -50,7 +50,7 @@ import org.apache.hudi.common.util.{CommitUtils, StringUtils, Option => HOption} | |||
import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME} | ||||
import org.apache.hudi.config.HoodieWriteConfig.{SPARK_SQL_MERGE_INTO_PREPPED_KEY, WRITE_TABLE_VERSION} | ||||
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieInternalConfig, HoodieWriteConfig} | ||||
import org.apache.hudi.exception.{HoodieException, HoodieRecordCreationException, HoodieWriteConflictException} | ||||
import org.apache.hudi.exception.{HoodieAvroSchemaException, HoodieException, HoodieRecordCreationException, HoodieWriteConflictException} | ||||
import org.apache.hudi.hadoop.fs.HadoopFSUtils | ||||
import org.apache.hudi.hive.ddl.HiveSyncMode | ||||
import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool} | ||||
|
@@ -746,7 +746,7 @@ class HoodieSparkSqlWriterInternal { | |||
String.valueOf(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()) | ||||
)) | ||||
|
||||
HoodieTableMetaClient.newTableBuilder() | ||||
val metaClient = HoodieTableMetaClient.newTableBuilder() | ||||
.setTableType(HoodieTableType.valueOf(tableType)) | ||||
.setTableName(tableName) | ||||
.setRecordKeyFields(recordKeyFields) | ||||
|
@@ -755,7 +755,9 @@ class HoodieSparkSqlWriterInternal { | |||
.setPayloadClassName(payloadClass) | ||||
.setRecordMergeMode(RecordMergeMode.getValue(hoodieConfig.getString(HoodieWriteConfig.RECORD_MERGE_MODE))) | ||||
.setRecordMergeStrategyId(recordMergerStrategy) | ||||
.setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD, null)) | ||||
// we can't fetch preCombine field from hoodieConfig object, since it falls back to "ts" as default value, | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is what we do for non-bootstrap, so I added this for consistency There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why can't we just remove default value for the precombine field only? hudi/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java Line 166 in 7b773fc
and then chase the test failures. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am very much in favor of doing that. But users that rely on ts as the precombine without setting it will see a behavior change. In the 6 to 8 upgrade flow, we could check the table schema and if precombine is not set and the schema includes a field "ts" then we could add it to the hoodie.properties. @yihua didn't think we should make that change though. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The approach of this PR is much more complicated than required. Previously, the
Note that if |
||||
// but we are interested in what user has set, hence fetching from optParams. | ||||
.setPreCombineField(optParams.getOrElse(PRECOMBINE_FIELD.key(), null)) | ||||
.setBootstrapIndexClass(bootstrapIndexClass) | ||||
.setBaseFileFormat(baseFileFormat) | ||||
.setBootstrapBasePath(bootstrapBasePath) | ||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whey do we trigger lookup using findNestedFieldType in the first place if ordering field is not set.
why can't we deduce it from table config that ordering field is not set bypass calling findNestedFieldType.
in other words, why do we need throwOnNotFound argument.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ordering field is set. It is just set to a nonexistent field. https://issues.apache.org/jira/browse/HUDI-8574 we have not been validating that the precombine actually exists in the table schema, so the current behavior of the filegroup reader will always throw an exception when trying to read a filegroup with log files. We don't want to completely break reading for those users. We could do a LOG.warn though?