A better solution would be to add the file to an internal queue and process the files in a sub process using POE::Wheel::Run. Of course we will limit the number of sub processes
#!/usr/bin/perl
use strict;
use warnings;
use File::Basename;
use File::Find ();
use Getopt::Std;
use Linux::Inotify2;
use POE qw( Kernel Session Wheel::Run );
$|++;
#######################################
#######################################
our @found_dirs;
our $max_concurrent_tasks;
sub watch_add_dir {
my ($heap_ref, $session, $dir_name) = @_;
##############
# Watch this directory with a call back
# to the watch_hdlr() subroutine via
# a message to the POE system
##############
$heap_ref->{inotify}->watch($dir_name, IN_CREATE|IN_CLOSE_WRITE, $session->postback("watch_hdlr"));
print " Watching directory $dir_name\n";
}
sub watch_hdlr {
my ($kernel, $heap, $session, $event) = ( $_[KERNEL], $_[HEAP], $_[SESSION], $_[ARG1][0] );
my $name = $event->fullname;
my $short_name = $event->name;
##############
# We can receive many many notifications
# for a file. If we’ve already processed
# the file, do nothing.
##############
unless ($heap->{inotify}{files}{$name}) {
##############
# If a new directory is added, we need
# to watch that directory too.
##############
if ($event->IN_CREATE && -d $name) {
print "New directory: $name\n";
watch_add_dir($heap, $session, $name);
} elsif ($event->IN_CLOSE_WRITE) {
##############
# When a file descriptor that was opened for
# ’writing’ is closed, then process that
# file it was being written to. We’re
# assuming that the file is complete at this
# point as the operation will be a copy into
# the watched directory
##############
my $ext = ( fileparse($name, ‘\..*’) )[2];
if (lc($ext) eq ‘.mp3′) {
##############
# Add the file to the file process queue
##############
push @{ $heap->{task}{task_files} }, $name;
##############
# Mark that we have processed the file. If
# we don’t we will end up processing the file
# in an infinite loop because we are modifying
# the files.
##############
$heap->{inotify}{files}{$name} = 1;
##############
# Yield to "task_next_file" through so
# that we can process files in the queue.
##############
$kernel->yield("task_next_file");
}
$heap->{inotify}{files}{$name} = 1;
}
}
##############
# While possible, it is highly unlikely that we will
# overflow the notification buffers within the Linux
# kernel. If so, we should report that.
##############
print "events for $name have been lost\n" if $event->IN_Q_OVERFLOW;
}
sub task_next_file {
my ($kernel, $heap) = @_[ KERNEL, HEAP ];
##############
# Process the files in the queue up
# to the $max_concurrent_tasks at
# once. Any extras will be processed
# when a file (task) completes.
##############
while ( keys( %{ $heap->{task} } ) < $max_concurrent_tasks ) {
my $next_task_file = shift @{ $heap->{task}{task_files} };
##############
# If the $next_task_file is empty, then we can safely
# ignore it.
##############
last unless defined $next_task_file;
##############
# Use POE::Wheel::Run to fire off the
# file processing using a sub process
# to the process_file() subroutine
##############
my $task = POE::Wheel::Run->new (
Program => sub { process_file($next_task_file) },
StdoutEvent => "task_output",
CloseEvent => "task_done",
);
##############
# Update the session with the task
# information and the kernel with
# the SIG_CHILD handler. These are
# necessary for the task to execute.
##############
$heap->{task}->{ $task->ID } = $task;
$kernel->sig_child( $task->PID, "sig_child");
}
}
sub process_file {
my $file = shift;
print " Processed \"$file\"\n";
##############
# Use the eyeD3 package to convert
# the mp3 id3v2/3/4 to id3v1. If
# eyeD3 fails, we don’t really care.
##############
my $cmd_output = `eyeD3 –to-v1.1 "$file"`;
$cmd_output = `eyeD3 –remove-v2 "$file"`;
}
sub find_wanted {
my $object = $File::Find::name;
if (-d $object) {
push @found_dirs, $object;
}
}
#######################################
#######################################
#######################################
my %arg_options;
my $watch_dir;
getopts(‘d:t:’, \%arg_options);
if ($arg_options{d} && -d $arg_options{d}) {
$watch_dir = $arg_options{d};
if ($arg_options{t} && $arg_options{t} =~ /^\d+/) {
$max_concurrent_tasks = $arg_options{t};
} else {
$max_concurrent_tasks = 2;
}
##############
# We need to watch all existing sub directories
# so we will find them and add them to the
# @found_dirs array to be added to the watched
# directories when we create the Inotify object
##############
File::Find::find({wanted => \&find_wanted}, $watch_dir);
POE::Session->create
( inline_states =>
{ _start => sub {
my $inotify_FH;
##############
# alias this particular POE session to
# ’notify’ so we can easily reference
# it later if needed
##############
$_[KERNEL]->alias_set(‘notify’);
##############
# Create the Linux::INotify object
##############
$_[HEAP]{inotify} = new Linux::Inotify2
or die "Unable to create new inotify object: $!";
##############
# Add the preexisting directories to
# be watched from the @found_dirs array
##############
foreach my $dir (@found_dirs) {
watch_add_dir($_[HEAP], $_[SESSION], $dir);
}
##############
# We need to create a hash in the "notify"
# POE session so we can determine if we’ve
# processed a file already
##############
$_[HEAP]{inotify}{files} = {};
##############
# The Inotify notifications are received
# on a file descriptor. We need to read
# from it when there is something to be
# read
##############
open $inotify_FH, "< &=" . $_[HEAP]{inotify}->fileno
or die "Can’t fdopen: $!\n";
##############
# Inform POE to poll the file descriptor
##############
$_[KERNEL]->select_read( $inotify_FH, "inotify_poll" );
},
inotify_poll => sub {
$_[HEAP]{inotify}->poll;
},
watch_hdlr => \&watch_hdlr,
##############
# Process the next file in the queue
##############
task_next_file => \&task_next_file,
##############
# print the output of the job
##############
task_output => sub {
my $result = $_[ARG0];
print "$result\n";
},
##############
# When we are done with a file, go process the
# next file if there is one waiting
##############
task_done => sub {
my ($kernel, $heap, $task_id) = @_[ KERNEL, HEAP, ARG0 ];
delete $heap->{task}{$task_id};
$kernel->yield("task_next_file");
},
sig_child => sub {
my ($heap, $pid) = @_[ HEAP, ARG1 ];
my $details = delete $heap->{$pid};
},
},
);
POE::Kernel->run();
}
exit 0;