Apache beam pipelines with Scala: part 1 - template
options. You may either use
fromArgs method or set parameters manually. [code language="scala"] trait TestOptions extends PipelineOptions with DataflowPipelineOptions val options = PipelineOptionsFactory.create().as(classOf[TestOptions])[/code] The next thing to do is to define input subscription name and output table reference object for
pipeline I/O. [code language="scala"] val fullSubscriptionName = s"projects/$projectId/subscriptions/$subscription" val targetTable = new TableReference() .setProjectId(projectId) .setDatasetId(dataset) .setTableId(tableName)[/code] Now we can describe our
DoFn function. It processes json string messages trying to convert them into
TableRow: [code language="scala"] class MyDoFn extends DoFn[String, TableRow] with LazyLogging { @ProcessElement def processElement(c: ProcessContext) { val inputString = c.element() logger.info(s"Received message: $inputString") Try { Transport.getJsonFactory.fromString(inputString, classOf[TableRow]) } match { case Success(row) ? logger.info(s"Converted to TableRow: $row") c.output(row) case Failure(ex) ? logger.info(s"Unable to parse message: $inputString", ex) } } }[/code] The last thing to do is to combine all parts together using pipeline object: [code language="scala"] val p = Pipeline.create(options) p.apply("read-pubsub", PubsubIO .readStrings() .fromSubscription(fullSubscriptionName)) .apply("process", ParDo.of(new MyDoFn)) .apply("write-bq", BigQueryIO .writeTableRows() .to(targetTable) .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER))[/code] And we are ready to run it: [code language="scala"] p.run()[/code]
Run pipeline locally To start your pipeline locally you need to specify
DirectRunner in pipeline options. Then you can simply start your pipeline with
sbt run command: [code language="scala"]$ sbt run [info] Loading settings from plugins.sbt ... [info] Loading project definition from C:\Users\Valentin\workspace\beam-template\project [info] Loading settings from build.sbt ... [info] Set current project to beam-template (in build file:/C:/Users/Valentin/workspace/beam-template/) [info] Running com.pythian.Beam[/code] Then you can publish the message from cloud console into you topic to test it: [code language="scala"]{ "id": 1, "data": "test data" }[/code] In a while you should see something like: [code language="scala"]2017/12/11 01:54:16.581 INFO com.pythian.Beam$MyDoFn - Received message: { "id": 1, "data": "test data" } 2017/12/11 01:54:16.588 INFO com.pythian.Beam$MyDoFn - Converted to TableRow: {"id":1,"data":"test data"}[/code] You can now select your row from BigQuery (please note that table preview won't show the rows which are in streaming buffer yet): [code language="scala"]select * from test_nikotin.test where id = 1[/code]
Run pipeline in DataFlow Once you are done with your tests you are ready to start it on GCP. Configure runner to
DataflowRunner and run
sbt: [code language="scala"]$ sbt run [info] Loading settings from plugins.sbt ... ... [info] Running com.pythian.Beam ... 2017/12/11 01:50:04.937 INFO o.a.b.r.dataflow.util.PackageUtil - Uploading 112 files from PipelineOptions.filesToStage to staging location to prepare for execution. 2017/12/11 01:50:09.093 INFO o.a.b.r.dataflow.util.PackageUtil - Staging files complete: 111 files cached, 1 files newly uploaded 2017/12/11 01:50:09.196 INFO o.a.b.r.d.DataflowPipelineTranslator - Adding read-pubsub/PubsubUnboundedSource as step s1 ... Dataflow SDK version: 2.1.0 2017/12/11 01:50:11.064 INFO o.a.b.r.dataflow.DataflowRunner - To access the Dataflow monitoring console, please navigate to https://console.developers.google.com/project/myproject/dataflow/job/2017-12-10_14_50_12-10326138943752681303 Submitted job: 2017-12-10_14_50_12-10326138943752681303 2017/12/11 01:50:11.064 INFO o.a.b.r.dataflow.DataflowRunner - To cancel the job using the 'gcloud' tool, run: > gcloud beta dataflow jobs --project=myproject cancel 2017-12-10_14_50_12-10326138943752681303 [success] Total time: 17 s, completed Dec 11, 2017 1:50:11 AM[/code] You can navigate to
DataFlow service in cloud console to verify it's running as expected and check the logs in
Cloud Logging. You can find the code
here In the second part I'll build a pipeline with control flow from another
Pub/Sub topic via side input.
On this page
Share this
Share this
More resources
Learn more about Pythian by reading the following blogs and articles.
Creating dynamic tasks using Apache Airflow
Creating dynamic tasks using Apache Airflow
Apr 24, 2018 12:00:00 AM
7
min read
Building a custom routing NiFi processor with Scala

Building a custom routing NiFi processor with Scala
Jan 4, 2018 12:00:00 AM
4
min read
Build a CI/CD pipeline using AWS developer tools
Build a CI/CD pipeline using AWS developer tools
Jun 5, 2018 12:00:00 AM
16
min read
Ready to unlock value from your data?
With Pythian, you can accomplish your data transformation goals and more.