Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Null Pointer Error on Avro to BigQuery load #5512

Open
prakharb10 opened this issue Oct 9, 2024 · 2 comments
Open

Null Pointer Error on Avro to BigQuery load #5512

prakharb10 opened this issue Oct 9, 2024 · 2 comments
Labels
bug Something isn't working gcp io

Comments

@prakharb10
Copy link

Environment

  • scioVersion = "0.14.5"
  • beamVersion = "2.56.0"
  • scalaVersion := "2.13.14",

I have a simple Dataflow that reads Avro files from GCS and saves them into BigQuery.

val schema = new Schema.Parser().parse(
    ""
    )

sc.avroFile(
      s"gs://<path>/*.avro",
      schema
    ).saveAsBigQueryTable(
      table = Table.Spec(
        "table_name"
      ),
      writeDisposition = WRITE_TRUNCATE,
      createDisposition = CREATE_NEVER
    )

However, the following exception is thrown

{
  "textPayload": "Error message from worker: org.apache.beam.sdk.util.UserCodeException: java.lang.NullPointerException\n\torg.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)\n\torg.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles$DoFnInvoker.invokeProcessElement(Unknown Source)\n\torg.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingParDo(FnApiDoFnRunner.java:816)\n\torg.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)\n\torg.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)\n\torg.apache.beam.fn.harness.MapFnRunners$ExplodedWindowedValueMapperFactory.lambda$create$0(MapFnRunners.java:133)\n\torg.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)\n\torg.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)\n\torg.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1792)\n\torg.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)\n\torg.apache.beam.fn.harness.FnApiDoFnRunner$WindowObservingProcessBundleContext.output(FnApiDoFnRunner.java:2195)\n\torg.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1.processElement(PrepareWrite.java:84)\n\torg.apache.beam.sdk.io.gcp.bigquery.PrepareWrite$1$DoFnInvoker.invokeProcessElement(Unknown Source)\n\torg.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingParDo(FnApiDoFnRunner.java:816)\n\torg.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)\n\torg.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)\n\torg.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1792)\n\torg.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)\n\torg.apache.beam.fn.harness.FnApiDoFnRunner$WindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2219)\n\torg.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn.processElement(Read.java:322)\n\torg.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn$DoFnInvoker.invokeProcessElement(Unknown Source)\n\torg.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingSizedElementAndRestriction(FnApiDoFnRunner.java:1100)\n\torg.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:143)\n\torg.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:659)\n\torg.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:654)\n\torg.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)\n\torg.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)\n\torg.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)\n\torg.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.multiplexElements(BeamFnDataInboundObserver.java:158)\n\torg.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:537)\n\torg.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150)\n\torg.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115)\n\tjava.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tjava.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\torg.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)\n\tjava.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tjava.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tjava.base/java.lang.Thread.run(Thread.java:829)\nCaused by: java.lang.NullPointerException\n\tcom.spotify.scio.bigquery.BigQueryTypedTable$$anonfun$6.apply(BigQueryIO.scala:446)\n\tcom.spotify.scio.bigquery.BigQueryTypedTable$$anonfun$6.apply(BigQueryIO.scala:445)\n\torg.apache.beam.sdk.io.gcp.bigquery.RowWriterFactory$AvroRowWriterFactory.createRowWriter(RowWriterFactory.java:144)\n\torg.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.createAndInsertWriter(WriteBundlesToFiles.java:200)\n\torg.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:225)\n",
}
@RustedBones
Copy link
Contributor

Thanks for the bug report.
I confirm there' something wrong in beam's code: with such setup, the ConstantTableDestinations created for the BigQuery table will always return null when getSchema is called.
As you didn't provide the schema in the write API, beam tried to infer one from the table information.

You can however workaround this with converting the avro schema to a table schema yourself

val tableSchema = BigQueryUtils.toTableSchema(AvroUtils.toBeamSchema(schema))
  ...
  
  .saveAsBigQueryTable(
      table = Table.Spec(
        "table_name"
      ),
      schema = tableSchema,
      writeDisposition = WRITE_TRUNCATE,
      createDisposition = CREATE_NEVER

TBH this API is not great and should require an avro Schema when writing generic records.

I'm actually down in a rabbit hole trying to increase coherence on the BQ API side. I'd hope to fix that as part of the refactoring

@RustedBones RustedBones added bug Something isn't working gcp io labels Oct 16, 2024
@prakharb10
Copy link
Author

Thanks for the info. I should mention that I did try converting an Avro record to a TableRow using

val row = AvroConverters.toTableRow(genericRecord)

This did not work. Some data types such as date were not converted and the API threw an error when inserting rows into the table.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working gcp io
Projects
None yet
Development

No branches or pull requests

2 participants