Apache Beam pipelines with Scala: part 3 - dynamic processing
In the third part of the series we will develop a pipeline to transform messages from “data” Pub/Sub using messages from the “control” topic as source code for our data processor. The idea is to utilize Scala toolBox. It's much easier than doing the same in Java. Basically it's just three lines of code: [code language="scala"]val toolbox = currentMirror.mkToolBox() val tree = toolbox.parse(code) toolbox.eval(tree)[/code] The problem we have to solve is performance issue. Having parsing and evaluating this code for each processed element is very ineffective. But on the other hand "control" is not something that is frequently changing. So we can definitely get a profit utilizing memoization. We can do result cache per DoFn instance but it's way too better to use it per JVM level. We just need to take care of thread safety. For the sake of simplicity I will use a synchronized approach while it would be more idiomatic and efficient to use Futures. The one more thing we have to take care with memoization: how many results will we keep cached? Here is my simple attempt: [code language="scala"]class Memoize[K, V](size: Int)(fun: K => V) { private val cache = scala.collection.concurrent.TrieMap.empty[K, V] private val queue = scala.collection.mutable.Queue.empty[K] def apply(k: K): V = cache.getOrElse(k, this.synchronized { cache.getOrElse(k, { if (queue.size >= size) { cache.remove(queue.dequeue()) } queue.enqueue(k) val v = fun(k) cache.put(k, v) v }) }) }[/code] Please note that executing all to get a result for any not-yet-cached argument will be locked until cache is computed. The next thing to think about is what K and V in our function should be. The K is obviously String. For V we need to get something that performs parsing itself
INPUT => OUTPUT
. For our needs we may want to minimize code in control messages so we convert input data into
JsonObject before pass it to function. We would get
TableRow as result. Thus
V is
JsonObject => TableRow. We expect that it would be fine to use for "control" message: [code language="scala"] (json: com.google.gson.JsonObject) => { new com.google.api.services.bigquery.model.TableRow() .set("id", json.get("id").getAsLong) .set("data", json.get("text").getAsString) }[/code] Now it's time to define an object that provides us with the ability to do something like: [code language="scala"]val tableRow = code.evalFor(json)[/code] This is the such object: [code language="scala"]object Dynamic { private val toolbox = currentMirror.mkToolBox() private val dynamic = new Memoize(10)((code: String) => { val tree = toolbox.parse(code) toolbox.eval(tree).asInstanceOf[JsonObject => TableRow] }) def apply(code: String) = dynamic(code) }[/code] We keep single per
JMV instances of
toolbox and
dynamic memoized compiler. So now we can simply use: [code language="scala"]val tableRow = Dynamic(code)(json)[/code] To make it works as
String method above we should use implicit
value class [code language="scala"] implicit class RichString(val code: String) extends AnyVal { def evalFor(arg: JsonObject): TableRow = Dynamic(code)(arg) }[/code] Now the only thing we change from our previous side input example is slightly altered
MyDoFn
: [code language="scala"] class MyDoFn(sideView: PCollectionView[java.util.List[String]]) extends DoFn[String, TableRow] with LazyLogging { @ProcessElement def processElement(c: ProcessContext) { val t0 = System.currentTimeMillis() val sideInput = c.sideInput(sideView).get(0) val inputString = c.element() logger.info(s"Getting new data=$inputString") Try { val json = new JsonParser().parse(inputString).getAsJsonObject sideInput.evalFor(json) } match { case Success(row) => logger.info(s"Inserting to BiqQuery: $row") c.output(row) case Failure(ex) => logger.info(s"Unable to parse message: $inputString", ex) } val t1 = System.currentTimeMillis() logger.info(s"Processed data in ${t1 - t0} ms") } }[/code] I added logger for time processing to show how much time it saves using cached functions. We are ready to test it now. Start it locally with
sbt: [code language="scala"]$ sbt run [info] Loading settings from plugins.sbt ... [info] Loading project definition from C:\Users\Valentin\workspace\beam-dynamic\project [info] Loading settings from build.sbt ... [info] Set current project to beam-dynamic (in build file:/C:/Users/Valentin/workspace/beam-dynamic/) [info] Running com.pythian.Beam[/code] Now publish our first attempt code into "control" topic and then publish data to "data" topic [code language="scala"]//for the "control topic" (json: com.google.gson.JsonObject) => { new com.google.api.services.bigquery.model.TableRow() .set("id", json.get("id").getAsLong) .set("data", "placeholder") } // for the "data" topic {"id":1,"text":"row1"} {"id":2,"text":"row2"} {"id":3,"text":"row3"}[/code] We can see how data is processed in the logs: [code language="scala"]2017/12/11 17:47:24.049 INFO com.pythian.Beam$MyDoFn - Getting new data={"id":1,"text":"row1"} 2017/12/11 17:47:24.544 INFO com.pythian.Beam$MyDoFn - Getting new data={"id":2,"text":"row2"} 2017/12/11 17:47:25.816 INFO com.pythian.Beam$MyDoFn - Inserting to BiqQuery: {id=1, data=placeholder} 2017/12/11 17:47:25.816 INFO com.pythian.Beam$MyDoFn - Inserting to BiqQuery: {id=2, data=placeholder} 2017/12/11 17:47:25.880 INFO com.pythian.Beam$MyDoFn - Processed data in 923 ms 2017/12/11 17:47:25.880 INFO com.pythian.Beam$MyDoFn - Processed data in 336 ms 2017/12/11 17:47:30.076 INFO com.pythian.Beam$MyDoFn - Getting new data={"id":3,"text":"row3"} 2017/12/11 17:47:30.076 INFO com.pythian.Beam$MyDoFn - Inserting to BiqQuery: {id=3, data=placeholder} 2017/12/11 17:47:30.077 INFO com.pythian.Beam$MyDoFn - Processed data in 1 ms[/code] It took about 1 sec to compile the code for
id=1, then for
id=2 it was waiting for some time on synchronized while "control" function has been compiling and for
id=3 it took just a
1 ms. Now lets change our parsing code and publish yet another 3 data rows: [code language="scala"]//for the "control" topic (json: com.google.gson.JsonObject) => { new com.google.api.services.bigquery.model.TableRow() .set("id", json.get("id").getAsLong) .set("data", json.get("text").getAsString) } //for the "data" topic {"id":4,"text":"row4"} {"id":5,"text":"row5"} {"id":6,"text":"row6"}[/code] And here is a log: [code language="scala"]2017/12/11 17:54:09.859 INFO com.pythian.Beam$MyDoFn - Getting new data={"id":4,"text":"row4"} 2017/12/11 17:54:10.237 INFO com.pythian.Beam$MyDoFn - Inserting to BiqQuery: {id=4, data=row4} 2017/12/11 17:54:10.238 INFO com.pythian.Beam$MyDoFn - Processed data in 379 ms 2017/12/11 17:54:26.885 INFO com.pythian.Beam$MyDoFn - Getting new data={"id":5,"text":"row5"} 2017/12/11 17:54:26.885 INFO com.pythian.Beam$MyDoFn - Inserting to BiqQuery: {id=5, data=row5} 2017/12/11 17:54:26.886 INFO com.pythian.Beam$MyDoFn - Processed data in 0 ms 2017/12/11 17:54:42.868 INFO com.pythian.Beam$MyDoFn - Getting new data={"id":6,"text":"row6"} 2017/12/11 17:54:42.869 INFO com.pythian.Beam$MyDoFn - Inserting to BiqQuery: {id=6, data=row6} 2017/12/11 17:54:42.871 INFO com.pythian.Beam$MyDoFn - Processed data in 1 ms[/code] As expected the new code applied and later messages processed much faster than the first one. Please note that as with any other dynamic approach you have take care about permissions for the "control" topic. You can find the code
here.
Share this
You May Also Like
These Related Stories
Apache beam pipelines With Scala: Part 2 - Side Input
Apache beam pipelines With Scala: Part 2 - Side Input
Dec 12, 2017
3
min read
Apache Cassandra 2.1 incremental repair
Apache Cassandra 2.1 incremental repair
Mar 21, 2016
3
min read
Mining Autoupgrade Results
Mining Autoupgrade Results
Dec 3, 2019
7
min read
No Comments Yet
Let us know what you think