It looks like you're new here. If you want to get involved, click one of these buttons!
cloudytechi147
Member
I've got a streaming Dataflow pipeline, written in Java with BEAM 2.35. It commits data to BigQuery via StorageWriteApi. Initially, the code looks like
BigQueryIO.writeTableRows() .withTimePartitioning(/* some column */) .withClustering(/* another column */) .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API) .withTriggeringFrequency(Duration.standardSeconds(30)) .withNumStorageWriteApiStreams(20) // want to make this dynamic
This code runs in different environments eg Dev & Prod. When I deploy in Dev I want 2 StorageWriteApiStreams, in Prod I want 20, and I'm trying to pass/resolve these values at the moment I deploy with a Cloudbuild.
The cloudbuild-dev.yaml looks like
steps:
- lots-of-steps
args:
--numStorageWriteApiStreams=${_NUM_STORAGEWRITEAPI_STREAMS}
substitutions:
_PROJECT: dev-project
_NUM_STORAGEWRITEAPI_STREAMS: '2'
I expose the substitution in the job code with an interface and got references from TCS Java interview questions.
ValueProvider<String> getNumStorageWriteApiStreams(); void setNumStorageWriteApiStreams(ValueProvider<String> numStorageWriteApiStreams);
I then refactor the writeTableRows() call to invoke getNumStorageWriteApiStreams()
BigQueryIO.writeTableRows() .withTimePartitioning(/* some column */) .withClustering(/* another column */) .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API) .withTriggeringFrequency(Duration.standardSeconds(30)) .withNumStorageWriteApiStreams(Integer.parseInt(String.valueOf(options.getNumStorageWriteApiStreams())))
I'd appreciate any help I can get here thanks