File Coverage

blib/lib/Parallel/Tiny.pm
Criterion Covered Total %
statement 75 76 98.6
branch 12 18 66.6
condition 7 14 50.0
subroutine 16 17 94.1
pod 2 2 100.0
total 112 127 88.1


line stmt bran cond sub pod time code
1             package Parallel::Tiny;
2              
3 11     11   933724 use strict;
  11         11  
  11         264  
4 11     11   33 use warnings;
  11         11  
  11         209  
5 11     11   77 use 5.12.0;
  11         110  
6              
7 11     11   4543 use POSIX qw(WNOHANG);
  11         37653  
  11         55  
8 11     11   14509 use Sys::Prctl qw(prctl);
  11         8624  
  11         517  
9              
10             ## prctl() defaults
11 11     11   44 use constant PR_SET_PDEATHSIG => 1;
  11         11  
  11         572  
12 11     11   33 use constant SIGHUP => 1;
  11         11  
  11         363  
13              
14             ## new() defaults
15 11     11   33 use constant DEFAULT_ERROR_TIMEOUT => 10;
  11         11  
  11         352  
16 11     11   22 use constant DEFAULT_REAP_TIMEOUT => .1;
  11         22  
  11         330  
17 11     11   33 use constant DEFAULT_SUBNAME => 'run';
  11         11  
  11         330  
18 11     11   33 use constant DEFAULT_WORKERS => 1;
  11         11  
  11         319  
19 11     11   33 use constant DEFAULT_WORKER_TOTAL => 1;
  11         11  
  11         4675  
20              
21             our $VERSION = 0.20;
22              
23             =head1 NAME
24              
25             Parallel::Tiny
26              
27             =head1 DESCRIPTION
28              
29             Provides a very simple, no frills fork manager.
30              
31             =head1 SYNOPSIS
32              
33             my $obj = My::Handler->new();
34              
35             my $forker = Parallel::Tiny->new(
36             handler => $obj,
37             subname => 'start', # My::Handler must implement start()
38             workers => 4,
39             worker_total => 'infinite',
40             );
41              
42             $forker->run();
43              
44             =head1 METHODS
45              
46             =over
47              
48             =item new()
49              
50             Returns a new Parallel::Tiny fork manager.
51              
52             takes arguments as a hash or hashref with the following arguments:
53              
54             handler - an object you provide which has a run() method (unless you define "subname")
55             (required)
56              
57             reap_timeout - the number of seconds to wait between runs of
58             waitpid() to reap children
59             (default .1)
60              
61             subname - the name of the sub you want to invoke on child spawn
62             (default 'run')
63              
64             workers - the number of simoltaneous forked processes you
65             want to allow at one time
66             (default 1)
67              
68             worker_total - the total number of processes that you want to run
69             (default 1)
70              
71             You can for instance, say that you want to run 100 proccesses,
72             but only 4 at a time like this:
73              
74             my $forker = Parallel::Tiny->new(
75             handler => $obj,
76             workers => 4,
77             worker_total => 100,
78             );
79              
80             If you want you can provide 'infinite' for worker_total.
81             If you do this, you're responsible for stopping the fork manager!
82              
83             Signals:
84             ---
85              
86             If the parent is sent SIGTERM, the parent will wait to reap all children.
87              
88             If the parent is killed before its children finish, children are configured to receive HUP.
89              
90             =cut
91              
92             sub new {
93 24     24 1 572 my $class = shift;
94 24 50       156 my $args = ref($_[0]) ? $_[0] : {@_};
95              
96             # set some defaults
97 24   50     206 $args->{reap_timeout} ||= DEFAULT_REAP_TIMEOUT;
98 24   50     186 $args->{subname} ||= DEFAULT_SUBNAME;
99 24   50     88 $args->{workers} ||= DEFAULT_WORKERS;
100 24   50     59 $args->{worker_total} ||= DEFAULT_WORKER_TOTAL;
101              
102             # special configuration
103 24 50       87 undef $args->{worker_total} if $args->{worker_total} eq 'infinite';
104              
105             # check args
106 24 50       104 die 'no handler provided' unless $args->{handler};
107 24 50       253 die "handler doesn't implement $args->{subname}()" unless $args->{handler}->can($args->{subname});
108              
109             return bless({
110             _continue => 1,
111             _handler => $args->{handler},
112             _jobs => {},
113             _reap_timeout => $args->{reap_timeout},
114             _subname => $args->{subname},
115             _workers => $args->{workers},
116             _worker_total => $args->{worker_total},
117 24         510 }, $class);
118             }
119              
120             =item run()
121              
122             Start spooling jobs according to the configuration.
123              
124             =cut
125              
126             sub run {
127 21     21 1 2851 my $self = shift;
128              
129 21     0   266 local $SIG{TERM} = sub { $self->{_continue} = 0 };
  0         0  
130              
131             # setup the fork manager
132 21         64 my $handler = $self->{_handler};
133 21         53 my $subname = $self->{_subname};
134              
135 21         49 while ($self->_waitqueue()) {
136             # parent work
137 65         39144 my $pid = fork();
138 65 100       1253 if ($pid) {
139 55 50 33     1020 $self->{_worker_total}-- if defined $self->{_worker_total} and $self->{_worker_total} > 0;
140 55         1155 $self->{_jobs}{$pid} = 1;
141 55         665 next;
142             }
143              
144             # child work
145 10         724 prctl(PR_SET_PDEATHSIG, SIGHUP);
146 10         3271 $SIG{$_} = 'DEFAULT' for keys(%SIG);
147 10         366 $0 = $0 . ' - worker';
148 10         275 $handler->$subname();
149              
150             # child cleanup
151 10         453 exit 0;
152             }
153              
154             # wait for children
155 11         1859752 while ( wait() != -1 ) {}
156              
157 11         570 return 1;
158             }
159              
160             ## Private
161             ############
162              
163             ## waits for another job slot to become available
164             ## short circuits if we've received SIGTERM or reached worker total threshold
165             sub _waitqueue {
166 76     76   170 my $self = shift;
167              
168             # check for any stopping conditions
169 76 100 66     562 return 0 if defined $self->{_worker_total} and $self->{_worker_total} <= 0;
170              
171             # wait to reap at least one child
172 65         75 while (keys(%{ $self->{_jobs} }) >= $self->{_workers}) {
  30570         66814  
173 30505 50       41177 return 0 unless $self->{_continue};
174 30505         35645 $self->_reapchildren();
175 30505         1974850 sleep $self->{_reap_timeout};
176             }
177              
178 65         180 return 1;
179             }
180              
181             ## checks all currently running jobs and reaps any that have finished, opening
182             ## their slot.
183             sub _reapchildren {
184 30505     30505   21019 my $self = shift;
185 30505         17856 foreach my $pid (keys(%{ $self->{_jobs} })) {
  30505         43668  
186 61010         91490 my $waitpid = waitpid($pid, WNOHANG);
187 61010 100       87245 delete $self->{_jobs}{$pid} if $waitpid > 0;
188             }
189             }
190              
191             =back
192              
193             =cut
194              
195             1;
196              
197             # ABSTRACT: Provides a very simple, no frills fork manager.
198