Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8551] Validate MOR Without Precombine Set and Fix Related Bugs #12317

Open
wants to merge 8 commits into
base: master
Choose a base branch
from

Conversation

jonvex
Copy link
Contributor

@jonvex jonvex commented Nov 22, 2024

Change Logs

Test MOR with precombine unset and precombine not in table schema

Test read and write for:

  • Spark sql with no recordkey
  • partial update merge into
  • cdc
  • bootstrap
  • clustering
  • compaction

Fix issues with:

  • bootstrap mor position based merge parquet row index metadata column only read
  • getOrderingValue in spark and hive records check for null or empty precombine
  • in the fg reader schema handler don't throw exception if we can't find precombine field
  • in base record buffer don't throw exception if we can't find precombine field
  • in sparksqlwriter don't set precombine default (ts) as tableconfig during bootstrap if user does not explicitly set. This matches behavior of non-bootstrap flows
  • In spark bootstrap provider, don't fail if can't find precombine field in the record
  • In createrecordutils (used by sparksqlwriter to turn the DataFrame into HoodieRecord) avro record flow don't fail if we can't find the precombine field in the record

Impact

Fix issues for mor without precombine set, or incorrectly set, and validate that it does not fail for a wide range of scenarios

Risk level (write none, low medium or high below)

low

Documentation Update

N/A

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@github-actions github-actions bot added the size:M PR with lines of changes in (100, 300] label Nov 22, 2024
@github-actions github-actions bot added size:L PR with lines of changes in (300, 1000] and removed size:M PR with lines of changes in (100, 300] labels Nov 24, 2024
@@ -82,9 +82,11 @@ private static InternalSchema addPositionalMergeCol(InternalSchema internalSchem
@Override
public Pair<List<Schema.Field>,List<Schema.Field>> getBootstrapRequiredFields() {
Pair<List<Schema.Field>,List<Schema.Field>> dataAndMetaCols = super.getBootstrapRequiredFields();
if (readerContext.supportsParquetRowIndex()) {
if (!dataAndMetaCols.getLeft().isEmpty() && !dataAndMetaCols.getRight().isEmpty()) {
if (readerContext.supportsParquetRowIndex() && (this.needsBootstrapMerge || this.needsMORMerge)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously we expected there to be a precombine if it was mor. So If you tried to just read meta cols, you would also still need to read the precombine so left and right would both have values. If you tried to only read data cols, you would also still read hoodie_record_key for mor merging in case we have to fall back to key based merging so left and right would also still both have values.

Now that we don't require precombine field, If you only read the meta cols then you don't need to read any data cols. But we still want to do positional merging for mor, so we need to add the positional merge field

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't we make this simpler.

  • poll table config and find if precombine field is set or not.
  • if yes, do changes to existing code block here.
  • if not set, remove precombine field from both dataAndMetaCols.getLeft() and dataAndMetaCols.getRight()

@@ -755,7 +755,9 @@ class HoodieSparkSqlWriterInternal {
.setPayloadClassName(payloadClass)
.setRecordMergeMode(RecordMergeMode.getValue(hoodieConfig.getString(HoodieWriteConfig.RECORD_MERGE_MODE)))
.setRecordMergeStrategyId(recordMergerStrategy)
.setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD, null))
// we can't fetch preCombine field from hoodieConfig object, since it falls back to "ts" as default value,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is what we do for non-bootstrap, so I added this for consistency

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't we just remove default value for the precombine field only?

and then chase the test failures.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am very much in favor of doing that. But users that rely on ts as the precombine without setting it will see a behavior change. In the 6 to 8 upgrade flow, we could check the table schema and if precombine is not set and the schema includes a field "ts" then we could add it to the hoodie.properties. @yihua didn't think we should make that change though.

Copy link
Contributor

@yihua yihua Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The approach of this PR is much more complicated than required. Previously, the precombine field is set as ts if the user does not configure it. We can still keep that. If the ts field does not exist, we use the natural ordering, i.e., setting 0 as the ordering value (in else branch which creates the record; second DefaultHoodieRecordPayload constructor fills the 0 as the ordering value).

val hoodieRecord = if (shouldCombine && !precombineEmpty) {
              val orderingVal = HoodieAvroUtils.getNestedFieldVal(avroRec, precombine,
                false, consistentLogicalTimestampEnabled).asInstanceOf[Comparable[_]]
              DataSourceUtils.createHoodieRecord(processedRecord, orderingVal, hoodieKey,
                config.getPayloadClass, recordLocation)
            } else {
              DataSourceUtils.createHoodieRecord(processedRecord, hoodieKey,
                config.getPayloadClass, recordLocation)
            }
  public DefaultHoodieRecordPayload(GenericRecord record, Comparable orderingVal) {
    super(record, orderingVal);
  }

  public DefaultHoodieRecordPayload(Option<GenericRecord> record) {
    this(record.isPresent() ? record.get() : null, 0); // natural order
  }

Note that if ts or another configured ordering field exists in the schema, nothing should break, which is the behavior.

@@ -216,7 +220,11 @@ public static Option<Schema.Type> findNestedFieldType(Schema schema, String fiel
for (String part : parts) {
Schema.Field foundField = resolveNullableSchema(schema).getField(part);
if (foundField == null) {
throw new HoodieAvroSchemaException(fieldName + " not a field in " + schema);
if (throwOnNotFound) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whey do we trigger lookup using findNestedFieldType in the first place if ordering field is not set.
why can't we deduce it from table config that ordering field is not set bypass calling findNestedFieldType.

in other words, why do we need throwOnNotFound argument.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ordering field is set. It is just set to a nonexistent field. https://issues.apache.org/jira/browse/HUDI-8574 we have not been validating that the precombine actually exists in the table schema, so the current behavior of the filegroup reader will always throw an exception when trying to read a filegroup with log files. We don't want to completely break reading for those users. We could do a LOG.warn though?

this.orderingFieldTypeOpt = recordMergeMode == RecordMergeMode.COMMIT_TIME_ORDERING ? Option.empty() : AvroSchemaUtils.findNestedFieldType(readerSchema, this.orderingFieldName);

// Don't throw exception due to [HUDI-8574]
this.orderingFieldTypeOpt = recordMergeMode == RecordMergeMode.COMMIT_TIME_ORDERING ? Option.empty() : AvroSchemaUtils.findNestedFieldType(readerSchema, this.orderingFieldName, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't we check for isEmptyOrNull on orderingFieldName and then avoid calling findNestedFieldType.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above comment, the ordering field might not actually exist in the schema

@@ -160,12 +160,16 @@ private Schema generateRequiredSchema() {
List<Schema.Field> addedFields = new ArrayList<>();
for (String field : getMandatoryFieldsForMerging(hoodieTableConfig, properties, dataSchema, recordMerger)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked the impl of getMandatoryFieldsForMerging. It does not return precombine if its not set. So, can you help me understand why do we need below changes ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above comments

@@ -82,9 +82,11 @@ private static InternalSchema addPositionalMergeCol(InternalSchema internalSchem
@Override
public Pair<List<Schema.Field>,List<Schema.Field>> getBootstrapRequiredFields() {
Pair<List<Schema.Field>,List<Schema.Field>> dataAndMetaCols = super.getBootstrapRequiredFields();
if (readerContext.supportsParquetRowIndex()) {
if (!dataAndMetaCols.getLeft().isEmpty() && !dataAndMetaCols.getRight().isEmpty()) {
if (readerContext.supportsParquetRowIndex() && (this.needsBootstrapMerge || this.needsMORMerge)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't we make this simpler.

  • poll table config and find if precombine field is set or not.
  • if yes, do changes to existing code block here.
  • if not set, remove precombine field from both dataAndMetaCols.getLeft() and dataAndMetaCols.getRight()

//TODO [HUDI-8574] we can throw exception if field doesn't exist
// lazy so that we don't evaluate if we short circuit the boolean expression in the if below
lazy val orderingVal = HoodieAvroUtils.getNestedFieldVal(avroRec, precombine,
true, consistentLogicalTimestampEnabled).asInstanceOf[Comparable[_]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need lazy here. whats the issue w/ previous code here? may be I am missing something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to add validation that the precombine existed but decided against it for this pr due to backwards compatibility and scope creep of this pr. So this made sense when that other code was here. But now we can just pull it into the if

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I still think it should be like this. We need to be fault tolerant for the case where the precombine field does not exist.

@@ -755,7 +755,9 @@ class HoodieSparkSqlWriterInternal {
.setPayloadClassName(payloadClass)
.setRecordMergeMode(RecordMergeMode.getValue(hoodieConfig.getString(HoodieWriteConfig.RECORD_MERGE_MODE)))
.setRecordMergeStrategyId(recordMergerStrategy)
.setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD, null))
// we can't fetch preCombine field from hoodieConfig object, since it falls back to "ts" as default value,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't we just remove default value for the precombine field only?

and then chase the test failures.

@nsivabalan
Copy link
Contributor

Did you check for auto record key generation.
specifically, can you check if we have a test where, auto generation of record keys are enabled for an MOR table and there is no precombine key set.

@nsivabalan
Copy link
Contributor

shouldn't we also remove the default value in HoodieStreamer for the source ordering field

@Parameter(names = {"--source-ordering-field"}, description = "Field within source record to decide how"

@jonvex
Copy link
Contributor Author

jonvex commented Nov 25, 2024

Did you check for auto record key generation. specifically, can you check if we have a test where, auto generation of record keys are enabled for an MOR table and there is no precombine key set.

The test in TestCreateTable does that. I have revised the test to ensure that updating the records works as well

@github-actions github-actions bot added size:XL PR with lines of changes > 1000 and removed size:L PR with lines of changes in (300, 1000] labels Nov 25, 2024
@@ -167,7 +167,7 @@ class SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetRea
HoodieAvroUtils.removeFields(skeletonRequiredSchema, rowIndexColumn))

//If we need to do position based merging with log files we will leave the row index column at the end
val dataProjection = if (getHasLogFiles && getShouldMergeUseRecordPosition) {
val dataProjection = if (getShouldMergeUseRecordPosition) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we avoid unrelated changes in this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved the fg reader changes and schema handler test to #12340

@nsivabalan
Copy link
Contributor

can we first try to understand below

  • can we skip setting precombine w/ MOR. can you check for 0.12.0, 0.13.0, 0.14.0 and 0.15.0. you can do a quick start and ensure compaction kicks in.
  • we should atelast have same support/behavior as before and regress.

@github-actions github-actions bot added size:L PR with lines of changes in (300, 1000] and removed size:XL PR with lines of changes > 1000 labels Nov 26, 2024
@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@jonvex jonvex changed the title [HUDI-8551] fix no precombine set for mor [HUDI-8551] Validate MOR Without Precombine Set and Fix Related Bugs Nov 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size:L PR with lines of changes in (300, 1000]
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants