File Coverage

blib/lib/MyCPAN/Indexer/Dispatcher/Parallel.pm
Criterion Covered Total %
statement 27 103 26.2
branch 0 10 0.0
condition 0 4 0.0
subroutine 11 20 55.0
pod 3 3 100.0
total 41 140 29.2


line stmt bran cond sub pod time code
1             package MyCPAN::Indexer::Dispatcher::Parallel;
2 1     1   1205 use strict;
  1         2  
  1         33  
3 1     1   6 use warnings;
  1         2  
  1         29  
4              
5 1     1   6 use parent qw(MyCPAN::Indexer::Component);
  1         2  
  1         5  
6 1     1   55 use vars qw($VERSION $logger);
  1         3  
  1         73  
7             $VERSION = '1.28_12';
8              
9 1     1   6 use Log::Log4perl;
  1         2  
  1         7  
10              
11             BEGIN {
12 1     1   57 $logger = Log::Log4perl->get_logger( 'Dispatcher' );
13             }
14              
15 1     1   637 BEGIN {
16             # override since Tk overrides exit and this needs the real exit
17 1     1   60 no warnings 'redefine';
  1         3  
  1         31  
18 1     1   1209 use Parallel::ForkManager;
  1         32310  
  1         39  
19              
20             sub Parallel::ForkManager::finish {
21 0     0 1   $logger->debug( "In Parallel::ForkManager::finish" );
22 0           my ($s, $x) = @_;
23 0 0 0       CORE::exit ($x || 0) if $s->{in_child};
24 0 0         if( $s->{max_proc} == 0 ) { # max_proc == 0
25 0           $s->on_finish($$, $x ,$s->{processes}->{$$}, 0, 0);
26 0           delete $s->{processes}->{$$};
27             }
28 0           return 0;
29             }
30             }
31              
32              
33             =head1 NAME
34              
35             MyCPAN::Indexer::Dispatcher::Parallel - Pass out work to sub-processes
36              
37             =head1 SYNOPSIS
38              
39             Use this in C by specifying it as the queue class:
40              
41             # in backpan_indexer.config
42             dispatch_class MyCPAN::Indexer::Dispatcher::Parallel
43              
44             =head1 DESCRIPTION
45              
46             This class takes the list of distributions to process and passes them
47             out to the code that will do the work.
48              
49             =head2 Methods
50              
51             =over 4
52              
53             =item component_type
54              
55             This is a dispatcher type
56              
57             =cut
58              
59 0     0 1   sub component_type { $_[0]->dispatcher_type }
60              
61             =item get_dispatcher
62              
63             Takes the C<$Notes> hash and adds the C key with a code
64             reference. This module uses C to run
65             jobs in parallel, and looks at the
66              
67             It also sets up keys for PID, whose value is an anonymous array
68             of process IDs. That array matches up with the one in the key
69             C which keeps track of the distributions it's processing.
70             It adds:
71              
72             dispatcher => sub { ... },
73             PID => [],
74             recent => [],
75              
76             =cut
77              
78              
79             sub get_dispatcher
80             {
81 0     0 1   my( $self ) = @_;
82              
83 0           $self->set_note( 'Threads', $self->get_config->parallel_jobs );
84 0           $self->set_note( 'dispatcher', $self->_make_forker );
85 0           $self->set_note( 'interface_callback', $self->_make_interface_callback );
86             }
87              
88             sub _make_forker
89             {
90 0     0     my( $self ) = @_;
91              
92 0   0       Parallel::ForkManager->new(
93             $self->get_config->parallel_jobs || 1
94             );
95             }
96              
97             sub _make_interface_callback
98             {
99 0     0     my( $self ) = @_;
100              
101 0           foreach my $key ( qw(PID recent errors ) )
102             {
103 0           $self->set_note( $key, [] );
104             }
105              
106 0           $self->set_note( 'Total', scalar @{ $self->get_note( 'queue' ) } );
  0            
107 0           $self->set_note( 'Left', $self->get_note('Total') );
108 0           $self->set_note( 'Errors', 0 );
109 0           $self->set_note( 'Done', 0 );
110 0           $self->set_note( 'Started', scalar localtime );
111 0           $self->set_note( 'Finished', 0 );
112              
113 0           $self->set_note( 'queue_cursor', 0 );
114              
115             my $interface_callback = sub {
116 0     0     $self->_remove_old_processes;
117              
118 0           $logger->debug( sprintf
119             "Finished: %s Left: %s",
120 0           map { $self->get_note( $_ ) } qw(Finished Left)
121             );
122              
123 0 0         unless( $self->get_note( 'Left' ) )
124             {
125 0           $logger->debug( "Waiting on all children [" . time . "]" );
126 0           $self->get_note( 'dispatcher' )->wait_all_children;
127 0           $self->set_note( 'Finished', 1 );
128 0           return;
129             };
130              
131 0           $self->set_note_unless_defined( '_started', time );
132              
133 0           $self->set_note(
134             '_elapsed',
135             time - $self->get_note( '_started' )
136             );
137              
138 0           $self->set_note(
139             'Elapsed',
140             _elapsed( $self->get_note( '_elapsed' ) )
141             );
142              
143 0           my $item = $self->get_note_list_element(
144             'queue',
145             $self->increment_note( 'queue_cursor' )
146             );
147              
148 0           my $info;
149              
150 0 0         if( my $pid = $self->get_note( 'dispatcher' )->start )
151             { #parent
152 0           $self->unshift_onto_note( 'PID', $pid );
153 0           $self->unshift_onto_note( 'recent', $item );
154              
155 0           $self->increment_note( 'Done' );
156              
157 0           $self->set_note(
158             'Left',
159             $self->get_note( 'Total' ) - $self->get_note( 'Done' )
160             );
161              
162 0           $logger->debug(
163             sprintf "Total: %s Done: %s Left: %s Finished: %s",
164 0           map { $self->get_note( $_ ) } qw( Total Done Left Finished )
165             );
166              
167 1     1   13 no warnings;
  1         4  
  1         576  
168             $self->set_note(
169             'Rate',
170 0           eval { $self->get_note( 'Done' ) / $self->get_note( '_elapsed' ) }
  0            
171             );
172              
173             }
174             else
175             { # child
176 0           $info = $self->get_note( 'child_task' )->( $item );
177 0           $self->get_note( 'dispatcher' )->finish;
178 0           $logger->error( "The child is still running!" );
179             }
180              
181 0           $info;
182 0           };
183              
184 0           $self->set_note( 'interface_callback', $interface_callback );
185             }
186              
187             sub _remove_old_processes
188             {
189 0     0     my( $self ) = @_;
190              
191 0           my $pid = $self->get_note( 'PID' );
192              
193 0           my @delete_indices = grep
194 0           { ! kill 0, $pid->[$_] }
195             0 .. $#$pid;
196              
197 0           my $recent = $self->get_note( 'recent' );
198              
199 0           foreach my $index ( reverse @delete_indices )
200             {
201 0           splice @$recent, $index, 1;
202 0           splice @$pid, $index, 1;
203             }
204             }
205              
206             sub _remove_stuck_processes
207             {
208 0     0     my( $self ) = @_;
209              
210 0           my $pids = $self->get_note( 'PID' );
211              
212 0           foreach my $pid ( @$pids )
213             {
214 0           $logger->debug( "Trying to remove stuck process $pid" );
215 0           my %children =
216 0 0         map { $_->{pid}, 1 }
217 0           grep { $_->{'ppid'} == $$ and $_->{'size'} > 1234 } #XXX
218 0           @{ Proc::ProcessTable->new->table };
219             }
220             }
221              
222             BEGIN {
223 1     1   44 my %hash = ( days => 864000, hours => 3600, minutes => 60 );
224              
225             sub _elapsed
226             {
227 0     0     my $seconds = shift;
228              
229 0           my @v;
230 0           foreach my $key ( qw(days hours minutes) )
231             {
232 0           push @v, int( $seconds / $hash{$key} );
233 0           $seconds -= $v[-1] * $hash{$key}
234             }
235              
236 0           push @v, $seconds;
237              
238 0           sprintf "%dd %02dh %02dm %02ds", @v;
239             }
240             }
241             1;
242              
243             1;
244              
245             =back
246              
247              
248             =head1 SEE ALSO
249              
250             MyCPAN::Indexer, MyCPAN::Indexer::Tutorial
251              
252             =head1 SOURCE AVAILABILITY
253              
254             This code is in Github:
255              
256             git://github.com/briandfoy/mycpan-indexer.git
257              
258             =head1 AUTHOR
259              
260             brian d foy, C<< >>
261              
262             =head1 COPYRIGHT AND LICENSE
263              
264             Copyright (c) 2008-2013, brian d foy, All Rights Reserved.
265              
266             You may redistribute this under the same terms as Perl itself.
267              
268             =cut