Howdy, Stranger!

It looks like you're new here. If you want to get involved, click one of these buttons!

Substitute ints into Dataflow via Cloudbuild yaml

I've got a streaming Dataflow pipeline, written in Java with BEAM 2.35. It commits data to BigQuery via StorageWriteApi. Initially, the code looks like

BigQueryIO.writeTableRows()
  .withTimePartitioning(/* some column */)
  .withClustering(/* another column */)
  .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
  .withTriggeringFrequency(Duration.standardSeconds(30))
  .withNumStorageWriteApiStreams(20) // want to make this dynamic

This code runs in different environments eg Dev & Prod. When I deploy in Dev I want 2 StorageWriteApiStreams, in Prod I want 20, and I'm trying to pass/resolve these values at the moment I deploy with a Cloudbuild.

The cloudbuild-dev.yaml looks like

steps:
  - lots-of-steps
args:
 --numStorageWriteApiStreams=${_NUM_STORAGEWRITEAPI_STREAMS}
substitutions:
  _PROJECT: dev-project
  _NUM_STORAGEWRITEAPI_STREAMS: '2'

I expose the substitution in the job code with an interface and got references from TCS Java interview questions.

ValueProvider<String> getNumStorageWriteApiStreams();
  void setNumStorageWriteApiStreams(ValueProvider<String> numStorageWriteApiStreams);

I then refactor the writeTableRows() call to invoke getNumStorageWriteApiStreams()

BigQueryIO.writeTableRows()
  .withTimePartitioning(/* some column */)
  .withClustering(/* another column */)
  .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
  .withTriggeringFrequency(Duration.standardSeconds(30))
  .withNumStorageWriteApiStreams(Integer.parseInt(String.valueOf(options.getNumStorageWriteApiStreams())))

I'd appreciate any help I can get here thanks

Sign In or Register to comment.