Skip to content

Insight and analysis of technology and business strategy

Apache beam pipelines With Scala: Part 2 - Side Input

In the second part of this series we will develop a pipeline to transform messages from "data" Pub/Sub topic with the ability to control the process via "control" topic. How to pass effectively non-immutable input into DoFn, is not obvious, but there is a clue in documentation:
If the side input has multiple trigger firings, Beam uses the value from the latest trigger firing. This is particularly useful if you use a side input with a single global window and specify a trigger.
Having this in hand we can utilize the construction like this: [code language="scala"] val sideView = p.apply(PubsubIO.readStrings() .fromSubscription(fullSideInputSubscriptionName)) .apply(Window.into[String](new GlobalWindows()) .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) .discardingFiredPanes()) .apply(View.asList())[/code] We have one single global window, fire trigger for incoming rows, keep only new values and convert it to the view to be able to use it for sideInput. A couple of things to note here. First of all you have to keep in mind that the system is eventually consistent and doesn't prevent message order. So in this case you may want to use accumulatingFiredPanes instead. Secondly, trigger firing is supposed to provide at least 1 value, but theoretically it may multiply and if you use View.asSingletoneton your pipeline will fail. The last thing to note is that pipeline will wait for the first message from the sideInput before it will start processing the steps using it. Now we change our template slightly to use sideView in it: [code language="scala"] p.apply(PubsubIO.readStrings().fromSubscription(fullSubscriptionName)) .apply(ParDo.of(new MyDoFn(sideView)).withSideInputs(sideView)) .apply(BigQueryIO .writeTableRows() .to(targetTable) .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER))[/code] DoFn uses this side input to switch if we either process the data or ignore incoming messages: [code language="scala"] class MyDoFn(sideView: PCollectionView[java.util.List[String]]) extends DoFn[String, TableRow] with LazyLogging { @ProcessElement def processElement(c: ProcessContext) { val sideInput = c.sideInput(sideView).get(0) val inputString = c.element() if (sideInput == "ENABLED") { ... } else {"Ignoring input messages, sideInput=$sideInput") } } }[/code] To test our pipeline we will send a sequence of messages into both Pub/Sub topics. Start the pipeline locally: [code language="scala"]$ sbt run [info] Loading settings from plugins.sbt ... [info] Loading project definition from C:\Users\Valentin\workspace\beam-sideinput\project [info] Loading settings from build.sbt ... [info] Set current project to beam-sideinput (in build file:/C:/Users/Valentin/workspace/beam-sideinput/) [info] Running com.pythian.Beam[/code] Send a few messages to "data" topic: [code language="scala"]{"id":1,"text":"row1"} {"id":2,"text":"row2"} {"id":3,"text":"row3"}[/code] There is nothing processed or ignored in the logs as far as we haven't yet published anything into the "control" topic. Now lets publish "ENABLED" in the "control" topic. In a while we will see three rows above processed: [code language="scala"]2017/12/11 14:43:09.773 INFO com.pythian.Beam$MyDoFn - Inserting to BiqQuery: {id=2, data=row2} 2017/12/11 14:43:09.773 INFO com.pythian.Beam$MyDoFn - Inserting to BiqQuery: {id=3, data=row3} 2017/12/11 14:43:09.773 INFO com.pythian.Beam$MyDoFn - Inserting to BiqQuery: {id=1, data=row1}[/code] Send one more message to the "data" topic -- it is processed as expected [code language="scala"]{"id":4,"text":"row4"} ... 2017/12/11 14:44:42.805 INFO com.pythian.Beam$MyDoFn - Inserting to BiqQuery: {id=4, data=row4}[/code] Now send "DISABLED" to the "control" topic and try yet another message to the "data" topic: [code language="scala"]{"id":5,"text":"row5"} ... 2017/12/11 14:46:32.097 INFO com.pythian.Beam$MyDoFn - Ignoring input messages, sideInput=DISABLED[/code] Exactly as expected. Note that having this pipelines running on DataFlow there may be delay in propagation and you will get new "data" processed after you published disabling message. You can find the code here. In the third part of this series we will build a powerful pipeline that uses side input messages as source code for data processors. This may be helpful for the building flexible pipelines that can be tuned on a fly without restarting of them.

Top Categories

  • There are no suggestions because the search field is empty.

Tell us how we can help!