File Coverage

blib/lib/Parallel/Map/Segmented.pm
Criterion Covered Total %
statement 32 35 91.4
branch 5 8 62.5
condition 2 3 66.6
subroutine 9 10 90.0
pod 1 1 100.0
total 49 57 85.9


line stmt bran cond sub pod time code
1             package Parallel::Map::Segmented;
2             $Parallel::Map::Segmented::VERSION = '0.4.1';
3 6     6   861388 use strict;
  6         11  
  6         201  
4 6     6   24 use warnings;
  6         20  
  6         345  
5 6     6   2891 use autodie;
  6         108279  
  6         29  
6 6     6   45448 use 5.014;
  6         48  
7              
8 6     6   3896 use Parallel::ForkManager::Segmented::Base v0.4.0;
  6         58325  
  6         257  
9 6     6   48 use parent 'Parallel::ForkManager::Segmented::Base';
  6         10  
  6         40  
10              
11 6     6   3907 use Parallel::Map qw/ pmap_void /;
  6         849711  
  6         2100  
12              
13             sub run
14             {
15 5     5 1 1944844 my ( $self, $args ) = @_;
16              
17 5         37 my $processed = $self->process_args($args);
18 5 100       201 return if not $processed;
19             my ( $WITH_PM, $batch_cb, $batch_size, $nproc, $stream_cb, ) =
20 4         11 @{$processed}{qw/ WITH_PM batch_cb batch_size nproc stream_cb /};
  4         23  
21              
22 4         25 my $batch = $stream_cb->( { size => 1 } )->{items};
23 4 50       99 return if not defined $batch;
24 4         45 $batch_cb->($batch);
25 4 50       29632 if ($WITH_PM)
26             {
27 0     0   0 pmap_void sub { $batch_cb->(shift); Future->done; }, generate => sub {
  0         0  
28 28   66 28   307049 return $stream_cb->( { size => $batch_size } )->{items} // ();
29             },
30 4 50       73 ( $nproc ? ( forks => $nproc ) : () ),
31             ;
32             }
33             else
34             {
35 0         0 $self->serial_run($processed);
36             }
37 4         41574 return;
38             }
39              
40             1;
41              
42             __END__