Oozing Caribou

Oct 17, 2013 / By Yanick Champoux

Tags: , , , ,

Meet Oozie’s Workflows

Oozie is a workflow scheduler for Hadoop, but that’s not terribly important right now. What is important is that it defines its workflows using an XML dialect. And as all XML things go, the result is… shall we say, less than easy on the eyes and the typing fingers. As a piece of evidence, I bring you that simple example workflow part of the Oozie distribution:


<workflow-app name="demo-wf">

    <start to="cleanup-node"/>

    <action name="cleanup-node">
        <fs>
            <delete path="${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/demo"/>
        </fs>
        <ok to="fork-node"/>
        <error to="fail"/>
    </action>

    <fork name="fork-node">
        <path start="pig-node"/>
        <path start="streaming-node"/>
    </fork>

    <action name="pig-node">
        <pig>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/demo/pig-node"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
                <property>
                    <name>mapred.map.output.compress</name>
                    <value>false</value>
                </property>
            </configuration>
            <script>id.pig</script>
            <param>INPUT=/user/${wf:user()}/${examplesRoot}/input-data/text</param>
            <param>OUTPUT=/user/${wf:user()}/${examplesRoot}/output-data/demo/pig-node</param>
        </pig>
        <ok to="join-node"/>
        <error to="fail"/>
    </action>

    <action name="streaming-node">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/demo/streaming-node"/>
            </prepare>
            <streaming>
                <mapper>/bin/cat</mapper>
                <reducer>/usr/bin/wc</reducer>
            </streaming>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>

                <property>
                    <name>mapred.input.dir</name>
                    <value>/user/${wf:user()}/${examplesRoot}/input-data/text</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>/user/${wf:user()}/${examplesRoot}/output-data/demo/streaming-node</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="join-node"/>
        <error to="fail"/>
    </action>

    <join name="join-node" to="mr-node"/>

    <action name="mr-node">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/demo/mr-node"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>

                <property>
                    <name>mapred.mapper.class</name>
                    <value>org.apache.oozie.example.DemoMapper</value>
                </property>
                <property>
                    <name>mapred.mapoutput.key.class</name>
                    <value>org.apache.hadoop.io.Text</value>
                </property>
                <property>
                    <name>mapred.mapoutput.value.class</name>
                    <value>org.apache.hadoop.io.IntWritable</value>
                </property>
                <property>
                    <name>mapred.reducer.class</name>
                    <value>org.apache.oozie.example.DemoReducer</value>
                </property>
                <property>
                    <name>mapred.map.tasks</name>
                    <value>1</value>
                </property>
                <property>
                    <name>mapred.input.dir</name>
                    <value>/user/${wf:user()}/${examplesRoot}/output-data/demo/pig-node,/user/${wf:user()}/${examplesRoot}/output-data/demo/streaming-node</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>/user/${wf:user()}/${examplesRoot}/output-data/demo/mr-node</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="decision-node"/>
        <error to="fail"/>
    </action>

    <decision name="decision-node">
        <switch>
            <case to="hdfs-node">${fs:exists(concat(concat(concat(concat(concat(nameNode, '/user/'), wf:user()), '/'), examplesRoot), '/output-data/demo/mr-node')) == "true"}</case>
            <default to="end"/>
        </switch>
    </decision>

    <action name="hdfs-node">
        <fs>
            <move source="${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/demo/mr-node"
                  target="/user/${wf:user()}/${examplesRoot}/output-data/demo/final-data"/>
        </fs>
        <ok to="end"/>
        <error to="fail"/>
    </action>

    <kill name="fail">
        <message>Demo workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>

    <end name="end"/>

</workflow-app>

Not the worst tag soup ever, I’ll admit. But still, that’s hefty on the eyes.

For the Love of the FSM, DSL That ML

At the core, the XML representation of the workflow is a fine thing. It’s very easily machine parsable and well-defined. It’s just not very friendly to us humans, and it’s one case where I think DSLs do wonders to abstract most of the tediousness and verbosity of the job.

Enter Template::Caribou, that toy templating system of mine. While its primary raison d’etre is HTML templating, it has been designed such that it’s friendly to any XML dialect. Indeed, with the help of a Hive tag library (currently available on the ‘hive’ branch of the GitHub repo of Caribou), here is how the workflow above could look.

First, we need a wrapping class:

#!/usr/bin/perl

use strict;
use warnings;

package Workflow;

use Moose;
use Template::Caribou;

with 'Template::Caribou';
with 'Template::Caribou::Files' => {
    dirs => [ '.' ],
};

my $template = Workflow->new;

print $template->render('demo');

and there is demo.bou, in all its glory:

use Template::Caribou::Tags::Hive ':all';

workflow 'demo-wf',
    start => 'cleanup-node',
    end => 'end',
    sub {

    action 'cleanup-node',
        ok => 'fork-node',
        error => 'fail',
        sub {
            fs {
                oozie_delete '${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/demo';
            }
    };

    oozie_fork 'fork-node', qw/
        pig-node
        streaming-node
    /;

    action 'pig-node',
        ok => 'join-node',
        error => 'fail',
        sub { pig
            'job-tracker' => '${JobTracker}',
            'name-node' => '${nameNode}',
            prepare => sub {
                oozie_delete '${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/demo/pig-node';
            },
            configuration => {
                'mapred.job.queue.name' => '${queueName}',
                'mapred.map.output.compress' => 'false',
            },
            script => 'id.pig',
            params => [
            'INPUT=/user/${wf:user()}/${examplesRoot}/input-data/text',
            'OUTPUT=/user/${wf:user()}/${examplesRoot}/output-data/demo/pig-node'
            ],
    };

    action 'streaming-node',
        ok => 'join-node',
        error => 'fail',
        sub {
        map_reduce
            job_tracker => '${JobTracker}',
            name_node => '${nameNode}',
            prepare => sub {
                delete => '${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/demo/streaming-node'
            },
            streaming => {
                mapper => '/bin/cat',
                reducer => '/usr/bin/wc',
            },
            configuration => {
                'mapred.job.queue.name' => '${queueName}',
                'mapred.input.dir' => '/user/${wf:user()}/${examplesRoot}/input-data/text',
                'mapred.output.dir' => '/user/${wf:user()}/${examplesRoot}/output-data/demo/streaming-node'
            },
        ;
    };

    oozie_join 'join-node' => 'mr-node';

    action 'mr-node',
        ok => 'decision-node',
        error => 'fail',
        sub {
            map_reduce
                job_tracker => '${JobTracker}',
                name_node => '${nameNode}',
                prepare => sub {
                    oozie_delete '${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/demo/mr-node';
                },
                configuration => {
                    'mapred.job.queue.name' => '${queueName}',
                    'mapred.mapper.class' => 'org.apache.oozie.example.DemoMapper',
                    'mapred.mapoutput.key.class' => 'org.apache.hadoop.io.Text',
                    'mapred.mapoutput.value.class' => 'org.apache.hadoop.io.IntWritable',
                    'mapred.reducer.class' => 'org.apache.oozie.example.DemoReducer',
                    'mapred.map.tasks' => 1,
                    'mapred.input.dir' => '/user/${wf:user()}/${examplesRoot}/output-data/demo/pig-node,/user/${wf:user()}/${examplesRoot}/output-data/demo/streaming-node',
                    'mapred.output.dir' => '/user/${wf:user()}/${examplesRoot}/output-data/demo/mr-node'
                },
        };

    decision 'decision-node', 'end',
        'hdfs-node' => q[${fs:exists(concat(concat(concat(concat(concat(nameNode, '/user/'), wf:user()), '/'), examplesRoot), '/output-data/demo/mr-node')) == "true"}];

    action 'hdfs-node',
        ok => 'end',
        error => 'fail',
        sub {
        fs {
            move '${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/demo/mr-node' => '/user/${wf:user()}/${examplesRoot}/output-data/demo/final-data';
        };
    };

    oozie_kill 'fail' => <<'URGH';
Demo workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
URGH

};

Reading the whole thing still doesn’t feel like Christmas, but it’s an improvement. And now that it’s part of a templating system, we can split the different actions into their own template/file, and then use a little bit of programmatic magic to slurp them all into the main workflow.

Also, while the example we’re using is simple enough that no great feats of simplification can be done, it’s easy to think of cases where a for loop will be our best friend. Like if we need to create lots of actions based on some parameter:


    # somewhere in the 'workflow' template

    oozie_fork 'update_tables' => map {
        'update_' . $_
    } $self->tables_to_update;

    # sub-template creating the 'update_*table*' action node
    show( 'table_update', table_name => $_ ) for $self->tables_to_update;

    oozie_join 'all_updates_done' => 'some-next-node';

Of course for loops we could also do something similar with an XSLT transform. But… y’know… no. Just… no.

Bonus Feature: Workflow Graph!

Unrelated to the stuff above, but just because I find it cute: want to make a quick graph of the workflow? Here’s a quick and dirty way to do it:

#!/usr/bin/env perl

use strict;
use warnings;

use Web::Query;
use Graph::Easy;

my $q = Web::Query->new_from_html( join '', <> );
my $graph = Graph::Easy->new;

$q->find( 'start' )->each(sub{
    $graph->add_edge( 'START' => $_[1]->attr('to') );
});

$q->find( 'end' )->each(sub{
    $graph->add_node($_[1]->attr('name') );
});

$q->find('action')->each(sub{
    for my $next (qw/ ok error /) {
        my $next_node = $_[1]->find($next)->attr('to') or next;
        $graph->add_edge(
            $_[1]->attr('name') => $next_node
        )->set_attribute( label => $next );
    }
});

$q->find('fork')->each(sub{
    my $name = $_[1]->attr('name');
    $_[1]->find('path')->each(sub{
        $graph->add_edge($name => $_[1]->attr('start'))
    });
});

$q->find('join')->each(sub{
    $graph->add_edge( map { $_[1]->attr($_) } qw/ name to / );
});

$q->find('decision')->each(sub{
    my $name = $_[1]->attr('name');
    $_[1]->find('case,default')->each(sub{
        $graph->add_edge( $name => $_[1]->attr('to') );
    });

});

print $graph->as_ascii;

Which gives us


$ perl graph.pl workflow.xml

                                               +----------------+  ok
  +------------------------------------------- | streaming-node | ------------------------------+
  |                                            +----------------+                               |
  |                                              ^                                              |
  |                                              |                                              |                                   +-----------------------------------------+
  |                                              |                                              v                                   |                                         v
  |  +-------+          +--------------+  ok   +----------------+          +----------+  ok   +-----------+     +---------+  ok   +---------------+     +-----------+  ok   +-----+
  |  | START | -------> | cleanup-node | ----> |   fork-node    | -------> | pig-node | ----> | join-node | --> | mr-node | ----> | decision-node | --> | hdfs-node | ----> | end |
  |  +-------+          +--------------+       +----------------+          +----------+       +-----------+     +---------+       +---------------+     +-----------+       +-----+
  |                       |                                                  |                                    |                                       |
  |                       | error                                            |                                    |                                       |
  |                       v                                                  |                                    |                                       |
  |            error    +---------------------------------------+  error     |                                    |                                       |
  +-------------------> |                                       | <----------+                                    |                                       |
                        |                                       |                                                 |                                       |
                        |                                       |  error                                          |                                       |
                        |                 fail                  | <-----------------------------------------------+                                       |
                        |                                       |                                                                                         |
                        |                                       |  error                                                                                  |
                        |                                       | <---------------------------------------------------------------------------------------+
                        +---------------------------------------+

Leave a Reply

  • (will not be published)

XHTML: You can use these tags: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>