Building a custom routing NiFi processor with Scala
![](https://www.pythian.com/hubfs/Imported_Blog_Media/nifi1.png)
In this post we will build a toy example NiFi processor which is still quite efficient and has powerful capabilities. Processor logic is straightforward: it will read incoming files line by line, apply given function to transform each line into key-value pairs, group them by key, write values to output files and transfer them into specified relationships based on group key. As a starting point I used nifi-processor-bundle-scala. You can deploy it right away with sbt. Relationships We have 3 permanent relationships specified in trait RouterRelationships:
RelUnmatched
-
: here will be transferred all files for the keys that don't have related dynamic relationships configured;
RelFailure
-
: here will be transferred incoming flow file if there is any failure happened during the processing;
RelIncompatible
-
: we will collect all lines that couldn't be processed into one file and move it here.
String=>(String,String)
type. Other properties are dynamic and define relationships as described above. We have to implement
getSupportedDynamicPropertyDescriptor
and
onPropertyModified
methods in our processor class.
onTrigger Here is where all processing logic lives. Once non-null flow file comes, we try to get compiled processing function from cache or actually compile it: [code language="scala"] val processorCode = context.getProperty(PROCESSOR).getValue val transform = compile(processorCode).get [/code] Then we make preparations for RoutingSink (we will touch it later): create
Sink
that takes care of creating output flow files, prepare key-value and key classes for routing, and finally create an instance of RoutingSink: [code language="scala"] val groupBy = RoutingSink.create[KeyValue, KeyType, FlowFile](FlowFileSink, _.keyType) [/code] After this we are ready to read line-by-line input file, transform lines into key-value pairs and send them to Sink. [code language="scala"] val br = new BufferedReader(new InputStreamReader(in)) var line: String = null while ( { line = br.readLine() line != null }) { val kv = Try { val (k, v) = transform(line) GoodKeyValue(k, v) }.getOrElse(ErrorKeyValue(line)) groupBy.add(kv) } in.close() [/code] After all we close it, getting list of output flow files and manage file transferring to correspondent relationships: [code language="scala"] val files = groupBy.close() files.foreach { case (ErrorKeyType, file) => session.transfer(file, RelIncompatible) case (DataKeyType(name), file) => val rel = dynamicRelationships.get.getOrElse(name, RelUnmatched) session.transfer(file, rel) } [/code]
RoutingSink RoutingSink is the akka actor based hand made implementation of routing worker pool were each worker processes only data for a one key. Worker actor creates an instance of its own Sink and adds all incoming data into it. Router actor performs housekeeping work and communications.
Build and deploy In order to build the processor, please run
mvn compile test package
. It will also execute
tests before building nar file. After packaging you should find nar file
nifi-akka-bundle/nifi-akka-nar/target/nifi-akka-nar-1.0.nar
. To use the processor in NiFi copy this file into
lib
directory. Make sure you have the same NiFi version that you used for processor building (at the moment of writing this article it was 1.4.0). Start NiFi and try to create something simple as at the picture below. Add 1 one property for dynamic relationship "black", add the following code for json decomposing: [code language="scala"] import pythian.nifi.processors.LineProcessor.GSON import com.google.gson.JsonObject (line: String) => { val jo = GSON.fromJson(line, classOf[JsonObject]) val tpe = jo.get("type").getAsString val payload = jo.get("payload") val data = GSON.toJson(payload) (tpe, data) } [/code] Add a few GetFile and PutFile processors and start it:
![](https://www.pythian.com/hs-fs/hubfs/Imported_Blog_Media/nifi1.png?width=2415&height=1533&name=nifi1.png)
Share this
Previous story
← Updating Elasticsearch indexes with Spark
Next story
Exporting custom metrics to influxdb →
You May Also Like
These Related Stories
Apache beam pipelines with Scala: part 1 - template
Apache beam pipelines with Scala: part 1 - template
Dec 12, 2017
3
min read
Creating dynamic tasks using Apache Airflow
Creating dynamic tasks using Apache Airflow
Apr 24, 2018
7
min read
Build a CI/CD pipeline using AWS developer tools
Build a CI/CD pipeline using AWS developer tools
Jun 5, 2018
16
min read
No Comments Yet
Let us know what you think