Apache Beam pipelines with Scala: part 3 - dynamic processing
INPUT => OUTPUT. For our needs we may want to minimize code in control messages so we convert input data into
JsonObject before pass it to function. We would get
TableRow as result. Thus
V is
JsonObject => TableRow. We expect that it would be fine to use for "control" message: [code language="scala"] (json: com.google.gson.JsonObject) => { new com.google.api.services.bigquery.model.TableRow() .set("id", json.get("id").getAsLong) .set("data", json.get("text").getAsString) }[/code] Now it's time to define an object that provides us with the ability to do something like: [code language="scala"]val tableRow = code.evalFor(json)[/code] This is the such object: [code language="scala"]object Dynamic { private val toolbox = currentMirror.mkToolBox() private val dynamic = new Memoize(10)((code: String) => { val tree = toolbox.parse(code) toolbox.eval(tree).asInstanceOf[JsonObject => TableRow] }) def apply(code: String) = dynamic(code) }[/code] We keep single per
JMV instances of
toolbox and
dynamic memoized compiler. So now we can simply use: [code language="scala"]val tableRow = Dynamic(code)(json)[/code] To make it works as
String method above we should use implicit
value class [code language="scala"] implicit class RichString(val code: String) extends AnyVal { def evalFor(arg: JsonObject): TableRow = Dynamic(code)(arg) }[/code] Now the only thing we change from our previous side input example is slightly altered
MyDoFn: [code language="scala"] class MyDoFn(sideView: PCollectionView[java.util.List[String]]) extends DoFn[String, TableRow] with LazyLogging { @ProcessElement def processElement(c: ProcessContext) { val t0 = System.currentTimeMillis() val sideInput = c.sideInput(sideView).get(0) val inputString = c.element() logger.info(s"Getting new data=$inputString") Try { val json = new JsonParser().parse(inputString).getAsJsonObject sideInput.evalFor(json) } match { case Success(row) => logger.info(s"Inserting to BiqQuery: $row") c.output(row) case Failure(ex) => logger.info(s"Unable to parse message: $inputString", ex) } val t1 = System.currentTimeMillis() logger.info(s"Processed data in ${t1 - t0} ms") } }[/code] I added logger for time processing to show how much time it saves using cached functions. We are ready to test it now. Start it locally with
sbt: [code language="scala"]$ sbt run [info] Loading settings from plugins.sbt ... [info] Loading project definition from C:\Users\Valentin\workspace\beam-dynamic\project [info] Loading settings from build.sbt ... [info] Set current project to beam-dynamic (in build file:/C:/Users/Valentin/workspace/beam-dynamic/) [info] Running com.pythian.Beam[/code] Now publish our first attempt code into "control" topic and then publish data to "data" topic [code language="scala"]//for the "control topic" (json: com.google.gson.JsonObject) => { new com.google.api.services.bigquery.model.TableRow() .set("id", json.get("id").getAsLong) .set("data", "placeholder") } // for the "data" topic {"id":1,"text":"row1"} {"id":2,"text":"row2"} {"id":3,"text":"row3"}[/code] We can see how data is processed in the logs: [code language="scala"]2017/12/11 17:47:24.049 INFO com.pythian.Beam$MyDoFn - Getting new data={"id":1,"text":"row1"} 2017/12/11 17:47:24.544 INFO com.pythian.Beam$MyDoFn - Getting new data={"id":2,"text":"row2"} 2017/12/11 17:47:25.816 INFO com.pythian.Beam$MyDoFn - Inserting to BiqQuery: {id=1, data=placeholder} 2017/12/11 17:47:25.816 INFO com.pythian.Beam$MyDoFn - Inserting to BiqQuery: {id=2, data=placeholder} 2017/12/11 17:47:25.880 INFO com.pythian.Beam$MyDoFn - Processed data in 923 ms 2017/12/11 17:47:25.880 INFO com.pythian.Beam$MyDoFn - Processed data in 336 ms 2017/12/11 17:47:30.076 INFO com.pythian.Beam$MyDoFn - Getting new data={"id":3,"text":"row3"} 2017/12/11 17:47:30.076 INFO com.pythian.Beam$MyDoFn - Inserting to BiqQuery: {id=3, data=placeholder} 2017/12/11 17:47:30.077 INFO com.pythian.Beam$MyDoFn - Processed data in 1 ms[/code] It took about 1 sec to compile the code for
id=1, then for
id=2 it was waiting for some time on synchronized while "control" function has been compiling and for
id=3 it took just a
1 ms. Now lets change our parsing code and publish yet another 3 data rows: [code language="scala"]//for the "control" topic (json: com.google.gson.JsonObject) => { new com.google.api.services.bigquery.model.TableRow() .set("id", json.get("id").getAsLong) .set("data", json.get("text").getAsString) } //for the "data" topic {"id":4,"text":"row4"} {"id":5,"text":"row5"} {"id":6,"text":"row6"}[/code] And here is a log: [code language="scala"]2017/12/11 17:54:09.859 INFO com.pythian.Beam$MyDoFn - Getting new data={"id":4,"text":"row4"} 2017/12/11 17:54:10.237 INFO com.pythian.Beam$MyDoFn - Inserting to BiqQuery: {id=4, data=row4} 2017/12/11 17:54:10.238 INFO com.pythian.Beam$MyDoFn - Processed data in 379 ms 2017/12/11 17:54:26.885 INFO com.pythian.Beam$MyDoFn - Getting new data={"id":5,"text":"row5"} 2017/12/11 17:54:26.885 INFO com.pythian.Beam$MyDoFn - Inserting to BiqQuery: {id=5, data=row5} 2017/12/11 17:54:26.886 INFO com.pythian.Beam$MyDoFn - Processed data in 0 ms 2017/12/11 17:54:42.868 INFO com.pythian.Beam$MyDoFn - Getting new data={"id":6,"text":"row6"} 2017/12/11 17:54:42.869 INFO com.pythian.Beam$MyDoFn - Inserting to BiqQuery: {id=6, data=row6} 2017/12/11 17:54:42.871 INFO com.pythian.Beam$MyDoFn - Processed data in 1 ms[/code] As expected the new code applied and later messages processed much faster than the first one. Please note that as with any other dynamic approach you have take care about permissions for the "control" topic. You can find the code
here.
On this page
Share this
Share this
More resources
Learn more about Pythian by reading the following blogs and articles.
Apache beam pipelines With Scala: Part 2 - Side Input
Apache beam pipelines With Scala: Part 2 - Side Input
Dec 12, 2017 12:00:00 AM
3
min read
Apache Cassandra 2.1 incremental repair
Apache Cassandra 2.1 incremental repair
Mar 21, 2016 12:00:00 AM
3
min read
Caching Alternatives in Google Dataflow: Avoiding Quota Limits and Improving Performance
Caching Alternatives in Google Dataflow: Avoiding Quota Limits and Improving Performance
Sep 1, 2021 12:00:00 AM
10
min read
Ready to unlock value from your data?
With Pythian, you can accomplish your data transformation goals and more.