It looks like you're new here. If you want to get involved, click one of these buttons!
I've got a streaming Dataflow pipeline, written in Java with BEAM 2.35. It commits data to BigQuery via StorageWriteApi. Initially, the code looks like
BigQueryIO.writeTableRows() .withTimePartitioning(/* some column */) .withClustering(/* another column */) .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API) .withTriggeringFrequency(Duration.standardSeconds(30)) .withNumStorageWriteApiStreams(20) // want to make this dynamic
This code runs in different environments eg Dev & Prod. When I deploy in Dev I want 2 StorageWriteApiStreams, in Prod I want 20, and I'm trying to pass/resolve these values at the moment I deploy with a Cloudbuild.
The cloudbuild-dev.yaml looks like
steps: - lots-of-steps args: --numStorageWriteApiStreams=${_NUM_STORAGEWRITEAPI_STREAMS} substitutions: _PROJECT: dev-project _NUM_STORAGEWRITEAPI_STREAMS: '2'
I expose the substitution in the job code with an interface and got references from TCS Java interview questions.
ValueProvider<String> getNumStorageWriteApiStreams(); void setNumStorageWriteApiStreams(ValueProvider<String> numStorageWriteApiStreams);
I then refactor the writeTableRows() call to invoke getNumStorageWriteApiStreams()
BigQueryIO.writeTableRows() .withTimePartitioning(/* some column */) .withClustering(/* another column */) .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API) .withTriggeringFrequency(Duration.standardSeconds(30)) .withNumStorageWriteApiStreams(Integer.parseInt(String.valueOf(options.getNumStorageWriteApiStreams())))
I'd appreciate any help I can get here thanks