File Coverage

blib/lib/AlignDB/Run.pm
Criterion Covered Total %
statement 40 43 93.0
branch 4 4 100.0
condition n/a
subroutine 11 12 91.6
pod 1 2 50.0
total 56 61 91.8


line stmt bran cond sub pod time code
1             package AlignDB::Run;
2 2     2   61569 use Moose;
  2         578867  
  2         11  
3 2     2   10506 use POE;
  2         52975  
  2         13  
4 2     2   92440 use POE::Wheel::Run;
  2         31383  
  2         48  
5 2     2   12 use POE::Filter::Line;
  2         2  
  2         862  
6              
7             our $VERSION = '1.0.2';
8              
9             has parallel => ( is => 'rw', isa => 'Int', default => sub {4}, );
10             has jobs => ( is => 'rw', isa => 'ArrayRef', default => sub { [] }, );
11             has code => ( is => 'rw', isa => 'CodeRef', required => 1 );
12             has opt => ( is => 'rw', isa => 'HashRef', default => sub { {} } );
13              
14             sub BUILD {
15 1     1 0 1632 my $self = shift;
16              
17             POE::Session->create(
18             inline_states => {
19 1     1   185 _start => sub { $_[KERNEL]->yield("next_task") },
20             next_task => \&_next_task,
21 26     26   5666 task_message => sub { print "$_[ARG0]\n"; },
22             task_done => \&_task_done,
23             sig_child => \&_sig_child,
24             },
25             heap => {
26             parallel => $self->parallel,
27             jobs => $self->jobs,
28             code => $self->code,
29             opt => $self->opt,
30             count => 0,
31 1         35 all => scalar @{ $self->jobs },
  1         20  
32             },
33             );
34             }
35              
36             sub run {
37              
38             # Run until there are no more tasks.
39 1     1 1 1013 $poe_kernel->run;
40             }
41              
42             sub _next_task {
43 27     27   1994 my ( $kernel, $heap ) = @_[ KERNEL, HEAP ];
44              
45 27         37 my $parallel = $heap->{parallel};
46 27         25 my $jobs = $heap->{jobs};
47 27         28 my $code = $heap->{code};
48 27         23 my $opt = $heap->{opt};
49              
50 27         23 while (1) {
51 53         1956 my $running = scalar keys %{ $heap->{task} };
  53         73  
52 53 100       240 last if $running >= $parallel;
53              
54 32         21 my $next = shift @{$jobs};
  32         163  
55 32 100       81 last unless defined $next;
56              
57 26         26 $heap->{count}++;
58 26         488 printf "===Do task %u out of %u===\n", $heap->{count}, $heap->{all};
59              
60             my $task = POE::Wheel::Run->new(
61             Program => sub {
62              
63             # Required for this to work on MSWin32
64 0     0   0 binmode(STDOUT);
65 0         0 binmode(STDERR);
66              
67 0         0 $code->( $next, $opt );
68             },
69 26         401 StdioFilter => POE::Filter::Line->new,
70             StderrFilter => POE::Filter::Line->new,
71             StdoutEvent => 'task_message',
72             StderrEvent => 'task_message',
73             CloseEvent => 'task_done',
74             );
75              
76 26         62346 $heap->{task}->{ $task->ID } = $task;
77 26         158 $kernel->sig_child( $task->PID, "sig_child" );
78             }
79             }
80              
81             # Delete the child wheel, and try to start a new task to take its place.
82             sub _task_done {
83 26     26   7430 my ( $kernel, $heap, $task_id ) = @_[ KERNEL, HEAP, ARG0 ];
84 26         84 delete $heap->{task}->{$task_id};
85 26         5546 $kernel->yield("next_task");
86             }
87              
88             # Detect the CHLD signal as each of our children exits.
89             sub _sig_child {
90 26     26   2546 my ( $heap, $sig, $pid, $exit_val ) = @_[ HEAP, ARG0, ARG1, ARG2 ];
91 26         55 delete $heap->{$pid};
92             }
93              
94             1;
95              
96             __END__
97              
98             =pod
99              
100             =encoding UTF-8
101              
102             =head1 NAME
103              
104             AlignDB::Run - Run in parallel without pains.
105              
106             =head1 SYNOPSIS
107              
108             use AlignDB::Run;
109              
110             my $worker = sub {
111             my $job = shift;
112              
113             print "$job\n";
114             return;
115             };
116              
117             my $run = AlignDB::Run->new(
118             parallel => 4,
119             jobs => [1 .. 8],
120             code => $worker,
121             opt => {foo => "bar",}
122             );
123             $run->run;
124              
125             =head1 ATTRIBUTES
126              
127             C<parallel> - run in parallel mode
128              
129             C<jobs> - All jobs to be done
130              
131             C<code> - code ref
132              
133             C<opt> - hash ref
134              
135             =head1 METHODS
136              
137             =head2 run
138              
139             Start run your code
140              
141             =head1 AUTHOR
142              
143             Qiang Wang <wang-q@outlook.com>
144              
145             =head1 COPYRIGHT AND LICENSE
146              
147             This software is copyright (c) 2009- by Qiang Wang.
148              
149             This is free software; you can redistribute it and/or modify it under
150             the same terms as the Perl 5 programming language system itself.
151              
152             =cut