File Coverage

blib/lib/Parallel/Queue.pm
Criterion Covered Total %
statement 48 48 100.0
branch 19 24 79.1
condition 10 11 90.9
subroutine 9 9 100.0
pod 0 2 0.0
total 86 94 91.4


line stmt bran cond sub pod time code
1             ########################################################################
2             # housekeeping
3             ########################################################################
4              
5             package Parallel::Queue v4.0.0;
6 218     218   383600 use v5.24;
  218         781  
7 218     218   1004 use mro qw( c3 );
  218         654  
  218         1260  
8              
9 218     218   85283 use mro::EVERY;
  218         259191  
  218         1131  
10              
11 218     218   6523 use Carp qw( croak );
  218         479  
  218         7545  
12 218     218   1057 use Symbol qw( qualify_to_ref );
  218         433  
  218         6708  
13              
14             use Scalar::Util
15             qw
16 218         254351 (
17             blessed
18             reftype
19             looks_like_number
20 218     218   929 );
  218         382  
21              
22             ########################################################################
23             # package variables
24             ########################################################################
25              
26             our @CARP_NOT = ( __PACKAGE__, qw( mro mro::EVERY ) );
27              
28             # config state
29              
30             my %defaultz =
31             (
32             fork => ! $^P
33             , qw
34             (
35             export runqueue
36             finish 0
37             debug 0
38             verbose 0
39             )
40             );
41              
42             my %argz = %defaultz;
43              
44             ########################################################################
45             # utility subs
46             ########################################################################
47              
48             my $format
49             = sub
50             {
51             state $dumper
52             = do
53             {
54             require Data::Dumper;
55             Data::Dumper->can( 'Dumper' )
56             };
57              
58             local $Data::Dumper::Terse = 1;
59             local $Data::Dumper::Indent = 1;
60             local $Data::Dumper::Sortkeys = 1;
61              
62             local $Data::Dumper::Purity = 0;
63             local $Data::Dumper::Deepcopy = 0;
64             local $Data::Dumper::Quotekeys = 0;
65              
66             my $head = shift;
67              
68             say join "\n" =>
69             map
70             {
71             ref $_
72             ? $_->$dumper
73             : $_
74             }
75             ( "($$) $head" => @_ );
76              
77             return;
78             };
79              
80             my $stub = sub{};
81             my $debug = $stub;
82              
83             ########################################################################
84             # execution handlers
85             ########################################################################
86              
87             my $next_job
88             = sub
89             {
90             state $redo = __SUB__;
91             state $object = '';
92              
93             my $job = '';
94              
95             if( $object )
96             {
97             # objects may have their own internal stack.
98             # if there aren't any arguments then pass
99             # nothing rather than undef from shift.
100              
101             if( my $job = $object->next_job )
102             {
103             return $job;
104             }
105             else
106             {
107             $object = '';
108             }
109             }
110              
111             @_ or return;
112            
113             # silently ignore empty slots.
114              
115             my $next = shift
116             or goto *$redo;
117              
118             my $class = blessed $next;
119              
120             if( $class && $class->can( 'next_job' ) )
121             {
122             $debug->( "New iterator: '$class'" );
123              
124             $object = $next;
125             goto &$redo
126             }
127             elsif( 'CODE' eq reftype $next )
128             {
129             $next
130             }
131             else
132             {
133             my $nastygram
134             = $format->( 'Bothched queue: un-blessed, non-coderef', $_[0] );
135              
136             $argz{ finish }
137             or croak $nastygram;
138              
139             say STDERR $nastygram;
140             goto &$redo
141             }
142             };
143              
144             my $run_nofork
145             = sub
146             {
147             # discard the count, iterate the queue without forking.
148             shift;
149              
150             $debug->( 'Non-forking queue' );
151              
152             while( my $sub = &$next_job )
153             {
154             # these should all exit zero.
155              
156             my $exit
157             = eval
158             {
159             $sub->()
160             or next
161             };
162              
163             say STDERR "\nNon-zero exit: $exit, $@\n";
164              
165             if( $argz{ finish } )
166             {
167             say 'Non-zero exit: Continuing queue.';
168             }
169             else
170             {
171             say 'Non-zero exit: Aborting queue.';
172             last;
173             }
174             }
175              
176             return
177             };
178              
179             my $fork_job
180             = sub
181             {
182             # don't check @_: next_job may have an object
183             # returning next jobs w/ an empty queue.
184              
185             my $job = &$next_job
186             or return;
187              
188             if( ( my $pid = fork ) > 0 )
189             {
190             $debug->( "fork: $pid" );
191             return
192             }
193             elsif( defined $pid )
194             {
195             # child passes the exit status of the perl sub call
196             # to the caller as our exit status. the O/S will deal
197             # with signal values.
198             #
199             # aside: failing to exit here will cause runaway
200             # phorkatosis.
201              
202             $debug->( "\tExecuting: '$job'" );
203              
204             my $exitval = eval { $job->() } || 0;
205              
206             # either way, this process needs to exit.
207              
208             $@
209             ? die
210             : exit $exitval
211             }
212             else
213             {
214             # pass back the fork failure for the caller to deal with.
215              
216             die "Phorkafobia: $!";
217             }
218             };
219              
220             my $fork_queue
221             = sub
222             {
223             # count was validated in runqueue.
224              
225             my $count = shift;
226              
227             # what's on the stack?
228             # the jobs to run!
229             # which may be none.
230             # if so, we're done.
231              
232             $debug->( "Forking initial $count jobs." );
233              
234             &$fork_job for 1 .. $count;
235              
236             $debug->( "Processing remainder of queue." );
237              
238             my $reap_only = '';
239              
240             while( ( my $pid = wait ) > 0 )
241             {
242             $debug->( "Complete: $pid ($?)" );
243              
244             if( $? )
245             {
246             # this assumes normal *NIX 16-bit exit values,
247             # with a status in the high byte and signum
248             # in the lower. notice that $status is not
249             # masked to 8 bits, however. this allows us to
250             # deal with non-zero exits on > 16-bit systems.
251             #
252             # caller can trap the signals.
253              
254             my $failure
255             = do
256             {
257             if( my $exit = $? >> 8 )
258             {
259             "exit( $exit ) by $pid"
260             }
261             elsif( my $signal = $? & 0xFF )
262             {
263             "kill SIG-$signal on $pid"
264             }
265             else
266             {
267             "coredump by $pid"
268             }
269             };
270              
271             my $result
272             = ( $reap_only = ! $argz{ finish } )
273             ? "Non-zero exit: Reaping only to complete queue."
274             : "Non-zero exit: Continuing queue."
275             ;
276              
277             say STDERR "\n$failure\n$result\n";
278             }
279              
280             $reap_only
281             or &$fork_job
282             }
283              
284             return
285             };
286              
287             # debug or zero count run the jobs without forking,
288             # simplifies most debugging issues.
289              
290             sub runqueue
291             {
292 1279     1279 0 5165616 my $count = $_[0];
293              
294 1279 50       8068 looks_like_number $count
295             or croak "Bogus runqueue: '$count' non-numeric";
296              
297 1279 50       5207 $count < 0
298             and croak "Bogus runqueue: negative count ($count)";
299              
300 1279 100 100     10108 $argz{ fork } && $count
301             ? &$fork_queue
302             : &$run_nofork
303             ;
304              
305             # return the unused portion.
306             # this includes any incomplete iterators.
307              
308             @_
309 1075         4062 }
310              
311             sub configure
312             {
313             # discard the current patckage
314 1488 100   1488 0 1300923 shift if $_[0] eq __PACKAGE__;
315              
316             %argz
317             = map
318             {
319 1488         4010 my ( $arg, $val ) = split /=/, $_, 2;
  2116         6743  
320              
321 2116   100     10092 $val //= 1;
322              
323 2116 100       8958 $val = !$val
324             if $arg =~ s{^ no}{}x;
325              
326 2116         14559 ( $arg => $val )
327             }
328             @_;
329              
330             @argz{ qw( fork verbose ) } = ( '', 1 )
331 1488 100       4588 if delete $argz{ debug };
332              
333 1488         3880 for( $argz{ export } )
334             {
335             # numeric for true gets default name.
336             # name for true gets whatever's there.
337              
338 1488 100       4726 $_ or next;
339              
340             # default does the right thing.
341              
342             looks_like_number $_
343 1 50       4 and delete $argz{ export };
344              
345 1 50       4 m{ \W }x
346             and croak "Botched export: '$_' contains non-word chars.";
347             }
348              
349 1488         11394 while( my($k,$v) = each %defaultz )
350             {
351 7440   100     29601 $argz{ $k } //= $v;
352             }
353              
354             $debug
355             = $argz{ verbose }
356 1488 100       5514 ? $format
357             : $stub
358             ;
359              
360 1488         4038 $debug->( 'Configuration:', \%argz );
361              
362 1488 50 66     7451 if( $argz{ fork } && $^P )
363             {
364 637         69153 say STDERR
365             'Debugger forking. Check TERM=xterm or $DB::debug_TTY.';
366             }
367              
368             return
369 1488         4117 }
370              
371             sub import
372             {
373 218     218   1434 &configure;
374              
375 218 100       3069 if( my $export = $argz{ export } )
376             {
377 133         267 my $caller = caller;
378 133         817 my $ref = qualify_to_ref $export => $caller;
379              
380 133         3906 undef &{ *$ref };
  133         777  
381              
382 133         536 $debug->( "Installing $export -> $caller" );
383              
384 133         6528 *$ref = \&runqueue
385             }
386             }
387              
388             # keep require happy
389              
390             1
391              
392             __END__