Test-Driving Reflex

May 18, 2012 / By Yanick Champoux

Tags: ,

At $work, we have a need for a little job daemon that would poll jobs and process them. If there was only one kind of job involved, the solution could be nothing more complicated than:

while ( my @jobs = poll_jobs() ) {
    process( $_ ) for @jobs;
    sleep $a_wee_bit;
}

But there is more than one type of job, so the solution that we need will have to be a little more complex. In fact, that’s something that typically can be dealt with with event-drived programs. As I don’t dabble often with that kind of stuff, I jumped on the occasion to play around a little bit. Perl doesn’t lack in event-based systems – POE and AnyEvent are two big names there — but I decided to have fun with Reflex, a Moose-based system built on top of POE.

To get to my goal, I decided that I would have a generic Poller class. For each type of job to monitor and run, I will create a different object with parameters to tell it how often to poll, how to poll, and what to do with the stuff it polls. Sounds good? Perfect, then let’s go.

First things first, let’s declare the class and import our favorite logging role:

package Poller;

use 5.10.0;

use strict;
use warnings;

use Moose;

extends 'Reflex::Base';

with 'MooseX::Role::Loggable';

has '+log_to_stdout' => (
    default => 1,
);

We also want a queue to put in the jobs that we poll until we have time to process them.

has queue => (
    is => 'ro',
    traits => [ 'Array' ],
    handles => {
        add_to_queue => 'push',
        shift_queue  => 'shift',
        queue_size   => 'count',
    },
);

And now, event stuff. For that, we’re going to use Reflex::Role::Interval:

has polling_interval => (
    is      => 'ro',
    default => 5,
);

has [ qw/ polling_auto_start / ] => (
    is      => 'ro',
    default => 1,
);

has process_interval => (
    is      => 'ro',
    default => 0,
);

has [ qw/
        process_auto_start
        process_auto_repeat
        polling_auto_repeat
/ ] => (
    is => 'ro',
    default => 0,
);

has [ qw/ polling_function process_function / ] => (
    is       => 'ro',
    required => 1,
);

with 'Reflex::Role::Interval' => {
    att_interval      => $_."_interval",
    att_auto_start    => $_."_auto_start",
    att_auto_repeat   => $_."_auto_repeat",
} for qw/ polling process /;

A little verbose, but there is nothing very arcane in there. It’s just the declaration of all the settings related to the two work loops (polling and processing) that we need.

The good news, though, is that after this, we only need to set the callbacks for both work loops:

sub on_polling_interval_tick { $_[0]->polling_function->(@_) }
sub on_process_interval_tick { $_[0]->process_function->(@_) }

Since we are feeling fancy and don’t want to poll when we still have work to do, we decide what we do next based on the status of the job queue:

after [ qw/
    on_process_interval_tick
    on_polling_interval_tick
/ ] => sub {
    my $self = shift;

    my $method = join '_',
        'repeat',
        ( $self->queue_size ? 'process' : 'polling' ),
        'interval';

    $self->$method;
};

And we are done.

Poller->meta->make_immutable;

1;

With this, creating our daemon that polls different jobs is pleasantly straight-forward:

my $foo = Poller->new(
    polling_function => sub {
        state $i = 0;
        poll(@_);
    },
    process_function => \&process,
);

my $bar = Poller->new(
    polling_interval => 2,  # a little faster
    polling_function => sub {
        state $i = 'a';
        poll(@_);
    },
    process_function => \&process,
);

sub poll {
    my $self = shift;

    my @new = map { ++$i } 1..rand 5;

    $self->log( "polling " . join ', ', @new );
    $self->add_to_queue(@new);
}

sub process {
    my $self = shift;

    my $item = $self->shift_queue or return;

    $self->log( "processing $item" );
}

Reflex->run_all;

A test run to convince ourselves that everything works as advertised:

$ perl daemon.pl
[5286] polling b, c
[5286] processing b
[5286] processing c
[5286] polling d, e, f
[5286] processing d
[5286] processing e
[5286] processing f
[5286] polling 1, 2, 3
[5286] processing 1
[5286] processing 2
[5286] processing 3
[5286] polling g, h, i, j

Bottom-line? Reflex is one heck of a shiny toy. Caveat, though for the potential emptor: It’s still fairly beta, and the documentation does not always exactly reflect the current implementation. But, it’s definitively something to keep on the radar.

Leave a Reply

  • (will not be published)

XHTML: You can use these tags: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>