Home » How To » HOWTO: Running multiple processes in a single Perl script using POE

HOWTO: Running multiple processes in a single Perl script using POE

The perl module POE is best known for simulating multiple user threads in a perl application.

POE is also very useful for managing the forking and managing of child processes.  By using POE to handle this, the programmer can stop worrying about correctly handling the forking.

Set the number of child processes with MAX_CONCURRENT_TASKS.  For example, to allow for three child processes that do the work:

sub MAX_CONCURRENT_TASKS () { 3 }

The child process executes the do_stuff() function, so put whatever you need to run in parallel there.


From POE Child Processes 3:

#!/usr/bin/perl

# This program forks children to handle a number of slow tasks.  It
# uses POE::Filter::Reference so the child tasks can send back
# arbitrary Perl data.  The constant MAX_CONCURRENT_TASKS limits the
# number of forked processes that can run at any given time.

use warnings;
use strict;

use POE qw(Wheel::Run Filter::Reference);

sub MAX_CONCURRENT_TASKS () { 3 }

my @tasks = qw(one two three four five six seven eight nine ten);

# Start the session that will manage all the children.  The _start and# next_task events are handled by the same function.

POE::Session->create(
  inline_states => {
    _start => \&start_tasks,
    next_task   => \&start_tasks,
    task_result => \&handle_task_result,
    task_done   => \&handle_task_done,
    task_debug  => \&handle_task_debug
  }  );

# Start as many tasks as needed so that the number of tasks is no more
# than MAX_CONCURRENT_TASKS.  Every wheel event is accompanied by the
# wheel's ID.  This function saves each wheel by its ID so it can be
# referred to when its events are handled.

# Wheel::Run's Program may be a code reference.  Here it's called via
# a short anonymous sub so we can pass in parameters.

sub start_tasks {
    my $heap = $_[HEAP];

    while ( keys( %{ $heap->{task} } ) < MAX_CONCURRENT_TASKS ) {
        my $next_task = shift @tasks;
        last unless defined $next_task;

        print "Starting task for $next_task...\n";

        my $task = POE::Wheel::Run->new(
	  Program => sub { do_stuff($next_task) },
          StdoutFilter => POE::Filter::Reference->new(),
          StdoutEvent  => "task_result",
          StderrEvent  => "task_debug",
          CloseEvent   => "task_done"
        );

        $heap->{task}->{ $task->ID } = $task;
    }
}

# This function is not a POE function!  It is a plain sub that will be
# run in a forked off child.  It uses POE::Filter::Reference so that
# it can return arbitrary information.  All POE filters can be used by
# themselves, but their parameters and return values are always list
# references.

sub do_stuff {
  binmode(STDOUT);
  # Required for this to work on MSWin32
  my $task   = shift;
  my $filter = POE::Filter::Reference->new();

  # Simulate a long, blocking task.

  sleep( rand 5 );

  # Generate a bogus result.  Note that this result will be passed by
  # reference back to the parent process via POE::Filter::Reference.
  my %result = (task => $task, status => "seems ok to me" );

  # Generate some output via the filter.  Note the strange use of list
  # references.
  my $output = $filter->put( [ \%result ] );
  print @$output;
}

# Handle information returned from the task.  Since we're using
# POE::Filter::Reference, the $result is however it was created in the
# child process.  In this sample, it's a hash reference.

sub handle_task_result {
  my $result = $_[ARG0];
  print "Result for $result->{task}: $result->{status}\n";
}

# Catch and display information from the child's STDERR.  This was
# useful for debugging since the child's warnings and errors were not
# being displayed otherwise.

sub handle_task_debug {
  my $result = $_[ARG0];
  print "Debug: $result\n";
}

# The task is done.  Delete the child wheel, and try to start a new
# task to take its place.

sub handle_task_done {
  my ( $kernel, $heap, $task_id ) = @_[ KERNEL, HEAP, ARG0 ];
  delete $heap->{task}->{$task_id};
  $kernel->yield("next_task");
}

# Run until there are no more tasks.

$poe_kernel->run();

exit 0;
Share Button

Leave a Reply

Your email address will not be published. Required fields are marked *

*
*

Facebook login by WP-FB-AutoConnect