Apache beam pipelines With Scala: Part 2 - Side Input
In the second part of this series we will develop a pipeline to transform messages from "data" Pub/Sub topic with the ability to control the process via "control" topic. How to pass effectively non-immutable input into DoFn, is not obvious, but there is a clue in documentation:
If the side input has multiple trigger firings, Beam uses the value from the latest trigger firing. This is particularly useful if you use a side input with a single global window and specify a trigger.Having this in hand we can utilize the construction like this: [code language="scala"] val sideView = p.apply(PubsubIO.readStrings() .fromSubscription(fullSideInputSubscriptionName)) .apply(Window.into[String](new GlobalWindows()) .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) .discardingFiredPanes()) .apply(View.asList())[/code] We have one single global window, fire trigger for incoming rows, keep only new values and convert it to the view to be able to use it for sideInput. A couple of things to note here. First of all you have to keep in mind that the system is eventually consistent and doesn't prevent message order. So in this case you may want to use
accumulatingFiredPanes
instead. Secondly, trigger firing is supposed to provide at least 1 value, but theoretically it may multiply and if you use
View.asSingletoneton
your pipeline will fail. The last thing to note is that pipeline will wait for the first message from the sideInput before it will start processing the steps using it. Now we change our template slightly to use sideView in it: [code language="scala"] p.apply(PubsubIO.readStrings().fromSubscription(fullSubscriptionName)) .apply(ParDo.of(new MyDoFn(sideView)).withSideInputs(sideView)) .apply(BigQueryIO .writeTableRows() .to(targetTable) .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER))[/code]
DoFn uses this side input to switch if we either process the data or ignore incoming messages: [code language="scala"] class MyDoFn(sideView: PCollectionView[java.util.List[String]]) extends DoFn[String, TableRow] with LazyLogging { @ProcessElement def processElement(c: ProcessContext) { val sideInput = c.sideInput(sideView).get(0) val inputString = c.element() if (sideInput == "ENABLED") { ... } else { logger.info(s"Ignoring input messages, sideInput=$sideInput") } } }[/code] To test our pipeline we will send a sequence of messages into both
Pub/Sub topics. Start the pipeline locally: [code language="scala"]$ sbt run [info] Loading settings from plugins.sbt ... [info] Loading project definition from C:\Users\Valentin\workspace\beam-sideinput\project [info] Loading settings from build.sbt ... [info] Set current project to beam-sideinput (in build file:/C:/Users/Valentin/workspace/beam-sideinput/) [info] Running com.pythian.Beam[/code] Send a few messages to "data" topic: [code language="scala"]{"id":1,"text":"row1"} {"id":2,"text":"row2"} {"id":3,"text":"row3"}[/code] There is nothing processed or ignored in the logs as far as we haven't yet published anything into the "control" topic. Now lets publish
"ENABLED"
in the "control" topic. In a while we will see three rows above processed: [code language="scala"]2017/12/11 14:43:09.773 INFO com.pythian.Beam$MyDoFn - Inserting to BiqQuery: {id=2, data=row2} 2017/12/11 14:43:09.773 INFO com.pythian.Beam$MyDoFn - Inserting to BiqQuery: {id=3, data=row3} 2017/12/11 14:43:09.773 INFO com.pythian.Beam$MyDoFn - Inserting to BiqQuery: {id=1, data=row1}[/code] Send one more message to the "data" topic -- it is processed as expected [code language="scala"]{"id":4,"text":"row4"} ... 2017/12/11 14:44:42.805 INFO com.pythian.Beam$MyDoFn - Inserting to BiqQuery: {id=4, data=row4}[/code] Now send
"DISABLED"
to the "control" topic and try yet another message to the "data" topic: [code language="scala"]{"id":5,"text":"row5"} ... 2017/12/11 14:46:32.097 INFO com.pythian.Beam$MyDoFn - Ignoring input messages, sideInput=DISABLED[/code] Exactly as expected. Note that having this pipelines running on
DataFlow there may be delay in propagation and you will get new "data" processed after you published disabling message. You can find the code
here. In the third part of this series we will build a powerful pipeline that uses side input messages as source code for data processors. This may be helpful for the building flexible pipelines that can be tuned on a fly without restarting of them.
Share this
You May Also Like
These Related Stories
Apache Beam pipelines with Scala: part 3 - dynamic processing
Apache Beam pipelines with Scala: part 3 - dynamic processing
Dec 12, 2017
5
min read
Apache Cassandra 2.1 incremental repair
Apache Cassandra 2.1 incremental repair
Mar 21, 2016
3
min read
Mining Autoupgrade Results
Mining Autoupgrade Results
Dec 3, 2019
7
min read
No Comments Yet
Let us know what you think